Compare commits

...

10 Commits

Author SHA1 Message Date
wkc
cf290624a9 提交 2026-03-10 10:52:16 +08:00
wkc
37f5b835fa Merge branch 'codex/bank-statement-duplicate-check' into dev
# Conflicts:
#	ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
2026-03-10 10:47:05 +08:00
wkc
6007f3284a feat(ccdi-project): harden bank statement dedup import 2026-03-10 10:39:06 +08:00
wkc
1241ba0bd6 fix(ccdi-project): remove local upload log files 2026-03-10 10:25:23 +08:00
wkc
924605ac3a docs: update bank statement dedup design key 2026-03-10 09:38:53 +08:00
wkc
4148bea5a9 fix(ccdi-project): initialize file upload log context 2026-03-09 16:55:14 +08:00
wkc
041974b318 fix(ccdi-project): truncate upload error messages 2026-03-09 16:51:28 +08:00
wkc
763e39d153 docs: add bank statement duplicate check design 2026-03-09 16:51:04 +08:00
wkc
3e0eb38f15 fix(ccdi-project): delay parse success until bank statements saved 2026-03-09 16:31:04 +08:00
wkc
e5116d000b docs: add file upload parse success timing design and plan 2026-03-09 15:33:52 +08:00
13 changed files with 2057 additions and 245 deletions

View File

@@ -0,0 +1,33 @@
SET NAMES utf8mb4;
DELETE FROM ccdi_bank_statement
WHERE project_id IS NULL
OR LE_ACCOUNT_NO IS NULL
OR ACCOUNTING_DATE_ID IS NULL;
UPDATE ccdi_bank_statement
SET LE_ACCOUNT_NO = TRIM(LE_ACCOUNT_NO);
DELETE t1
FROM ccdi_bank_statement t1
JOIN ccdi_bank_statement t2
ON t1.bank_statement_id > t2.bank_statement_id
AND t1.project_id = t2.project_id
AND t1.LE_ACCOUNT_NO = t2.LE_ACCOUNT_NO
AND t1.ACCOUNTING_DATE_ID = t2.ACCOUNTING_DATE_ID
AND t1.AMOUNT_DR = t2.AMOUNT_DR
AND t1.AMOUNT_CR = t2.AMOUNT_CR;
ALTER TABLE ccdi_bank_statement
MODIFY COLUMN project_id bigint(20) NOT NULL COMMENT '关联项目ID',
MODIFY COLUMN LE_ACCOUNT_NO varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
MODIFY COLUMN ACCOUNTING_DATE_ID int(11) NOT NULL COMMENT '账号日期ID';
ALTER TABLE ccdi_bank_statement
ADD UNIQUE KEY uk_bank_statement_dedup (
project_id,
LE_ACCOUNT_NO,
ACCOUNTING_DATE_ID,
AMOUNT_DR,
AMOUNT_CR
);

View File

@@ -1,13 +1,24 @@
# 兰溪存储的流水表的表结构
# 银行流水中间表 `ccdi_bank_statement`
## 去重相关字段
- `project_id`:业务项目主键,导入时由后端写入,迁移后要求 `NOT NULL`
- `LE_ACCOUNT_NO`:企业银行账号,作为去重键之一;入库前服务层会先执行 `trim`,迁移后要求 `NOT NULL DEFAULT ''`
- `ACCOUNTING_DATE_ID`:账期日期 ID作为去重键之一迁移后要求 `NOT NULL`
- 去重唯一键:`(project_id, LE_ACCOUNT_NO, ACCOUNTING_DATE_ID, AMOUNT_DR, AMOUNT_CR)`
- 唯一键语义:同一项目、同一账号、同一账期、同一借贷金额的流水只保留一条;重复导入时应通过 no-op upsert 跳过,不改写已有记录。
## 迁移后的关键结构
```sql
CREATE TABLE `ccdi_bank_statement` (
`bank_statement_id` bigint(20) unsigned NOT NULL AUTO_INCREMENT COMMENT 'ID',
`project_id` bigint(20) NOT NULL COMMENT '关联项目ID',
`LE_ID` int(10) unsigned DEFAULT '0' COMMENT '企业ID',
`ACCOUNT_ID` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '账号ID',
`LE_ACCOUNT_NAME` varchar(240) DEFAULT 'NONE' COMMENT '企业账号名称',
`LE_ACCOUNT_NO` varchar(240) DEFAULT NULL COMMENT '企业银行账号',
`ACCOUNTING_DATE_ID` int(11) DEFAULT NULL COMMENT '账号日期ID',
`LE_ACCOUNT_NO` varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
`ACCOUNTING_DATE_ID` int(11) NOT NULL COMMENT '账号日期ID',
`ACCOUNTING_DATE` varchar(10) DEFAULT '0000-00-00' COMMENT '账号日期',
`TRX_DATE` varchar(20) NOT NULL COMMENT '交易日期',
`CURRENCY` varchar(10) DEFAULT NULL COMMENT '币种',
@@ -30,7 +41,7 @@ CREATE TABLE `ccdi_bank_statement` (
`internal_flag` tinyint(1) DEFAULT '0' COMMENT '"是否为内部交易1 是 0 否"',
`batch_id` int(11) NOT NULL DEFAULT '0' COMMENT '上传logId对应upload_log',
`batch_sequence` int(11) NOT NULL COMMENT '每次上传在文件中的line',
`CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间',
`CREATE_DATE` datetime DEFAULT NULL COMMENT '创建时间',
`created_by` bigint(20) unsigned NOT NULL DEFAULT '0' COMMENT '创建者',
`meta_json` text COMMENT '"meta json"',
`no_balance` tinyint(1) DEFAULT '0' COMMENT '是否包含余额',
@@ -39,54 +50,62 @@ CREATE TABLE `ccdi_bank_statement` (
`group_id` int(11) DEFAULT '0' COMMENT '项目id',
`override_bs_id` bigint(20) DEFAULT '0' COMMENT '=0表示该数据未覆盖主表>0表示覆盖主表<0表示被主表覆盖',
`payment_method` varchar(500) DEFAULT NULL COMMENT '微信、支付宝流水字段,交易方式',
`cret_no` varchar(20) COMMENT '身份证号',
`cret_no` varchar(20) DEFAULT NULL COMMENT '身份证号',
PRIMARY KEY (`bank_statement_id`),
KEY `idx_batch_id_account` (`batch_id`,`LE_ACCOUNT_NO`,`ACCOUNTING_DATE_ID`),
KEY `idx_batch_id_account` (`batch_id`, `LE_ACCOUNT_NO`, `ACCOUNTING_DATE_ID`),
KEY `GROUP_ID` (`group_id`),
KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`,`LE_ACCOUNT_NO`) USING BTREE
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COMMENT='银行流水的中间处理表';
KEY `c4c_bank_statement_stg_batch_id_IDX` (`batch_id`, `LE_ACCOUNT_NO`) USING BTREE,
KEY `idx_project_id` (`project_id`),
UNIQUE KEY `uk_bank_statement_dedup` (
`project_id`,
`LE_ACCOUNT_NO`,
`ACCOUNTING_DATE_ID`,
`AMOUNT_DR`,
`AMOUNT_CR`
)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='银行流水的中间处理表';
```
流水表和返回值的对应关系
## 字段映射
| 序号 | ccdi_bank_statement | 返回值 |
| 序号 | ccdi_bank_statement | 流水分析返回字段 |
| --- | --- | --- |
| 1 | bank_statement_id | bankStatementId |
| 2 | LE_ID | leId |
| 3 | ACCOUNT_ID | accountId |
| 4 | LE_ACCOUNT_NAME | leName |
| 5 | LE_ACCOUNT_NO | accountNo |
| 6 | ACCOUNTING_DATE_ID | accountingDateId |
| 7 | ACCOUNTING_DATE | accountingDate |
| 8 | TRX_DATE | trxDate |
| 9 | CURRENCY | currency |
| 10 | AMOUNT_DR | drAmount |
| 11 | AMOUNT_CR | crAmount |
| 12 | AMOUNT_BALANCE | balanceAmount |
| 13 | CASH_TYPE | cashType |
| 14 | CUSTOMER_LE_ID | customerId |
| 15 | CUSTOMER_ACCOUNT_NAME | customerName |
| 16 | CUSTOMER_ACCOUNT_NO | customerAccountNo |
| 17 | customer_bank | customerBank |
| 18 | customer_reference | customerReference |
| 19 | USER_MEMO | userMemo |
| 20 | BANK_COMMENTS | bankComments |
| 21 | BANK_TRX_NUMBER | bankTrxNumber |
| 22 | BANK | bank |
| 23 | TRX_FLAG | transFlag |
| 24 | TRX_TYPE | transTypeId |
| 25 | EXCEPTION_TYPE | exceptionType |
| 26 | internal_flag | internalFlag |
| 27 | batch_id | batchId |
| 28 | batch_sequence | uploadSequnceNumber |
| 29 | CREATE_DATE | createDate |
| 30 | created_by | createdBy |
| 31 | meta_json | 设置为null |
| 32 | no_balance | isNoBalance |
| 33 | begin_balance | isBeginBalance |
| 34 | end_balance | isEndBalance |
| 35 | override_bs_id | overrideBsId |
| 36 | payment_method | paymentMethod |
| 37 | cret_no | cretNo |
| 38 | group_id | groupId |
| 2 | project_id | 后端按业务写入 |
| 3 | LE_ID | leId |
| 4 | ACCOUNT_ID | accountId |
| 5 | LE_ACCOUNT_NAME | leName |
| 6 | LE_ACCOUNT_NO | accountMaskNo |
| 7 | ACCOUNTING_DATE_ID | accountingDateId |
| 8 | ACCOUNTING_DATE | accountingDate |
| 9 | TRX_DATE | trxDate |
| 10 | CURRENCY | currency |
| 11 | AMOUNT_DR | drAmount |
| 12 | AMOUNT_CR | crAmount |
| 13 | AMOUNT_BALANCE | balanceAmount |
| 14 | CASH_TYPE | cashType |
| 15 | CUSTOMER_LE_ID | customerId |
| 16 | CUSTOMER_ACCOUNT_NAME | customerName |
| 17 | CUSTOMER_ACCOUNT_NO | customerAccountMaskNo |
| 18 | customer_bank | customerBank |
| 19 | customer_reference | customerReference |
| 20 | USER_MEMO | userMemo |
| 21 | BANK_COMMENTS | bankComments |
| 22 | BANK_TRX_NUMBER | bankTrxNumber |
| 23 | BANK | bank |
| 24 | TRX_FLAG | transFlag |
| 25 | TRX_TYPE | transTypeId |
| 26 | EXCEPTION_TYPE | exceptionType |
| 27 | internal_flag | internalFlag |
| 28 | batch_id | batchId |
| 29 | batch_sequence | uploadSequnceNumber |
| 30 | CREATE_DATE | createDate |
| 31 | created_by | createdBy |
| 32 | meta_json | 固定写入 `null` |
| 33 | no_balance | noBalance |
| 34 | begin_balance | beginBalance |
| 35 | end_balance | endBalance |
| 36 | group_id | groupId |
| 37 | override_bs_id | overrideBsId |
| 38 | payment_method | paymentMethod |
| 39 | cret_no | cretNo |

View File

@@ -1,103 +0,0 @@
package com.ruoyi.ccdi.project.log;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.FileAppender;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 文件上传批次日志Appender
* 为每个批次创建独立的日志文件
*
* @author ruoyi
* @date 2026-03-05
*/
@Slf4j
public class FileUploadLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private static final ThreadLocal<FileAppender<ILoggingEvent>> currentAppender = new ThreadLocal<>();
private PatternLayout layout;
@Override
public void start() {
// 初始化日志格式
this.layout = new PatternLayout();
this.layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
this.layout.setContext(getContext());
this.layout.start();
super.start();
log.info("【文件上传日志】FileUploadLogAppender已启动");
}
@Override
protected void append(ILoggingEvent event) {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.doAppend(event);
}
}
/**
* 为指定批次创建独立的日志文件
*
* @param uploadPath ruoyi.profile配置的上传路径
* @param projectId 项目ID
* @param batchId 批次ID
*/
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
try {
// 构建日志文件路径: {ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
String logDirPath = uploadPath + File.separator + "logs" + File.separator
+ "file-upload" + File.separator + projectId;
// 确保目录存在
File logDir = new File(logDirPath);
if (!logDir.exists()) {
logDir.mkdirs();
}
String logFilePath = logDirPath + File.separator + timestamp + ".log";
// 创建FileAppender
FileAppender<ILoggingEvent> appender = new FileAppender<>();
appender.setFile(logFilePath);
PatternLayout layout = new PatternLayout();
layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
layout.setContext(appender.getContext());
layout.start();
appender.setLayout(layout);
appender.setAppend(true);
appender.setContext(appender.getContext());
appender.start();
currentAppender.set(appender);
log.info("【文件上传日志】创建批次日志文件: path={}, batchId={}", logFilePath, batchId);
} catch (Exception e) {
log.error("【文件上传日志】创建批次日志文件失败: projectId={}, batchId={}", projectId, batchId, e);
}
}
/**
* 关闭当前批次的日志文件
*/
public static void closeBatchLogFile() {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.stop();
currentAppender.remove();
log.info("【文件上传日志】关闭批次日志文件");
}
}
}

View File

@@ -21,4 +21,7 @@ public interface CcdiBankStatementMapper extends BaseMapper<CcdiBankStatement> {
* @return 插入记录数
*/
int insertBatch(@Param("list") List<CcdiBankStatement> list);
int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId);
}

View File

@@ -7,7 +7,6 @@ import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.log.FileUploadLogAppender;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
@@ -17,6 +16,7 @@ import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest;
import com.ruoyi.lsfx.domain.response.*;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
@@ -49,6 +49,16 @@ import java.util.concurrent.RejectedExecutionException;
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
private static final int MAX_ERROR_MESSAGE_LENGTH = 2000;
@Data
private static class FetchBankStatementResult {
private boolean success;
private int totalCount;
private int attemptedCount;
private String errorMessage;
}
/**
* 若依框架文件上传路径
*/
@@ -258,12 +268,8 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String batchId) {
log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId);
// 创建批次日志文件
FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);
try {
// 循环提交任务
for (int i = 0; i < tempFilePaths.size(); i++) {
// 循环提交任务
for (int i = 0; i < tempFilePaths.size(); i++) {
// Critical Fix #6: 检查线程中断状态
if (Thread.currentThread().isInterrupted()) {
log.warn("【文件上传】调度线程被中断,停止提交剩余任务");
@@ -307,11 +313,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
}
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
} finally {
// 关闭批次日志文件
FileUploadLogAppender.closeBatchLogFile();
}
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
}
/**
@@ -321,10 +323,34 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setId(recordId);
record.setFileStatus(status);
record.setErrorMessage(errorMessage);
record.setErrorMessage(normalizeErrorMessage(errorMessage));
recordMapper.updateById(record);
}
private void updateFailedRecord(CcdiFileUploadRecord record, String errorMessage) {
record.setFileStatus("parsed_failed");
record.setErrorMessage(normalizeErrorMessage(errorMessage));
recordMapper.updateById(record);
}
private String normalizeErrorMessage(String errorMessage) {
if (!StringUtils.hasText(errorMessage)) {
return null;
}
if (errorMessage.length() <= MAX_ERROR_MESSAGE_LENGTH) {
return errorMessage;
}
return errorMessage.substring(0, MAX_ERROR_MESSAGE_LENGTH);
}
private String trimAccountNo(String value) {
return value == null ? null : value.trim();
}
private void normalizeDedupFields(CcdiBankStatement statement) {
statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo()));
}
/**
* 异步处理单个文件的完整流程
* 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据
@@ -426,24 +452,29 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}",
enterpriseNamesStr, accountNosStr);
// 步骤7:获取流水数据并保存
log.info("【文件上传】步骤7: 获取流水数据");
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
FetchBankStatementResult fetchResult =
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
if (!fetchResult.isSuccess()) {
updateFailedRecord(record, fetchResult.getErrorMessage());
return;
}
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
record.setErrorMessage(null);
recordMapper.updateById(record);
} else {
// 解析失败
log.warn("【文件上传】步骤6: 解析失败: status={}, desc={}", status, uploadStatusDesc);
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
updateFailedRecord(record, "解析失败: " + uploadStatusDesc);
}
log.info("【文件上传】处理完成: fileName={}", record.getFileName());
@@ -524,97 +555,114 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
* @param groupId 流水分析平台项目ID
* @param logId 文件ID
*/
private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId) {
private FetchBankStatementResult fetchAndSaveBankStatements(Long projectId, Integer groupId,
Integer logId) {
log.info("【文件上传】开始获取流水数据: projectId={}, groupId={}, logId={}",
projectId, groupId, logId);
// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1); // 只获取1条用于获取总数
FetchBankStatementResult result = new FetchBankStatementResult();
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
try {
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1);
if (firstResponse == null || firstResponse.getData() == null) {
log.warn("【文件上传】获取流水数据失败: 响应数据为空");
return;
}
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
if (firstResponse == null || firstResponse.getData() == null) {
result.setSuccess(false);
result.setErrorMessage("获取流水数据失败: 响应数据为空");
cleanupBankStatements(projectId, logId);
return result;
}
Integer totalCount = firstResponse.getData().getTotalCount();
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
return;
}
Integer totalCount = firstResponse.getData().getTotalCount();
result.setTotalCount(totalCount == null ? 0 : totalCount);
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
result.setSuccess(true);
return result;
}
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
int totalAttempted = 0;
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
// 步骤2: 计算分页信息
int pageSize = 1000; // 每页1000条
int batchSize = 1000; // 批量插入每批1000条与pageSize保持一致
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages);
log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages);
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
try {
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
int totalSaved = 0;
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
if (response == null || response.getData() == null
|| response.getData().getBankStatementList() == null) {
result.setSuccess(false);
result.setErrorMessage("获取流水数据失败: 第 " + pageNow + " 页响应异常");
cleanupBankStatements(projectId, logId);
return result;
}
// 步骤3: 循环分页获取所有数据
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
try {
// 构建请求参数
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
List<GetBankStatementResponse.BankStatementItem> items =
response.getData().getBankStatementList();
log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size());
// 获取流水数据
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
if (statement != null) {
statement.setBatchId(logId);
statement.setProjectId(projectId);
normalizeDedupFields(statement);
batchList.add(statement);
if (response == null || response.getData() == null
|| response.getData().getBankStatementList() == null) {
log.warn("【文件上传】获取流水数据为空: pageNow={}", pageNow);
continue;
}
List<GetBankStatementResponse.BankStatementItem> items =
response.getData().getBankStatementList();
log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size());
// 转换并收集到批量列表
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
if (statement != null) {
statement.setProjectId(projectId); // 设置业务项目ID
batchList.add(statement);
// 达到批量插入阈值1000条执行插入
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入流水: {}条, 累计{}条",
batchList.size(), totalSaved);
batchList.clear();
if (batchList.size() >= batchSize) {
int currentBatchSize = batchList.size();
bankStatementMapper.insertBatch(batchList);
totalAttempted += currentBatchSize;
log.debug("【文件上传】批量写入流水 {}条", currentBatchSize);
batchList.clear();
}
}
}
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e);
result.setSuccess(false);
result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage());
cleanupBankStatements(projectId, logId);
return result;
}
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e);
// 继续处理下一页,不中断整个流程
}
}
// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入剩余流水: {}条", batchList.size());
}
if (!batchList.isEmpty()) {
int currentBatchSize = batchList.size();
bankStatementMapper.insertBatch(batchList);
totalAttempted += currentBatchSize;
log.debug("【文件上传】批量插入剩余流水 {}条", batchList.size());
}
log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved);
log.info("【文件上传】流水入库完成: fetchedCount={}, attemptedCount={}",
totalCount, totalAttempted);
result.setSuccess(true);
result.setAttemptedCount(totalAttempted);
return result;
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: logId={}", logId, e);
result.setSuccess(false);
result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage());
cleanupBankStatements(projectId, logId);
return result;
}
}
private void cleanupBankStatements(Long projectId, Integer logId) {
bankStatementMapper.deleteByProjectIdAndBatchId(projectId, logId);
}
}

View File

@@ -84,6 +84,14 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
#{item.overrideBsId}, #{item.paymentMethod}, #{item.cretNo}
)
</foreach>
on duplicate key update
bank_statement_id = bank_statement_id
</insert>
<delete id="deleteByProjectIdAndBatchId">
delete from ccdi_bank_statement
where project_id = #{projectId}
and batch_id = #{batchId}
</delete>
</mapper>

View File

@@ -0,0 +1,441 @@
package com.ruoyi.ccdi.project.service.impl;
import ch.qos.logback.classic.Logger;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.response.CheckParseStatusResponse;
import com.ruoyi.lsfx.domain.response.GetBankStatementResponse;
import com.ruoyi.lsfx.domain.response.GetFileUploadStatusResponse;
import com.ruoyi.lsfx.domain.response.UploadFileResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import org.slf4j.LoggerFactory;
import org.springframework.test.util.ReflectionTestUtils;
import java.io.IOException;
import java.lang.reflect.Field;
import java.lang.reflect.Method;
import java.math.BigDecimal;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CcdiFileUploadServiceImplTest {
private static final Long PROJECT_ID = 100L;
private static final Integer LSFX_PROJECT_ID = 200;
private static final Long RECORD_ID = 300L;
private static final Integer LOG_ID = 400;
private static final int MAX_ERROR_MESSAGE_LENGTH = 2000;
@InjectMocks
private CcdiFileUploadServiceImpl service;
@Mock
private CcdiFileUploadRecordMapper recordMapper;
@Mock
private CcdiProjectMapper projectMapper;
@Mock
private LsfxAnalysisClient lsfxClient;
@Mock
private CcdiBankStatementMapper bankStatementMapper;
@Mock
private Executor fileUploadExecutor;
@TempDir
Path tempDir;
@Test
void submitTasksAsync_shouldNotCreateLocalBatchLogFiles() throws Exception {
setField("uploadPath", tempDir.toString());
Path tempFile = createTempFile();
CcdiFileUploadRecord record = buildRecord();
invokeSubmitTasksAsync(List.of(tempFile.toString()), List.of(record), "batch-1");
Path batchLogDir = tempDir.resolve("logs").resolve("file-upload").resolve(String.valueOf(PROJECT_ID));
assertFalse(Files.exists(batchLogDir));
}
@Test
void processFileAsync_shouldKeepParsingUntilBankStatementsSaved() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenThrow(new RuntimeException("bank statement fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
}
@Test
void processFileAsync_shouldMarkSuccessAfterBankStatementsSaved() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenAnswer(invocation -> {
events.add(sequence.incrementAndGet() + ":bank-fetch");
return buildEmptyBankStatementResponse();
});
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
int fetchIndex = findEventIndex(events, "bank-fetch");
int successIndex = findEventIndex(events, "record:parsed_success");
assertTrue(fetchIndex >= 0);
assertTrue(successIndex > fetchIndex);
}
@Test
void processFileAsync_shouldCleanupInsertedStatementsWhenFetchFails() throws IOException {
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenThrow(new RuntimeException("bank statement fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID);
}
@Test
void processFileAsync_shouldFailWhenPagedFetchThrows() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithTotalCount(1))
.thenThrow(new RuntimeException("paged fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID);
}
@Test
void processFileAsync_shouldTruncateLongErrorMessageWhenBankStatementFetchFails() throws IOException {
List<CcdiFileUploadRecord> updates = new ArrayList<>();
captureUpdatedRecords(updates);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenThrow(new RuntimeException("bank statement fetch failed:" + "x".repeat(3000)));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
CcdiFileUploadRecord failedRecord = findLastUpdatedRecordByStatus(updates, "parsed_failed");
assertTrue(failedRecord.getErrorMessage().length() <= MAX_ERROR_MESSAGE_LENGTH);
}
@Test
void processFileAsync_shouldTruncateLongErrorMessageWhenUnexpectedFailureOccurs() throws IOException {
List<CcdiFileUploadRecord> updates = new ArrayList<>();
captureUpdatedRecords(updates);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any()))
.thenThrow(new RuntimeException("upload failed:" + "x".repeat(3000)));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
CcdiFileUploadRecord failedRecord = findLastUpdatedRecordByStatus(updates, "parsed_failed");
assertTrue(failedRecord.getErrorMessage().length() <= MAX_ERROR_MESSAGE_LENGTH);
}
@Test
void fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert() throws IOException {
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 "))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem(" 62220001 "))));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
verify(bankStatementMapper).insertBatch(any());
verify(bankStatementMapper).insertBatch(org.mockito.ArgumentMatchers.argThat(list ->
list.size() == 1 && "62220001".equals(list.get(0).getLeAccountNo())));
}
@Test
void fetchAndSaveBankStatements_shouldLogConservativeCountsWhenAffectedRowsAreAmbiguous() {
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))));
when(bankStatementMapper.insertBatch(any())).thenReturn(1);
Logger logger = (Logger) LoggerFactory.getLogger(CcdiFileUploadServiceImpl.class);
ListAppender<ILoggingEvent> logAppender = new ListAppender<>();
logAppender.start();
logger.addAppender(logAppender);
try {
Object result = ReflectionTestUtils.invokeMethod(
service,
"fetchAndSaveBankStatements",
PROJECT_ID,
LSFX_PROJECT_ID,
LOG_ID
);
assertTrue(Boolean.TRUE.equals(ReflectionTestUtils.getField(result, "success")));
assertEquals(1, ReflectionTestUtils.getField(result, "totalCount"));
assertEquals(1, ReflectionTestUtils.getField(result, "attemptedCount"));
assertTrue(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage)
.anyMatch(message -> message.contains("流水入库完成: fetchedCount=1, attemptedCount=1")));
assertFalse(logAppender.list.stream().map(ILoggingEvent::getFormattedMessage)
.anyMatch(message -> message.contains("insertedCount=")));
} finally {
logger.detachAppender(logAppender);
}
}
@Test
void processFileAsync_shouldMarkParsedFailedWhenInsertBatchThrowsUnexpectedSqlError() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))))
.thenReturn(buildBankStatementResponseWithItems(1, List.of(buildBankStatementItem("62220001"))));
when(bankStatementMapper.insertBatch(any()))
.thenThrow(new RuntimeException("sql syntax error"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
}
private void captureRecordStatus(List<String> events, AtomicInteger sequence) {
doAnswer(invocation -> {
CcdiFileUploadRecord record = invocation.getArgument(0);
events.add(sequence.incrementAndGet() + ":record:" + record.getFileStatus());
return 1;
}).when(recordMapper).updateById(any(CcdiFileUploadRecord.class));
}
private void captureUpdatedRecords(List<CcdiFileUploadRecord> updates) {
doAnswer(invocation -> {
CcdiFileUploadRecord record = invocation.getArgument(0);
CcdiFileUploadRecord snapshot = new CcdiFileUploadRecord();
snapshot.setId(record.getId());
snapshot.setFileStatus(record.getFileStatus());
snapshot.setErrorMessage(record.getErrorMessage());
updates.add(snapshot);
return 1;
}).when(recordMapper).updateById(any(CcdiFileUploadRecord.class));
}
private CcdiFileUploadRecord buildRecord() {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setId(RECORD_ID);
record.setProjectId(PROJECT_ID);
record.setLsfxProjectId(LSFX_PROJECT_ID);
record.setFileName("test.xlsx");
record.setFileStatus("uploading");
return record;
}
private Path createTempFile() throws IOException {
Path tempFile = Files.createTempFile(tempDir, "upload-", ".xlsx");
Files.writeString(tempFile, "test");
return tempFile;
}
private UploadFileResponse buildUploadResponse() {
UploadFileResponse.UploadLogItem logItem = new UploadFileResponse.UploadLogItem();
logItem.setLogId(LOG_ID);
UploadFileResponse.UploadData uploadData = new UploadFileResponse.UploadData();
uploadData.setUploadLogList(List.of(logItem));
UploadFileResponse response = new UploadFileResponse();
response.setData(uploadData);
return response;
}
private CheckParseStatusResponse buildCheckParseStatusResponse(boolean parsing) {
CheckParseStatusResponse.ParseStatusData data = new CheckParseStatusResponse.ParseStatusData();
data.setParsing(parsing);
CheckParseStatusResponse response = new CheckParseStatusResponse();
response.setData(data);
return response;
}
private GetFileUploadStatusResponse buildParsedSuccessStatusResponse() {
GetFileUploadStatusResponse.LogItem logItem = new GetFileUploadStatusResponse.LogItem();
logItem.setStatus(-5);
logItem.setUploadStatusDesc("data.wait.confirm.newaccount");
logItem.setEnterpriseNameList(List.of("测试主体"));
logItem.setAccountNoList(List.of("62220001"));
GetFileUploadStatusResponse.FileUploadStatusData data =
new GetFileUploadStatusResponse.FileUploadStatusData();
data.setLogs(List.of(logItem));
GetFileUploadStatusResponse response = new GetFileUploadStatusResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse buildEmptyBankStatementResponse() {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(0);
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse buildBankStatementResponseWithTotalCount(int totalCount) {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(totalCount);
data.setBankStatementList(List.of());
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private void invokeSubmitTasksAsync(List<String> tempFilePaths,
List<CcdiFileUploadRecord> records,
String batchId) throws Exception {
Method method = CcdiFileUploadServiceImpl.class.getDeclaredMethod("submitTasksAsync",
Long.class, Integer.class, List.class, List.class, String.class);
method.setAccessible(true);
method.invoke(service, PROJECT_ID, LSFX_PROJECT_ID, tempFilePaths, records, batchId);
}
private void setField(String fieldName, Object value) throws Exception {
Field field = CcdiFileUploadServiceImpl.class.getDeclaredField(fieldName);
field.setAccessible(true);
field.set(service, value);
}
private GetBankStatementResponse buildBankStatementResponseWithItems(
int totalCount,
List<GetBankStatementResponse.BankStatementItem> items
) {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(totalCount);
data.setBankStatementList(items);
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse.BankStatementItem buildBankStatementItem(String accountMaskNo) {
GetBankStatementResponse.BankStatementItem item = new GetBankStatementResponse.BankStatementItem();
item.setAccountMaskNo(accountMaskNo);
item.setAccountingDateId(20260310);
item.setDrAmount(new BigDecimal("100.00"));
item.setCrAmount(new BigDecimal("0.00"));
return item;
}
private int findEventIndex(List<String> events, String suffix) {
for (int i = 0; i < events.size(); i++) {
if (events.get(i).endsWith(suffix)) {
return i;
}
}
return -1;
}
private CcdiFileUploadRecord findLastUpdatedRecordByStatus(List<CcdiFileUploadRecord> updates,
String status) {
for (int i = updates.size() - 1; i >= 0; i--) {
if (status.equals(updates.get(i).getFileStatus())) {
return updates.get(i);
}
}
throw new AssertionError("未找到状态为 " + status + " 的更新记录");
}
}

View File

@@ -0,0 +1,313 @@
# 银行流水入库重复校验设计
## 概述
`fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId)` 中,
接口返回的银行流水写入 `ccdi_bank_statement` 前,需要基于业务键避免重复插入。
本次确认的重复判定键为:
- `project_id`
- `LE_ACCOUNT_NO`
- `ACCOUNTING_DATE_ID`
- `AMOUNT_DR`
- `AMOUNT_CR`
目标是将“什么叫重复”尽量固化到数据库约束层,服务层只负责轻量标准化和保留现有异步处理链路。
## 背景
当前实现位于 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
1. 分页调用流水分析接口获取流水数据
2. 将返回项转换为 `CcdiBankStatement`
3. 直接调用 `CcdiBankStatementMapper.insertBatch(...)` 批量入库
现状没有任何重复校验,重复导入同一批流水时会再次插入。
已确认的业务边界:
- 接口返回的同一批流水自身不会重复
- 接口返回的 `LE_ACCOUNT_NO``ACCOUNTING_DATE_ID``AMOUNT_DR``AMOUNT_CR` 不会是 `null`
- 如果数据库里已存在相同业务键的流水,保留原记录,不更新原数据
- 命中重复不应让整次文件处理失败
## 方案对比
### 方案一:服务层先查库再插入
做法:
- 服务层先按业务键查库
- 过滤已存在记录
- 仅插入剩余记录
优点:
- 语义直观
- 不需要调整批量插入 SQL
缺点:
- 规则只在当前入口生效,其他写入入口仍可能写入重复数据
- 并发导入时存在竞态窗口
- 代码和 SQL 都会变复杂
### 方案二:数据库唯一键 + no-op upsert
做法:
- 对业务键加唯一约束
- 批量插入改为 `INSERT ... ON DUPLICATE KEY UPDATE bank_statement_id = bank_statement_id`
- 服务层只做必要的字段标准化
优点:
- 重复规则由数据库统一约束
- 并发下稳定
- 代码改动集中且可控
缺点:
- 需要先处理测试库中的存量异常/重复数据
- `ON DUPLICATE KEY` 的受影响行数语义需要在本地 MySQL 实测确认
### 方案三:`INSERT IGNORE`
做法:
- 数据库加唯一键
- 批量插入改为 `INSERT IGNORE`
优点:
- SQL 最短
- 重复会被自动跳过
缺点:
- 可能连非重复键类的数据问题也一起吞掉
- 不利于保留真实错误
## 最终方案
采用方案二:`数据库唯一键 + 写入前标准化 + no-op upsert`
核心决策:
1. 不做本次接口结果的内存去重
2. 去重定义整体切换为 `project_id + LE_ACCOUNT_NO + ACCOUNTING_DATE_ID + AMOUNT_DR + AMOUNT_CR`
3. 服务层只保留与新去重键相关的轻量标准化
4. 数据库侧增加唯一键统一兜底重复规则
5. 命中重复时跳过写入,不更新原有业务数据
6. 非重复键类数据库错误仍然向上抛出,并按现有流程标记 `parsed_failed`
## 详细设计
### 1. 去重键定义
重复判定使用以下五元组:
```text
project_id + LE_ACCOUNT_NO + ACCOUNTING_DATE_ID + AMOUNT_DR + AMOUNT_CR
```
对应到 Java 字段为:
- `projectId`
- `leAccountNo`
- `accountingDateId`
- `amountDr`
- `amountCr`
旧设计中的 `LE_ACCOUNT_NAME``TRX_DATE``CUSTOMER_ACCOUNT_NAME``AMOUNT_BALANCE`
不再参与重复判定。
### 2. 服务层标准化
`CcdiFileUploadServiceImpl.fetchAndSaveBankStatements(...)` 中,
`CcdiBankStatement.fromResponse(...)` 返回实体后,只保留与新去重键相关的标准化:
- `leAccountNo = leAccountNo.trim()`
- `accountingDateId` 保持接口返回值
- `amountDr``amountCr` 保持 `BigDecimal` 语义写入数据库 `decimal(19,2)`
- `projectId` 继续由服务层显式设置
建议新增私有辅助方法,例如:
```java
private String trimAccountNo(String value) {
return value == null ? null : value.trim();
}
private void normalizeDedupFields(CcdiBankStatement statement) {
statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo()));
}
```
说明:
- 因为接口已保证 `LE_ACCOUNT_NO``ACCOUNTING_DATE_ID``AMOUNT_DR``AMOUNT_CR` 不为 `null`
服务层不再额外承担空值回填逻辑
- 标准化的目标是避免账号前后空格导致同一条流水被误判为不同记录
### 3. 数据库结构调整
为保证唯一键对所有写入入口都有效,需要先清理测试库中的异常数据,再加唯一键。
测试库迁移步骤:
1. 删除 `project_id``LE_ACCOUNT_NO``ACCOUNTING_DATE_ID` 缺失的测试数据
2.`LE_ACCOUNT_NO` 执行 `TRIM`
3. 按新五元组清理已存在的重复测试数据,只保留一条
4.`project_id``LE_ACCOUNT_NO``ACCOUNTING_DATE_ID` 收紧为 `NOT NULL`
5. 新增唯一键
建议迁移脚本内容包含:
```sql
DELETE FROM ccdi_bank_statement
WHERE project_id IS NULL
OR LE_ACCOUNT_NO IS NULL
OR ACCOUNTING_DATE_ID IS NULL;
UPDATE ccdi_bank_statement
SET LE_ACCOUNT_NO = TRIM(LE_ACCOUNT_NO);
DELETE t1
FROM ccdi_bank_statement t1
JOIN ccdi_bank_statement t2
ON t1.bank_statement_id > t2.bank_statement_id
AND t1.project_id = t2.project_id
AND t1.LE_ACCOUNT_NO = t2.LE_ACCOUNT_NO
AND t1.ACCOUNTING_DATE_ID = t2.ACCOUNTING_DATE_ID
AND t1.AMOUNT_DR = t2.AMOUNT_DR
AND t1.AMOUNT_CR = t2.AMOUNT_CR;
ALTER TABLE ccdi_bank_statement
MODIFY COLUMN project_id bigint(20) NOT NULL COMMENT '关联项目ID',
MODIFY COLUMN LE_ACCOUNT_NO varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
MODIFY COLUMN ACCOUNTING_DATE_ID int(11) NOT NULL COMMENT '账号日期ID';
ALTER TABLE ccdi_bank_statement
ADD UNIQUE KEY uk_bank_statement_dedup (
project_id,
LE_ACCOUNT_NO,
ACCOUNTING_DATE_ID,
AMOUNT_DR,
AMOUNT_CR
);
```
备注:
- `AMOUNT_DR``AMOUNT_CR` 在现有表设计中已是 `NOT NULL DEFAULT 0.00`
- `project_id` 是当前业务写入必填字段,迁移前应确认测试库不存在空值
- 由于库未上线、测试数据可调整,删除不完整测试数据是可接受方案
### 4. Mapper SQL 调整
`CcdiBankStatementMapper.xml` 中的批量插入改为 no-op upsert
```sql
INSERT INTO ccdi_bank_statement (...)
VALUES
(...),
(...)
ON DUPLICATE KEY UPDATE
bank_statement_id = bank_statement_id
```
语义说明:
- 新记录:正常插入
- 重复记录:命中唯一键,走 duplicate 分支,但不改任何业务字段
- 非重复键类 SQL 错误:仍然抛出异常
这样满足“保留原数据,不进行更新”的业务要求。
### 5. 日志与统计
服务层日志增加三类计数:
- 接口返回数
- 实际新增数
- 重复跳过数
这里有一个实现细节需要在本地 MySQL 上确认:
- `ON DUPLICATE KEY UPDATE bank_statement_id = bank_statement_id`
在 MySQL/JDBC 下的受影响行数,是否稳定返回“新增 1、重复 0”
如果实测成立,则可以直接计算:
```text
重复跳过数 = 尝试写入数 - 实际新增数
```
如果实测不稳定,则降级为保守日志,不伪造精确的重复数。
### 6. 异常处理
命中重复不应视为失败。
处理规则:
- 命中唯一键重复:不抛业务失败,继续处理后续批次
- 真实数据库错误:保持现有异常传播路径
- 外层 `processFileAsync(...)` 捕获真实异常后,仍更新上传记录为 `parsed_failed`
### 7. 文档同步
当前 `assets/对接流水分析/ccdi_bank_statement.md` 中的建表说明与现有实体/Mapper 已有漂移,
例如当前代码已经使用 `project_id`,而该文档片段未体现。
本次实现后应同步更新以下文档,避免数据库说明继续失真:
- `assets/对接流水分析/ccdi_bank_statement.md`
- 如有必要,同步 `docs/plans/2026-03-04-bank-statement-entity-design.md` 中的表结构补充说明
## 影响范围
后端代码:
- `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
- `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java`
- `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml`
测试:
- `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
数据库与文档:
- 建议新增测试库迁移脚本到 `assets/database/`
- `assets/对接流水分析/ccdi_bank_statement.md`
## 测试设计
至少覆盖以下场景:
1. 标准化逻辑:`LE_ACCOUNT_NO` 前后空格
2. 首次导入:记录正常插入
3. 重复导入:不报错、不更新原记录
4. 混合批次:重复记录跳过,新增记录写入
5. 非唯一键类数据库异常:仍然向上抛出并触发 `parsed_failed`
6. 本地 MySQL 验证:确认 no-op upsert 的受影响行数语义
## 风险与缓解
| 风险 | 影响 | 缓解方案 |
|------|------|----------|
| 测试库已有异常/重复数据,新增唯一键失败 | 高 | 先清洗异常行和重复行,再加唯一键 |
| `project_id` / `LE_ACCOUNT_NO` / `ACCOUNTING_DATE_ID` 空值绕过唯一键语义 | 高 | 迁移时删除异常测试数据并收紧为 `NOT NULL` |
| `ON DUPLICATE KEY` 受影响行数语义与预期不一致 | 中 | 实测后决定日志计数方案,不影响去重正确性 |
| 资产文档与代码继续漂移 | 中 | 实现后同步更新表结构说明 |
## 验收标准
1. 使用相同五元组重复导入时,数据库仅保留原记录
2. 重复导入不会更新原记录的任何业务字段
3. 命中重复不会导致上传记录失败
4. 非重复键类数据库错误仍会让上传记录进入 `parsed_failed`
5. 唯一键规则对后续其他写入入口同样生效

View File

@@ -0,0 +1,420 @@
# 银行流水重复校验 Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:**`fetchAndSaveBankStatements(...)` 增加基于 `project_id + LE_ACCOUNT_NO + ACCOUNTING_DATE_ID + AMOUNT_DR + AMOUNT_CR` 的数据库级重复校验,确保重复流水跳过插入且保留原数据不变。
**Architecture:** 先清理测试库中的异常行和重复行,再为 `ccdi_bank_statement` 增加新唯一键。业务代码只负责对 `LE_ACCOUNT_NO` 做轻量标准化,并将批量插入改为 no-op upsert真实数据库错误仍按现有异步文件处理链路向上抛出。
**Tech Stack:** Java 21, Spring Boot 3, MyBatis Plus, MySQL, JUnit 5, Mockito
---
### Task 1: 固化新去重键的标准化失败测试
**Files:**
- Modify: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
**Step 1: Write the failing test**
在现有 `CcdiFileUploadServiceImplTest` 中新增一个聚焦标准化的测试。通过 mock `lsfxClient.getBankStatement(...)` 返回一条带空白账号的流水,并捕获传给 `bankStatementMapper.insertBatch(...)` 的实体。
```java
@Test
void fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert() {
// arrange
// leAccountNo = " 62220001 "
// accountingDateId = 20260310
// amountDr = new BigDecimal("100.00")
// amountCr = new BigDecimal("0.00")
// act
// assert
verify(bankStatementMapper).insertBatch(argThat(list ->
"62220001".equals(list.get(0).getLeAccountNo())));
}
```
**Step 2: Run test to verify it fails**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert
```
Expected:
- FAIL
- 失败原因体现当前实现尚未对 `LE_ACCOUNT_NO``trim`
**Step 3: Write the minimal implementation**
`CcdiFileUploadServiceImpl` 内增加最小辅助方法:
```java
private String trimAccountNo(String value) {
return value == null ? null : value.trim();
}
private void normalizeDedupFields(CcdiBankStatement statement) {
statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo()));
}
```
并在 `fromResponse(...)` 结果加入批次列表前调用。
**Step 4: Run test to verify it passes**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#fetchAndSaveBankStatements_shouldTrimLeAccountNoBeforeInsert
```
Expected:
- PASS
**Step 5: Commit**
```bash
git add ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java
git commit -m "test(ccdi-project): cover bank statement account no normalization"
```
### Task 2: 用失败测试锁定“重复不失败”的服务层语义
**Files:**
- Modify: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
**Step 1: Write the failing test**
新增一个测试,模拟 Mapper 在重复导入场景下不抛异常,并验证上传记录最终不会因为重复而进入 `parsed_failed`
```java
@Test
void processFileAsync_shouldNotFailWhenDuplicateStatementsAreSkipped() {
// mock upload success / parse success
// mock bankStatementMapper.insertBatch(...) 返回“未报错的重复跳过结果”
// assert recordMapper 不会收到 parsed_failed
}
```
**Step 2: Run test to verify it fails**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#processFileAsync_shouldNotFailWhenDuplicateStatementsAreSkipped
```
Expected:
- FAIL
- 失败点应体现当前实现还没有为重复跳过设计明确语义或日志
**Step 3: Write minimal implementation**
只做本任务所需的最小改动:
-`fetchAndSaveBankStatements(...)` 中补充接口返回数、尝试写入数的日志
- 保持 duplicate 场景不抛异常
- 不调整其他与重复无关的异常路径
示例日志:
```java
log.info("【文件上传】流水入库完成: fetchedCount={}, attemptedCount={}, insertedCount={}, duplicateCount={}",
fetchedCount, attemptedCount, insertedCount, duplicateCount);
```
**Step 4: Run the test to verify it passes**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#processFileAsync_shouldNotFailWhenDuplicateStatementsAreSkipped
```
Expected:
- PASS
**Step 5: Commit**
```bash
git add ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java
git commit -m "test(ccdi-project): keep duplicate bank statements from failing uploads"
```
### Task 3: 为测试库编写数据清洗和唯一键迁移脚本
**Files:**
- Create: `assets/database/2026-03-10-bank-statement-dedup.sql`
- Modify: `assets/对接流水分析/ccdi_bank_statement.md`
**Step 1: Write the migration script**
创建测试库迁移脚本,包含以下 SQL
```sql
DELETE FROM ccdi_bank_statement
WHERE project_id IS NULL
OR LE_ACCOUNT_NO IS NULL
OR ACCOUNTING_DATE_ID IS NULL;
UPDATE ccdi_bank_statement
SET LE_ACCOUNT_NO = TRIM(LE_ACCOUNT_NO);
DELETE t1
FROM ccdi_bank_statement t1
JOIN ccdi_bank_statement t2
ON t1.bank_statement_id > t2.bank_statement_id
AND t1.project_id = t2.project_id
AND t1.LE_ACCOUNT_NO = t2.LE_ACCOUNT_NO
AND t1.ACCOUNTING_DATE_ID = t2.ACCOUNTING_DATE_ID
AND t1.AMOUNT_DR = t2.AMOUNT_DR
AND t1.AMOUNT_CR = t2.AMOUNT_CR;
ALTER TABLE ccdi_bank_statement
MODIFY COLUMN project_id bigint(20) NOT NULL COMMENT '关联项目ID',
MODIFY COLUMN LE_ACCOUNT_NO varchar(240) NOT NULL DEFAULT '' COMMENT '企业银行账号',
MODIFY COLUMN ACCOUNTING_DATE_ID int(11) NOT NULL COMMENT '账号日期ID';
ALTER TABLE ccdi_bank_statement
ADD UNIQUE KEY uk_bank_statement_dedup (
project_id,
LE_ACCOUNT_NO,
ACCOUNTING_DATE_ID,
AMOUNT_DR,
AMOUNT_CR
);
```
**Step 2: Manually run the migration in the local test database**
Run:
```bash
mysql -u <user> -p <database> < assets/database/2026-03-10-bank-statement-dedup.sql
```
Expected:
- SQL 全部执行成功
- `SHOW INDEX FROM ccdi_bank_statement;` 能看到 `uk_bank_statement_dedup`
**Step 3: Update the bank statement schema note**
同步更新 `assets/对接流水分析/ccdi_bank_statement.md`,补齐:
- `project_id`
- 新唯一键说明
- `project_id` / `LE_ACCOUNT_NO` / `ACCOUNTING_DATE_ID` 的非空语义
**Step 4: Verify migration result**
Run:
```bash
mysql -u <user> -p -e "SHOW CREATE TABLE ccdi_bank_statement\G"
```
Expected:
- 表结构包含新的唯一键
- `project_id``LE_ACCOUNT_NO``ACCOUNTING_DATE_ID``NOT NULL`
**Step 5: Commit**
```bash
git add assets/database/2026-03-10-bank-statement-dedup.sql assets/对接流水分析/ccdi_bank_statement.md
git commit -m "feat(database): add bank statement dedup unique key"
```
### Task 4: 将 Mapper 批量插入改为 no-op upsert
**Files:**
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java`
- Modify: `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml`
**Step 1: Write the failing verification case**
先在本地 MySQL 中准备两次相同业务键的数据,第二次执行当前批量插入 SQL确认现状会抛唯一键冲突。
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#processFileAsync_shouldNotFailWhenDuplicateStatementsAreSkipped
```
Expected:
- FAIL 或本地重复场景仍未被正确跳过
**Step 2: Change the insert SQL**
将 XML 中的批量插入改为:
```xml
<insert id="insertBatch" parameterType="java.util.List">
insert into ccdi_bank_statement (...)
values
<foreach collection="list" item="item" separator=",">
(...)
</foreach>
on duplicate key update
bank_statement_id = bank_statement_id
</insert>
```
**Step 3: Keep mapper signature unchanged**
保持 `CcdiBankStatementMapper.insertBatch(@Param("list") List<CcdiBankStatement> list)` 不变,避免扩大调用面。
**Step 4: Run the targeted test suite**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest
```
Expected:
- PASS
**Step 5: Manual duplicate verification**
在本地测试库重复导入同一批数据后执行:
```bash
mysql -u <user> -p -e "SELECT COUNT(*) FROM ccdi_bank_statement WHERE project_id = <projectId> AND LE_ACCOUNT_NO = '62220001' AND ACCOUNTING_DATE_ID = 20260310 AND AMOUNT_DR = 100.00 AND AMOUNT_CR = 0.00;"
```
Expected:
- 结果始终为 `1`
**Step 6: Commit**
```bash
git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml
git commit -m "feat(ccdi-project): skip duplicate bank statements on insert"
```
### Task 5: 校准日志计数并验证 MySQL 受影响行数语义
**Files:**
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
- Modify: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
**Step 1: Add a temporary probe in the service**
在开发阶段先打印:
```java
log.info("【文件上传】dedup probe: batchSize={}, mapperAffectedRows={}", batchList.size(), affectedRows);
```
**Step 2: Run a duplicate import manually**
使用相同测试文件连续导入两次,并观察第二次日志。
Expected:
- 能确认 duplicate 分支下 MyBatis/JDBC 返回的 `affectedRows` 语义
**Step 3: Finalize the counting logic**
如果实测 duplicate 返回 `0`,则直接落正式逻辑:
```java
insertedCount += affectedRows;
duplicateCount += batchSize - affectedRows;
```
如果实测不稳定,则不要伪造精确计数,改为保守日志:
```java
log.info("【文件上传】流水入库完成: fetchedCount={}, attemptedCount={}", fetchedCount, attemptedCount);
```
**Step 4: Run the test suite**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest
```
Expected:
- PASS
**Step 5: Remove temporary probe and commit**
```bash
git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
git commit -m "refactor(ccdi-project): finalize bank statement dedup logging"
```
### Task 6: 做最终回归验证并整理交付
**Files:**
- Review: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
- Review: `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml`
- Review: `assets/database/2026-03-10-bank-statement-dedup.sql`
- Review: `assets/对接流水分析/ccdi_bank_statement.md`
**Step 1: Run automated tests**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest
```
Expected:
- PASS
**Step 2: Run a focused compile**
Run:
```bash
mvn clean compile -pl ccdi-project -am
```
Expected:
- BUILD SUCCESS
**Step 3: Execute manual acceptance checks**
验证以下结果:
- 首次导入:数据写入成功
- 第二次导入同一数据:原记录不变,数量不增加
- 非重复数据再次导入:仅新增新记录
- 制造非唯一键类 SQL 错误:上传记录进入 `parsed_failed`
**Step 4: Prepare the delivery summary**
总结:
- 唯一键是否已生效
- 重复导入是否跳过
- 原数据是否保持不变
- 日志计数是否为精确值还是保守值
**Step 5: Commit**
```bash
git status --short
git add <verified files>
git commit -m "docs: finalize bank statement dedup verification notes"
```

View File

@@ -0,0 +1,263 @@
# 流水文件解析成功状态延后到流水入库完成设计
## 概述
调整流水文件上传异步处理链路中“解析成功”的业务含义。
当前实现里,只要流水分析平台返回“解析成功且可确认账户”,系统就会立即把上传记录状态更新为 `parsed_success`,随后才执行步骤 7 获取流水数据并写入本地数据库。
本次设计将 `parsed_success` 的含义收紧为:
- 流水分析平台解析成功
- 步骤 7 获取流水数据成功
- 流水数据成功写入本地数据库
在步骤 7 完成前,页面继续显示“解析中”。
## 背景
### 当前现状
当前核心逻辑位于 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
1. 上传文件到流水分析平台
2. 轮询解析状态
3. 调用上传状态接口判断是否解析成功
4. 立即更新 `ccdi_file_upload_record.file_status = parsed_success`
5. 再调用 `fetchAndSaveBankStatements(...)` 获取流水并入库
这会产生两个问题:
1. 前端会看到“解析成功”,但数据库里的流水可能还没有写完
2. 步骤 7 失败时,记录可能先显示成功,随后又因为异常被改成失败,状态语义不稳定
### 前端约束
当前前端只识别以下四种状态:
- `uploading`
- `parsing`
- `parsed_success`
- `parsed_failed`
本次需求明确要求:
- 不新增前端状态
- 当步骤 6 已经确认平台解析成功,但步骤 7 尚未完成时,页面继续显示“解析中”
## 方案对比
### 方案一:仅后移 `parsed_success` 更新时机
做法:
- 步骤 6 解析成功后,不更新状态
- 执行步骤 7
- 步骤 7 执行结束后,再更新为 `parsed_success`
优点:
- 改动最小
- 前端和数据库状态枚举都不需要调整
缺点:
- 当前 `fetchAndSaveBankStatements(...)` 没有显式返回成功或失败结果
- 方法内部存在“记录异常后继续处理”的行为,容易把部分失败误判为成功
### 方案二:后移成功状态,并让步骤 7 返回明确执行结果
做法:
- 步骤 6 只确认“平台解析成功且可以获取流水”
- 记录状态继续保持 `parsing`
- 步骤 7 返回结构化结果,例如 `success``savedCount``errorMessage`
- 只有步骤 7 明确成功后,才更新 `parsed_success`
- 步骤 7 任一关键失败,则更新为 `parsed_failed`
优点:
- 状态语义完整且稳定
- 能避免“伪成功”
- 与当前前端状态模型兼容
缺点:
- 需要对步骤 7 做一定重构
### 方案三:拆分为解析状态和入库状态两个维度
做法:
- 新增“解析状态”和“入库状态”两个字段
- 前端组合展示
优点:
- 状态表达最完整
缺点:
- 涉及数据库、后端查询统计、前端状态映射等多处改动
- 超出本次需求范围
## 最终方案
采用方案二。
### 核心决策
1. `parsed_success` 只表示“流水数据已经成功入库”
2. 步骤 6 解析成功后,记录状态继续保持 `parsing`
3. 步骤 7 必须显式返回成功或失败结果
4. 步骤 7 失败时,将上传记录更新为 `parsed_failed`
5. 步骤 7 失败时,清理本次 `logId` 对应的已落库流水,避免半成品数据残留
## 详细设计
### 1. 主流程状态流转
调整 `processFileAsync(...)` 的状态流转如下:
1. 初始创建记录时为 `uploading`
2. 文件上传到流水分析平台成功后,更新为 `parsing`
3. 轮询解析完成
4. 调用文件上传状态接口判断平台是否解析成功
5. 若平台解析失败,更新为 `parsed_failed`
6. 若平台解析成功,不更新为 `parsed_success`,继续保持 `parsing`
7. 执行步骤 7 获取流水并入库
8. 步骤 7 成功后,一次性更新:
- `file_status = parsed_success`
- `enterprise_names`
- `account_nos`
- 清空可能残留的 `error_message`
9. 步骤 7 失败后,更新:
- `file_status = parsed_failed`
- `error_message = 失败原因`
### 2. 步骤 7 返回结构化结果
`fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId)``void` 改为返回结构化结果对象。
建议新增内部结果对象,例如:
```java
@Data
private static class FetchBankStatementResult {
private boolean success;
private int totalCount;
private int savedCount;
private String errorMessage;
}
```
返回语义建议如下:
- `success = true`
- 已成功完成全部分页拉取和数据库落库
- `savedCount` 为实际保存条数
- `success = false`
- 任一关键步骤失败
- `errorMessage` 写明失败原因
### 3. 步骤 7 的成功判定
步骤 7 需同时满足以下条件才算成功:
1. 首次 `getBankStatement` 请求成功返回
2. 分页总数计算正常
3. 所有分页请求成功完成
4. 所有批量插入操作成功完成
5. 最终保存条数与已拉取条数一致
其中 `totalCount = 0` 的场景按成功处理,原因如下:
- 平台已经解析成功
- 业务上允许“解析成功但无流水”
- 否则记录会长期停留在 `parsing` 或被错误标记为失败
### 4. 步骤 7 的失败处理
当前实现中,分页循环内部发生异常后会记录日志并继续下一页。该行为不适用于本次状态语义。
调整后规则:
1. 首次查询总数失败,直接返回失败
2. 任一分页请求失败,直接返回失败
3. 任一批量插入失败,直接返回失败
4. 返回失败前,清理当前 `logId` 已写入的流水数据
### 5. 半成品流水清理
`ccdi_bank_statement` 已存在 `batch_id` 字段,且当前实体 `CcdiBankStatement.batchId` 已映射该字段。
因此步骤 7 中应确保每条流水都带上本次上传的 `logId`
```java
statement.setProjectId(projectId);
statement.setBatchId(logId);
```
同时在 `CcdiBankStatementMapper` 中新增清理接口,例如:
```java
int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId);
```
用于在步骤 7 失败时删除本次已插入的流水,避免出现“部分落库但上传记录失败”的脏数据。
### 6. 为什么不使用长事务
不建议把步骤 7 做成覆盖远程接口调用和全部分页落库的单个数据库事务,原因如下:
1. 远程接口调用时间不可控
2. 全量分页获取可能持续较久
3. 长事务会占用数据库连接并增加锁持有时间
因此本次采用“显式成功判定 + 失败补偿清理”的方式,而不是“长事务回滚”。
## 影响范围
### 后端
- `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`
- `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java`
- `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml`
### 测试
- 新增 `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
### 前端
无须改动。
当前前端的 `parsing` 状态即可承载“平台解析成功但流水尚未入库完成”的阶段。
## 测试设计
至少覆盖以下场景:
1. 平台解析成功,步骤 7 全量拉取并入库成功,最终状态应为 `parsed_success`
2. 平台解析成功,但首次获取流水总数失败,最终状态应为 `parsed_failed`
3. 分页处理中途失败,最终状态应为 `parsed_failed`,且已写入流水被清理
4. 批量插入失败,最终状态应为 `parsed_failed`,且已写入流水被清理
5. `totalCount = 0`,最终状态应为 `parsed_success`
6. 平台解析失败,保持现有失败路径
## 风险与缓解
| 风险 | 影响 | 缓解方案 |
|------|------|----------|
| 步骤 7 重构后改变现有异常处理行为 | 中 | 使用单元测试锁定成功、失败、零数据三类分支 |
| 清理逻辑误删其他流水 | 高 | 删除条件必须同时绑定 `projectId``batchId(logId)` |
| 失败原因不清晰 | 中 | 统一由步骤 7 返回明确 `errorMessage`,最终写入 `ccdi_file_upload_record.error_message` |
## 验收标准
1. 当流水平台解析成功但本地仍在入库时,上传记录保持 `parsing`
2. 只有本地流水入库完成后,上传记录才变为 `parsed_success`
3. 任一步骤 7 失败,上传记录为 `parsed_failed`
4. 步骤 7 失败后,不残留本次 `logId` 的半成品流水
5. 前端无需新增状态,现有页面展示符合预期

View File

@@ -0,0 +1,362 @@
# 流水文件解析成功状态延后到流水入库完成 Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 让流水文件上传记录只有在步骤 7 获取并保存流水数据成功后才更新为 `parsed_success`,在此之前继续显示 `parsing`
**Architecture:** 重构 `CcdiFileUploadServiceImpl` 的步骤 7使其返回结构化执行结果而不是吞异常主流程基于该结果决定最终状态。使用 `ccdi_bank_statement.batch_id` 绑定本次上传 `logId`,在步骤 7 失败时通过 Mapper 补偿删除本次已写入流水,避免半成品数据残留。
**Tech Stack:** Java 21, Spring Boot 3, MyBatis Plus, JUnit 5, Mockito来自 `spring-boot-starter-test`
---
### Task 1: 为状态延后规则编写服务层失败测试
**Files:**
- Create: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java:340-619`
**Step 1: Write the failing test**
在新测试类中先写“平台解析成功但步骤 7 失败时,记录最终为 `parsed_failed`”的测试。使用 Mockito mock 以下依赖:
- `LsfxAnalysisClient`
- `CcdiFileUploadRecordMapper`
- `CcdiBankStatementMapper`
示例骨架:
```java
@ExtendWith(MockitoExtension.class)
class CcdiFileUploadServiceImplTest {
@InjectMocks
private CcdiFileUploadServiceImpl service;
@Mock
private CcdiFileUploadRecordMapper recordMapper;
@Mock
private CcdiProjectMapper projectMapper;
@Mock
private LsfxAnalysisClient lsfxClient;
@Mock
private CcdiBankStatementMapper bankStatementMapper;
@Test
void processFileAsync_shouldKeepParsingUntilBankStatementsSaved() {
// arrange
// mock 上传成功、轮询完成、状态接口解析成功
// mock getBankStatement 首次调用抛异常
// act
// assert
verify(recordMapper, never()).updateById(argThat(record ->
"parsed_success".equals(record.getFileStatus())));
verify(recordMapper).updateById(argThat(record ->
"parsed_failed".equals(record.getFileStatus())));
}
}
```
**Step 2: Run test to verify it fails**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#processFileAsync_shouldKeepParsingUntilBankStatementsSaved
```
Expected:
- FAIL
- 失败原因应体现当前实现会先更新 `parsed_success`,或测试类尚未编译通过
**Step 3: Write a second failing test for the success path**
补一条成功路径测试,验证步骤 7 成功后才更新为 `parsed_success`
```java
@Test
void processFileAsync_shouldMarkSuccessAfterBankStatementsSaved() {
// mock 上传成功、解析成功、getBankStatement 返回 totalCount=0
// 执行后应只在步骤7完成后出现 parsed_success
verify(recordMapper).updateById(argThat(record ->
"parsed_success".equals(record.getFileStatus())));
}
```
**Step 4: Run both tests to verify they fail**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest
```
Expected:
- FAIL
- 至少一条断言失败,证明当前实现不符合新设计
**Step 5: Commit**
```bash
git add ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
git commit -m "test(ccdi-project): add file upload status transition tests"
```
### Task 2: 重构步骤 7 返回结果对象并延后成功状态更新
**Files:**
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java:340-619`
- Test: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
**Step 1: Add a result object inside the service**
`CcdiFileUploadServiceImpl` 内新增私有静态结果类:
```java
@Data
private static class FetchBankStatementResult {
private boolean success;
private int totalCount;
private int savedCount;
private String errorMessage;
}
```
**Step 2: Change the fetch method signature**
把:
```java
private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId)
```
改为:
```java
private FetchBankStatementResult fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId)
```
**Step 3: Return explicit failure instead of swallowing exceptions**
把当前“记录错误后继续下一页”的逻辑改成显式失败返回。例如:
```java
catch (Exception e) {
result.setSuccess(false);
result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage());
return result;
}
```
**Step 4: Delay `parsed_success` update until the fetch result succeeds**
`processFileAsync(...)` 中当前这段提前成功逻辑:
```java
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
```
改成:
```java
FetchBankStatementResult fetchResult = fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
if (!fetchResult.isSuccess()) {
record.setFileStatus("parsed_failed");
record.setErrorMessage(fetchResult.getErrorMessage());
recordMapper.updateById(record);
return;
}
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
record.setErrorMessage(null);
recordMapper.updateById(record);
```
**Step 5: Handle the zero-data path explicitly**
在首次总数查询后,若 `totalCount == null || totalCount <= 0`,返回成功结果:
```java
result.setSuccess(true);
result.setTotalCount(0);
result.setSavedCount(0);
return result;
```
**Step 6: Run targeted tests**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest
```
Expected:
- PASS
- 新增的状态延后测试全部通过
**Step 7: Commit**
```bash
git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
git commit -m "refactor(ccdi-project): delay parsed success until bank statements saved"
```
### Task 3: 为本次上传绑定 batchId 并补偿清理半成品流水
**Files:**
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java:527-619`
- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java:15-23`
- Modify: `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml:62-87`
- Test: `ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java`
**Step 1: Attach the upload logId to each statement**
在流水转换循环内补齐:
```java
statement.setProjectId(projectId);
statement.setBatchId(logId);
```
**Step 2: Add the cleanup mapper method**
`CcdiBankStatementMapper.java` 中新增:
```java
int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId);
```
并在 XML 中实现:
```xml
<delete id="deleteByProjectIdAndBatchId">
delete from ccdi_bank_statement
where project_id = #{projectId}
and batch_id = #{batchId}
</delete>
```
**Step 3: Call cleanup before returning failure**
`fetchAndSaveBankStatements(...)` 的失败分支中调用:
```java
bankStatementMapper.deleteByProjectIdAndBatchId(projectId, logId);
```
只允许使用 `projectId + logId(batchId)` 双条件,避免误删其他批次数据。
**Step 4: Write a failing cleanup test**
在测试类中新增:
```java
@Test
void processFileAsync_shouldCleanupInsertedStatementsWhenFetchFails() {
// mock 某页或某批插入失败
// assert deleteByProjectIdAndBatchId(projectId, logId) 被调用
}
```
**Step 5: Run the new test**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiFileUploadServiceImplTest#processFileAsync_shouldCleanupInsertedStatementsWhenFetchFails
```
Expected:
- 先 FAIL再在实现后 PASS
**Step 6: Run the module tests**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiBankStatementTest,CcdiFileUploadServiceImplTest
```
Expected:
- PASS
- 旧的 `CcdiBankStatementTest` 不回归
**Step 7: Commit**
```bash
git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
git commit -m "fix(ccdi-project): cleanup partial bank statements on upload failure"
```
### Task 4: 回归验证并整理交付
**Files:**
- Modify: `docs/plans/2026-03-09-file-upload-parse-success-after-bank-statement-design.md`
- Modify: `docs/plans/2026-03-09-file-upload-parse-success-after-bank-statement.md`
**Step 1: Run final verification**
Run:
```bash
mvn test -pl ccdi-project -Dtest=CcdiBankStatementTest,CcdiFileUploadServiceImplTest
```
Expected:
- PASS
- 无编译错误
**Step 2: Inspect git diff**
Run:
```bash
git diff -- ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java
```
Expected:
- 只包含状态时机调整、结果对象、清理接口和测试
**Step 3: Update docs if implementation deviates**
若实现中出现与设计或计划不一致的细节,及时回写到这两份文档,避免文档失真。
**Step 4: Commit**
```bash
git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java docs/plans/2026-03-09-file-upload-parse-success-after-bank-statement-design.md docs/plans/2026-03-09-file-upload-parse-success-after-bank-statement.md
git commit -m "docs: finalize file upload parse success timing plan"
```
---
## Implementation Checklist
- [ ] 服务测试已覆盖“成功延后”和“步骤 7 失败”场景
- [ ] `fetchAndSaveBankStatements(...)` 改为返回结构化结果
- [ ] 步骤 7 完成前记录状态保持 `parsing`
- [ ] 步骤 7 成功后才更新 `parsed_success`
- [ ] 步骤 7 失败后更新 `parsed_failed`
- [ ] 本次 `logId` 对应流水写入 `batch_id`
- [ ] 步骤 7 失败时清理本次半成品流水
- [ ] `totalCount = 0` 场景按成功处理
- [ ] `ccdi-project` 相关测试通过

View File

@@ -18,6 +18,7 @@ logging:
level:
com.ruoyi: debug
org.springframework: warn
"com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper.insertBatch": info
# 用户配置
user:

View File

@@ -0,0 +1,4 @@
USE ccdi;
ALTER TABLE ccdi_file_upload_record
MODIFY COLUMN error_message TEXT COMMENT '错误信息(解析失败时记录)';