feat(ccdi-project): harden bank statement dedup import

This commit is contained in:
wkc
2026-03-10 10:39:06 +08:00
parent 924605ac3a
commit 6007f3284a
5 changed files with 228 additions and 57 deletions

View File

@@ -0,0 +1,33 @@
SET NAMES utf8mb4;
DELETE FROM ccdi_bank_statement
WHERE project_id IS NULL
OR LE_ACCOUNT_NO IS NULL
OR ACCOUNTING_DATE_ID IS NULL;
UPDATE ccdi_bank_statement
SET LE_ACCOUNT_NO = TRIM(LE_ACCOUNT_NO);
DELETE t1
FROM ccdi_bank_statement t1
JOIN ccdi_bank_statement t2
ON t1.bank_statement_id > t2.bank_statement_id
AND t1.project_id = t2.project_id
AND t1.LE_ACCOUNT_NO = t2.LE_ACCOUNT_NO
AND t1.ACCOUNTING_DATE_ID = t2.ACCOUNTING_DATE_ID
AND t1.AMOUNT_DR = t2.AMOUNT_DR
AND t1.AMOUNT_CR = t2.AMOUNT_CR;
ALTER TABLE ccdi_bank_statement
MODIFY COLUMN project_id bigint(20) NOT NULL COMMENT '关联项目ID',
MODIFY COLUMN LE_ACCOUNT_NO varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
MODIFY COLUMN ACCOUNTING_DATE_ID int(11) NOT NULL COMMENT '账号日期ID';
ALTER TABLE ccdi_bank_statement
ADD UNIQUE KEY uk_bank_statement_dedup (
project_id,
LE_ACCOUNT_NO,
ACCOUNTING_DATE_ID,
AMOUNT_DR,
AMOUNT_CR
);

View File

@@ -1,13 +1,24 @@
# 兰溪存储的流水表的表结构
# 银行流水中间表 `ccdi_bank_statement`
## 去重相关字段
- `project_id`:业务项目主键,导入时由后端写入,迁移后要求 `NOT NULL`
- `LE_ACCOUNT_NO`:企业银行账号,作为去重键之一;入库前服务层会先执行 `trim`,迁移后要求 `NOT NULL DEFAULT ''`
- `ACCOUNTING_DATE_ID`:账期日期 ID作为去重键之一迁移后要求 `NOT NULL`
- 去重唯一键:`(project_id, LE_ACCOUNT_NO, ACCOUNTING_DATE_ID, AMOUNT_DR, AMOUNT_CR)`
- 唯一键语义:同一项目、同一账号、同一账期、同一借贷金额的流水只保留一条;重复导入时应通过 no-op upsert 跳过,不改写已有记录。
## 迁移后的关键结构
```sql
CREATE TABLE `ccdi_bank_statement` (
`bank_statement_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`project_id` bigint(20) NOT NULL COMMENT '关联项目ID',
`LE_ID` int(10) unsigned DEFAULT '0' COMMENT '企业ID',
`ACCOUNT_ID` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '账号ID',
`LE_ACCOUNT_NAME` varchar(240) DEFAULT 'NONE' COMMENT '企业账号名称',
`LE_ACCOUNT_NO` varchar(240) DEFAULT NULL COMMENT '企业银行账号',
`ACCOUNTING_DATE_ID` int(11) DEFAULT NULL COMMENT '账号日期ID',
`LE_ACCOUNT_NO` varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
`ACCOUNTING_DATE_ID` int(11) NOT NULL COMMENT '账号日期ID',
`ACCOUNTING_DATE` varchar(10) DEFAULT '0000-00-00' COMMENT '账号日期',
`TRX_DATE` varchar(20) NOT NULL COMMENT '交易日期',
`CURRENCY` varchar(10) DEFAULT NULL COMMENT '币种',
@@ -30,7 +41,7 @@ CREATE TABLE `ccdi_bank_statement` (
`internal_flag` tinyint(1) DEFAULT '0' COMMENT '"是否为内部交易1 是 0 否"',
`batch_id` int(11) NOT NULL DEFAULT '0' COMMENT '上传logId对应upload_log',
`batch_sequence` int(11) NOT NULL COMMENT '每次上传在文件中的line',
`CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间',
`CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间',
`created_by` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '创建者',
`meta_json` text COMMENT '"meta json"',
`no_balance` tinyint(1) DEFAULT '0' COMMENT '是否包含余额',
@@ -39,54 +50,62 @@ CREATE TABLE `ccdi_bank_statement` (
`group_id` int(11) DEFAULT '0' COMMENT '项目id',
`override_bs_id` bigint(20) DEFAULT '0' COMMENT '=0表示该数据未覆盖主表>0表示覆盖主表<0表示被主表覆盖',
`payment_method` varchar(500) DEFAULT NULL COMMENT '微信、支付宝流水字段,交易方式',
`cret_no` varchar(20) COMMENT '身份证号',
`cret_no` varchar(20) DEFAULT NULL COMMENT '身份证号',
PRIMARY KEY (`bank_statement_id`),
KEY `idx_batch_id_account` (`batch_id`,`LE_ACCOUNT_NO`,`ACCOUNTING_DATE_ID`),
KEY `idx_batch_id_account` (`batch_id`, `LE_ACCOUNT_NO`, `ACCOUNTING_DATE_ID`),
KEY `GROUP_ID` (`group_id`),
KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`,`LE_ACCOUNT_NO`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='银行流水的中间处理表';
KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`, `LE_ACCOUNT_NO`) USING BTREE,
KEY `idx_project_id` (`project_id`),
UNIQUE KEY `uk_bank_statement_dedup` (
`project_id`,
`LE_ACCOUNT_NO`,
`ACCOUNTING_DATE_ID`,
`AMOUNT_DR`,
`AMOUNT_CR`
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='银行流水的中间处理表';
```
流水表和返回值的对应关系
## 字段映射
| 序号 | ccdi_bank_statement | 返回值 |
| 序号 | ccdi_bank_statement | 流水分析返回字段 |
| --- | --- | --- |
| 1 | bank_statement_id | bankStatementId |
| 2 | LE_ID | leId |
| 3 | ACCOUNT_ID | accountId |
| 4 | LE_ACCOUNT_NAME | leName |
| 5 | LE_ACCOUNT_NO | accountNo |
| 6 | ACCOUNTING_DATE_ID | accountingDateId |
| 7 | ACCOUNTING_DATE | accountingDate |
| 8 | TRX_DATE | trxDate |
| 9 | CURRENCY | currency |
| 10 | AMOUNT_DR | drAmount |
| 11 | AMOUNT_CR | crAmount |
| 12 | AMOUNT_BALANCE | balanceAmount |
| 13 | CASH_TYPE | cashType |
| 14 | CUSTOMER_LE_ID | customerId |
| 15 | CUSTOMER_ACCOUNT_NAME | customerName |
| 16 | CUSTOMER_ACCOUNT_NO | customerAccountNo |
| 17 | customer_bank | customerBank |
| 18 | customer_reference | customerReference |
| 19 | USER_MEMO | userMemo |
| 20 | BANK_COMMENTS | bankComments |
| 21 | BANK_TRX_NUMBER | bankTrxNumber |
| 22 | BANK | bank |
| 23 | TRX_FLAG | transFlag |
| 24 | TRX_TYPE | transTypeId |
| 25 | EXCEPTION_TYPE | exceptionType |
| 26 | internal_flag | internalFlag |
| 27 | batch_id | batchId |
| 28 | batch_sequence | uploadSequnceNumber |
| 29 | CREATE_DATE | createDate |
| 30 | created_by | createdBy |
| 31 | meta_json | 设置为null |
| 32 | no_balance | isNoBalance |
| 33 | begin_balance | isBeginBalance |
| 34 | end_balance | isEndBalance |
| 35 | override_bs_id | overrideBsId |
| 36 | payment_method | paymentMethod |
| 37 | cret_no | cretNo |
| 38 | group_id | groupId |
| 2 | project_id | 后端按业务写入 |
| 3 | LE_ID | leId |
| 4 | ACCOUNT_ID | accountId |
| 5 | LE_ACCOUNT_NAME | leName |
| 6 | LE_ACCOUNT_NO | accountMaskNo |
| 7 | ACCOUNTING_DATE_ID | accountingDateId |
| 8 | ACCOUNTING_DATE | accountingDate |
| 9 | TRX_DATE | trxDate |
| 10 | CURRENCY | currency |
| 11 | AMOUNT_DR | drAmount |
| 12 | AMOUNT_CR | crAmount |
| 13 | AMOUNT_BALANCE | balanceAmount |
| 14 | CASH_TYPE | cashType |
| 15 | CUSTOMER_LE_ID | customerId |
| 16 | CUSTOMER_ACCOUNT_NAME | customerName |
| 17 | CUSTOMER_ACCOUNT_NO | customerAccountMaskNo |
| 18 | customer_bank | customerBank |
| 19 | customer_reference | customerReference |
| 20 | USER_MEMO | userMemo |
| 21 | BANK_COMMENTS | bankComments |
| 22 | BANK_TRX_NUMBER | bankTrxNumber |
| 23 | BANK | bank |
| 24 | TRX_FLAG | transFlag |
| 25 | TRX_TYPE | transTypeId |
| 26 | EXCEPTION_TYPE | exceptionType |
| 27 | internal_flag | internalFlag |
| 28 | batch_id | batchId |
| 29 | batch_sequence | uploadSequnceNumber |
| 30 | CREATE_DATE | createDate |
| 31 | created_by | createdBy |
| 32 | meta_json | 固定写入 `null` |
| 33 | no_balance | noBalance |
| 34 | begin_balance | beginBalance |
| 35 | end_balance | endBalance |
| 36 | group_id | groupId |
| 37 | override_bs_id | overrideBsId |
| 38 | payment_method | paymentMethod |
| 39 | cret_no | cretNo |

View File

@@ -56,7 +56,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
private static class FetchBankStatementResult {
private boolean success;
private int totalCount;
private int savedCount;
private int attemptedCount;
private String errorMessage;
}
@@ -352,6 +352,14 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
return errorMessage.substring(0, MAX_ERROR_MESSAGE_LENGTH);
}
private String trimAccountNo(String value) {
return value == null ? null : value.trim();
}
private void normalizeDedupFields(CcdiBankStatement statement) {
statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo()));
}
/**
* 异步处理单个文件的完整流程
* 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据
@@ -583,14 +591,13 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
result.setSuccess(true);
result.setSavedCount(0);
return result;
}
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
int totalSaved = 0;
int totalAttempted = 0;
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
@@ -622,13 +629,14 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
if (statement != null) {
statement.setBatchId(logId);
statement.setProjectId(projectId);
normalizeDedupFields(statement);
batchList.add(statement);
if (batchList.size() >= batchSize) {
int currentBatchSize = batchList.size();
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量入流水 {}条, 累计{}条",
batchList.size(), totalSaved);
totalAttempted += currentBatchSize;
log.debug("【文件上传】批量入流水 {}条", currentBatchSize);
batchList.clear();
}
}
@@ -643,14 +651,16 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
if (!batchList.isEmpty()) {
int currentBatchSize = batchList.size();
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
totalAttempted += currentBatchSize;
log.debug("【文件上传】批量插入剩余流水 {}条", batchList.size());
}
log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved);
log.info("【文件上传】流水入库完成: fetchedCount={}, attemptedCount={}",
totalCount, totalAttempted);
result.setSuccess(true);
result.setSavedCount(totalSaved);
result.setAttemptedCount(totalAttempted);
return result;
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: logId={}", logId, e);

View File

@@ -84,6 +84,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
#{item.overrideBsId}, #{item.paymentMethod}, #{item.cretNo}
)
</foreach>
on duplicate key update
bank_statement_id = bank_statement_id
</insert>
<delete id="deleteByProjectIdAndBatchId">

View File

@@ -1,5 +1,8 @@
package com.ruoyi.ccdi.project.service.impl;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
@@ -16,14 +19,18 @@ import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.test.util.ReflectionTestUtils;
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
@@ -187,6 +194,84 @@ class CcdiFileUploadServiceImplTest {
assertTrue(failedRecord.getErrorMessage().length() <= MAX_ERROR_MESSAGE_LENGTH);
}
@Test
void fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert() throws IOException {
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 "))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 "))));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
verify(bankStatementMapper).insertBatch(any());
verify(bankStatementMapper).insertBatch(org.mockito.ArgumentMatchers.argThat(list ->
list.size() == 1 && "62220001".equals(list.get(0).getLeAccountNo())));
}
@Test
void fetchAndSaveBankStatements_shouldLogConservativeCountsWhenAffectedRowsAreAmbiguous() {
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))));
when(bankStatementMapper.insertBatch(any())).thenReturn(1);
Logger logger = (Logger) LoggerFactory.getLogger(CcdiFileUploadServiceImpl.class);
ListAppender<ILoggingEvent> logAppender = new ListAppender<>();
logAppender.start();
logger.addAppender(logAppender);
try {
Object result = ReflectionTestUtils.invokeMethod(
service,
"fetchAndSaveBankStatements",
PROJECT_ID,
LSFX_PROJECT_ID,
LOG_ID
);
assertTrue(Boolean.TRUE.equals(ReflectionTestUtils.getField(result, "success")));
assertEquals(1, ReflectionTestUtils.getField(result, "totalCount"));
assertEquals(1, ReflectionTestUtils.getField(result, "attemptedCount"));
assertTrue(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage)
.anyMatch(message -> message.contains("流水入库完成: fetchedCount=1, attemptedCount=1")));
assertFalse(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage)
.anyMatch(message -> message.contains("insertedCount=")));
} finally {
logger.detachAppender(logAppender);
}
}
@Test
void processFileAsync_shouldMarkParsedFailedWhenInsertBatchThrowsUnexpectedSqlError() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))));
when(bankStatementMapper.insertBatch(any()))
.thenThrow(new RuntimeException("sql syntax error"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
}
private void captureRecordStatus(List<String> events, AtomicInteger sequence) {
doAnswer(invocation -> {
CcdiFileUploadRecord record = invocation.getArgument(0);
@@ -279,6 +364,28 @@ class CcdiFileUploadServiceImplTest {
return response;
}
private GetBankStatementResponse buildBankStatementResponseWithItems(
int totalCount,
List<GetBankStatementResponse.BankStatementItem> items
) {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(totalCount);
data.setBankStatementList(items);
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse.BankStatementItem buildBankStatementItem(String accountMaskNo) {
GetBankStatementResponse.BankStatementItem item = new GetBankStatementResponse.BankStatementItem();
item.setAccountMaskNo(accountMaskNo);
item.setAccountingDateId(20260310);
item.setDrAmount(new BigDecimal("100.00"));
item.setCrAmount(new BigDecimal("0.00"));
return item;
}
private int findEventIndex(List<String> events, String suffix) {
for (int i = 0; i < events.size(); i++) {
if (events.get(i).endsWith(suffix)) {