diff --git a/doc/design/2026-03-05-async-file-upload-design.md b/doc/design/2026-03-05-async-file-upload-design.md new file mode 100644 index 0000000..7d1c9c0 --- /dev/null +++ b/doc/design/2026-03-05-async-file-upload-design.md @@ -0,0 +1,560 @@ +# 项目异步文件上传功能 - 设计文档 + +## 文档信息 +- **创建日期**: 2026-03-05 +- **版本**: v1.0 +- **作者**: Claude +- **状态**: 已批准 + +## 1. 概述 + +### 1.1 功能描述 +实现项目流水文件的异步批量上传功能,支持文件上传到流水分析平台、轮询解析状态、获取解析结果、保存流水数据到本地数据库的完整流程。 + +### 1.2 核心需求 +- 批量上传流水文件(最多100个文件) +- 异步处理每个文件的上传→解析→存储流程 +- 线程池容量100,超载时等待30秒重试 +- 实时跟踪文件处理状态 +- 生成独立的批次日志文件便于维护 + +### 1.3 技术栈 +- Spring @Async 异步处理 +- ThreadPoolTaskExecutor 线程池 +- MyBatis Plus 批量操作 +- Logback 自定义日志 +- Vue + Element UI 前端 + +## 2. 数据库设计 + +### 2.1 文件上传记录表 + +```sql +CREATE TABLE `ccdi_file_upload_record` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', + `project_id` bigint(20) NOT NULL COMMENT '项目ID', + `lsfx_project_id` int(11) DEFAULT NULL COMMENT '流水分析平台项目ID', + `log_id` int(11) DEFAULT NULL COMMENT '流水分析平台返回的logId', + `file_name` varchar(255) NOT NULL COMMENT '文件名称', + `file_size` bigint(20) DEFAULT NULL COMMENT '文件大小(字节)', + `file_status` varchar(20) NOT NULL COMMENT '文件状态:uploading-上传中,parsing-解析中,parsed_success-解析成功,parsed_failed-解析失败', + `enterprise_names` text COMMENT '主体名称(多个用逗号分隔)', + `account_nos` text COMMENT '主体账号(多个用逗号分隔)', + `error_message` text COMMENT '错误信息(解析失败时记录)', + `upload_time` datetime NOT NULL COMMENT '上传时间', + `upload_user` varchar(64) NOT NULL COMMENT '上传人', + PRIMARY KEY (`id`), + KEY `idx_project_id` (`project_id`), + KEY `idx_log_id` (`log_id`), + KEY `idx_file_status` (`file_status`), + KEY `idx_upload_time` (`upload_time`), + KEY `idx_project_status` (`project_id`, `file_status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='项目文件上传记录表'; +``` + +### 2.2 字段说明 + +| 字段 | 类型 | 说明 | 备注 | +|------|------|------|------| +| id | bigint | 主键ID | 自增 | +| project_id | bigint | 项目ID | 外键关联 ccdi_project | +| lsfx_project_id | int | 流水分析平台项目ID | 用于调用流水分析接口 | +| log_id | int | 流水分析平台返回的logId | 关键字段,用于查询解析状态和流水数据 | +| file_name | varchar(255) | 文件名称 | 原始文件名 | +| file_size | bigint | 文件大小 | 字节数 | +| file_status | varchar(20) | 文件状态 | uploading/parsing/parsed_success/parsed_failed | +| enterprise_names | text | 主体名称 | 解析成功后存储,多个用逗号分隔 | +| account_nos | text | 主体账号 | 解析成功后存储,多个用逗号分隔 | +| error_message | text | 错误信息 | 解析失败时记录原因 | +| upload_time | datetime | 上传时间 | 记录创建时间 | +| upload_user | varchar(64) | 上传人 | 操作用户 | + +## 3. 后端架构设计 + +### 3.1 模块结构 + +``` +ccdi-project/src/main/java/com/ruoyi/ccdi/project/ +├── controller/ +│ └── CcdiFileUploadController.java # 文件上传接口 +├── service/ +│ ├── ICcdiFileUploadService.java # 文件上传服务接口 +│ └── impl/ +│ └── CcdiFileUploadServiceImpl.java # 文件上传服务实现 +├── mapper/ +│ └── CcdiFileUploadRecordMapper.java # 文件上传记录Mapper +├── domain/ +│ ├── entity/ +│ │ └── CcdiFileUploadRecord.java # 文件上传记录实体 +│ ├── dto/ +│ │ └── CcdiFileUploadQueryDTO.java # 查询DTO +│ └── vo/ +│ ├── CcdiFileUploadVO.java # 文件上传响应VO +│ └── CcdiFileUploadStatisticsVO.java # 统计VO +├── config/ +│ └── AsyncThreadPoolConfig.java # 异步线程池配置 +└── log/ + └── FileUploadLogAppender.java # 自定义日志Appender + +ccdi-project/src/main/resources/ +└── mapper/ccdi/project/ + └── CcdiFileUploadRecordMapper.xml # Mapper XML映射文件 +``` + +### 3.2 Controller 接口设计 + +| 接口路径 | 方法 | 功能 | 参数 | 返回值 | +|---------|------|------|------|--------| +| `/ccdi/file-upload/batch` | POST | 批量上传文件 | projectId, files[] | batchId | +| `/ccdi/file-upload/list` | GET | 查询上传记录列表 | projectId, fileStatus, pageNum, pageSize | 分页列表 | +| `/ccdi/file-upload/statistics/{projectId}` | GET | 查询上传统计 | projectId | 各状态数量 | +| `/ccdi/file-upload/detail/{id}` | GET | 查询记录详情 | id | 完整信息 | +| `/ccdi/file-upload/thread-pool/status` | GET | 查询线程池状态 | - | 线程池状态信息 | + +### 3.3 Service 核心方法 + +#### ICcdiFileUploadService 接口 + +```java +public interface ICcdiFileUploadService { + /** + * 批量上传文件 + * @param projectId 项目ID + * @param files 文件数组 + * @param username 上传人 + * @return 批次ID + */ + String batchUploadFiles(Long projectId, MultipartFile[] files, String username); + + /** + * 异步处理单个文件 + * @Async("fileUploadExecutor") + */ + void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + Long recordId, String batchId, CcdiFileUploadRecord record); + + /** + * 查询上传记录列表 + */ + Page selectPage(Page page, + CcdiFileUploadQueryDTO queryDTO); + + /** + * 统计各状态文件数量 + */ + Map countByStatus(Long projectId); +} +``` + +#### 核心处理流程 + +```java +// 1. batchUploadFiles - 主入口 +String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { + // 1.1 生成批次ID + String batchId = UUID.randomUUID().toString().replace("-", ""); + + // 1.2 获取项目的 lsfxProjectId + Integer lsfxProjectId = project.getLsfxProjectId(); + + // 1.3 批量插入文件记录(status=uploading) + List records = createRecords(projectId, lsfxProjectId, files, username); + recordMapper.insertBatch(records); + + // 1.4 异步启动调度线程提交任务 + CompletableFuture.runAsync(() -> { + submitTasksAsync(projectId, lsfxProjectId, files, records, batchId); + }); + + // 1.5 立即返回 batchId + return batchId; +} + +// 2. submitTasksAsync - 调度线程 +void submitTasksAsync(Long projectId, Integer lsfxProjectId, MultipartFile[] files, + List records, String batchId) { + // 2.1 创建批次日志文件 + FileUploadLogAppender.createBatchLogFile(projectId, batchId); + + // 2.2 循环提交任务,支持重试 + for (int i = 0; i < files.length; i++) { + boolean submitted = false; + int retryCount = 0; + + while (!submitted && retryCount < 2) { + try { + // 提交异步任务到线程池 + CompletableFuture.runAsync( + () -> processFileAsync(projectId, lsfxProjectId, files[i], + records.get(i).getId(), batchId, records.get(i)), + fileUploadExecutor + ); + submitted = true; + } catch (RejectedExecutionException e) { + retryCount++; + if (retryCount == 1) { + Thread.sleep(30000); // 等待30秒 + } else { + // 重试失败,更新记录状态 + updateRecordStatus(records.get(i).getId(), "parsed_failed", "系统繁忙"); + } + } + } + } +} + +// 3. processFileAsync - 文件处理线程 +@Async("fileUploadExecutor") +void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + Long recordId, String batchId, CcdiFileUploadRecord record) { + try { + // 3.1 上传文件到流水分析平台 + UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); + Integer logId = uploadResponse.getData().getLogId(); + + // 3.2 更新状态为 parsing + record.setLogId(logId); + record.setFileStatus("parsing"); + recordMapper.updateById(record); + + // 3.3 轮询解析状态(最多300次,间隔2秒) + boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); + + // 3.4 获取文件上传状态 + GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...); + + // 3.5 判断解析结果 + if (status == -5 && desc == "data.wait.confirm.newaccount") { + // 解析成功 + record.setFileStatus("parsed_success"); + record.setEnterpriseNames(...); + record.setAccountNos(...); + recordMapper.updateById(record); + + // 3.6 获取流水数据并批量保存 + fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount); + } else { + // 解析失败 + record.setFileStatus("parsed_failed"); + record.setErrorMessage(...); + recordMapper.updateById(record); + } + } catch (Exception e) { + updateRecordStatus(recordId, "parsed_failed", e.getMessage()); + } +} +``` + +## 4. 线程池配置 + +### 4.1 配置类 + +```java +@Configuration +@EnableAsync +public class AsyncThreadPoolConfig { + + @Bean("fileUploadExecutor") + public Executor fileUploadExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + executor.setCorePoolSize(100); // 核心线程数 + executor.setMaxPoolSize(100); // 最大线程数 + executor.setQueueCapacity(0); // 队列容量(0表示不使用队列) + executor.setThreadNamePrefix("file-upload-"); // 线程名称前缀 + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略 + executor.setKeepAliveSeconds(60); // 空闲线程存活时间 + executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成再关闭 + executor.setAwaitTerminationSeconds(60); // 最长等待时间 + executor.initialize(); + return executor; + } +} +``` + +### 4.2 拒绝策略 + +- **策略**: AbortPolicy +- **行为**: 抛出 RejectedExecutionException +- **处理**: 调度线程捕获异常,等待30秒后重试1次 +- **重试失败**: 更新记录状态为 `parsed_failed`,错误信息"系统繁忙" + +## 5. 日志管理 + +### 5.1 日志文件组织 + +- **路径格式**: `logs/file-upload/{projectId}/{timestamp}.log` +- **示例**: `logs/file-upload/123/20260305-103025.log` +- **特点**: 每个批次生成独立的日志文件 + +### 5.2 Logback 配置 + +```xml + + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n + + + logs/file-upload/%d{yyyy-MM-dd}/%d{HH}.log + 30 + 100MB + + + + + + +``` + +### 5.3 自定义 Appender + +```java +public class FileUploadLogAppender extends UnsynchronizedAppenderBase { + + private static final ThreadLocal> currentAppender = + new ThreadLocal<>(); + + /** + * 为指定批次创建独立的日志文件 + */ + public static void createBatchLogFile(Long projectId, String batchId) { + String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); + String logPath = String.format("logs/file-upload/%d/%s.log", projectId, timestamp); + + FileAppender appender = new FileAppender<>(); + appender.setFile(logPath); + appender.setLayout(...); + appender.start(); + + currentAppender.set(appender); + } + + @Override + protected void append(ILoggingEvent event) { + FileAppender appender = currentAppender.get(); + if (appender != null) { + appender.doAppend(event); + } + } +} +``` + +## 6. 前端交互设计 + +### 6.1 上传流程 + +``` +用户选择文件 → 确认上传 → 显示loading + ↓ +调用 batchUploadFiles() API + ↓ +后端立即返回 batchId + ↓ +前端提示"上传任务已提交" + ↓ +跳转到上传记录列表页 + ↓ +每5秒自动刷新列表(可关闭) +``` + +### 6.2 列表页展示 + +**统计卡片:** +- 上传中: 2 +- 解析中: 3 +- 解析成功: 15 +- 解析失败: 1 + +**文件列表:** + +| 文件名 | 大小 | 状态 | 主体名称 | 上传时间 | 操作 | +|--------|------|------|----------|----------|------| +| 流水1.xlsx | 2.5MB | 🔄 解析中 | - | 10:30:25 | - | +| 流水2.xlsx | 1.8MB | ✅ 解析成功 | 张三,李四 | 10:28:15 | 查看流水 | +| 流水3.xlsx | 3.2MB | ❌ 解析失败 | - | 10:25:30 | 查看错误 | + +### 6.3 API 接口 + +```javascript +// 批量上传文件 +POST /ccdi/file-upload/batch +参数: FormData(projectId, files[]) +返回: { code: 200, msg: "上传任务已提交", data: batchId } + +// 查询上传记录列表 +GET /ccdi/file-upload/list +参数: { projectId, fileStatus, pageNum, pageSize } +返回: { rows: [], total: 100 } + +// 查询上传统计 +GET /ccdi/file-upload/statistics/{projectId} +返回: { uploading: 2, parsing: 3, parsed_success: 15, parsed_failed: 1 } +``` + +## 7. 异常处理 + +### 7.1 Controller 层异常 + +| 异常类型 | 处理方式 | 返回信息 | +|---------|---------|---------| +| 参数为空 | 参数校验 | "项目ID不能为空" | +| 文件数量超限 | 参数校验 | "单次最多上传100个文件" | +| 文件大小超限 | 参数校验 | "文件超过50MB限制" | +| 文件格式错误 | 参数校验 | "仅支持Excel文件" | +| 项目不存在 | 业务校验 | "项目不存在" | + +### 7.2 Service 层异常 + +| 异常类型 | 处理方式 | 记录状态 | +|---------|---------|---------| +| 流水分析平台接口异常 | 捕获并记录 | parsed_failed | +| 轮询超时(>300次) | 捕获并记录 | parsed_failed | +| 文件解析失败 | 捕获并记录 | parsed_failed | +| 线程池满且重试失败 | 捕获并记录 | parsed_failed | +| 其他未知异常 | 捕获并记录 | parsed_failed | + +### 7.3 异常处理代码示例 + +```java +try { + // 处理文件 + processFileInternal(projectId, lsfxProjectId, file, record); +} catch (LsfxApiException e) { + log.error("流水分析平台接口异常", e); + updateRecordStatus(recordId, "parsed_failed", "流水分析平台接口异常:" + e.getMessage()); +} catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("处理被中断", e); + updateRecordStatus(recordId, "parsed_failed", "处理被中断"); +} catch (Exception e) { + log.error("处理失败(未知异常)", e); + updateRecordStatus(recordId, "parsed_failed", "处理失败:" + e.getMessage()); +} +``` + +## 8. 性能优化 + +### 8.1 数据库优化 + +**索引建议:** +```sql +-- 组合索引提升查询性能 +ALTER TABLE ccdi_file_upload_record +ADD INDEX idx_project_status (project_id, file_status); + +ALTER TABLE ccdi_bank_statement +ADD INDEX idx_project_log (project_id, batch_id); +``` + +**批量插入:** +- 使用 MyBatis Plus 的 `saveBatch(statements, 500)` +- 每批500条,避免单次插入过多数据 + +### 8.2 轮询优化 + +**动态间隔策略:** +- 前10次:1秒间隔 +- 11-50次:2秒间隔 +- 51次后:5秒间隔 + +### 8.3 线程池监控 + +```java +@GetMapping("/thread-pool/status") +public AjaxResult getThreadPoolStatus() { + ThreadPoolExecutor pool = fileUploadExecutor.getThreadPoolExecutor(); + + Map status = new HashMap<>(); + status.put("activeCount", pool.getActiveCount()); + status.put("corePoolSize", pool.getCorePoolSize()); + status.put("queueSize", pool.getQueue().size()); + status.put("completedTaskCount", pool.getCompletedTaskCount()); + + return AjaxResult.success(status); +} +``` + +## 9. 测试场景 + +### 9.1 功能测试 + +| 场景 | 输入 | 预期结果 | +|------|------|---------| +| 正常上传 | 10个Excel文件,每个5MB | 所有文件处理成功 | +| 大文件上传 | 1个50MB文件 | 处理成功 | +| 文件数量超限 | 101个文件 | 返回错误提示 | +| 文件格式错误 | 上传PDF文件 | 返回错误提示 | +| 解析失败 | 格式错误的Excel | 状态更新为parsed_failed | + +### 9.2 压力测试 + +| 场景 | 并发数 | 预期结果 | +|------|--------|---------| +| 正常并发 | 100个线程同时上传 | 所有任务正常处理 | +| 超载测试 | 150个文件同时上传 | 超过100的文件等待30秒重试 | +| 持续运行 | 1000次循环上传 | 无内存泄漏,无线程死锁 | + +### 9.3 边界测试 + +| 场景 | 操作 | 预期结果 | +|------|------|---------| +| 项目被删除 | 上传中删除项目 | 任务取消,状态更新为失败 | +| 重复上传 | 同一文件上传2次 | 生成2条独立记录和logId | +| 网络中断 | 轮询时网络断开 | 捕获异常,状态更新为失败 | + +## 10. 部署注意事项 + +### 10.1 配置检查清单 + +- [ ] 线程池容量配置(默认100) +- [ ] 文件上传大小限制(默认50MB) +- [ ] 日志文件路径权限 +- [ ] 数据库索引创建 +- [ ] 流水分析平台地址配置 +- [ ] 应用认证信息配置 + +### 10.2 监控指标 + +- 线程池活跃线程数 +- 文件上传成功率(parsed_success / total) +- 平均处理时长 +- 线程池拒绝次数 +- 日志文件大小和数量 + +### 10.3 运维建议 + +- 定期清理30天前的日志文件 +- 监控线程池状态,必要时调整容量 +- 关注数据库连接池使用情况 +- 流水分析平台接口调用成功率监控 + +## 11. 附录 + +### 11.1 状态机转换 + +``` +uploading (初始状态) + ↓ +parsing (上传成功,轮询中) + ↓ +parsed_success (解析成功) 或 parsed_failed (解析失败) +``` + +### 11.2 关键时序 + +- 文件上传:2-5秒(取决于文件大小) +- 轮询解析:最多10分钟(300次 × 2秒) +- 获取流水数据:1-3分钟(取决于流水数量) +- 总处理时长:约3-15分钟/文件 + +### 11.3 数据量估算 + +- 单个Excel文件:平均5000条流水 +- 100个文件:约50万条流水 +- 数据库存储:约200MB +- 日志文件:约5-10MB/批次 + +--- + +**文档结束**