Files
ccdi/docs/plans/2026-03-05-async-file-upload-implementation-design.md
wkc 388c70ce04 docs: 添加异步文件上传服务实现设计文档
- 完整设计 CcdiFileUploadServiceImpl 所有 TODO 实现方案
- 包含依赖注入、文件上传、轮询解析、批量保存等详细设计
- 确定设计决策:固定间隔轮询、大批量分页、严格失败策略
- 实现批次日志管理器 FileUploadLogAppender
- 包含完整的测试策略和部署注意事项
2026-03-05 12:39:58 +08:00

16 KiB
Raw Blame History

异步文件上传服务实现设计文档

文档信息

  • 创建日期: 2026-03-05
  • 版本: v1.0
  • 作者: Claude
  • 状态: 已批准

1. 概述

1.1 功能描述

实现 CcdiFileUploadServiceImpl 中所有 TODO 方法,完成项目流水文件的异步批量上传功能的端到端流程。

1.2 核心需求

  • 集成流水分析平台客户端LsfxAnalysisClient
  • 实现文件上传到流水分析平台
  • 实现轮询解析状态(固定间隔策略)
  • 获取并判断解析结果
  • 批量获取并保存流水数据到本地数据库
  • 实现批次日志管理

1.3 技术栈

  • Spring @Async 异步处理
  • ThreadPoolTaskExecutor 线程池
  • MyBatis Plus 批量操作
  • Logback 自定义日志
  • 流水分析平台 API

2. 设计决策

2.1 轮询策略

决策: 固定间隔策略

  • 轮询次数: 300次
  • 间隔时间: 2秒
  • 最长等待: 10分钟
  • 理由: 简单可靠,符合设计文档要求

2.2 分页获取策略

决策: 大批量分页

  • 每页数量: 1000条
  • 批量插入: 每批1000条
  • 先调用一次获取 totalCount
  • 理由: 性能与内存占用的平衡

2.3 错误处理策略

决策: 严格失败策略

  • 任何异常直接标记为 parsed_failed
  • 记录详细的错误信息到 error_message 字段
  • 不进行额外重试(线程池层面已有重试机制)
  • 理由: 简单明了,便于排查问题

2.4 日志管理策略

决策: 完整实现批次日志

  • 实现自定义 FileUploadLogAppender
  • 每个批次生成独立日志文件
  • 路径基于 ruoyi.profile 配置
  • 理由: 便于运维排查问题

3. 详细设计

3.1 依赖注入

@Slf4j
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {

    @Value("${ruoyi.profile}")
    private String uploadPath;

    @Resource
    private CcdiFileUploadRecordMapper recordMapper;

    @Resource
    private CcdiProjectMapper projectMapper;

    @Resource
    @Qualifier("fileUploadExecutor")
    private Executor fileUploadExecutor;

    @Resource
    private LsfxAnalysisClient lsfxClient;  // 新增

    @Resource
    private CcdiBankStatementMapper bankStatementMapper;  // 新增

3.2 文件上传逻辑processFileAsync 第329-333行

核心流程:

  1. 将临时文件路径转换为 File 对象
  2. 验证文件存在性
  3. 调用 lsfxClient.uploadFile(lsfxProjectId, file)
  4. 提取并验证返回的 logId

关键代码:

File file = filePath.toFile();
if (!file.exists()) {
    throw new RuntimeException("临时文件不存在: " + tempFilePath);
}

UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
if (uploadResponse == null || uploadResponse.getData() == null) {
    throw new RuntimeException("上传文件失败: 响应数据为空");
}

Integer logId = uploadResponse.getData().getLogId();
if (logId == null) {
    throw new RuntimeException("上传文件失败: 未返回logId");
}

3.3 轮询解析状态逻辑waitForParsingComplete

核心流程:

  1. 调用 checkParseStatus(groupId, logId)
  2. 检查 parsing 字段
  3. parsing=false 表示解析完成
  4. 固定间隔2秒最多300次

关键代码:

for (int i = 1; i <= maxRetries; i++) {
    CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId);

    if (response == null || response.getData() == null) {
        log.warn("【文件上传】轮询第{}次: 响应数据为空", i);
        Thread.sleep(intervalSeconds * 1000L);
        continue;
    }

    Boolean parsing = response.getData().getParsing();

    // parsing=false 表示解析完成
    if (Boolean.FALSE.equals(parsing)) {
        log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i);
        return true;
    }

    if (i < maxRetries) {
        Thread.sleep(intervalSeconds * 1000L);
    }
}

异常处理:

  • InterruptedException: 恢复中断状态,返回 false
  • 其他异常: 记录日志,继续轮询

3.4 获取解析结果逻辑processFileAsync 第355-383行

核心流程:

  1. 调用 getFileUploadStatus(groupId, logId)
  2. 判断 status == -5 && uploadStatusDesc == "data.wait.confirm.newaccount"
  3. 提取 enterpriseNameListaccountNoList
  4. 解析成功则调用 fetchAndSaveBankStatements()

关键代码:

GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);

GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);

GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();

// 判断解析结果
boolean parseSuccess = status != null && status == -5
    && "data.wait.confirm.newaccount".equals(uploadStatusDesc);

if (parseSuccess) {
    // 提取主体信息
    List<String> enterpriseNames = logItem.getEnterpriseNameList();
    List<String> accountNos = logItem.getAccountNoList();

    String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
    String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";

    record.setFileStatus("parsed_success");
    record.setEnterpriseNames(enterpriseNamesStr);
    record.setAccountNos(accountNosStr);
    recordMapper.updateById(record);

    // 获取流水数据
    fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
    record.setFileStatus("parsed_failed");
    record.setErrorMessage("解析失败: " + uploadStatusDesc);
    recordMapper.updateById(record);
}

3.5 批量保存流水数据逻辑fetchAndSaveBankStatements

核心流程:

  1. 先调用一次接口获取 totalCountpageSize=1, pageNow=1
  2. 计算分页信息每页1000条
  3. 循环分页获取所有数据
  4. 每累积1000条批量插入一次
  5. 设置 projectId 到每条流水记录

关键代码:

// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1);

GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
Integer totalCount = firstResponse.getData().getTotalCount();

// 步骤2: 计算分页信息
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);

List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);

// 步骤3: 循环分页获取
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
    GetBankStatementRequest request = new GetBankStatementRequest();
    request.setGroupId(groupId);
    request.setLogId(logId);
    request.setPageNow(pageNow);
    request.setPageSize(pageSize);

    GetBankStatementResponse response = lsfxClient.getBankStatement(request);

    for (GetBankStatementResponse.BankStatementItem item : items) {
        CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
        statement.setProjectId(projectId);  // 设置业务项目ID
        batchList.add(statement);

        // 达到批量插入阈值1000条
        if (batchList.size() >= batchSize) {
            bankStatementMapper.insertBatch(batchList);
            batchList.clear();
        }
    }
}

// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
    bankStatementMapper.insertBatch(batchList);
}

性能优化:

  • 每页1000条减少请求次数
  • 批量插入1000条提高数据库性能
  • 异常不中断,继续处理下一页

3.6 批次日志管理FileUploadLogAppender

核心功能:

  1. 继承 UnsynchronizedAppenderBase<ILoggingEvent>
  2. 使用 ThreadLocal 存储当前批次的 FileAppender
  3. 为每个批次创建独立的日志文件

日志文件路径:

{ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log

示例:

  • Windows: D:/ruoyi/uploadPath/logs/file-upload/123/20260305-103025.log
  • Linux: /var/ruoyi/logs/file-upload/123/20260305-103025.log

关键方法:

/**
 * 为指定批次创建独立的日志文件
 */
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
    String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
    String logDirPath = uploadPath + File.separator + "logs" + File.separator
                      + "file-upload" + File.separator + projectId;

    File logDir = new File(logDirPath);
    if (!logDir.exists()) {
        logDir.mkdirs();
    }

    String logFilePath = logDirPath + File.separator + timestamp + ".log";

    FileAppender<ILoggingEvent> appender = new FileAppender<>();
    appender.setFile(logFilePath);

    PatternLayout layout = new PatternLayout();
    layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
    layout.start();

    appender.setLayout(layout);
    appender.start();

    currentAppender.set(appender);
}

/**
 * 关闭当前批次的日志文件
 */
public static void closeBatchLogFile() {
    FileAppender<ILoggingEvent> appender = currentAppender.get();
    if (appender != null) {
        appender.stop();
        currentAppender.remove();
    }
}

使用方式:

private void submitTasksAsync(...) {
    // 创建批次日志文件
    FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);

    try {
        // 任务提交逻辑
    } finally {
        // 关闭日志文件
        FileUploadLogAppender.closeBatchLogFile();
    }
}

4. 实现细节

4.1 文件上传完整流程

@Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
                              Long recordId, String batchId, CcdiFileUploadRecord record) {
    try {
        // 步骤1: 状态已是uploading记录已存在
        Path filePath = Paths.get(tempFilePath);
        if (!Files.exists(filePath)) {
            throw new RuntimeException("临时文件不存在: " + tempFilePath);
        }

        // 步骤2: 上传文件到流水分析平台
        File file = filePath.toFile();
        UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
        Integer logId = uploadResponse.getData().getLogId();

        // 步骤3: 更新状态为 parsing
        record.setLogId(logId);
        record.setFileStatus("parsing");
        recordMapper.updateById(record);

        // 步骤4: 轮询解析状态最多300次间隔2秒
        boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
        if (!parsingComplete) {
            throw new RuntimeException("解析超时(超过10分钟)");
        }

        // 步骤5: 获取文件上传状态
        GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
        statusRequest.setGroupId(lsfxProjectId);
        statusRequest.setLogId(logId);
        GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);

        GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
        Integer status = logItem.getStatus();
        String uploadStatusDesc = logItem.getUploadStatusDesc();

        // 步骤6: 判断解析结果
        boolean parseSuccess = status != null && status == -5
            && "data.wait.confirm.newaccount".equals(uploadStatusDesc);

        if (parseSuccess) {
            // 解析成功
            List<String> enterpriseNames = logItem.getEnterpriseNameList();
            List<String> accountNos = logItem.getAccountNoList();

            record.setFileStatus("parsed_success");
            record.setEnterpriseNames(enterpriseNames != null ? String.join(",", enterpriseNames) : "");
            record.setAccountNos(accountNos != null ? String.join(",", accountNos) : "");
            recordMapper.updateById(record);

            // 步骤7: 获取流水数据并保存
            fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
        } else {
            // 解析失败
            record.setFileStatus("parsed_failed");
            record.setErrorMessage("解析失败: " + uploadStatusDesc);
            recordMapper.updateById(record);
        }

    } catch (Exception e) {
        log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
        updateRecordStatus(recordId, "parsed_failed", e.getMessage());
    } finally {
        // 清理临时文件
        try {
            Path filePath = Paths.get(tempFilePath);
            if (Files.exists(filePath)) {
                Files.delete(filePath);
            }
        } catch (IOException e) {
            log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e);
        }
    }
}

4.2 错误处理规范

异常分类:

  1. 文件异常: 临时文件不存在、文件转换失败
  2. 网络异常: 流水分析平台接口调用失败
  3. 业务异常: 解析失败、解析超时
  4. 数据库异常: 批量插入失败

处理策略:

  • 所有异常统一捕获,记录详细日志
  • 直接标记为 parsed_failed
  • 记录错误信息到 error_message 字段
  • finally 块确保临时文件被清理

4.3 日志记录规范

日志级别:

  • INFO: 关键步骤(开始上传、上传成功、解析完成、保存成功)
  • DEBUG: 详细信息(轮询次数、每页数据量)
  • WARN: 警告信息(响应数据为空、清理失败)
  • ERROR: 错误信息(处理失败、异常)

日志格式:

【文件上传】{步骤描述}: {关键参数}={值}

示例:

【文件上传】开始处理文件: fileName=流水1.xlsx, recordId=123
【文件上传】文件上传成功: logId=456789
【文件上传】解析完成: logId=456789, 轮询次数=15
【文件上传】流水数据保存完成: 总共保存5000条

5. 文件清单

5.1 需要修改的文件

文件路径 修改内容
CcdiFileUploadServiceImpl.java 实现 processFileAsync、waitForParsingComplete、fetchAndSaveBankStatements 中的 TODO

5.2 需要新增的文件

文件路径 说明
ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java 批次日志管理器

6. 测试策略

6.1 单元测试

测试用例:

  1. waitForParsingComplete - 正常轮询成功
  2. waitForParsingComplete - 轮询超时
  3. waitForParsingComplete - 轮询被中断
  4. fetchAndSaveBankStatements - 无数据
  5. fetchAndSaveBankStatements - 单页数据
  6. fetchAndSaveBankStatements - 多页数据
  7. fetchAndSaveBankStatements - 异常处理

6.2 集成测试

测试场景:

  1. 完整流程测试(单个文件,正常场景)
  2. 大文件测试50MB
  3. 批量文件测试10个文件
  4. 解析失败场景
  5. 网络异常场景
  6. 线程池满载场景

6.3 性能测试

测试指标:

  • 单个文件处理时长: 3-15分钟
  • 100个文件并发处理
  • 数据库批量插入性能
  • 内存占用情况

7. 部署注意事项

7.1 配置检查

  • ruoyi.profile 配置正确且目录有写权限
  • 线程池容量配置默认100
  • 流水分析平台地址配置正确
  • 应用认证信息配置正确

7.2 监控指标

  • 线程池活跃线程数
  • 文件上传成功率
  • 平均处理时长
  • 批量插入性能
  • 日志文件大小和数量

7.3 运维建议

  • 定期清理30天前的日志文件
  • 监控线程池状态
  • 关注数据库连接池使用情况
  • 流水分析平台接口调用成功率监控

8. 风险与缓解

8.1 风险识别

风险 影响 概率 缓解措施
流水分析平台不稳定 异常捕获,标记失败,详细日志
大文件内存溢出 批量插入,及时清理临时文件
线程池满载 重试机制,提示系统繁忙
日志文件过大 按批次分离,定期清理

8.2 回滚方案

如遇严重问题,可以:

  1. 禁用异步上传功能
  2. 回退到同步上传方式
  3. 暂停新的上传任务

9. 参考资料


文档结束