Files
ccdi/docs/plans/2026-03-05-async-file-upload-implementation-design.md

545 lines
16 KiB
Markdown
Raw Normal View History

# 异步文件上传服务实现设计文档
## 文档信息
- **创建日期**: 2026-03-05
- **版本**: v1.0
- **作者**: Claude
- **状态**: 已批准
## 1. 概述
### 1.1 功能描述
实现 `CcdiFileUploadServiceImpl` 中所有 TODO 方法,完成项目流水文件的异步批量上传功能的端到端流程。
### 1.2 核心需求
- 集成流水分析平台客户端LsfxAnalysisClient
- 实现文件上传到流水分析平台
- 实现轮询解析状态(固定间隔策略)
- 获取并判断解析结果
- 批量获取并保存流水数据到本地数据库
- 实现批次日志管理
### 1.3 技术栈
- Spring @Async 异步处理
- ThreadPoolTaskExecutor 线程池
- MyBatis Plus 批量操作
- Logback 自定义日志
- 流水分析平台 API
## 2. 设计决策
### 2.1 轮询策略
**决策**: 固定间隔策略
- 轮询次数: 300次
- 间隔时间: 2秒
- 最长等待: 10分钟
- **理由**: 简单可靠,符合设计文档要求
### 2.2 分页获取策略
**决策**: 大批量分页
- 每页数量: 1000条
- 批量插入: 每批1000条
- 先调用一次获取 totalCount
- **理由**: 性能与内存占用的平衡
### 2.3 错误处理策略
**决策**: 严格失败策略
- 任何异常直接标记为 `parsed_failed`
- 记录详细的错误信息到 `error_message` 字段
- 不进行额外重试(线程池层面已有重试机制)
- **理由**: 简单明了,便于排查问题
### 2.4 日志管理策略
**决策**: 完整实现批次日志
- 实现自定义 `FileUploadLogAppender`
- 每个批次生成独立日志文件
- 路径基于 `ruoyi.profile` 配置
- **理由**: 便于运维排查问题
## 3. 详细设计
### 3.1 依赖注入
```java
@Slf4j
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Value("${ruoyi.profile}")
private String uploadPath;
@Resource
private CcdiFileUploadRecordMapper recordMapper;
@Resource
private CcdiProjectMapper projectMapper;
@Resource
@Qualifier("fileUploadExecutor")
private Executor fileUploadExecutor;
@Resource
private LsfxAnalysisClient lsfxClient; // 新增
@Resource
private CcdiBankStatementMapper bankStatementMapper; // 新增
```
### 3.2 文件上传逻辑processFileAsync 第329-333行
**核心流程**:
1. 将临时文件路径转换为 File 对象
2. 验证文件存在性
3. 调用 `lsfxClient.uploadFile(lsfxProjectId, file)`
4. 提取并验证返回的 logId
**关键代码**:
```java
File file = filePath.toFile();
if (!file.exists()) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
if (uploadResponse == null || uploadResponse.getData() == null) {
throw new RuntimeException("上传文件失败: 响应数据为空");
}
Integer logId = uploadResponse.getData().getLogId();
if (logId == null) {
throw new RuntimeException("上传文件失败: 未返回logId");
}
```
### 3.3 轮询解析状态逻辑waitForParsingComplete
**核心流程**:
1. 调用 `checkParseStatus(groupId, logId)`
2. 检查 `parsing` 字段
3. `parsing=false` 表示解析完成
4. 固定间隔2秒最多300次
**关键代码**:
```java
for (int i = 1; i <= maxRetries; i++) {
CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId);
if (response == null || response.getData() == null) {
log.warn("【文件上传】轮询第{}次: 响应数据为空", i);
Thread.sleep(intervalSeconds * 1000L);
continue;
}
Boolean parsing = response.getData().getParsing();
// parsing=false 表示解析完成
if (Boolean.FALSE.equals(parsing)) {
log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i);
return true;
}
if (i < maxRetries) {
Thread.sleep(intervalSeconds * 1000L);
}
}
```
**异常处理**:
- `InterruptedException`: 恢复中断状态,返回 false
- 其他异常: 记录日志,继续轮询
### 3.4 获取解析结果逻辑processFileAsync 第355-383行
**核心流程**:
1. 调用 `getFileUploadStatus(groupId, logId)`
2. 判断 `status == -5 && uploadStatusDesc == "data.wait.confirm.newaccount"`
3. 提取 `enterpriseNameList``accountNoList`
4. 解析成功则调用 `fetchAndSaveBankStatements()`
**关键代码**:
```java
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
// 判断解析结果
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 提取主体信息
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
// 获取流水数据
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
}
```
### 3.5 批量保存流水数据逻辑fetchAndSaveBankStatements
**核心流程**:
1. 先调用一次接口获取 totalCountpageSize=1, pageNow=1
2. 计算分页信息每页1000条
3. 循环分页获取所有数据
4. 每累积1000条批量插入一次
5. 设置 projectId 到每条流水记录
**关键代码**:
```java
// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1);
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
Integer totalCount = firstResponse.getData().getTotalCount();
// 步骤2: 计算分页信息
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
// 步骤3: 循环分页获取
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
statement.setProjectId(projectId); // 设置业务项目ID
batchList.add(statement);
// 达到批量插入阈值1000条
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
batchList.clear();
}
}
}
// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
}
```
**性能优化**:
- 每页1000条减少请求次数
- 批量插入1000条提高数据库性能
- 异常不中断,继续处理下一页
### 3.6 批次日志管理FileUploadLogAppender
**核心功能**:
1. 继承 `UnsynchronizedAppenderBase<ILoggingEvent>`
2. 使用 `ThreadLocal` 存储当前批次的 FileAppender
3. 为每个批次创建独立的日志文件
**日志文件路径**:
```
{ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
```
**示例**:
- Windows: `D:/ruoyi/uploadPath/logs/file-upload/123/20260305-103025.log`
- Linux: `/var/ruoyi/logs/file-upload/123/20260305-103025.log`
**关键方法**:
```java
/**
* 为指定批次创建独立的日志文件
*/
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
String logDirPath = uploadPath + File.separator + "logs" + File.separator
+ "file-upload" + File.separator + projectId;
File logDir = new File(logDirPath);
if (!logDir.exists()) {
logDir.mkdirs();
}
String logFilePath = logDirPath + File.separator + timestamp + ".log";
FileAppender<ILoggingEvent> appender = new FileAppender<>();
appender.setFile(logFilePath);
PatternLayout layout = new PatternLayout();
layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
layout.start();
appender.setLayout(layout);
appender.start();
currentAppender.set(appender);
}
/**
* 关闭当前批次的日志文件
*/
public static void closeBatchLogFile() {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.stop();
currentAppender.remove();
}
}
```
**使用方式**:
```java
private void submitTasksAsync(...) {
// 创建批次日志文件
FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);
try {
// 任务提交逻辑
} finally {
// 关闭日志文件
FileUploadLogAppender.closeBatchLogFile();
}
}
```
## 4. 实现细节
### 4.1 文件上传完整流程
```java
@Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
Long recordId, String batchId, CcdiFileUploadRecord record) {
try {
// 步骤1: 状态已是uploading记录已存在
Path filePath = Paths.get(tempFilePath);
if (!Files.exists(filePath)) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
// 步骤2: 上传文件到流水分析平台
File file = filePath.toFile();
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
Integer logId = uploadResponse.getData().getLogId();
// 步骤3: 更新状态为 parsing
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
// 步骤4: 轮询解析状态最多300次间隔2秒
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟)");
}
// 步骤5: 获取文件上传状态
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
// 步骤6: 判断解析结果
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 解析成功
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNames != null ? String.join(",", enterpriseNames) : "");
record.setAccountNos(accountNos != null ? String.join(",", accountNos) : "");
recordMapper.updateById(record);
// 步骤7: 获取流水数据并保存
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
// 解析失败
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
}
} catch (Exception e) {
log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
} finally {
// 清理临时文件
try {
Path filePath = Paths.get(tempFilePath);
if (Files.exists(filePath)) {
Files.delete(filePath);
}
} catch (IOException e) {
log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e);
}
}
}
```
### 4.2 错误处理规范
**异常分类**:
1. **文件异常**: 临时文件不存在、文件转换失败
2. **网络异常**: 流水分析平台接口调用失败
3. **业务异常**: 解析失败、解析超时
4. **数据库异常**: 批量插入失败
**处理策略**:
- 所有异常统一捕获,记录详细日志
- 直接标记为 `parsed_failed`
- 记录错误信息到 `error_message` 字段
- finally 块确保临时文件被清理
### 4.3 日志记录规范
**日志级别**:
- `INFO`: 关键步骤(开始上传、上传成功、解析完成、保存成功)
- `DEBUG`: 详细信息(轮询次数、每页数据量)
- `WARN`: 警告信息(响应数据为空、清理失败)
- `ERROR`: 错误信息(处理失败、异常)
**日志格式**:
```
【文件上传】{步骤描述}: {关键参数}={值}
```
**示例**:
```
【文件上传】开始处理文件: fileName=流水1.xlsx, recordId=123
【文件上传】文件上传成功: logId=456789
【文件上传】解析完成: logId=456789, 轮询次数=15
【文件上传】流水数据保存完成: 总共保存5000条
```
## 5. 文件清单
### 5.1 需要修改的文件
| 文件路径 | 修改内容 |
|---------|---------|
| `CcdiFileUploadServiceImpl.java` | 实现 processFileAsync、waitForParsingComplete、fetchAndSaveBankStatements 中的 TODO |
### 5.2 需要新增的文件
| 文件路径 | 说明 |
|---------|------|
| `ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java` | 批次日志管理器 |
## 6. 测试策略
### 6.1 单元测试
**测试用例**:
1. `waitForParsingComplete` - 正常轮询成功
2. `waitForParsingComplete` - 轮询超时
3. `waitForParsingComplete` - 轮询被中断
4. `fetchAndSaveBankStatements` - 无数据
5. `fetchAndSaveBankStatements` - 单页数据
6. `fetchAndSaveBankStatements` - 多页数据
7. `fetchAndSaveBankStatements` - 异常处理
### 6.2 集成测试
**测试场景**:
1. 完整流程测试(单个文件,正常场景)
2. 大文件测试50MB
3. 批量文件测试10个文件
4. 解析失败场景
5. 网络异常场景
6. 线程池满载场景
### 6.3 性能测试
**测试指标**:
- 单个文件处理时长: 3-15分钟
- 100个文件并发处理
- 数据库批量插入性能
- 内存占用情况
## 7. 部署注意事项
### 7.1 配置检查
- [ ] `ruoyi.profile` 配置正确且目录有写权限
- [ ] 线程池容量配置默认100
- [ ] 流水分析平台地址配置正确
- [ ] 应用认证信息配置正确
### 7.2 监控指标
- 线程池活跃线程数
- 文件上传成功率
- 平均处理时长
- 批量插入性能
- 日志文件大小和数量
### 7.3 运维建议
- 定期清理30天前的日志文件
- 监控线程池状态
- 关注数据库连接池使用情况
- 流水分析平台接口调用成功率监控
## 8. 风险与缓解
### 8.1 风险识别
| 风险 | 影响 | 概率 | 缓解措施 |
|------|------|------|----------|
| 流水分析平台不稳定 | 高 | 中 | 异常捕获,标记失败,详细日志 |
| 大文件内存溢出 | 高 | 低 | 批量插入,及时清理临时文件 |
| 线程池满载 | 中 | 中 | 重试机制,提示系统繁忙 |
| 日志文件过大 | 低 | 中 | 按批次分离,定期清理 |
### 8.2 回滚方案
如遇严重问题,可以:
1. 禁用异步上传功能
2. 回退到同步上传方式
3. 暂停新的上传任务
## 9. 参考资料
- [项目异步文件上传功能设计文档](../../design/2026-03-05-async-file-upload-design.md)
- [项目异步文件上传需求](../../assets/项目异步文件上传/task.md)
- [流水分析平台接口文档](../2026-03-02-lsfx-integration-design.md)
- [银行流水实体设计](../2026-03-04-bank-statement-entity-design.md)
---
**文档结束**