From 388c70ce041f2379e9964baa4a5b2924e18cdb39 Mon Sep 17 00:00:00 2001 From: wkc <978997012@qq.com> Date: Thu, 5 Mar 2026 12:39:58 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=E5=BC=82=E6=AD=A5?= =?UTF-8?q?=E6=96=87=E4=BB=B6=E4=B8=8A=E4=BC=A0=E6=9C=8D=E5=8A=A1=E5=AE=9E?= =?UTF-8?q?=E7=8E=B0=E8=AE=BE=E8=AE=A1=E6=96=87=E6=A1=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - 完整设计 CcdiFileUploadServiceImpl 所有 TODO 实现方案 - 包含依赖注入、文件上传、轮询解析、批量保存等详细设计 - 确定设计决策:固定间隔轮询、大批量分页、严格失败策略 - 实现批次日志管理器 FileUploadLogAppender - 包含完整的测试策略和部署注意事项 --- ...async-file-upload-implementation-design.md | 544 ++++++++++++++++++ 1 file changed, 544 insertions(+) create mode 100644 docs/plans/2026-03-05-async-file-upload-implementation-design.md diff --git a/docs/plans/2026-03-05-async-file-upload-implementation-design.md b/docs/plans/2026-03-05-async-file-upload-implementation-design.md new file mode 100644 index 0000000..3c2b5d2 --- /dev/null +++ b/docs/plans/2026-03-05-async-file-upload-implementation-design.md @@ -0,0 +1,544 @@ +# 异步文件上传服务实现设计文档 + +## 文档信息 +- **创建日期**: 2026-03-05 +- **版本**: v1.0 +- **作者**: Claude +- **状态**: 已批准 + +## 1. 概述 + +### 1.1 功能描述 +实现 `CcdiFileUploadServiceImpl` 中所有 TODO 方法,完成项目流水文件的异步批量上传功能的端到端流程。 + +### 1.2 核心需求 +- 集成流水分析平台客户端(LsfxAnalysisClient) +- 实现文件上传到流水分析平台 +- 实现轮询解析状态(固定间隔策略) +- 获取并判断解析结果 +- 批量获取并保存流水数据到本地数据库 +- 实现批次日志管理 + +### 1.3 技术栈 +- Spring @Async 异步处理 +- ThreadPoolTaskExecutor 线程池 +- MyBatis Plus 批量操作 +- Logback 自定义日志 +- 流水分析平台 API + +## 2. 设计决策 + +### 2.1 轮询策略 +**决策**: 固定间隔策略 +- 轮询次数: 300次 +- 间隔时间: 2秒 +- 最长等待: 10分钟 +- **理由**: 简单可靠,符合设计文档要求 + +### 2.2 分页获取策略 +**决策**: 大批量分页 +- 每页数量: 1000条 +- 批量插入: 每批1000条 +- 先调用一次获取 totalCount +- **理由**: 性能与内存占用的平衡 + +### 2.3 错误处理策略 +**决策**: 严格失败策略 +- 任何异常直接标记为 `parsed_failed` +- 记录详细的错误信息到 `error_message` 字段 +- 不进行额外重试(线程池层面已有重试机制) +- **理由**: 简单明了,便于排查问题 + +### 2.4 日志管理策略 +**决策**: 完整实现批次日志 +- 实现自定义 `FileUploadLogAppender` +- 每个批次生成独立日志文件 +- 路径基于 `ruoyi.profile` 配置 +- **理由**: 便于运维排查问题 + +## 3. 详细设计 + +### 3.1 依赖注入 + +```java +@Slf4j +@Service +public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { + + @Value("${ruoyi.profile}") + private String uploadPath; + + @Resource + private CcdiFileUploadRecordMapper recordMapper; + + @Resource + private CcdiProjectMapper projectMapper; + + @Resource + @Qualifier("fileUploadExecutor") + private Executor fileUploadExecutor; + + @Resource + private LsfxAnalysisClient lsfxClient; // 新增 + + @Resource + private CcdiBankStatementMapper bankStatementMapper; // 新增 +``` + +### 3.2 文件上传逻辑(processFileAsync 第329-333行) + +**核心流程**: +1. 将临时文件路径转换为 File 对象 +2. 验证文件存在性 +3. 调用 `lsfxClient.uploadFile(lsfxProjectId, file)` +4. 提取并验证返回的 logId + +**关键代码**: +```java +File file = filePath.toFile(); +if (!file.exists()) { + throw new RuntimeException("临时文件不存在: " + tempFilePath); +} + +UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); +if (uploadResponse == null || uploadResponse.getData() == null) { + throw new RuntimeException("上传文件失败: 响应数据为空"); +} + +Integer logId = uploadResponse.getData().getLogId(); +if (logId == null) { + throw new RuntimeException("上传文件失败: 未返回logId"); +} +``` + +### 3.3 轮询解析状态逻辑(waitForParsingComplete) + +**核心流程**: +1. 调用 `checkParseStatus(groupId, logId)` +2. 检查 `parsing` 字段 +3. `parsing=false` 表示解析完成 +4. 固定间隔2秒,最多300次 + +**关键代码**: +```java +for (int i = 1; i <= maxRetries; i++) { + CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId); + + if (response == null || response.getData() == null) { + log.warn("【文件上传】轮询第{}次: 响应数据为空", i); + Thread.sleep(intervalSeconds * 1000L); + continue; + } + + Boolean parsing = response.getData().getParsing(); + + // parsing=false 表示解析完成 + if (Boolean.FALSE.equals(parsing)) { + log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i); + return true; + } + + if (i < maxRetries) { + Thread.sleep(intervalSeconds * 1000L); + } +} +``` + +**异常处理**: +- `InterruptedException`: 恢复中断状态,返回 false +- 其他异常: 记录日志,继续轮询 + +### 3.4 获取解析结果逻辑(processFileAsync 第355-383行) + +**核心流程**: +1. 调用 `getFileUploadStatus(groupId, logId)` +2. 判断 `status == -5 && uploadStatusDesc == "data.wait.confirm.newaccount"` +3. 提取 `enterpriseNameList` 和 `accountNoList` +4. 解析成功则调用 `fetchAndSaveBankStatements()` + +**关键代码**: +```java +GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest(); +statusRequest.setGroupId(lsfxProjectId); +statusRequest.setLogId(logId); + +GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest); + +GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0); +Integer status = logItem.getStatus(); +String uploadStatusDesc = logItem.getUploadStatusDesc(); + +// 判断解析结果 +boolean parseSuccess = status != null && status == -5 + && "data.wait.confirm.newaccount".equals(uploadStatusDesc); + +if (parseSuccess) { + // 提取主体信息 + List enterpriseNames = logItem.getEnterpriseNameList(); + List accountNos = logItem.getAccountNoList(); + + String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : ""; + String accountNosStr = accountNos != null ? String.join(",", accountNos) : ""; + + record.setFileStatus("parsed_success"); + record.setEnterpriseNames(enterpriseNamesStr); + record.setAccountNos(accountNosStr); + recordMapper.updateById(record); + + // 获取流水数据 + fetchAndSaveBankStatements(projectId, lsfxProjectId, logId); +} else { + record.setFileStatus("parsed_failed"); + record.setErrorMessage("解析失败: " + uploadStatusDesc); + recordMapper.updateById(record); +} +``` + +### 3.5 批量保存流水数据逻辑(fetchAndSaveBankStatements) + +**核心流程**: +1. 先调用一次接口获取 totalCount(pageSize=1, pageNow=1) +2. 计算分页信息(每页1000条) +3. 循环分页获取所有数据 +4. 每累积1000条批量插入一次 +5. 设置 projectId 到每条流水记录 + +**关键代码**: +```java +// 步骤1: 先调用一次接口获取 totalCount +GetBankStatementRequest firstRequest = new GetBankStatementRequest(); +firstRequest.setGroupId(groupId); +firstRequest.setLogId(logId); +firstRequest.setPageNow(1); +firstRequest.setPageSize(1); + +GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest); +Integer totalCount = firstResponse.getData().getTotalCount(); + +// 步骤2: 计算分页信息 +int pageSize = 1000; +int batchSize = 1000; +int totalPages = (int) Math.ceil((double) totalCount / pageSize); + +List batchList = new ArrayList<>(batchSize); + +// 步骤3: 循环分页获取 +for (int pageNow = 1; pageNow <= totalPages; pageNow++) { + GetBankStatementRequest request = new GetBankStatementRequest(); + request.setGroupId(groupId); + request.setLogId(logId); + request.setPageNow(pageNow); + request.setPageSize(pageSize); + + GetBankStatementResponse response = lsfxClient.getBankStatement(request); + + for (GetBankStatementResponse.BankStatementItem item : items) { + CcdiBankStatement statement = CcdiBankStatement.fromResponse(item); + statement.setProjectId(projectId); // 设置业务项目ID + batchList.add(statement); + + // 达到批量插入阈值(1000条) + if (batchList.size() >= batchSize) { + bankStatementMapper.insertBatch(batchList); + batchList.clear(); + } + } +} + +// 步骤4: 保存剩余的数据 +if (!batchList.isEmpty()) { + bankStatementMapper.insertBatch(batchList); +} +``` + +**性能优化**: +- 每页1000条,减少请求次数 +- 批量插入1000条,提高数据库性能 +- 异常不中断,继续处理下一页 + +### 3.6 批次日志管理(FileUploadLogAppender) + +**核心功能**: +1. 继承 `UnsynchronizedAppenderBase` +2. 使用 `ThreadLocal` 存储当前批次的 FileAppender +3. 为每个批次创建独立的日志文件 + +**日志文件路径**: +``` +{ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log +``` + +**示例**: +- Windows: `D:/ruoyi/uploadPath/logs/file-upload/123/20260305-103025.log` +- Linux: `/var/ruoyi/logs/file-upload/123/20260305-103025.log` + +**关键方法**: +```java +/** + * 为指定批次创建独立的日志文件 + */ +public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) { + String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); + String logDirPath = uploadPath + File.separator + "logs" + File.separator + + "file-upload" + File.separator + projectId; + + File logDir = new File(logDirPath); + if (!logDir.exists()) { + logDir.mkdirs(); + } + + String logFilePath = logDirPath + File.separator + timestamp + ".log"; + + FileAppender appender = new FileAppender<>(); + appender.setFile(logFilePath); + + PatternLayout layout = new PatternLayout(); + layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"); + layout.start(); + + appender.setLayout(layout); + appender.start(); + + currentAppender.set(appender); +} + +/** + * 关闭当前批次的日志文件 + */ +public static void closeBatchLogFile() { + FileAppender appender = currentAppender.get(); + if (appender != null) { + appender.stop(); + currentAppender.remove(); + } +} +``` + +**使用方式**: +```java +private void submitTasksAsync(...) { + // 创建批次日志文件 + FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId); + + try { + // 任务提交逻辑 + } finally { + // 关闭日志文件 + FileUploadLogAppender.closeBatchLogFile(); + } +} +``` + +## 4. 实现细节 + +### 4.1 文件上传完整流程 + +```java +@Async("fileUploadExecutor") +public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath, + Long recordId, String batchId, CcdiFileUploadRecord record) { + try { + // 步骤1: 状态已是uploading,记录已存在 + Path filePath = Paths.get(tempFilePath); + if (!Files.exists(filePath)) { + throw new RuntimeException("临时文件不存在: " + tempFilePath); + } + + // 步骤2: 上传文件到流水分析平台 + File file = filePath.toFile(); + UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); + Integer logId = uploadResponse.getData().getLogId(); + + // 步骤3: 更新状态为 parsing + record.setLogId(logId); + record.setFileStatus("parsing"); + recordMapper.updateById(record); + + // 步骤4: 轮询解析状态(最多300次,间隔2秒) + boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); + if (!parsingComplete) { + throw new RuntimeException("解析超时(超过10分钟)"); + } + + // 步骤5: 获取文件上传状态 + GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest(); + statusRequest.setGroupId(lsfxProjectId); + statusRequest.setLogId(logId); + GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest); + + GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0); + Integer status = logItem.getStatus(); + String uploadStatusDesc = logItem.getUploadStatusDesc(); + + // 步骤6: 判断解析结果 + boolean parseSuccess = status != null && status == -5 + && "data.wait.confirm.newaccount".equals(uploadStatusDesc); + + if (parseSuccess) { + // 解析成功 + List enterpriseNames = logItem.getEnterpriseNameList(); + List accountNos = logItem.getAccountNoList(); + + record.setFileStatus("parsed_success"); + record.setEnterpriseNames(enterpriseNames != null ? String.join(",", enterpriseNames) : ""); + record.setAccountNos(accountNos != null ? String.join(",", accountNos) : ""); + recordMapper.updateById(record); + + // 步骤7: 获取流水数据并保存 + fetchAndSaveBankStatements(projectId, lsfxProjectId, logId); + } else { + // 解析失败 + record.setFileStatus("parsed_failed"); + record.setErrorMessage("解析失败: " + uploadStatusDesc); + recordMapper.updateById(record); + } + + } catch (Exception e) { + log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e); + updateRecordStatus(recordId, "parsed_failed", e.getMessage()); + } finally { + // 清理临时文件 + try { + Path filePath = Paths.get(tempFilePath); + if (Files.exists(filePath)) { + Files.delete(filePath); + } + } catch (IOException e) { + log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e); + } + } +} +``` + +### 4.2 错误处理规范 + +**异常分类**: +1. **文件异常**: 临时文件不存在、文件转换失败 +2. **网络异常**: 流水分析平台接口调用失败 +3. **业务异常**: 解析失败、解析超时 +4. **数据库异常**: 批量插入失败 + +**处理策略**: +- 所有异常统一捕获,记录详细日志 +- 直接标记为 `parsed_failed` +- 记录错误信息到 `error_message` 字段 +- finally 块确保临时文件被清理 + +### 4.3 日志记录规范 + +**日志级别**: +- `INFO`: 关键步骤(开始上传、上传成功、解析完成、保存成功) +- `DEBUG`: 详细信息(轮询次数、每页数据量) +- `WARN`: 警告信息(响应数据为空、清理失败) +- `ERROR`: 错误信息(处理失败、异常) + +**日志格式**: +``` +【文件上传】{步骤描述}: {关键参数}={值} +``` + +**示例**: +``` +【文件上传】开始处理文件: fileName=流水1.xlsx, recordId=123 +【文件上传】文件上传成功: logId=456789 +【文件上传】解析完成: logId=456789, 轮询次数=15 +【文件上传】流水数据保存完成: 总共保存5000条 +``` + +## 5. 文件清单 + +### 5.1 需要修改的文件 + +| 文件路径 | 修改内容 | +|---------|---------| +| `CcdiFileUploadServiceImpl.java` | 实现 processFileAsync、waitForParsingComplete、fetchAndSaveBankStatements 中的 TODO | + +### 5.2 需要新增的文件 + +| 文件路径 | 说明 | +|---------|------| +| `ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java` | 批次日志管理器 | + +## 6. 测试策略 + +### 6.1 单元测试 + +**测试用例**: +1. `waitForParsingComplete` - 正常轮询成功 +2. `waitForParsingComplete` - 轮询超时 +3. `waitForParsingComplete` - 轮询被中断 +4. `fetchAndSaveBankStatements` - 无数据 +5. `fetchAndSaveBankStatements` - 单页数据 +6. `fetchAndSaveBankStatements` - 多页数据 +7. `fetchAndSaveBankStatements` - 异常处理 + +### 6.2 集成测试 + +**测试场景**: +1. 完整流程测试(单个文件,正常场景) +2. 大文件测试(50MB) +3. 批量文件测试(10个文件) +4. 解析失败场景 +5. 网络异常场景 +6. 线程池满载场景 + +### 6.3 性能测试 + +**测试指标**: +- 单个文件处理时长: 3-15分钟 +- 100个文件并发处理 +- 数据库批量插入性能 +- 内存占用情况 + +## 7. 部署注意事项 + +### 7.1 配置检查 + +- [ ] `ruoyi.profile` 配置正确且目录有写权限 +- [ ] 线程池容量配置(默认100) +- [ ] 流水分析平台地址配置正确 +- [ ] 应用认证信息配置正确 + +### 7.2 监控指标 + +- 线程池活跃线程数 +- 文件上传成功率 +- 平均处理时长 +- 批量插入性能 +- 日志文件大小和数量 + +### 7.3 运维建议 + +- 定期清理30天前的日志文件 +- 监控线程池状态 +- 关注数据库连接池使用情况 +- 流水分析平台接口调用成功率监控 + +## 8. 风险与缓解 + +### 8.1 风险识别 + +| 风险 | 影响 | 概率 | 缓解措施 | +|------|------|------|----------| +| 流水分析平台不稳定 | 高 | 中 | 异常捕获,标记失败,详细日志 | +| 大文件内存溢出 | 高 | 低 | 批量插入,及时清理临时文件 | +| 线程池满载 | 中 | 中 | 重试机制,提示系统繁忙 | +| 日志文件过大 | 低 | 中 | 按批次分离,定期清理 | + +### 8.2 回滚方案 + +如遇严重问题,可以: +1. 禁用异步上传功能 +2. 回退到同步上传方式 +3. 暂停新的上传任务 + +## 9. 参考资料 + +- [项目异步文件上传功能设计文档](../../design/2026-03-05-async-file-upload-design.md) +- [项目异步文件上传需求](../../assets/项目异步文件上传/task.md) +- [流水分析平台接口文档](../2026-03-02-lsfx-integration-design.md) +- [银行流水实体设计](../2026-03-04-bank-statement-entity-design.md) + +--- + +**文档结束**