Files
ccdi/doc/design/2026-03-05-async-file-upload-design.md

18 KiB
Raw Permalink Blame History

项目异步文件上传功能 - 设计文档

文档信息

  • 创建日期: 2026-03-05
  • 版本: v1.0
  • 作者: Claude
  • 状态: 已批准

1. 概述

1.1 功能描述

实现项目流水文件的异步批量上传功能,支持文件上传到流水分析平台、轮询解析状态、获取解析结果、保存流水数据到本地数据库的完整流程。

1.2 核心需求

  • 批量上传流水文件最多100个文件
  • 异步处理每个文件的上传→解析→存储流程
  • 线程池容量100超载时等待30秒重试
  • 实时跟踪文件处理状态
  • 生成独立的批次日志文件便于维护

1.3 技术栈

  • Spring @Async 异步处理
  • ThreadPoolTaskExecutor 线程池
  • MyBatis Plus 批量操作
  • Logback 自定义日志
  • Vue + Element UI 前端

2. 数据库设计

2.1 文件上传记录表

CREATE TABLE `ccdi_file_upload_record` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
  `project_id` bigint(20) NOT NULL COMMENT '项目ID',
  `lsfx_project_id` int(11) DEFAULT NULL COMMENT '流水分析平台项目ID',
  `log_id` int(11) DEFAULT NULL COMMENT '流水分析平台返回的logId',
  `file_name` varchar(255) NOT NULL COMMENT '文件名称',
  `file_size` bigint(20) DEFAULT NULL COMMENT '文件大小(字节)',
  `file_status` varchar(20) NOT NULL COMMENT '文件状态uploading-上传中parsing-解析中parsed_success-解析成功parsed_failed-解析失败',
  `enterprise_names` text COMMENT '主体名称(多个用逗号分隔)',
  `account_nos` text COMMENT '主体账号(多个用逗号分隔)',
  `error_message` text COMMENT '错误信息(解析失败时记录)',
  `upload_time` datetime NOT NULL COMMENT '上传时间',
  `upload_user` varchar(64) NOT NULL COMMENT '上传人',
  PRIMARY KEY (`id`),
  KEY `idx_project_id` (`project_id`),
  KEY `idx_log_id` (`log_id`),
  KEY `idx_file_status` (`file_status`),
  KEY `idx_upload_time` (`upload_time`),
  KEY `idx_project_status` (`project_id`, `file_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='项目文件上传记录表';

2.2 字段说明

字段 类型 说明 备注
id bigint 主键ID 自增
project_id bigint 项目ID 外键关联 ccdi_project
lsfx_project_id int 流水分析平台项目ID 用于调用流水分析接口
log_id int 流水分析平台返回的logId 关键字段,用于查询解析状态和流水数据
file_name varchar(255) 文件名称 原始文件名
file_size bigint 文件大小 字节数
file_status varchar(20) 文件状态 uploading/parsing/parsed_success/parsed_failed
enterprise_names text 主体名称 解析成功后存储,多个用逗号分隔
account_nos text 主体账号 解析成功后存储,多个用逗号分隔
error_message text 错误信息 解析失败时记录原因
upload_time datetime 上传时间 记录创建时间
upload_user varchar(64) 上传人 操作用户

3. 后端架构设计

3.1 模块结构

ccdi-project/src/main/java/com/ruoyi/ccdi/project/
├── controller/
│   └── CcdiFileUploadController.java          # 文件上传接口
├── service/
│   ├── ICcdiFileUploadService.java            # 文件上传服务接口
│   └── impl/
│       └── CcdiFileUploadServiceImpl.java     # 文件上传服务实现
├── mapper/
│   └── CcdiFileUploadRecordMapper.java        # 文件上传记录Mapper
├── domain/
│   ├── entity/
│   │   └── CcdiFileUploadRecord.java          # 文件上传记录实体
│   ├── dto/
│   │   └── CcdiFileUploadQueryDTO.java        # 查询DTO
│   └── vo/
│       ├── CcdiFileUploadVO.java              # 文件上传响应VO
│       └── CcdiFileUploadStatisticsVO.java    # 统计VO
├── config/
│   └── AsyncThreadPoolConfig.java             # 异步线程池配置
└── log/
    └── FileUploadLogAppender.java             # 自定义日志Appender

ccdi-project/src/main/resources/
└── mapper/ccdi/project/
    └── CcdiFileUploadRecordMapper.xml         # Mapper XML映射文件

3.2 Controller 接口设计

接口路径 方法 功能 参数 返回值
/ccdi/file-upload/batch POST 批量上传文件 projectId, files[] batchId
/ccdi/file-upload/list GET 查询上传记录列表 projectId, fileStatus, pageNum, pageSize 分页列表
/ccdi/file-upload/statistics/{projectId} GET 查询上传统计 projectId 各状态数量
/ccdi/file-upload/detail/{id} GET 查询记录详情 id 完整信息
/ccdi/file-upload/thread-pool/status GET 查询线程池状态 - 线程池状态信息

3.3 Service 核心方法

ICcdiFileUploadService 接口

public interface ICcdiFileUploadService {
    /**
     * 批量上传文件
     * @param projectId 项目ID
     * @param files 文件数组
     * @param username 上传人
     * @return 批次ID
     */
    String batchUploadFiles(Long projectId, MultipartFile[] files, String username);

    /**
     * 异步处理单个文件
     * @Async("fileUploadExecutor")
     */
    void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
                          Long recordId, String batchId, CcdiFileUploadRecord record);

    /**
     * 查询上传记录列表
     */
    Page<CcdiFileUploadRecord> selectPage(Page<CcdiFileUploadRecord> page,
                                          CcdiFileUploadQueryDTO queryDTO);

    /**
     * 统计各状态文件数量
     */
    Map<String, Long> countByStatus(Long projectId);
}

核心处理流程

// 1. batchUploadFiles - 主入口
String batchUploadFiles(Long projectId, MultipartFile[] files, String username) {
    // 1.1 生成批次ID
    String batchId = UUID.randomUUID().toString().replace("-", "");

    // 1.2 获取项目的 lsfxProjectId
    Integer lsfxProjectId = project.getLsfxProjectId();

    // 1.3 批量插入文件记录status=uploading
    List<CcdiFileUploadRecord> records = createRecords(projectId, lsfxProjectId, files, username);
    recordMapper.insertBatch(records);

    // 1.4 异步启动调度线程提交任务
    CompletableFuture.runAsync(() -> {
        submitTasksAsync(projectId, lsfxProjectId, files, records, batchId);
    });

    // 1.5 立即返回 batchId
    return batchId;
}

// 2. submitTasksAsync - 调度线程
void submitTasksAsync(Long projectId, Integer lsfxProjectId, MultipartFile[] files,
                      List<CcdiFileUploadRecord> records, String batchId) {
    // 2.1 创建批次日志文件
    FileUploadLogAppender.createBatchLogFile(projectId, batchId);

    // 2.2 循环提交任务,支持重试
    for (int i = 0; i < files.length; i++) {
        boolean submitted = false;
        int retryCount = 0;

        while (!submitted && retryCount < 2) {
            try {
                // 提交异步任务到线程池
                CompletableFuture.runAsync(
                    () -> processFileAsync(projectId, lsfxProjectId, files[i],
                                           records.get(i).getId(), batchId, records.get(i)),
                    fileUploadExecutor
                );
                submitted = true;
            } catch (RejectedExecutionException e) {
                retryCount++;
                if (retryCount == 1) {
                    Thread.sleep(30000); // 等待30秒
                } else {
                    // 重试失败,更新记录状态
                    updateRecordStatus(records.get(i).getId(), "parsed_failed", "系统繁忙");
                }
            }
        }
    }
}

// 3. processFileAsync - 文件处理线程
@Async("fileUploadExecutor")
void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
                      Long recordId, String batchId, CcdiFileUploadRecord record) {
    try {
        // 3.1 上传文件到流水分析平台
        UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
        Integer logId = uploadResponse.getData().getLogId();

        // 3.2 更新状态为 parsing
        record.setLogId(logId);
        record.setFileStatus("parsing");
        recordMapper.updateById(record);

        // 3.3 轮询解析状态最多300次间隔2秒
        boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());

        // 3.4 获取文件上传状态
        GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...);

        // 3.5 判断解析结果
        if (status == -5 && desc == "data.wait.confirm.newaccount") {
            // 解析成功
            record.setFileStatus("parsed_success");
            record.setEnterpriseNames(...);
            record.setAccountNos(...);
            recordMapper.updateById(record);

            // 3.6 获取流水数据并批量保存
            fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount);
        } else {
            // 解析失败
            record.setFileStatus("parsed_failed");
            record.setErrorMessage(...);
            recordMapper.updateById(record);
        }
    } catch (Exception e) {
        updateRecordStatus(recordId, "parsed_failed", e.getMessage());
    }
}

4. 线程池配置

4.1 配置类

@Configuration
@EnableAsync
public class AsyncThreadPoolConfig {

    @Bean("fileUploadExecutor")
    public Executor fileUploadExecutor() {
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(100);              // 核心线程数
        executor.setMaxPoolSize(100);               // 最大线程数
        executor.setQueueCapacity(0);               // 队列容量0表示不使用队列
        executor.setThreadNamePrefix("file-upload-"); // 线程名称前缀
        executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
        executor.setKeepAliveSeconds(60);           // 空闲线程存活时间
        executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成再关闭
        executor.setAwaitTerminationSeconds(60);    // 最长等待时间
        executor.initialize();
        return executor;
    }
}

4.2 拒绝策略

  • 策略: AbortPolicy
  • 行为: 抛出 RejectedExecutionException
  • 处理: 调度线程捕获异常等待30秒后重试1次
  • 重试失败: 更新记录状态为 parsed_failed,错误信息"系统繁忙"

5. 日志管理

5.1 日志文件组织

  • 路径格式: logs/file-upload/{projectId}/{timestamp}.log
  • 示例: logs/file-upload/123/20260305-103025.log
  • 特点: 每个批次生成独立的日志文件

5.2 Logback 配置

<!-- logback-fileupload.xml -->
<appender name="FILE_UPLOAD" class="com.ruoyi.ccdi.project.log.FileUploadLogAppender">
    <layout class="ch.qos.logback.classic.PatternLayout">
        <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
    </layout>
    <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
        <fileNamePattern>logs/file-upload/%d{yyyy-MM-dd}/%d{HH}.log</fileNamePattern>
        <maxHistory>30</maxHistory>
        <maxFileSize>100MB</maxFileSize>
    </rollingPolicy>
</appender>

<logger name="com.ruoyi.ccdi.project.service.impl.CcdiFileUploadServiceImpl"
        level="INFO" additivity="false">
    <appender-ref ref="FILE_UPLOAD"/>
</logger>

5.3 自定义 Appender

public class FileUploadLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {

    private static final ThreadLocal<FileAppender<ILoggingEvent>> currentAppender =
        new ThreadLocal<>();

    /**
     * 为指定批次创建独立的日志文件
     */
    public static void createBatchLogFile(Long projectId, String batchId) {
        String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
        String logPath = String.format("logs/file-upload/%d/%s.log", projectId, timestamp);

        FileAppender<ILoggingEvent> appender = new FileAppender<>();
        appender.setFile(logPath);
        appender.setLayout(...);
        appender.start();

        currentAppender.set(appender);
    }

    @Override
    protected void append(ILoggingEvent event) {
        FileAppender<ILoggingEvent> appender = currentAppender.get();
        if (appender != null) {
            appender.doAppend(event);
        }
    }
}

6. 前端交互设计

6.1 上传流程

用户选择文件 → 确认上传 → 显示loading
    ↓
调用 batchUploadFiles() API
    ↓
后端立即返回 batchId
    ↓
前端提示"上传任务已提交"
    ↓
跳转到上传记录列表页
    ↓
每5秒自动刷新列表可关闭

6.2 列表页展示

统计卡片:

  • 上传中: 2
  • 解析中: 3
  • 解析成功: 15
  • 解析失败: 1

文件列表:

文件名 大小 状态 主体名称 上传时间 操作
流水1.xlsx 2.5MB 🔄 解析中 - 10:30:25 -
流水2.xlsx 1.8MB 解析成功 张三,李四 10:28:15 查看流水
流水3.xlsx 3.2MB 解析失败 - 10:25:30 查看错误

6.3 API 接口

// 批量上传文件
POST /ccdi/file-upload/batch
参数: FormData(projectId, files[])
返回: { code: 200, msg: "上传任务已提交", data: batchId }

// 查询上传记录列表
GET /ccdi/file-upload/list
参数: { projectId, fileStatus, pageNum, pageSize }
返回: { rows: [], total: 100 }

// 查询上传统计
GET /ccdi/file-upload/statistics/{projectId}
返回: { uploading: 2, parsing: 3, parsed_success: 15, parsed_failed: 1 }

7. 异常处理

7.1 Controller 层异常

异常类型 处理方式 返回信息
参数为空 参数校验 "项目ID不能为空"
文件数量超限 参数校验 "单次最多上传100个文件"
文件大小超限 参数校验 "文件超过50MB限制"
文件格式错误 参数校验 "仅支持Excel文件"
项目不存在 业务校验 "项目不存在"

7.2 Service 层异常

异常类型 处理方式 记录状态
流水分析平台接口异常 捕获并记录 parsed_failed
轮询超时(>300次 捕获并记录 parsed_failed
文件解析失败 捕获并记录 parsed_failed
线程池满且重试失败 捕获并记录 parsed_failed
其他未知异常 捕获并记录 parsed_failed

7.3 异常处理代码示例

try {
    // 处理文件
    processFileInternal(projectId, lsfxProjectId, file, record);
} catch (LsfxApiException e) {
    log.error("流水分析平台接口异常", e);
    updateRecordStatus(recordId, "parsed_failed", "流水分析平台接口异常:" + e.getMessage());
} catch (InterruptedException e) {
    Thread.currentThread().interrupt();
    log.error("处理被中断", e);
    updateRecordStatus(recordId, "parsed_failed", "处理被中断");
} catch (Exception e) {
    log.error("处理失败(未知异常)", e);
    updateRecordStatus(recordId, "parsed_failed", "处理失败:" + e.getMessage());
}

8. 性能优化

8.1 数据库优化

索引建议:

-- 组合索引提升查询性能
ALTER TABLE ccdi_file_upload_record
ADD INDEX idx_project_status (project_id, file_status);

ALTER TABLE ccdi_bank_statement
ADD INDEX idx_project_log (project_id, batch_id);

批量插入:

  • 使用 MyBatis Plus 的 saveBatch(statements, 500)
  • 每批500条避免单次插入过多数据

8.2 轮询优化

动态间隔策略:

  • 前10次1秒间隔
  • 11-50次2秒间隔
  • 51次后5秒间隔

8.3 线程池监控

@GetMapping("/thread-pool/status")
public AjaxResult getThreadPoolStatus() {
    ThreadPoolExecutor pool = fileUploadExecutor.getThreadPoolExecutor();

    Map<String, Object> status = new HashMap<>();
    status.put("activeCount", pool.getActiveCount());
    status.put("corePoolSize", pool.getCorePoolSize());
    status.put("queueSize", pool.getQueue().size());
    status.put("completedTaskCount", pool.getCompletedTaskCount());

    return AjaxResult.success(status);
}

9. 测试场景

9.1 功能测试

场景 输入 预期结果
正常上传 10个Excel文件每个5MB 所有文件处理成功
大文件上传 1个50MB文件 处理成功
文件数量超限 101个文件 返回错误提示
文件格式错误 上传PDF文件 返回错误提示
解析失败 格式错误的Excel 状态更新为parsed_failed

9.2 压力测试

场景 并发数 预期结果
正常并发 100个线程同时上传 所有任务正常处理
超载测试 150个文件同时上传 超过100的文件等待30秒重试
持续运行 1000次循环上传 无内存泄漏,无线程死锁

9.3 边界测试

场景 操作 预期结果
项目被删除 上传中删除项目 任务取消,状态更新为失败
重复上传 同一文件上传2次 生成2条独立记录和logId
网络中断 轮询时网络断开 捕获异常,状态更新为失败

10. 部署注意事项

10.1 配置检查清单

  • 线程池容量配置默认100
  • 文件上传大小限制默认50MB
  • 日志文件路径权限
  • 数据库索引创建
  • 流水分析平台地址配置
  • 应用认证信息配置

10.2 监控指标

  • 线程池活跃线程数
  • 文件上传成功率parsed_success / total
  • 平均处理时长
  • 线程池拒绝次数
  • 日志文件大小和数量

10.3 运维建议

  • 定期清理30天前的日志文件
  • 监控线程池状态,必要时调整容量
  • 关注数据库连接池使用情况
  • 流水分析平台接口调用成功率监控

11. 附录

11.1 状态机转换

uploading (初始状态)
    ↓
parsing (上传成功,轮询中)
    ↓
parsed_success (解析成功) 或 parsed_failed (解析失败)

11.2 关键时序

  • 文件上传2-5秒取决于文件大小
  • 轮询解析最多10分钟300次 × 2秒
  • 获取流水数据1-3分钟取决于流水数量
  • 总处理时长约3-15分钟/文件

11.3 数据量估算

  • 单个Excel文件平均5000条流水
  • 100个文件约50万条流水
  • 数据库存储约200MB
  • 日志文件约5-10MB/批次

文档结束