docs: 添加异步文件上传服务实现设计文档
- 完整设计 CcdiFileUploadServiceImpl 所有 TODO 实现方案 - 包含依赖注入、文件上传、轮询解析、批量保存等详细设计 - 确定设计决策:固定间隔轮询、大批量分页、严格失败策略 - 实现批次日志管理器 FileUploadLogAppender - 包含完整的测试策略和部署注意事项
This commit is contained in:
544
docs/plans/2026-03-05-async-file-upload-implementation-design.md
Normal file
544
docs/plans/2026-03-05-async-file-upload-implementation-design.md
Normal file
@@ -0,0 +1,544 @@
|
||||
# 异步文件上传服务实现设计文档
|
||||
|
||||
## 文档信息
|
||||
- **创建日期**: 2026-03-05
|
||||
- **版本**: v1.0
|
||||
- **作者**: Claude
|
||||
- **状态**: 已批准
|
||||
|
||||
## 1. 概述
|
||||
|
||||
### 1.1 功能描述
|
||||
实现 `CcdiFileUploadServiceImpl` 中所有 TODO 方法,完成项目流水文件的异步批量上传功能的端到端流程。
|
||||
|
||||
### 1.2 核心需求
|
||||
- 集成流水分析平台客户端(LsfxAnalysisClient)
|
||||
- 实现文件上传到流水分析平台
|
||||
- 实现轮询解析状态(固定间隔策略)
|
||||
- 获取并判断解析结果
|
||||
- 批量获取并保存流水数据到本地数据库
|
||||
- 实现批次日志管理
|
||||
|
||||
### 1.3 技术栈
|
||||
- Spring @Async 异步处理
|
||||
- ThreadPoolTaskExecutor 线程池
|
||||
- MyBatis Plus 批量操作
|
||||
- Logback 自定义日志
|
||||
- 流水分析平台 API
|
||||
|
||||
## 2. 设计决策
|
||||
|
||||
### 2.1 轮询策略
|
||||
**决策**: 固定间隔策略
|
||||
- 轮询次数: 300次
|
||||
- 间隔时间: 2秒
|
||||
- 最长等待: 10分钟
|
||||
- **理由**: 简单可靠,符合设计文档要求
|
||||
|
||||
### 2.2 分页获取策略
|
||||
**决策**: 大批量分页
|
||||
- 每页数量: 1000条
|
||||
- 批量插入: 每批1000条
|
||||
- 先调用一次获取 totalCount
|
||||
- **理由**: 性能与内存占用的平衡
|
||||
|
||||
### 2.3 错误处理策略
|
||||
**决策**: 严格失败策略
|
||||
- 任何异常直接标记为 `parsed_failed`
|
||||
- 记录详细的错误信息到 `error_message` 字段
|
||||
- 不进行额外重试(线程池层面已有重试机制)
|
||||
- **理由**: 简单明了,便于排查问题
|
||||
|
||||
### 2.4 日志管理策略
|
||||
**决策**: 完整实现批次日志
|
||||
- 实现自定义 `FileUploadLogAppender`
|
||||
- 每个批次生成独立日志文件
|
||||
- 路径基于 `ruoyi.profile` 配置
|
||||
- **理由**: 便于运维排查问题
|
||||
|
||||
## 3. 详细设计
|
||||
|
||||
### 3.1 依赖注入
|
||||
|
||||
```java
|
||||
@Slf4j
|
||||
@Service
|
||||
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
|
||||
|
||||
@Value("${ruoyi.profile}")
|
||||
private String uploadPath;
|
||||
|
||||
@Resource
|
||||
private CcdiFileUploadRecordMapper recordMapper;
|
||||
|
||||
@Resource
|
||||
private CcdiProjectMapper projectMapper;
|
||||
|
||||
@Resource
|
||||
@Qualifier("fileUploadExecutor")
|
||||
private Executor fileUploadExecutor;
|
||||
|
||||
@Resource
|
||||
private LsfxAnalysisClient lsfxClient; // 新增
|
||||
|
||||
@Resource
|
||||
private CcdiBankStatementMapper bankStatementMapper; // 新增
|
||||
```
|
||||
|
||||
### 3.2 文件上传逻辑(processFileAsync 第329-333行)
|
||||
|
||||
**核心流程**:
|
||||
1. 将临时文件路径转换为 File 对象
|
||||
2. 验证文件存在性
|
||||
3. 调用 `lsfxClient.uploadFile(lsfxProjectId, file)`
|
||||
4. 提取并验证返回的 logId
|
||||
|
||||
**关键代码**:
|
||||
```java
|
||||
File file = filePath.toFile();
|
||||
if (!file.exists()) {
|
||||
throw new RuntimeException("临时文件不存在: " + tempFilePath);
|
||||
}
|
||||
|
||||
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
|
||||
if (uploadResponse == null || uploadResponse.getData() == null) {
|
||||
throw new RuntimeException("上传文件失败: 响应数据为空");
|
||||
}
|
||||
|
||||
Integer logId = uploadResponse.getData().getLogId();
|
||||
if (logId == null) {
|
||||
throw new RuntimeException("上传文件失败: 未返回logId");
|
||||
}
|
||||
```
|
||||
|
||||
### 3.3 轮询解析状态逻辑(waitForParsingComplete)
|
||||
|
||||
**核心流程**:
|
||||
1. 调用 `checkParseStatus(groupId, logId)`
|
||||
2. 检查 `parsing` 字段
|
||||
3. `parsing=false` 表示解析完成
|
||||
4. 固定间隔2秒,最多300次
|
||||
|
||||
**关键代码**:
|
||||
```java
|
||||
for (int i = 1; i <= maxRetries; i++) {
|
||||
CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId);
|
||||
|
||||
if (response == null || response.getData() == null) {
|
||||
log.warn("【文件上传】轮询第{}次: 响应数据为空", i);
|
||||
Thread.sleep(intervalSeconds * 1000L);
|
||||
continue;
|
||||
}
|
||||
|
||||
Boolean parsing = response.getData().getParsing();
|
||||
|
||||
// parsing=false 表示解析完成
|
||||
if (Boolean.FALSE.equals(parsing)) {
|
||||
log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i);
|
||||
return true;
|
||||
}
|
||||
|
||||
if (i < maxRetries) {
|
||||
Thread.sleep(intervalSeconds * 1000L);
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**异常处理**:
|
||||
- `InterruptedException`: 恢复中断状态,返回 false
|
||||
- 其他异常: 记录日志,继续轮询
|
||||
|
||||
### 3.4 获取解析结果逻辑(processFileAsync 第355-383行)
|
||||
|
||||
**核心流程**:
|
||||
1. 调用 `getFileUploadStatus(groupId, logId)`
|
||||
2. 判断 `status == -5 && uploadStatusDesc == "data.wait.confirm.newaccount"`
|
||||
3. 提取 `enterpriseNameList` 和 `accountNoList`
|
||||
4. 解析成功则调用 `fetchAndSaveBankStatements()`
|
||||
|
||||
**关键代码**:
|
||||
```java
|
||||
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
|
||||
statusRequest.setGroupId(lsfxProjectId);
|
||||
statusRequest.setLogId(logId);
|
||||
|
||||
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
|
||||
|
||||
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
|
||||
Integer status = logItem.getStatus();
|
||||
String uploadStatusDesc = logItem.getUploadStatusDesc();
|
||||
|
||||
// 判断解析结果
|
||||
boolean parseSuccess = status != null && status == -5
|
||||
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
|
||||
|
||||
if (parseSuccess) {
|
||||
// 提取主体信息
|
||||
List<String> enterpriseNames = logItem.getEnterpriseNameList();
|
||||
List<String> accountNos = logItem.getAccountNoList();
|
||||
|
||||
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
|
||||
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
|
||||
|
||||
record.setFileStatus("parsed_success");
|
||||
record.setEnterpriseNames(enterpriseNamesStr);
|
||||
record.setAccountNos(accountNosStr);
|
||||
recordMapper.updateById(record);
|
||||
|
||||
// 获取流水数据
|
||||
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
|
||||
} else {
|
||||
record.setFileStatus("parsed_failed");
|
||||
record.setErrorMessage("解析失败: " + uploadStatusDesc);
|
||||
recordMapper.updateById(record);
|
||||
}
|
||||
```
|
||||
|
||||
### 3.5 批量保存流水数据逻辑(fetchAndSaveBankStatements)
|
||||
|
||||
**核心流程**:
|
||||
1. 先调用一次接口获取 totalCount(pageSize=1, pageNow=1)
|
||||
2. 计算分页信息(每页1000条)
|
||||
3. 循环分页获取所有数据
|
||||
4. 每累积1000条批量插入一次
|
||||
5. 设置 projectId 到每条流水记录
|
||||
|
||||
**关键代码**:
|
||||
```java
|
||||
// 步骤1: 先调用一次接口获取 totalCount
|
||||
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
|
||||
firstRequest.setGroupId(groupId);
|
||||
firstRequest.setLogId(logId);
|
||||
firstRequest.setPageNow(1);
|
||||
firstRequest.setPageSize(1);
|
||||
|
||||
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
|
||||
Integer totalCount = firstResponse.getData().getTotalCount();
|
||||
|
||||
// 步骤2: 计算分页信息
|
||||
int pageSize = 1000;
|
||||
int batchSize = 1000;
|
||||
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
|
||||
|
||||
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
|
||||
|
||||
// 步骤3: 循环分页获取
|
||||
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
|
||||
GetBankStatementRequest request = new GetBankStatementRequest();
|
||||
request.setGroupId(groupId);
|
||||
request.setLogId(logId);
|
||||
request.setPageNow(pageNow);
|
||||
request.setPageSize(pageSize);
|
||||
|
||||
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
|
||||
|
||||
for (GetBankStatementResponse.BankStatementItem item : items) {
|
||||
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
|
||||
statement.setProjectId(projectId); // 设置业务项目ID
|
||||
batchList.add(statement);
|
||||
|
||||
// 达到批量插入阈值(1000条)
|
||||
if (batchList.size() >= batchSize) {
|
||||
bankStatementMapper.insertBatch(batchList);
|
||||
batchList.clear();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// 步骤4: 保存剩余的数据
|
||||
if (!batchList.isEmpty()) {
|
||||
bankStatementMapper.insertBatch(batchList);
|
||||
}
|
||||
```
|
||||
|
||||
**性能优化**:
|
||||
- 每页1000条,减少请求次数
|
||||
- 批量插入1000条,提高数据库性能
|
||||
- 异常不中断,继续处理下一页
|
||||
|
||||
### 3.6 批次日志管理(FileUploadLogAppender)
|
||||
|
||||
**核心功能**:
|
||||
1. 继承 `UnsynchronizedAppenderBase<ILoggingEvent>`
|
||||
2. 使用 `ThreadLocal` 存储当前批次的 FileAppender
|
||||
3. 为每个批次创建独立的日志文件
|
||||
|
||||
**日志文件路径**:
|
||||
```
|
||||
{ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
|
||||
```
|
||||
|
||||
**示例**:
|
||||
- Windows: `D:/ruoyi/uploadPath/logs/file-upload/123/20260305-103025.log`
|
||||
- Linux: `/var/ruoyi/logs/file-upload/123/20260305-103025.log`
|
||||
|
||||
**关键方法**:
|
||||
```java
|
||||
/**
|
||||
* 为指定批次创建独立的日志文件
|
||||
*/
|
||||
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
|
||||
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
|
||||
String logDirPath = uploadPath + File.separator + "logs" + File.separator
|
||||
+ "file-upload" + File.separator + projectId;
|
||||
|
||||
File logDir = new File(logDirPath);
|
||||
if (!logDir.exists()) {
|
||||
logDir.mkdirs();
|
||||
}
|
||||
|
||||
String logFilePath = logDirPath + File.separator + timestamp + ".log";
|
||||
|
||||
FileAppender<ILoggingEvent> appender = new FileAppender<>();
|
||||
appender.setFile(logFilePath);
|
||||
|
||||
PatternLayout layout = new PatternLayout();
|
||||
layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
|
||||
layout.start();
|
||||
|
||||
appender.setLayout(layout);
|
||||
appender.start();
|
||||
|
||||
currentAppender.set(appender);
|
||||
}
|
||||
|
||||
/**
|
||||
* 关闭当前批次的日志文件
|
||||
*/
|
||||
public static void closeBatchLogFile() {
|
||||
FileAppender<ILoggingEvent> appender = currentAppender.get();
|
||||
if (appender != null) {
|
||||
appender.stop();
|
||||
currentAppender.remove();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
**使用方式**:
|
||||
```java
|
||||
private void submitTasksAsync(...) {
|
||||
// 创建批次日志文件
|
||||
FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);
|
||||
|
||||
try {
|
||||
// 任务提交逻辑
|
||||
} finally {
|
||||
// 关闭日志文件
|
||||
FileUploadLogAppender.closeBatchLogFile();
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
## 4. 实现细节
|
||||
|
||||
### 4.1 文件上传完整流程
|
||||
|
||||
```java
|
||||
@Async("fileUploadExecutor")
|
||||
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
|
||||
Long recordId, String batchId, CcdiFileUploadRecord record) {
|
||||
try {
|
||||
// 步骤1: 状态已是uploading,记录已存在
|
||||
Path filePath = Paths.get(tempFilePath);
|
||||
if (!Files.exists(filePath)) {
|
||||
throw new RuntimeException("临时文件不存在: " + tempFilePath);
|
||||
}
|
||||
|
||||
// 步骤2: 上传文件到流水分析平台
|
||||
File file = filePath.toFile();
|
||||
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
|
||||
Integer logId = uploadResponse.getData().getLogId();
|
||||
|
||||
// 步骤3: 更新状态为 parsing
|
||||
record.setLogId(logId);
|
||||
record.setFileStatus("parsing");
|
||||
recordMapper.updateById(record);
|
||||
|
||||
// 步骤4: 轮询解析状态(最多300次,间隔2秒)
|
||||
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
|
||||
if (!parsingComplete) {
|
||||
throw new RuntimeException("解析超时(超过10分钟)");
|
||||
}
|
||||
|
||||
// 步骤5: 获取文件上传状态
|
||||
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
|
||||
statusRequest.setGroupId(lsfxProjectId);
|
||||
statusRequest.setLogId(logId);
|
||||
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
|
||||
|
||||
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
|
||||
Integer status = logItem.getStatus();
|
||||
String uploadStatusDesc = logItem.getUploadStatusDesc();
|
||||
|
||||
// 步骤6: 判断解析结果
|
||||
boolean parseSuccess = status != null && status == -5
|
||||
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
|
||||
|
||||
if (parseSuccess) {
|
||||
// 解析成功
|
||||
List<String> enterpriseNames = logItem.getEnterpriseNameList();
|
||||
List<String> accountNos = logItem.getAccountNoList();
|
||||
|
||||
record.setFileStatus("parsed_success");
|
||||
record.setEnterpriseNames(enterpriseNames != null ? String.join(",", enterpriseNames) : "");
|
||||
record.setAccountNos(accountNos != null ? String.join(",", accountNos) : "");
|
||||
recordMapper.updateById(record);
|
||||
|
||||
// 步骤7: 获取流水数据并保存
|
||||
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
|
||||
} else {
|
||||
// 解析失败
|
||||
record.setFileStatus("parsed_failed");
|
||||
record.setErrorMessage("解析失败: " + uploadStatusDesc);
|
||||
recordMapper.updateById(record);
|
||||
}
|
||||
|
||||
} catch (Exception e) {
|
||||
log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
|
||||
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
|
||||
} finally {
|
||||
// 清理临时文件
|
||||
try {
|
||||
Path filePath = Paths.get(tempFilePath);
|
||||
if (Files.exists(filePath)) {
|
||||
Files.delete(filePath);
|
||||
}
|
||||
} catch (IOException e) {
|
||||
log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### 4.2 错误处理规范
|
||||
|
||||
**异常分类**:
|
||||
1. **文件异常**: 临时文件不存在、文件转换失败
|
||||
2. **网络异常**: 流水分析平台接口调用失败
|
||||
3. **业务异常**: 解析失败、解析超时
|
||||
4. **数据库异常**: 批量插入失败
|
||||
|
||||
**处理策略**:
|
||||
- 所有异常统一捕获,记录详细日志
|
||||
- 直接标记为 `parsed_failed`
|
||||
- 记录错误信息到 `error_message` 字段
|
||||
- finally 块确保临时文件被清理
|
||||
|
||||
### 4.3 日志记录规范
|
||||
|
||||
**日志级别**:
|
||||
- `INFO`: 关键步骤(开始上传、上传成功、解析完成、保存成功)
|
||||
- `DEBUG`: 详细信息(轮询次数、每页数据量)
|
||||
- `WARN`: 警告信息(响应数据为空、清理失败)
|
||||
- `ERROR`: 错误信息(处理失败、异常)
|
||||
|
||||
**日志格式**:
|
||||
```
|
||||
【文件上传】{步骤描述}: {关键参数}={值}
|
||||
```
|
||||
|
||||
**示例**:
|
||||
```
|
||||
【文件上传】开始处理文件: fileName=流水1.xlsx, recordId=123
|
||||
【文件上传】文件上传成功: logId=456789
|
||||
【文件上传】解析完成: logId=456789, 轮询次数=15
|
||||
【文件上传】流水数据保存完成: 总共保存5000条
|
||||
```
|
||||
|
||||
## 5. 文件清单
|
||||
|
||||
### 5.1 需要修改的文件
|
||||
|
||||
| 文件路径 | 修改内容 |
|
||||
|---------|---------|
|
||||
| `CcdiFileUploadServiceImpl.java` | 实现 processFileAsync、waitForParsingComplete、fetchAndSaveBankStatements 中的 TODO |
|
||||
|
||||
### 5.2 需要新增的文件
|
||||
|
||||
| 文件路径 | 说明 |
|
||||
|---------|------|
|
||||
| `ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java` | 批次日志管理器 |
|
||||
|
||||
## 6. 测试策略
|
||||
|
||||
### 6.1 单元测试
|
||||
|
||||
**测试用例**:
|
||||
1. `waitForParsingComplete` - 正常轮询成功
|
||||
2. `waitForParsingComplete` - 轮询超时
|
||||
3. `waitForParsingComplete` - 轮询被中断
|
||||
4. `fetchAndSaveBankStatements` - 无数据
|
||||
5. `fetchAndSaveBankStatements` - 单页数据
|
||||
6. `fetchAndSaveBankStatements` - 多页数据
|
||||
7. `fetchAndSaveBankStatements` - 异常处理
|
||||
|
||||
### 6.2 集成测试
|
||||
|
||||
**测试场景**:
|
||||
1. 完整流程测试(单个文件,正常场景)
|
||||
2. 大文件测试(50MB)
|
||||
3. 批量文件测试(10个文件)
|
||||
4. 解析失败场景
|
||||
5. 网络异常场景
|
||||
6. 线程池满载场景
|
||||
|
||||
### 6.3 性能测试
|
||||
|
||||
**测试指标**:
|
||||
- 单个文件处理时长: 3-15分钟
|
||||
- 100个文件并发处理
|
||||
- 数据库批量插入性能
|
||||
- 内存占用情况
|
||||
|
||||
## 7. 部署注意事项
|
||||
|
||||
### 7.1 配置检查
|
||||
|
||||
- [ ] `ruoyi.profile` 配置正确且目录有写权限
|
||||
- [ ] 线程池容量配置(默认100)
|
||||
- [ ] 流水分析平台地址配置正确
|
||||
- [ ] 应用认证信息配置正确
|
||||
|
||||
### 7.2 监控指标
|
||||
|
||||
- 线程池活跃线程数
|
||||
- 文件上传成功率
|
||||
- 平均处理时长
|
||||
- 批量插入性能
|
||||
- 日志文件大小和数量
|
||||
|
||||
### 7.3 运维建议
|
||||
|
||||
- 定期清理30天前的日志文件
|
||||
- 监控线程池状态
|
||||
- 关注数据库连接池使用情况
|
||||
- 流水分析平台接口调用成功率监控
|
||||
|
||||
## 8. 风险与缓解
|
||||
|
||||
### 8.1 风险识别
|
||||
|
||||
| 风险 | 影响 | 概率 | 缓解措施 |
|
||||
|------|------|------|----------|
|
||||
| 流水分析平台不稳定 | 高 | 中 | 异常捕获,标记失败,详细日志 |
|
||||
| 大文件内存溢出 | 高 | 低 | 批量插入,及时清理临时文件 |
|
||||
| 线程池满载 | 中 | 中 | 重试机制,提示系统繁忙 |
|
||||
| 日志文件过大 | 低 | 中 | 按批次分离,定期清理 |
|
||||
|
||||
### 8.2 回滚方案
|
||||
|
||||
如遇严重问题,可以:
|
||||
1. 禁用异步上传功能
|
||||
2. 回退到同步上传方式
|
||||
3. 暂停新的上传任务
|
||||
|
||||
## 9. 参考资料
|
||||
|
||||
- [项目异步文件上传功能设计文档](../../design/2026-03-05-async-file-upload-design.md)
|
||||
- [项目异步文件上传需求](../../assets/项目异步文件上传/task.md)
|
||||
- [流水分析平台接口文档](../2026-03-02-lsfx-integration-design.md)
|
||||
- [银行流水实体设计](../2026-03-04-bank-statement-entity-design.md)
|
||||
|
||||
---
|
||||
|
||||
**文档结束**
|
||||
Reference in New Issue
Block a user