18 KiB
18 KiB
项目异步文件上传功能 - 设计文档
文档信息
- 创建日期: 2026-03-05
- 版本: v1.0
- 作者: Claude
- 状态: 已批准
1. 概述
1.1 功能描述
实现项目流水文件的异步批量上传功能,支持文件上传到流水分析平台、轮询解析状态、获取解析结果、保存流水数据到本地数据库的完整流程。
1.2 核心需求
- 批量上传流水文件(最多100个文件)
- 异步处理每个文件的上传→解析→存储流程
- 线程池容量100,超载时等待30秒重试
- 实时跟踪文件处理状态
- 生成独立的批次日志文件便于维护
1.3 技术栈
- Spring @Async 异步处理
- ThreadPoolTaskExecutor 线程池
- MyBatis Plus 批量操作
- Logback 自定义日志
- Vue + Element UI 前端
2. 数据库设计
2.1 文件上传记录表
CREATE TABLE `ccdi_file_upload_record` (
`id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID',
`project_id` bigint(20) NOT NULL COMMENT '项目ID',
`lsfx_project_id` int(11) DEFAULT NULL COMMENT '流水分析平台项目ID',
`log_id` int(11) DEFAULT NULL COMMENT '流水分析平台返回的logId',
`file_name` varchar(255) NOT NULL COMMENT '文件名称',
`file_size` bigint(20) DEFAULT NULL COMMENT '文件大小(字节)',
`file_status` varchar(20) NOT NULL COMMENT '文件状态:uploading-上传中,parsing-解析中,parsed_success-解析成功,parsed_failed-解析失败',
`enterprise_names` text COMMENT '主体名称(多个用逗号分隔)',
`account_nos` text COMMENT '主体账号(多个用逗号分隔)',
`error_message` text COMMENT '错误信息(解析失败时记录)',
`upload_time` datetime NOT NULL COMMENT '上传时间',
`upload_user` varchar(64) NOT NULL COMMENT '上传人',
PRIMARY KEY (`id`),
KEY `idx_project_id` (`project_id`),
KEY `idx_log_id` (`log_id`),
KEY `idx_file_status` (`file_status`),
KEY `idx_upload_time` (`upload_time`),
KEY `idx_project_status` (`project_id`, `file_status`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='项目文件上传记录表';
2.2 字段说明
| 字段 | 类型 | 说明 | 备注 |
|---|---|---|---|
| id | bigint | 主键ID | 自增 |
| project_id | bigint | 项目ID | 外键关联 ccdi_project |
| lsfx_project_id | int | 流水分析平台项目ID | 用于调用流水分析接口 |
| log_id | int | 流水分析平台返回的logId | 关键字段,用于查询解析状态和流水数据 |
| file_name | varchar(255) | 文件名称 | 原始文件名 |
| file_size | bigint | 文件大小 | 字节数 |
| file_status | varchar(20) | 文件状态 | uploading/parsing/parsed_success/parsed_failed |
| enterprise_names | text | 主体名称 | 解析成功后存储,多个用逗号分隔 |
| account_nos | text | 主体账号 | 解析成功后存储,多个用逗号分隔 |
| error_message | text | 错误信息 | 解析失败时记录原因 |
| upload_time | datetime | 上传时间 | 记录创建时间 |
| upload_user | varchar(64) | 上传人 | 操作用户 |
3. 后端架构设计
3.1 模块结构
ccdi-project/src/main/java/com/ruoyi/ccdi/project/
├── controller/
│ └── CcdiFileUploadController.java # 文件上传接口
├── service/
│ ├── ICcdiFileUploadService.java # 文件上传服务接口
│ └── impl/
│ └── CcdiFileUploadServiceImpl.java # 文件上传服务实现
├── mapper/
│ └── CcdiFileUploadRecordMapper.java # 文件上传记录Mapper
├── domain/
│ ├── entity/
│ │ └── CcdiFileUploadRecord.java # 文件上传记录实体
│ ├── dto/
│ │ └── CcdiFileUploadQueryDTO.java # 查询DTO
│ └── vo/
│ ├── CcdiFileUploadVO.java # 文件上传响应VO
│ └── CcdiFileUploadStatisticsVO.java # 统计VO
├── config/
│ └── AsyncThreadPoolConfig.java # 异步线程池配置
└── log/
└── FileUploadLogAppender.java # 自定义日志Appender
ccdi-project/src/main/resources/
└── mapper/ccdi/project/
└── CcdiFileUploadRecordMapper.xml # Mapper XML映射文件
3.2 Controller 接口设计
| 接口路径 | 方法 | 功能 | 参数 | 返回值 |
|---|---|---|---|---|
/ccdi/file-upload/batch |
POST | 批量上传文件 | projectId, files[] | batchId |
/ccdi/file-upload/list |
GET | 查询上传记录列表 | projectId, fileStatus, pageNum, pageSize | 分页列表 |
/ccdi/file-upload/statistics/{projectId} |
GET | 查询上传统计 | projectId | 各状态数量 |
/ccdi/file-upload/detail/{id} |
GET | 查询记录详情 | id | 完整信息 |
/ccdi/file-upload/thread-pool/status |
GET | 查询线程池状态 | - | 线程池状态信息 |
3.3 Service 核心方法
ICcdiFileUploadService 接口
public interface ICcdiFileUploadService {
/**
* 批量上传文件
* @param projectId 项目ID
* @param files 文件数组
* @param username 上传人
* @return 批次ID
*/
String batchUploadFiles(Long projectId, MultipartFile[] files, String username);
/**
* 异步处理单个文件
* @Async("fileUploadExecutor")
*/
void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
Long recordId, String batchId, CcdiFileUploadRecord record);
/**
* 查询上传记录列表
*/
Page<CcdiFileUploadRecord> selectPage(Page<CcdiFileUploadRecord> page,
CcdiFileUploadQueryDTO queryDTO);
/**
* 统计各状态文件数量
*/
Map<String, Long> countByStatus(Long projectId);
}
核心处理流程
// 1. batchUploadFiles - 主入口
String batchUploadFiles(Long projectId, MultipartFile[] files, String username) {
// 1.1 生成批次ID
String batchId = UUID.randomUUID().toString().replace("-", "");
// 1.2 获取项目的 lsfxProjectId
Integer lsfxProjectId = project.getLsfxProjectId();
// 1.3 批量插入文件记录(status=uploading)
List<CcdiFileUploadRecord> records = createRecords(projectId, lsfxProjectId, files, username);
recordMapper.insertBatch(records);
// 1.4 异步启动调度线程提交任务
CompletableFuture.runAsync(() -> {
submitTasksAsync(projectId, lsfxProjectId, files, records, batchId);
});
// 1.5 立即返回 batchId
return batchId;
}
// 2. submitTasksAsync - 调度线程
void submitTasksAsync(Long projectId, Integer lsfxProjectId, MultipartFile[] files,
List<CcdiFileUploadRecord> records, String batchId) {
// 2.1 创建批次日志文件
FileUploadLogAppender.createBatchLogFile(projectId, batchId);
// 2.2 循环提交任务,支持重试
for (int i = 0; i < files.length; i++) {
boolean submitted = false;
int retryCount = 0;
while (!submitted && retryCount < 2) {
try {
// 提交异步任务到线程池
CompletableFuture.runAsync(
() -> processFileAsync(projectId, lsfxProjectId, files[i],
records.get(i).getId(), batchId, records.get(i)),
fileUploadExecutor
);
submitted = true;
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
Thread.sleep(30000); // 等待30秒
} else {
// 重试失败,更新记录状态
updateRecordStatus(records.get(i).getId(), "parsed_failed", "系统繁忙");
}
}
}
}
}
// 3. processFileAsync - 文件处理线程
@Async("fileUploadExecutor")
void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
Long recordId, String batchId, CcdiFileUploadRecord record) {
try {
// 3.1 上传文件到流水分析平台
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
Integer logId = uploadResponse.getData().getLogId();
// 3.2 更新状态为 parsing
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
// 3.3 轮询解析状态(最多300次,间隔2秒)
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
// 3.4 获取文件上传状态
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...);
// 3.5 判断解析结果
if (status == -5 && desc == "data.wait.confirm.newaccount") {
// 解析成功
record.setFileStatus("parsed_success");
record.setEnterpriseNames(...);
record.setAccountNos(...);
recordMapper.updateById(record);
// 3.6 获取流水数据并批量保存
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount);
} else {
// 解析失败
record.setFileStatus("parsed_failed");
record.setErrorMessage(...);
recordMapper.updateById(record);
}
} catch (Exception e) {
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
}
}
4. 线程池配置
4.1 配置类
@Configuration
@EnableAsync
public class AsyncThreadPoolConfig {
@Bean("fileUploadExecutor")
public Executor fileUploadExecutor() {
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
executor.setCorePoolSize(100); // 核心线程数
executor.setMaxPoolSize(100); // 最大线程数
executor.setQueueCapacity(0); // 队列容量(0表示不使用队列)
executor.setThreadNamePrefix("file-upload-"); // 线程名称前缀
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略
executor.setKeepAliveSeconds(60); // 空闲线程存活时间
executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成再关闭
executor.setAwaitTerminationSeconds(60); // 最长等待时间
executor.initialize();
return executor;
}
}
4.2 拒绝策略
- 策略: AbortPolicy
- 行为: 抛出 RejectedExecutionException
- 处理: 调度线程捕获异常,等待30秒后重试1次
- 重试失败: 更新记录状态为
parsed_failed,错误信息"系统繁忙"
5. 日志管理
5.1 日志文件组织
- 路径格式:
logs/file-upload/{projectId}/{timestamp}.log - 示例:
logs/file-upload/123/20260305-103025.log - 特点: 每个批次生成独立的日志文件
5.2 Logback 配置
<!-- logback-fileupload.xml -->
<appender name="FILE_UPLOAD" class="com.ruoyi.ccdi.project.log.FileUploadLogAppender">
<layout class="ch.qos.logback.classic.PatternLayout">
<pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
</layout>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>logs/file-upload/%d{yyyy-MM-dd}/%d{HH}.log</fileNamePattern>
<maxHistory>30</maxHistory>
<maxFileSize>100MB</maxFileSize>
</rollingPolicy>
</appender>
<logger name="com.ruoyi.ccdi.project.service.impl.CcdiFileUploadServiceImpl"
level="INFO" additivity="false">
<appender-ref ref="FILE_UPLOAD"/>
</logger>
5.3 自定义 Appender
public class FileUploadLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private static final ThreadLocal<FileAppender<ILoggingEvent>> currentAppender =
new ThreadLocal<>();
/**
* 为指定批次创建独立的日志文件
*/
public static void createBatchLogFile(Long projectId, String batchId) {
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
String logPath = String.format("logs/file-upload/%d/%s.log", projectId, timestamp);
FileAppender<ILoggingEvent> appender = new FileAppender<>();
appender.setFile(logPath);
appender.setLayout(...);
appender.start();
currentAppender.set(appender);
}
@Override
protected void append(ILoggingEvent event) {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.doAppend(event);
}
}
}
6. 前端交互设计
6.1 上传流程
用户选择文件 → 确认上传 → 显示loading
↓
调用 batchUploadFiles() API
↓
后端立即返回 batchId
↓
前端提示"上传任务已提交"
↓
跳转到上传记录列表页
↓
每5秒自动刷新列表(可关闭)
6.2 列表页展示
统计卡片:
- 上传中: 2
- 解析中: 3
- 解析成功: 15
- 解析失败: 1
文件列表:
| 文件名 | 大小 | 状态 | 主体名称 | 上传时间 | 操作 |
|---|---|---|---|---|---|
| 流水1.xlsx | 2.5MB | 🔄 解析中 | - | 10:30:25 | - |
| 流水2.xlsx | 1.8MB | ✅ 解析成功 | 张三,李四 | 10:28:15 | 查看流水 |
| 流水3.xlsx | 3.2MB | ❌ 解析失败 | - | 10:25:30 | 查看错误 |
6.3 API 接口
// 批量上传文件
POST /ccdi/file-upload/batch
参数: FormData(projectId, files[])
返回: { code: 200, msg: "上传任务已提交", data: batchId }
// 查询上传记录列表
GET /ccdi/file-upload/list
参数: { projectId, fileStatus, pageNum, pageSize }
返回: { rows: [], total: 100 }
// 查询上传统计
GET /ccdi/file-upload/statistics/{projectId}
返回: { uploading: 2, parsing: 3, parsed_success: 15, parsed_failed: 1 }
7. 异常处理
7.1 Controller 层异常
| 异常类型 | 处理方式 | 返回信息 |
|---|---|---|
| 参数为空 | 参数校验 | "项目ID不能为空" |
| 文件数量超限 | 参数校验 | "单次最多上传100个文件" |
| 文件大小超限 | 参数校验 | "文件超过50MB限制" |
| 文件格式错误 | 参数校验 | "仅支持Excel文件" |
| 项目不存在 | 业务校验 | "项目不存在" |
7.2 Service 层异常
| 异常类型 | 处理方式 | 记录状态 |
|---|---|---|
| 流水分析平台接口异常 | 捕获并记录 | parsed_failed |
| 轮询超时(>300次) | 捕获并记录 | parsed_failed |
| 文件解析失败 | 捕获并记录 | parsed_failed |
| 线程池满且重试失败 | 捕获并记录 | parsed_failed |
| 其他未知异常 | 捕获并记录 | parsed_failed |
7.3 异常处理代码示例
try {
// 处理文件
processFileInternal(projectId, lsfxProjectId, file, record);
} catch (LsfxApiException e) {
log.error("流水分析平台接口异常", e);
updateRecordStatus(recordId, "parsed_failed", "流水分析平台接口异常:" + e.getMessage());
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("处理被中断", e);
updateRecordStatus(recordId, "parsed_failed", "处理被中断");
} catch (Exception e) {
log.error("处理失败(未知异常)", e);
updateRecordStatus(recordId, "parsed_failed", "处理失败:" + e.getMessage());
}
8. 性能优化
8.1 数据库优化
索引建议:
-- 组合索引提升查询性能
ALTER TABLE ccdi_file_upload_record
ADD INDEX idx_project_status (project_id, file_status);
ALTER TABLE ccdi_bank_statement
ADD INDEX idx_project_log (project_id, batch_id);
批量插入:
- 使用 MyBatis Plus 的
saveBatch(statements, 500) - 每批500条,避免单次插入过多数据
8.2 轮询优化
动态间隔策略:
- 前10次:1秒间隔
- 11-50次:2秒间隔
- 51次后:5秒间隔
8.3 线程池监控
@GetMapping("/thread-pool/status")
public AjaxResult getThreadPoolStatus() {
ThreadPoolExecutor pool = fileUploadExecutor.getThreadPoolExecutor();
Map<String, Object> status = new HashMap<>();
status.put("activeCount", pool.getActiveCount());
status.put("corePoolSize", pool.getCorePoolSize());
status.put("queueSize", pool.getQueue().size());
status.put("completedTaskCount", pool.getCompletedTaskCount());
return AjaxResult.success(status);
}
9. 测试场景
9.1 功能测试
| 场景 | 输入 | 预期结果 |
|---|---|---|
| 正常上传 | 10个Excel文件,每个5MB | 所有文件处理成功 |
| 大文件上传 | 1个50MB文件 | 处理成功 |
| 文件数量超限 | 101个文件 | 返回错误提示 |
| 文件格式错误 | 上传PDF文件 | 返回错误提示 |
| 解析失败 | 格式错误的Excel | 状态更新为parsed_failed |
9.2 压力测试
| 场景 | 并发数 | 预期结果 |
|---|---|---|
| 正常并发 | 100个线程同时上传 | 所有任务正常处理 |
| 超载测试 | 150个文件同时上传 | 超过100的文件等待30秒重试 |
| 持续运行 | 1000次循环上传 | 无内存泄漏,无线程死锁 |
9.3 边界测试
| 场景 | 操作 | 预期结果 |
|---|---|---|
| 项目被删除 | 上传中删除项目 | 任务取消,状态更新为失败 |
| 重复上传 | 同一文件上传2次 | 生成2条独立记录和logId |
| 网络中断 | 轮询时网络断开 | 捕获异常,状态更新为失败 |
10. 部署注意事项
10.1 配置检查清单
- 线程池容量配置(默认100)
- 文件上传大小限制(默认50MB)
- 日志文件路径权限
- 数据库索引创建
- 流水分析平台地址配置
- 应用认证信息配置
10.2 监控指标
- 线程池活跃线程数
- 文件上传成功率(parsed_success / total)
- 平均处理时长
- 线程池拒绝次数
- 日志文件大小和数量
10.3 运维建议
- 定期清理30天前的日志文件
- 监控线程池状态,必要时调整容量
- 关注数据库连接池使用情况
- 流水分析平台接口调用成功率监控
11. 附录
11.1 状态机转换
uploading (初始状态)
↓
parsing (上传成功,轮询中)
↓
parsed_success (解析成功) 或 parsed_failed (解析失败)
11.2 关键时序
- 文件上传:2-5秒(取决于文件大小)
- 轮询解析:最多10分钟(300次 × 2秒)
- 获取流水数据:1-3分钟(取决于流水数量)
- 总处理时长:约3-15分钟/文件
11.3 数据量估算
- 单个Excel文件:平均5000条流水
- 100个文件:约50万条流水
- 数据库存储:约200MB
- 日志文件:约5-10MB/批次
文档结束