feat: 接入流水标签自动触发并完成验证

This commit is contained in:
wkc
2026-03-16 18:25:10 +08:00
parent 147f234109
commit 7d7cfa813b
3 changed files with 839 additions and 10 deletions

View File

@@ -4,6 +4,7 @@ import com.alibaba.excel.EasyExcel;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
@@ -12,6 +13,7 @@ import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.ccdi.project.service.ICcdiBankTagService;
import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.constants.LsfxConstants;
@@ -91,6 +93,9 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Resource
private CcdiBankStatementMapper bankStatementMapper;
@Resource
private ICcdiBankTagService bankTagService;
/**
* 获取临时文件存储目录
*/
@@ -414,6 +419,8 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String batchId) {
log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId);
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
// 循环提交任务
for (int i = 0; i < tempFilePaths.size(); i++) {
// Critical Fix #6: 检查线程中断状态
@@ -431,10 +438,11 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
while (!submitted && retryCount < 2) {
try {
// 尝试提交异步任务
CompletableFuture.runAsync(
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(
() -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record),
fileUploadExecutor
);
futures.add(future);
submitted = true;
log.info("【文件上传】任务提交成功: fileName={}, recordId={}",
record.getFileName(), record.getId());
@@ -449,16 +457,24 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
Thread.currentThread().interrupt();
log.error("【文件上传】等待被中断: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
futures.add(CompletableFuture.completedFuture(Boolean.FALSE));
break;
}
} else {
log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
futures.add(CompletableFuture.completedFuture(Boolean.FALSE));
}
}
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((unused, throwable) -> {
boolean anySuccess = futures.stream().anyMatch(future -> Boolean.TRUE.equals(future.getNow(Boolean.FALSE)));
handleTagRebuildAfterBatchCompletion(projectId, TriggerType.AUTO_BATCH_UPLOAD, anySuccess);
});
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
}
@@ -506,6 +522,8 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String batchId) {
log.info("【拉取本行信息】调度线程启动: projectId={}, batchId={}", projectId, batchId);
List<CompletableFuture<Boolean>> futures = new ArrayList<>();
for (int i = 0; i < records.size(); i++) {
if (Thread.currentThread().isInterrupted()) {
log.warn("【拉取本行信息】调度线程被中断,停止提交剩余任务");
@@ -519,10 +537,11 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
while (!submitted && retryCount < 2) {
try {
CompletableFuture.runAsync(
CompletableFuture<Boolean> future = CompletableFuture.supplyAsync(
() -> processPullBankInfoAsync(projectId, lsfxProjectId, record, idCard, startDate, endDate),
fileUploadExecutor
);
futures.add(future);
submitted = true;
log.info("【拉取本行信息】任务提交成功: idCard={}, recordId={}", idCard, record.getId());
} catch (RejectedExecutionException e) {
@@ -535,23 +554,31 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
Thread.currentThread().interrupt();
log.error("【拉取本行信息】等待被中断: idCard={}", idCard);
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
futures.add(CompletableFuture.completedFuture(Boolean.FALSE));
break;
}
} else {
log.error("【拉取本行信息】重试失败,放弃任务: idCard={}", idCard);
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
futures.add(CompletableFuture.completedFuture(Boolean.FALSE));
}
}
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.whenComplete((unused, throwable) -> {
boolean anySuccess = futures.stream().anyMatch(future -> Boolean.TRUE.equals(future.getNow(Boolean.FALSE)));
handleTagRebuildAfterBatchCompletion(projectId, TriggerType.AUTO_PULL_BANK_INFO, anySuccess);
});
}
public void processPullBankInfoAsync(Long projectId,
Integer lsfxProjectId,
CcdiFileUploadRecord record,
String idCard,
String startDate,
String endDate ) {
public boolean processPullBankInfoAsync(Long projectId,
Integer lsfxProjectId,
CcdiFileUploadRecord record,
String idCard,
String startDate,
String endDate ) {
try {
FetchInnerFlowRequest request = new FetchInnerFlowRequest();
request.setGroupId(lsfxProjectId);
@@ -573,9 +600,11 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
processRecordAfterLogIdReady(projectId, lsfxProjectId, record, logId);
return true;
} catch (Exception e) {
log.error("【拉取本行信息】处理失败: idCard={}, recordId={}", idCard, record.getId(), e);
updateFailedRecord(record, e.getMessage());
return false;
}
}
@@ -591,8 +620,8 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
* @param record 文件上传记录
*/
@Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
Long recordId, String batchId, CcdiFileUploadRecord record) {
public boolean processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
Long recordId, String batchId, CcdiFileUploadRecord record) {
log.info("【文件上传】开始处理文件: fileName={}, recordId={}, tempPath={}",
record.getFileName(), recordId, tempFilePath);
@@ -629,10 +658,12 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
log.info("【文件上传】文件上传成功: logId={}", logId);
processRecordAfterLogIdReady(projectId, lsfxProjectId, record, logId);
log.info("【文件上传】处理完成: fileName={}", record.getFileName());
return true;
} catch (Exception e) {
log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
return false;
} finally {
// 清理临时文件
try {
@@ -647,6 +678,13 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
}
private void handleTagRebuildAfterBatchCompletion(Long projectId, TriggerType triggerType, Boolean anySuccess) {
if (!Boolean.TRUE.equals(anySuccess)) {
return;
}
bankTagService.submitAutoRebuild(projectId, triggerType);
}
private void processRecordAfterLogIdReady(Long projectId,
Integer lsfxProjectId,
CcdiFileUploadRecord record,

View File

@@ -5,11 +5,13 @@ import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.read.ListAppender;
import com.alibaba.excel.EasyExcel;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.ccdi.project.service.ICcdiBankTagService;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.response.CheckParseStatusResponse;
@@ -84,6 +86,9 @@ class CcdiFileUploadServiceImplTest {
@Mock
private Executor fileUploadExecutor;
@Mock
private ICcdiBankTagService bankTagService;
@TempDir
Path tempDir;
@@ -488,6 +493,32 @@ class CcdiFileUploadServiceImplTest {
assertEquals(3L, result.getTotal());
}
@Test
void batchUploadCompletion_shouldSubmitProjectTagRebuildWhenAnyFileSucceeded() {
ReflectionTestUtils.invokeMethod(
service,
"handleTagRebuildAfterBatchCompletion",
PROJECT_ID,
TriggerType.AUTO_BATCH_UPLOAD,
Boolean.TRUE
);
verify(bankTagService).submitAutoRebuild(PROJECT_ID, TriggerType.AUTO_BATCH_UPLOAD);
}
@Test
void pullBankInfoCompletion_shouldSubmitProjectTagRebuildWhenAnyTaskSucceeded() {
ReflectionTestUtils.invokeMethod(
service,
"handleTagRebuildAfterBatchCompletion",
PROJECT_ID,
TriggerType.AUTO_PULL_BANK_INFO,
Boolean.TRUE
);
verify(bankTagService).submitAutoRebuild(PROJECT_ID, TriggerType.AUTO_PULL_BANK_INFO);
}
private void captureRecordStatus(List<String> events, AtomicInteger sequence) {
doAnswer(invocation -> {
CcdiFileUploadRecord record = invocation.getArgument(0);