diff --git a/assets/database/2026-03-10-bank-statement-dedup.sql b/assets/database/2026-03-10-bank-statement-dedup.sql new file mode 100644 index 0000000..6219ad4 --- /dev/null +++ b/assets/database/2026-03-10-bank-statement-dedup.sql @@ -0,0 +1,33 @@ +SET NAMES utf8mb4; + +DELETE FROM ccdi_bank_statement +WHERE project_id IS NULL + OR LE_ACCOUNT_NO IS NULL + OR ACCOUNTING_DATE_ID IS NULL; + +UPDATE ccdi_bank_statement +SET LE_ACCOUNT_NO = TRIM(LE_ACCOUNT_NO); + +DELETE t1 +FROM ccdi_bank_statement t1 +JOIN ccdi_bank_statement t2 + ON t1.bank_statement_id > t2.bank_statement_id + AND t1.project_id = t2.project_id + AND t1.LE_ACCOUNT_NO = t2.LE_ACCOUNT_NO + AND t1.ACCOUNTING_DATE_ID = t2.ACCOUNTING_DATE_ID + AND t1.AMOUNT_DR = t2.AMOUNT_DR + AND t1.AMOUNT_CR = t2.AMOUNT_CR; + +ALTER TABLE ccdi_bank_statement + MODIFY COLUMN project_id bigint(20) NOT NULL COMMENT '关联项目ID', + MODIFY COLUMN LE_ACCOUNT_NO varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号', + MODIFY COLUMN ACCOUNTING_DATE_ID int(11) NOT NULL COMMENT '账号日期ID'; + +ALTER TABLE ccdi_bank_statement + ADD UNIQUE KEY uk_bank_statement_dedup ( + project_id, + LE_ACCOUNT_NO, + ACCOUNTING_DATE_ID, + AMOUNT_DR, + AMOUNT_CR + ); diff --git a/assets/对接流水分析/ccdi_bank_statement.md b/assets/对接流水分析/ccdi_bank_statement.md index a27108c..bce81a1 100644 --- a/assets/对接流水分析/ccdi_bank_statement.md +++ b/assets/对接流水分析/ccdi_bank_statement.md @@ -1,13 +1,24 @@ -# 兰溪存储的流水表的表结构 +# 银行流水中间表 `ccdi_bank_statement` + +## 去重相关字段 + +- `project_id`:业务项目主键,导入时由后端写入,迁移后要求 `NOT NULL`。 +- `LE_ACCOUNT_NO`:企业银行账号,作为去重键之一;入库前服务层会先执行 `trim`,迁移后要求 `NOT NULL DEFAULT ''`。 +- `ACCOUNTING_DATE_ID`:账期日期 ID,作为去重键之一,迁移后要求 `NOT NULL`。 +- 去重唯一键:`(project_id, LE_ACCOUNT_NO, ACCOUNTING_DATE_ID, AMOUNT_DR, AMOUNT_CR)`。 +- 唯一键语义:同一项目、同一账号、同一账期、同一借贷金额的流水只保留一条;重复导入时应通过 no-op upsert 跳过,不改写已有记录。 + +## 迁移后的关键结构 ```sql CREATE TABLE `ccdi_bank_statement` ( `bank_statement_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID', + `project_id` bigint(20) NOT NULL COMMENT '关联项目ID', `LE_ID` int(10) unsigned DEFAULT '0' COMMENT '企业ID', `ACCOUNT_ID` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '账号ID', `LE_ACCOUNT_NAME` varchar(240) DEFAULT 'NONE' COMMENT '企业账号名称', - `LE_ACCOUNT_NO` varchar(240) DEFAULT NULL COMMENT '企业银行账号', - `ACCOUNTING_DATE_ID` int(11) DEFAULT NULL COMMENT '账号日期ID', + `LE_ACCOUNT_NO` varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号', + `ACCOUNTING_DATE_ID` int(11) NOT NULL COMMENT '账号日期ID', `ACCOUNTING_DATE` varchar(10) DEFAULT '0000-00-00' COMMENT '账号日期', `TRX_DATE` varchar(20) NOT NULL COMMENT '交易日期', `CURRENCY` varchar(10) DEFAULT NULL COMMENT '币种', @@ -30,7 +41,7 @@ CREATE TABLE `ccdi_bank_statement` ( `internal_flag` tinyint(1) DEFAULT '0' COMMENT '"是否为内部交易1 是 0 否"', `batch_id` int(11) NOT NULL DEFAULT '0' COMMENT '上传logId对应upload_log', `batch_sequence` int(11) NOT NULL COMMENT '每次上传在文件中的line', - `CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间内', + `CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间', `created_by` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '创建者', `meta_json` text COMMENT '"meta json"', `no_balance` tinyint(1) DEFAULT '0' COMMENT '是否包含余额', @@ -39,54 +50,62 @@ CREATE TABLE `ccdi_bank_statement` ( `group_id` int(11) DEFAULT '0' COMMENT '项目id', `override_bs_id` bigint(20) DEFAULT '0' COMMENT '=0表示该数据未覆盖主表,>0表示覆盖主表,<0表示被主表覆盖', `payment_method` varchar(500) DEFAULT NULL COMMENT '微信、支付宝流水字段,交易方式', - `cret_no` varchar(20) COMMENT '身份证号', + `cret_no` varchar(20) DEFAULT NULL COMMENT '身份证号', PRIMARY KEY (`bank_statement_id`), - KEY `idx_batch_id_account` (`batch_id`,`LE_ACCOUNT_NO`,`ACCOUNTING_DATE_ID`), + KEY `idx_batch_id_account` (`batch_id`, `LE_ACCOUNT_NO`, `ACCOUNTING_DATE_ID`), KEY `GROUP_ID` (`group_id`), - KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`,`LE_ACCOUNT_NO`) USING BTREE -) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='银行流水的中间处理表'; + KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`, `LE_ACCOUNT_NO`) USING BTREE, + KEY `idx_project_id` (`project_id`), + UNIQUE KEY `uk_bank_statement_dedup` ( + `project_id`, + `LE_ACCOUNT_NO`, + `ACCOUNTING_DATE_ID`, + `AMOUNT_DR`, + `AMOUNT_CR` + ) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='银行流水的中间处理表'; ``` -流水表和返回值的对应关系 +## 字段映射 -| 序号 | ccdi_bank_statement | 返回值 | +| 序号 | ccdi_bank_statement | 流水分析返回字段 | | --- | --- | --- | | 1 | bank_statement_id | bankStatementId | -| 2 | LE_ID | leId | -| 3 | ACCOUNT_ID | accountId | -| 4 | LE_ACCOUNT_NAME | leName | -| 5 | LE_ACCOUNT_NO | accountNo | -| 6 | ACCOUNTING_DATE_ID | accountingDateId | -| 7 | ACCOUNTING_DATE | accountingDate | -| 8 | TRX_DATE | trxDate | -| 9 | CURRENCY | currency | -| 10 | AMOUNT_DR | drAmount | -| 11 | AMOUNT_CR | crAmount | -| 12 | AMOUNT_BALANCE | balanceAmount | -| 13 | CASH_TYPE | cashType | -| 14 | CUSTOMER_LE_ID | customerId | -| 15 | CUSTOMER_ACCOUNT_NAME | customerName | -| 16 | CUSTOMER_ACCOUNT_NO | customerAccountNo | -| 17 | customer_bank | customerBank | -| 18 | customer_reference | customerReference | -| 19 | USER_MEMO | userMemo | -| 20 | BANK_COMMENTS | bankComments | -| 21 | BANK_TRX_NUMBER | bankTrxNumber | -| 22 | BANK | bank | -| 23 | TRX_FLAG | transFlag | -| 24 | TRX_TYPE | transTypeId | -| 25 | EXCEPTION_TYPE | exceptionType | -| 26 | internal_flag | internalFlag | -| 27 | batch_id | batchId | -| 28 | batch_sequence | uploadSequnceNumber | -| 29 | CREATE_DATE | createDate | -| 30 | created_by | createdBy | -| 31 | meta_json | 设置为null | -| 32 | no_balance | isNoBalance | -| 33 | begin_balance | isBeginBalance | -| 34 | end_balance | isEndBalance | -| 35 | override_bs_id | overrideBsId | -| 36 | payment_method | paymentMethod | -| 37 | cret_no | cretNo | -| 38 | group_id | groupId | - +| 2 | project_id | 后端按业务写入 | +| 3 | LE_ID | leId | +| 4 | ACCOUNT_ID | accountId | +| 5 | LE_ACCOUNT_NAME | leName | +| 6 | LE_ACCOUNT_NO | accountMaskNo | +| 7 | ACCOUNTING_DATE_ID | accountingDateId | +| 8 | ACCOUNTING_DATE | accountingDate | +| 9 | TRX_DATE | trxDate | +| 10 | CURRENCY | currency | +| 11 | AMOUNT_DR | drAmount | +| 12 | AMOUNT_CR | crAmount | +| 13 | AMOUNT_BALANCE | balanceAmount | +| 14 | CASH_TYPE | cashType | +| 15 | CUSTOMER_LE_ID | customerId | +| 16 | CUSTOMER_ACCOUNT_NAME | customerName | +| 17 | CUSTOMER_ACCOUNT_NO | customerAccountMaskNo | +| 18 | customer_bank | customerBank | +| 19 | customer_reference | customerReference | +| 20 | USER_MEMO | userMemo | +| 21 | BANK_COMMENTS | bankComments | +| 22 | BANK_TRX_NUMBER | bankTrxNumber | +| 23 | BANK | bank | +| 24 | TRX_FLAG | transFlag | +| 25 | TRX_TYPE | transTypeId | +| 26 | EXCEPTION_TYPE | exceptionType | +| 27 | internal_flag | internalFlag | +| 28 | batch_id | batchId | +| 29 | batch_sequence | uploadSequnceNumber | +| 30 | CREATE_DATE | createDate | +| 31 | created_by | createdBy | +| 32 | meta_json | 固定写入 `null` | +| 33 | no_balance | noBalance | +| 34 | begin_balance | beginBalance | +| 35 | end_balance | endBalance | +| 36 | group_id | groupId | +| 37 | override_bs_id | overrideBsId | +| 38 | payment_method | paymentMethod | +| 39 | cret_no | cretNo | diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java index 41352fb..3a1201f 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java @@ -56,7 +56,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { private static class FetchBankStatementResult { private boolean success; private int totalCount; - private int savedCount; + private int attemptedCount; private String errorMessage; } @@ -352,6 +352,14 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { return errorMessage.substring(0, MAX_ERROR_MESSAGE_LENGTH); } + private String trimAccountNo(String value) { + return value == null ? null : value.trim(); + } + + private void normalizeDedupFields(CcdiBankStatement statement) { + statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo())); + } + /** * 异步处理单个文件的完整流程 * 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据 @@ -583,14 +591,13 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { if (totalCount == null || totalCount <= 0) { log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount); result.setSuccess(true); - result.setSavedCount(0); return result; } int pageSize = 1000; int batchSize = 1000; int totalPages = (int) Math.ceil((double) totalCount / pageSize); - int totalSaved = 0; + int totalAttempted = 0; List batchList = new ArrayList<>(batchSize); log.info("【文件上传】获取到总数: totalCount={}", totalCount); @@ -622,13 +629,14 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { if (statement != null) { statement.setBatchId(logId); statement.setProjectId(projectId); + normalizeDedupFields(statement); batchList.add(statement); if (batchList.size() >= batchSize) { + int currentBatchSize = batchList.size(); bankStatementMapper.insertBatch(batchList); - totalSaved += batchList.size(); - log.debug("【文件上传】批量插入流水 {}条, 累计{}条", - batchList.size(), totalSaved); + totalAttempted += currentBatchSize; + log.debug("【文件上传】批量写入流水 {}条", currentBatchSize); batchList.clear(); } } @@ -643,14 +651,16 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { } if (!batchList.isEmpty()) { + int currentBatchSize = batchList.size(); bankStatementMapper.insertBatch(batchList); - totalSaved += batchList.size(); + totalAttempted += currentBatchSize; log.debug("【文件上传】批量插入剩余流水 {}条", batchList.size()); } - log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved); + log.info("【文件上传】流水入库完成: fetchedCount={}, attemptedCount={}", + totalCount, totalAttempted); result.setSuccess(true); - result.setSavedCount(totalSaved); + result.setAttemptedCount(totalAttempted); return result; } catch (Exception e) { log.error("【文件上传】获取或保存流水数据失败: logId={}", logId, e); diff --git a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml index 3bfe650..da66c4a 100644 --- a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml +++ b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml @@ -84,6 +84,8 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" #{item.overrideBsId}, #{item.paymentMethod}, #{item.cretNo} ) + on duplicate key update + bank_statement_id = bank_statement_id diff --git a/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java b/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java index 335fdbb..83f7f28 100644 --- a/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java +++ b/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java @@ -1,5 +1,8 @@ package com.ruoyi.ccdi.project.service.impl; +import ch.qos.logback.classic.Logger; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.read.ListAppender; import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper; import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; @@ -16,14 +19,18 @@ import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.InjectMocks; import org.mockito.Mock; import org.mockito.junit.jupiter.MockitoExtension; +import org.slf4j.LoggerFactory; +import org.springframework.test.util.ReflectionTestUtils; import java.io.IOException; +import java.math.BigDecimal; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.List; import java.util.concurrent.atomic.AtomicInteger; +import static org.junit.jupiter.api.Assertions.assertEquals; import static org.junit.jupiter.api.Assertions.assertFalse; import static org.junit.jupiter.api.Assertions.assertTrue; import static org.mockito.ArgumentMatchers.any; @@ -187,6 +194,84 @@ class CcdiFileUploadServiceImplTest { assertTrue(failedRecord.getErrorMessage().length() <= MAX_ERROR_MESSAGE_LENGTH); } + @Test + void fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert() throws IOException { + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 ")))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 ")))); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + verify(bankStatementMapper).insertBatch(any()); + verify(bankStatementMapper).insertBatch(org.mockito.ArgumentMatchers.argThat(list -> + list.size() == 1 && "62220001".equals(list.get(0).getLeAccountNo()))); + } + + @Test + void fetchAndSaveBankStatements_shouldLogConservativeCountsWhenAffectedRowsAreAmbiguous() { + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001")))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001")))); + when(bankStatementMapper.insertBatch(any())).thenReturn(1); + + Logger logger = (Logger) LoggerFactory.getLogger(CcdiFileUploadServiceImpl.class); + ListAppender logAppender = new ListAppender<>(); + logAppender.start(); + logger.addAppender(logAppender); + + try { + Object result = ReflectionTestUtils.invokeMethod( + service, + "fetchAndSaveBankStatements", + PROJECT_ID, + LSFX_PROJECT_ID, + LOG_ID + ); + + assertTrue(Boolean.TRUE.equals(ReflectionTestUtils.getField(result, "success"))); + assertEquals(1, ReflectionTestUtils.getField(result, "totalCount")); + assertEquals(1, ReflectionTestUtils.getField(result, "attemptedCount")); + assertTrue(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage) + .anyMatch(message -> message.contains("流水入库完成: fetchedCount=1, attemptedCount=1"))); + assertFalse(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage) + .anyMatch(message -> message.contains("insertedCount="))); + } finally { + logger.detachAppender(logAppender); + } + } + + @Test + void processFileAsync_shouldMarkParsedFailedWhenInsertBatchThrowsUnexpectedSqlError() throws IOException { + List events = new ArrayList<>(); + AtomicInteger sequence = new AtomicInteger(); + captureRecordStatus(events, sequence); + + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001")))) + .thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001")))); + when(bankStatementMapper.insertBatch(any())) + .thenThrow(new RuntimeException("sql syntax error")); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed"))); + assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success"))); + } + private void captureRecordStatus(List events, AtomicInteger sequence) { doAnswer(invocation -> { CcdiFileUploadRecord record = invocation.getArgument(0); @@ -279,6 +364,28 @@ class CcdiFileUploadServiceImplTest { return response; } + private GetBankStatementResponse buildBankStatementResponseWithItems( + int totalCount, + List items + ) { + GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData(); + data.setTotalCount(totalCount); + data.setBankStatementList(items); + + GetBankStatementResponse response = new GetBankStatementResponse(); + response.setData(data); + return response; + } + + private GetBankStatementResponse.BankStatementItem buildBankStatementItem(String accountMaskNo) { + GetBankStatementResponse.BankStatementItem item = new GetBankStatementResponse.BankStatementItem(); + item.setAccountMaskNo(accountMaskNo); + item.setAccountingDateId(20260310); + item.setDrAmount(new BigDecimal("100.00")); + item.setCrAmount(new BigDecimal("0.00")); + return item; + } + private int findEventIndex(List events, String suffix) { for (int i = 0; i < events.size(); i++) { if (events.get(i).endsWith(suffix)) {