feat: 实现异步处理单个文件的完整流程

This commit is contained in:
wkc
2026-03-05 09:56:50 +08:00
parent 85a03a001d
commit 71d9b5b2d1

View File

@@ -10,6 +10,7 @@ import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import jakarta.annotation.Resource; import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j; import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service; import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils; import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile; import org.springframework.web.multipart.MultipartFile;
@@ -214,11 +215,96 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
/** /**
* 异步处理单个文件的完整流程 * 异步处理单个文件的完整流程
* TODO: 下一步实现完整逻辑 * 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据
*/ */
private void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, @Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
Long recordId, String batchId, CcdiFileUploadRecord record) { Long recordId, String batchId, CcdiFileUploadRecord record) {
// TODO: 将在下一步实现 log.info("【文件上传】开始处理文件: fileName={}, recordId={}",
log.info("【文件上传】开始处理文件: fileName={}", file.getOriginalFilename()); file.getOriginalFilename(), recordId);
try {
// 步骤1:状态已是uploading,记录已存在
// 步骤2:上传文件到流水分析平台
log.info("【文件上传】步骤2: 上传文件到流水分析平台");
// TODO: 调用 lsfxClient.uploadFile()
// UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
// Integer logId = uploadResponse.getData().getLogId();
// 临时模拟 logId
Integer logId = (int) (System.currentTimeMillis() % 1000000);
// 步骤3:更新状态为 parsing
log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId);
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
// 步骤4:轮询解析状态(最多300次,间隔2秒)
log.info("【文件上传】步骤4: 开始轮询解析状态");
// TODO: 实现真实的轮询逻辑
// boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
boolean parsingComplete = true; // 临时模拟
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确");
}
// 步骤5:获取文件上传状态
log.info("【文件上传】步骤5: 获取文件上传状态");
// TODO: 调用 lsfxClient.getFileUploadStatus()
// GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...);
// 步骤6:判断解析结果
// TODO: 实现真实的判断逻辑
boolean parseSuccess = true; // 临时模拟
if (parseSuccess) {
// 解析成功
log.info("【文件上传】步骤6: 解析成功,保存主体信息");
record.setFileStatus("parsed_success");
record.setEnterpriseNames("测试主体1,测试主体2");
record.setAccountNos("622xxx,623xxx");
recordMapper.updateById(record);
// 步骤7:获取流水数据并保存
log.info("【文件上传】步骤7: 获取流水数据");
// TODO: 实现 fetchAndSaveBankStatements
// fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount);
} else {
// 解析失败
log.warn("【文件上传】步骤6: 解析失败");
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败:文件格式错误");
recordMapper.updateById(record);
}
log.info("【文件上传】处理完成: fileName={}", file.getOriginalFilename());
} catch (Exception e) {
log.error("【文件上传】处理失败: fileName={}", file.getOriginalFilename(), e);
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
}
}
/**
* 轮询解析状态
* TODO: 实现真实逻辑
*/
private boolean waitForParsingComplete(Integer groupId, String logId) {
// TODO: 调用 lsfxClient.checkParseStatus() 轮询
return true;
}
/**
* 获取并保存流水数据
* TODO: 实现真实逻辑
*/
private void fetchAndSaveBankStatements(Long projectId, Integer groupId,
Integer logId, int totalCount) {
// TODO: 调用 lsfxClient.getBankStatement() 获取流水
// TODO: 批量插入到 ccdi_bank_statement
} }
} }