From 3e0eb38f15e3c296c9563967432ba66282e12955 Mon Sep 17 00:00:00 2001 From: wkc <978997012@qq.com> Date: Mon, 9 Mar 2026 16:31:04 +0800 Subject: [PATCH] fix(ccdi-project): delay parse success until bank statements saved --- .../mapper/CcdiBankStatementMapper.java | 3 + .../impl/CcdiFileUploadServiceImpl.java | 189 ++++++++------ .../ccdi/project/CcdiBankStatementMapper.xml | 6 + .../impl/CcdiFileUploadServiceImplTest.java | 239 ++++++++++++++++++ 4 files changed, 359 insertions(+), 78 deletions(-) create mode 100644 ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java index 2f2262c..952d2fb 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiBankStatementMapper.java @@ -21,4 +21,7 @@ public interface CcdiBankStatementMapper extends BaseMapper { * @return 插入记录数 */ int insertBatch(@Param("list") List list); + + int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId, + @Param("batchId") Integer batchId); } diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java index 5b59d9d..5408652 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java @@ -17,6 +17,7 @@ import com.ruoyi.lsfx.domain.request.GetBankStatementRequest; import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest; import com.ruoyi.lsfx.domain.response.*; import jakarta.annotation.Resource; +import lombok.Data; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Value; @@ -49,6 +50,14 @@ import java.util.concurrent.RejectedExecutionException; @Service public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { + @Data + private static class FetchBankStatementResult { + private boolean success; + private int totalCount; + private int savedCount; + private String errorMessage; + } + /** * 若依框架文件上传路径 */ @@ -426,17 +435,26 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : ""; String accountNosStr = accountNos != null ? String.join(",", accountNos) : ""; - record.setFileStatus("parsed_success"); - record.setEnterpriseNames(enterpriseNamesStr); - record.setAccountNos(accountNosStr); - recordMapper.updateById(record); log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}", enterpriseNamesStr, accountNosStr); // 步骤7:获取流水数据并保存 log.info("【文件上传】步骤7: 获取流水数据"); - fetchAndSaveBankStatements(projectId, lsfxProjectId, logId); + FetchBankStatementResult fetchResult = + fetchAndSaveBankStatements(projectId, lsfxProjectId, logId); + if (!fetchResult.isSuccess()) { + record.setFileStatus("parsed_failed"); + record.setErrorMessage(fetchResult.getErrorMessage()); + recordMapper.updateById(record); + return; + } + + record.setFileStatus("parsed_success"); + record.setEnterpriseNames(enterpriseNamesStr); + record.setAccountNos(accountNosStr); + record.setErrorMessage(null); + recordMapper.updateById(record); } else { // 解析失败 @@ -524,97 +542,112 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { * @param groupId 流水分析平台项目ID * @param logId 文件ID */ - private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId) { + private FetchBankStatementResult fetchAndSaveBankStatements(Long projectId, Integer groupId, + Integer logId) { log.info("【文件上传】开始获取流水数据: projectId={}, groupId={}, logId={}", projectId, groupId, logId); - // 步骤1: 先调用一次接口获取 totalCount - GetBankStatementRequest firstRequest = new GetBankStatementRequest(); - firstRequest.setGroupId(groupId); - firstRequest.setLogId(logId); - firstRequest.setPageNow(1); - firstRequest.setPageSize(1); // 只获取1条,用于获取总数 + FetchBankStatementResult result = new FetchBankStatementResult(); - GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest); + try { + GetBankStatementRequest firstRequest = new GetBankStatementRequest(); + firstRequest.setGroupId(groupId); + firstRequest.setLogId(logId); + firstRequest.setPageNow(1); + firstRequest.setPageSize(1); - if (firstResponse == null || firstResponse.getData() == null) { - log.warn("【文件上传】获取流水数据失败: 响应数据为空"); - return; - } + GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest); + if (firstResponse == null || firstResponse.getData() == null) { + result.setSuccess(false); + result.setErrorMessage("获取流水数据失败: 响应数据为空"); + cleanupBankStatements(projectId, logId); + return result; + } - Integer totalCount = firstResponse.getData().getTotalCount(); - if (totalCount == null || totalCount <= 0) { - log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount); - return; - } + Integer totalCount = firstResponse.getData().getTotalCount(); + result.setTotalCount(totalCount == null ? 0 : totalCount); + if (totalCount == null || totalCount <= 0) { + log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount); + result.setSuccess(true); + result.setSavedCount(0); + return result; + } - log.info("【文件上传】获取到总数: totalCount={}", totalCount); + int pageSize = 1000; + int batchSize = 1000; + int totalPages = (int) Math.ceil((double) totalCount / pageSize); + int totalSaved = 0; + List batchList = new ArrayList<>(batchSize); - // 步骤2: 计算分页信息 - int pageSize = 1000; // 每页1000条 - int batchSize = 1000; // 批量插入每批1000条(与pageSize保持一致) - int totalPages = (int) Math.ceil((double) totalCount / pageSize); + log.info("【文件上传】获取到总数: totalCount={}", totalCount); + log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages); - log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages); + for (int pageNow = 1; pageNow <= totalPages; pageNow++) { + try { + GetBankStatementRequest request = new GetBankStatementRequest(); + request.setGroupId(groupId); + request.setLogId(logId); + request.setPageNow(pageNow); + request.setPageSize(pageSize); - List batchList = new ArrayList<>(batchSize); - int totalSaved = 0; + GetBankStatementResponse response = lsfxClient.getBankStatement(request); + if (response == null || response.getData() == null + || response.getData().getBankStatementList() == null) { + result.setSuccess(false); + result.setErrorMessage("获取流水数据失败: 第 " + pageNow + " 页响应异常"); + cleanupBankStatements(projectId, logId); + return result; + } - // 步骤3: 循环分页获取所有数据 - for (int pageNow = 1; pageNow <= totalPages; pageNow++) { - try { - // 构建请求参数 - GetBankStatementRequest request = new GetBankStatementRequest(); - request.setGroupId(groupId); - request.setLogId(logId); - request.setPageNow(pageNow); - request.setPageSize(pageSize); + List items = + response.getData().getBankStatementList(); + log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size()); - // 获取流水数据 - GetBankStatementResponse response = lsfxClient.getBankStatement(request); + for (GetBankStatementResponse.BankStatementItem item : items) { + CcdiBankStatement statement = CcdiBankStatement.fromResponse(item); + if (statement != null) { + statement.setBatchId(logId); + statement.setProjectId(projectId); + batchList.add(statement); - if (response == null || response.getData() == null - || response.getData().getBankStatementList() == null) { - log.warn("【文件上传】获取流水数据为空: pageNow={}", pageNow); - continue; - } - - List items = - response.getData().getBankStatementList(); - - log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size()); - - // 转换并收集到批量列表 - for (GetBankStatementResponse.BankStatementItem item : items) { - CcdiBankStatement statement = CcdiBankStatement.fromResponse(item); - if (statement != null) { - statement.setProjectId(projectId); // 设置业务项目ID - batchList.add(statement); - - // 达到批量插入阈值(1000条),执行插入 - if (batchList.size() >= batchSize) { - bankStatementMapper.insertBatch(batchList); - totalSaved += batchList.size(); - log.debug("【文件上传】批量插入流水: {}条, 累计{}条", - batchList.size(), totalSaved); - batchList.clear(); + if (batchList.size() >= batchSize) { + bankStatementMapper.insertBatch(batchList); + totalSaved += batchList.size(); + log.debug("【文件上传】批量插入流水 {}条, 累计{}条", + batchList.size(), totalSaved); + batchList.clear(); + } } } + } catch (Exception e) { + log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e); + result.setSuccess(false); + result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage()); + cleanupBankStatements(projectId, logId); + return result; } - - } catch (Exception e) { - log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e); - // 继续处理下一页,不中断整个流程 } - } - // 步骤4: 保存剩余的数据 - if (!batchList.isEmpty()) { - bankStatementMapper.insertBatch(batchList); - totalSaved += batchList.size(); - log.debug("【文件上传】批量插入剩余流水: {}条", batchList.size()); - } + if (!batchList.isEmpty()) { + bankStatementMapper.insertBatch(batchList); + totalSaved += batchList.size(); + log.debug("【文件上传】批量插入剩余流水 {}条", batchList.size()); + } - log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved); + log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved); + result.setSuccess(true); + result.setSavedCount(totalSaved); + return result; + } catch (Exception e) { + log.error("【文件上传】获取或保存流水数据失败: logId={}", logId, e); + result.setSuccess(false); + result.setErrorMessage("获取或保存流水数据失败: " + e.getMessage()); + cleanupBankStatements(projectId, logId); + return result; + } + } + + private void cleanupBankStatements(Long projectId, Integer logId) { + bankStatementMapper.deleteByProjectIdAndBatchId(projectId, logId); } } diff --git a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml index c8c22dd..3bfe650 100644 --- a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml +++ b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiBankStatementMapper.xml @@ -86,4 +86,10 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" + + delete from ccdi_bank_statement + where project_id = #{projectId} + and batch_id = #{batchId} + + diff --git a/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java b/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java new file mode 100644 index 0000000..d8c41ac --- /dev/null +++ b/ccdi-project/src/test/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImplTest.java @@ -0,0 +1,239 @@ +package com.ruoyi.ccdi.project.service.impl; + +import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; +import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper; +import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; +import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper; +import com.ruoyi.lsfx.client.LsfxAnalysisClient; +import com.ruoyi.lsfx.domain.request.GetBankStatementRequest; +import com.ruoyi.lsfx.domain.response.CheckParseStatusResponse; +import com.ruoyi.lsfx.domain.response.GetBankStatementResponse; +import com.ruoyi.lsfx.domain.response.GetFileUploadStatusResponse; +import com.ruoyi.lsfx.domain.response.UploadFileResponse; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; +import org.junit.jupiter.api.extension.ExtendWith; +import org.mockito.InjectMocks; +import org.mockito.Mock; +import org.mockito.junit.jupiter.MockitoExtension; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicInteger; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.eq; +import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +@ExtendWith(MockitoExtension.class) +class CcdiFileUploadServiceImplTest { + + private static final Long PROJECT_ID = 100L; + private static final Integer LSFX_PROJECT_ID = 200; + private static final Long RECORD_ID = 300L; + private static final Integer LOG_ID = 400; + + @InjectMocks + private CcdiFileUploadServiceImpl service; + + @Mock + private CcdiFileUploadRecordMapper recordMapper; + + @Mock + private CcdiProjectMapper projectMapper; + + @Mock + private LsfxAnalysisClient lsfxClient; + + @Mock + private CcdiBankStatementMapper bankStatementMapper; + + @TempDir + Path tempDir; + + @Test + void processFileAsync_shouldKeepParsingUntilBankStatementsSaved() throws IOException { + List events = new ArrayList<>(); + AtomicInteger sequence = new AtomicInteger(); + captureRecordStatus(events, sequence); + + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenThrow(new RuntimeException("bank statement fetch failed")); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed"))); + assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success"))); + } + + @Test + void processFileAsync_shouldMarkSuccessAfterBankStatementsSaved() throws IOException { + List events = new ArrayList<>(); + AtomicInteger sequence = new AtomicInteger(); + captureRecordStatus(events, sequence); + + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenAnswer(invocation -> { + events.add(sequence.incrementAndGet() + ":bank-fetch"); + return buildEmptyBankStatementResponse(); + }); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + int fetchIndex = findEventIndex(events, "bank-fetch"); + int successIndex = findEventIndex(events, "record:parsed_success"); + assertTrue(fetchIndex >= 0); + assertTrue(successIndex > fetchIndex); + } + + @Test + void processFileAsync_shouldCleanupInsertedStatementsWhenFetchFails() throws IOException { + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenThrow(new RuntimeException("bank statement fetch failed")); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID); + } + + @Test + void processFileAsync_shouldFailWhenPagedFetchThrows() throws IOException { + List events = new ArrayList<>(); + AtomicInteger sequence = new AtomicInteger(); + captureRecordStatus(events, sequence); + + when(lsfxClient.uploadFile(eq(LSFX_PROJECT_ID), any())).thenReturn(buildUploadResponse()); + when(lsfxClient.checkParseStatus(LSFX_PROJECT_ID, String.valueOf(LOG_ID))) + .thenReturn(buildCheckParseStatusResponse(false)); + when(lsfxClient.getFileUploadStatus(any())).thenReturn(buildParsedSuccessStatusResponse()); + when(lsfxClient.getBankStatement(any(GetBankStatementRequest.class))) + .thenReturn(buildBankStatementResponseWithTotalCount(1)) + .thenThrow(new RuntimeException("paged fetch failed")); + + CcdiFileUploadRecord record = buildRecord(); + Path tempFile = createTempFile(); + + service.processFileAsync(PROJECT_ID, LSFX_PROJECT_ID, tempFile.toString(), RECORD_ID, "batch-1", record); + + assertTrue(events.stream().anyMatch(event -> event.endsWith("record:parsed_failed"))); + assertFalse(events.stream().anyMatch(event -> event.endsWith("record:parsed_success"))); + verify(bankStatementMapper).deleteByProjectIdAndBatchId(PROJECT_ID, LOG_ID); + } + + private void captureRecordStatus(List events, AtomicInteger sequence) { + doAnswer(invocation -> { + CcdiFileUploadRecord record = invocation.getArgument(0); + events.add(sequence.incrementAndGet() + ":record:" + record.getFileStatus()); + return 1; + }).when(recordMapper).updateById(any(CcdiFileUploadRecord.class)); + } + + private CcdiFileUploadRecord buildRecord() { + CcdiFileUploadRecord record = new CcdiFileUploadRecord(); + record.setId(RECORD_ID); + record.setProjectId(PROJECT_ID); + record.setLsfxProjectId(LSFX_PROJECT_ID); + record.setFileName("test.xlsx"); + record.setFileStatus("uploading"); + return record; + } + + private Path createTempFile() throws IOException { + Path tempFile = Files.createTempFile(tempDir, "upload-", ".xlsx"); + Files.writeString(tempFile, "test"); + return tempFile; + } + + private UploadFileResponse buildUploadResponse() { + UploadFileResponse.UploadLogItem logItem = new UploadFileResponse.UploadLogItem(); + logItem.setLogId(LOG_ID); + + UploadFileResponse.UploadData uploadData = new UploadFileResponse.UploadData(); + uploadData.setUploadLogList(List.of(logItem)); + + UploadFileResponse response = new UploadFileResponse(); + response.setData(uploadData); + return response; + } + + private CheckParseStatusResponse buildCheckParseStatusResponse(boolean parsing) { + CheckParseStatusResponse.ParseStatusData data = new CheckParseStatusResponse.ParseStatusData(); + data.setParsing(parsing); + + CheckParseStatusResponse response = new CheckParseStatusResponse(); + response.setData(data); + return response; + } + + private GetFileUploadStatusResponse buildParsedSuccessStatusResponse() { + GetFileUploadStatusResponse.LogItem logItem = new GetFileUploadStatusResponse.LogItem(); + logItem.setStatus(-5); + logItem.setUploadStatusDesc("data.wait.confirm.newaccount"); + logItem.setEnterpriseNameList(List.of("测试主体")); + logItem.setAccountNoList(List.of("62220001")); + + GetFileUploadStatusResponse.FileUploadStatusData data = + new GetFileUploadStatusResponse.FileUploadStatusData(); + data.setLogs(List.of(logItem)); + + GetFileUploadStatusResponse response = new GetFileUploadStatusResponse(); + response.setData(data); + return response; + } + + private GetBankStatementResponse buildEmptyBankStatementResponse() { + GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData(); + data.setTotalCount(0); + + GetBankStatementResponse response = new GetBankStatementResponse(); + response.setData(data); + return response; + } + + private GetBankStatementResponse buildBankStatementResponseWithTotalCount(int totalCount) { + GetBankStatementResponse.BankStatementData data = new GetBankStatementResponse.BankStatementData(); + data.setTotalCount(totalCount); + data.setBankStatementList(List.of()); + + GetBankStatementResponse response = new GetBankStatementResponse(); + response.setData(data); + return response; + } + + private int findEventIndex(List events, String suffix) { + for (int i = 0; i < events.size(); i++) { + if (events.get(i).endsWith(suffix)) { + return i; + } + } + return -1; + } +}