fix(ccdi-project): delay parse success until bank statements saved

This commit is contained in:
wkc
2026-03-09 16:31:04 +08:00
parent e5116d000b
commit 3e0eb38f15
4 changed files with 359 additions and 78 deletions

View File

@@ -21,4 +21,7 @@ public interface CcdiBankStatementMapper extends BaseMapper<CcdiBankStatement> {
* @return 插入记录数
*/
int insertBatch(@Param("list") List<CcdiBankStatement> list);
int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId);
}

View File

@@ -17,6 +17,7 @@ import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest;
import com.ruoyi.lsfx.domain.response.*;
import jakarta.annotation.Resource;
import lombok.Data;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.beans.factory.annotation.Value;
@@ -49,6 +50,14 @@ import java.util.concurrent.RejectedExecutionException;
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Data
private static class FetchBankStatementResult {
private boolean success;
private int totalCount;
private int savedCount;
private String errorMessage;
}
/**
* 若依框架文件上传路径
*/
@@ -426,17 +435,26 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}",
enterpriseNamesStr, accountNosStr);
// 步骤7:获取流水数据并保存
log.info("【文件上传】步骤7: 获取流水数据");
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
FetchBankStatementResult fetchResult =
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
if (!fetchResult.isSuccess()) {
record.setFileStatus("parsed_failed");
record.setErrorMessage(fetchResult.getErrorMessage());
recordMapper.updateById(record);
return;
}
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
record.setErrorMessage(null);
recordMapper.updateById(record);
} else {
// 解析失败
@@ -524,97 +542,112 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
* @param groupId 流水分析平台项目ID
* @param logId 文件ID
*/
private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId) {
private FetchBankStatementResult fetchAndSaveBankStatements(Long projectId, Integer groupId,
Integer logId) {
log.info("【文件上传】开始获取流水数据: projectId={}, groupId={}, logId={}",
projectId, groupId, logId);
// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1); // 只获取1条用于获取总数
FetchBankStatementResult result = new FetchBankStatementResult();
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
try {
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1);
if (firstResponse == null || firstResponse.getData() == null) {
log.warn("【文件上传】获取流水数据失败: 响应数据为空");
return;
}
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
if (firstResponse == null || firstResponse.getData() == null) {
result.setSuccess(false);
result.setErrorMessage("获取流水数据失败: 响应数据为空");
cleanupBankStatements(projectId, logId);
return result;
}
Integer totalCount = firstResponse.getData().getTotalCount();
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
return;
}
Integer totalCount = firstResponse.getData().getTotalCount();
result.setTotalCount(totalCount == null ? 0 : totalCount);
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
result.setSuccess(true);
result.setSavedCount(0);
return result;
}
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
int totalSaved = 0;
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
// 步骤2: 计算分页信息
int pageSize = 1000; // 每页1000条
int batchSize = 1000; // 批量插入每批1000条与pageSize保持一致
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages);
log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages);
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
try {
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
int totalSaved = 0;
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
if (response == null || response.getData() == null
|| response.getData().getBankStatementList() == null) {
result.setSuccess(false);
result.setErrorMessage("获取流水数据失败: 第 " + pageNow + " 页响应异常");
cleanupBankStatements(projectId, logId);
return result;
}
// 步骤3: 循环分页获取所有数据
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
try {
// 构建请求参数
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
List<GetBankStatementResponse.BankStatementItem> items =
response.getData().getBankStatementList();
log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size());
// 获取流水数据
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
if (statement != null) {
statement.setBatchId(logId);
statement.setProjectId(projectId);
batchList.add(statement);
if (response == null || response.getData() == null
|| response.getData().getBankStatementList() == null) {
log.warn("【文件上传】获取流水数据为空: pageNow={}", pageNow);
continue;
}
List<GetBankStatementResponse.BankStatementItem> items =
response.getData().getBankStatementList();
log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size());
// 转换并收集到批量列表
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
if (statement != null) {
statement.setProjectId(projectId); // 设置业务项目ID
batchList.add(statement);
// 达到批量插入阈值1000条执行插入
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入流水: {}条, 累计{}条",
batchList.size(), totalSaved);
batchList.clear();
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入流水 {}条, 累计{}条",
batchList.size(), totalSaved);
batchList.clear();
}
}
}
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e);
result.setSuccess(false);
result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage());
cleanupBankStatements(projectId, logId);
return result;
}
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e);
// 继续处理下一页,不中断整个流程
}
}
// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入剩余流水: {}条", batchList.size());
}
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入剩余流水 {}条", batchList.size());
}
log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved);
log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved);
result.setSuccess(true);
result.setSavedCount(totalSaved);
return result;
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: logId={}", logId, e);
result.setSuccess(false);
result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage());
cleanupBankStatements(projectId, logId);
return result;
}
}
private void cleanupBankStatements(Long projectId, Integer logId) {
bankStatementMapper.deleteByProjectIdAndBatchId(projectId, logId);
}
}

View File

@@ -86,4 +86,10 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</foreach>
</insert>
<delete id="deleteByProjectIdAndBatchId">
delete from ccdi_bank_statement
where project_id = #{projectId}
and batch_id = #{batchId}
</delete>
</mapper>

View File

@@ -0,0 +1,239 @@
package com.ruoyi.ccdi.project.service.impl;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.response.CheckParseStatusResponse;
import com.ruoyi.lsfx.domain.response.GetBankStatementResponse;
import com.ruoyi.lsfx.domain.response.GetFileUploadStatusResponse;
import com.ruoyi.lsfx.domain.response.UploadFileResponse;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.AtomicInteger;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CcdiFileUploadServiceImplTest {
private static final Long PROJECT_ID = 100L;
private static final Integer LSFX_PROJECT_ID = 200;
private static final Long RECORD_ID = 300L;
private static final Integer LOG_ID = 400;
@InjectMocks
private CcdiFileUploadServiceImpl service;
@Mock
private CcdiFileUploadRecordMapper recordMapper;
@Mock
private CcdiProjectMapper projectMapper;
@Mock
private LsfxAnalysisClient lsfxClient;
@Mock
private CcdiBankStatementMapper bankStatementMapper;
@TempDir
Path tempDir;
@Test
void processFileAsync_shouldKeepParsingUntilBankStatementsSaved() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenThrow(new RuntimeException("bank statement fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
}
@Test
void processFileAsync_shouldMarkSuccessAfterBankStatementsSaved() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenAnswer(invocation -> {
events.add(sequence.incrementAndGet() + ":bank-fetch");
return buildEmptyBankStatementResponse();
});
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
int fetchIndex = findEventIndex(events, "bank-fetch");
int successIndex = findEventIndex(events, "record:parsed_success");
assertTrue(fetchIndex >= 0);
assertTrue(successIndex > fetchIndex);
}
@Test
void processFileAsync_shouldCleanupInsertedStatementsWhenFetchFails() throws IOException {
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenThrow(new RuntimeException("bank statement fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID);
}
@Test
void processFileAsync_shouldFailWhenPagedFetchThrows() throws IOException {
List<String> events = new ArrayList<>();
AtomicInteger sequence = new AtomicInteger();
captureRecordStatus(events, sequence);
when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse());
when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID)))
.thenReturn(buildCheckParseStatusResponse(false));
when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse());
when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class)))
.thenReturn(buildBankStatementResponseWithTotalCount(1))
.thenThrow(new RuntimeException("paged fetch failed"));
CcdiFileUploadRecord record = buildRecord();
Path tempFile = createTempFile();
service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record);
assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed")));
assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success")));
verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID);
}
private void captureRecordStatus(List<String> events, AtomicInteger sequence) {
doAnswer(invocation -> {
CcdiFileUploadRecord record = invocation.getArgument(0);
events.add(sequence.incrementAndGet() + ":record:" + record.getFileStatus());
return 1;
}).when(recordMapper).updateById(any(CcdiFileUploadRecord.class));
}
private CcdiFileUploadRecord buildRecord() {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setId(RECORD_ID);
record.setProjectId(PROJECT_ID);
record.setLsfxProjectId(LSFX_PROJECT_ID);
record.setFileName("test.xlsx");
record.setFileStatus("uploading");
return record;
}
private Path createTempFile() throws IOException {
Path tempFile = Files.createTempFile(tempDir, "upload-", ".xlsx");
Files.writeString(tempFile, "test");
return tempFile;
}
private UploadFileResponse buildUploadResponse() {
UploadFileResponse.UploadLogItem logItem = new UploadFileResponse.UploadLogItem();
logItem.setLogId(LOG_ID);
UploadFileResponse.UploadData uploadData = new UploadFileResponse.UploadData();
uploadData.setUploadLogList(List.of(logItem));
UploadFileResponse response = new UploadFileResponse();
response.setData(uploadData);
return response;
}
private CheckParseStatusResponse buildCheckParseStatusResponse(boolean parsing) {
CheckParseStatusResponse.ParseStatusData data = new CheckParseStatusResponse.ParseStatusData();
data.setParsing(parsing);
CheckParseStatusResponse response = new CheckParseStatusResponse();
response.setData(data);
return response;
}
private GetFileUploadStatusResponse buildParsedSuccessStatusResponse() {
GetFileUploadStatusResponse.LogItem logItem = new GetFileUploadStatusResponse.LogItem();
logItem.setStatus(-5);
logItem.setUploadStatusDesc("data.wait.confirm.newaccount");
logItem.setEnterpriseNameList(List.of("测试主体"));
logItem.setAccountNoList(List.of("62220001"));
GetFileUploadStatusResponse.FileUploadStatusData data =
new GetFileUploadStatusResponse.FileUploadStatusData();
data.setLogs(List.of(logItem));
GetFileUploadStatusResponse response = new GetFileUploadStatusResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse buildEmptyBankStatementResponse() {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(0);
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private GetBankStatementResponse buildBankStatementResponseWithTotalCount(int totalCount) {
GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData();
data.setTotalCount(totalCount);
data.setBankStatementList(List.of());
GetBankStatementResponse response = new GetBankStatementResponse();
response.setData(data);
return response;
}
private int findEventIndex(List<String> events, String suffix) {
for (int i = 0; i < events.size(); i++) {
if (events.get(i).endsWith(suffix)) {
return i;
}
}
return -1;
}
}