# 项目异步文件上传功能 - 设计文档 ## 文档信息 - **创建日期**: 2026-03-05 - **版本**: v1.0 - **作者**: Claude - **状态**: 已批准 ## 1. 概述 ### 1.1 功能描述 实现项目流水文件的异步批量上传功能,支持文件上传到流水分析平台、轮询解析状态、获取解析结果、保存流水数据到本地数据库的完整流程。 ### 1.2 核心需求 - 批量上传流水文件(最多100个文件) - 异步处理每个文件的上传→解析→存储流程 - 线程池容量100,超载时等待30秒重试 - 实时跟踪文件处理状态 - 生成独立的批次日志文件便于维护 ### 1.3 技术栈 - Spring @Async 异步处理 - ThreadPoolTaskExecutor 线程池 - MyBatis Plus 批量操作 - Logback 自定义日志 - Vue + Element UI 前端 ## 2. 数据库设计 ### 2.1 文件上传记录表 ```sql CREATE TABLE `ccdi_file_upload_record` ( `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', `project_id` bigint(20) NOT NULL COMMENT '项目ID', `lsfx_project_id` int(11) DEFAULT NULL COMMENT '流水分析平台项目ID', `log_id` int(11) DEFAULT NULL COMMENT '流水分析平台返回的logId', `file_name` varchar(255) NOT NULL COMMENT '文件名称', `file_size` bigint(20) DEFAULT NULL COMMENT '文件大小(字节)', `file_status` varchar(20) NOT NULL COMMENT '文件状态:uploading-上传中,parsing-解析中,parsed_success-解析成功,parsed_failed-解析失败', `enterprise_names` text COMMENT '主体名称(多个用逗号分隔)', `account_nos` text COMMENT '主体账号(多个用逗号分隔)', `error_message` text COMMENT '错误信息(解析失败时记录)', `upload_time` datetime NOT NULL COMMENT '上传时间', `upload_user` varchar(64) NOT NULL COMMENT '上传人', PRIMARY KEY (`id`), KEY `idx_project_id` (`project_id`), KEY `idx_log_id` (`log_id`), KEY `idx_file_status` (`file_status`), KEY `idx_upload_time` (`upload_time`), KEY `idx_project_status` (`project_id`, `file_status`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='项目文件上传记录表'; ``` ### 2.2 字段说明 | 字段 | 类型 | 说明 | 备注 | |------|------|------|------| | id | bigint | 主键ID | 自增 | | project_id | bigint | 项目ID | 外键关联 ccdi_project | | lsfx_project_id | int | 流水分析平台项目ID | 用于调用流水分析接口 | | log_id | int | 流水分析平台返回的logId | 关键字段,用于查询解析状态和流水数据 | | file_name | varchar(255) | 文件名称 | 原始文件名 | | file_size | bigint | 文件大小 | 字节数 | | file_status | varchar(20) | 文件状态 | uploading/parsing/parsed_success/parsed_failed | | enterprise_names | text | 主体名称 | 解析成功后存储,多个用逗号分隔 | | account_nos | text | 主体账号 | 解析成功后存储,多个用逗号分隔 | | error_message | text | 错误信息 | 解析失败时记录原因 | | upload_time | datetime | 上传时间 | 记录创建时间 | | upload_user | varchar(64) | 上传人 | 操作用户 | ## 3. 后端架构设计 ### 3.1 模块结构 ``` ccdi-project/src/main/java/com/ruoyi/ccdi/project/ ├── controller/ │ └── CcdiFileUploadController.java # 文件上传接口 ├── service/ │ ├── ICcdiFileUploadService.java # 文件上传服务接口 │ └── impl/ │ └── CcdiFileUploadServiceImpl.java # 文件上传服务实现 ├── mapper/ │ └── CcdiFileUploadRecordMapper.java # 文件上传记录Mapper ├── domain/ │ ├── entity/ │ │ └── CcdiFileUploadRecord.java # 文件上传记录实体 │ ├── dto/ │ │ └── CcdiFileUploadQueryDTO.java # 查询DTO │ └── vo/ │ ├── CcdiFileUploadVO.java # 文件上传响应VO │ └── CcdiFileUploadStatisticsVO.java # 统计VO ├── config/ │ └── AsyncThreadPoolConfig.java # 异步线程池配置 └── log/ └── FileUploadLogAppender.java # 自定义日志Appender ccdi-project/src/main/resources/ └── mapper/ccdi/project/ └── CcdiFileUploadRecordMapper.xml # Mapper XML映射文件 ``` ### 3.2 Controller 接口设计 | 接口路径 | 方法 | 功能 | 参数 | 返回值 | |---------|------|------|------|--------| | `/ccdi/file-upload/batch` | POST | 批量上传文件 | projectId, files[] | batchId | | `/ccdi/file-upload/list` | GET | 查询上传记录列表 | projectId, fileStatus, pageNum, pageSize | 分页列表 | | `/ccdi/file-upload/statistics/{projectId}` | GET | 查询上传统计 | projectId | 各状态数量 | | `/ccdi/file-upload/detail/{id}` | GET | 查询记录详情 | id | 完整信息 | | `/ccdi/file-upload/thread-pool/status` | GET | 查询线程池状态 | - | 线程池状态信息 | ### 3.3 Service 核心方法 #### ICcdiFileUploadService 接口 ```java public interface ICcdiFileUploadService { /** * 批量上传文件 * @param projectId 项目ID * @param files 文件数组 * @param username 上传人 * @return 批次ID */ String batchUploadFiles(Long projectId, MultipartFile[] files, String username); /** * 异步处理单个文件 * @Async("fileUploadExecutor") */ void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, Long recordId, String batchId, CcdiFileUploadRecord record); /** * 查询上传记录列表 */ Page selectPage(Page page, CcdiFileUploadQueryDTO queryDTO); /** * 统计各状态文件数量 */ Map countByStatus(Long projectId); } ``` #### 核心处理流程 ```java // 1. batchUploadFiles - 主入口 String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { // 1.1 生成批次ID String batchId = UUID.randomUUID().toString().replace("-", ""); // 1.2 获取项目的 lsfxProjectId Integer lsfxProjectId = project.getLsfxProjectId(); // 1.3 批量插入文件记录(status=uploading) List records = createRecords(projectId, lsfxProjectId, files, username); recordMapper.insertBatch(records); // 1.4 异步启动调度线程提交任务 CompletableFuture.runAsync(() -> { submitTasksAsync(projectId, lsfxProjectId, files, records, batchId); }); // 1.5 立即返回 batchId return batchId; } // 2. submitTasksAsync - 调度线程 void submitTasksAsync(Long projectId, Integer lsfxProjectId, MultipartFile[] files, List records, String batchId) { // 2.1 创建批次日志文件 FileUploadLogAppender.createBatchLogFile(projectId, batchId); // 2.2 循环提交任务,支持重试 for (int i = 0; i < files.length; i++) { boolean submitted = false; int retryCount = 0; while (!submitted && retryCount < 2) { try { // 提交异步任务到线程池 CompletableFuture.runAsync( () -> processFileAsync(projectId, lsfxProjectId, files[i], records.get(i).getId(), batchId, records.get(i)), fileUploadExecutor ); submitted = true; } catch (RejectedExecutionException e) { retryCount++; if (retryCount == 1) { Thread.sleep(30000); // 等待30秒 } else { // 重试失败,更新记录状态 updateRecordStatus(records.get(i).getId(), "parsed_failed", "系统繁忙"); } } } } } // 3. processFileAsync - 文件处理线程 @Async("fileUploadExecutor") void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, Long recordId, String batchId, CcdiFileUploadRecord record) { try { // 3.1 上传文件到流水分析平台 UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); Integer logId = uploadResponse.getData().getLogId(); // 3.2 更新状态为 parsing record.setLogId(logId); record.setFileStatus("parsing"); recordMapper.updateById(record); // 3.3 轮询解析状态(最多300次,间隔2秒) boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); // 3.4 获取文件上传状态 GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...); // 3.5 判断解析结果 if (status == -5 && desc == "data.wait.confirm.newaccount") { // 解析成功 record.setFileStatus("parsed_success"); record.setEnterpriseNames(...); record.setAccountNos(...); recordMapper.updateById(record); // 3.6 获取流水数据并批量保存 fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount); } else { // 解析失败 record.setFileStatus("parsed_failed"); record.setErrorMessage(...); recordMapper.updateById(record); } } catch (Exception e) { updateRecordStatus(recordId, "parsed_failed", e.getMessage()); } } ``` ## 4. 线程池配置 ### 4.1 配置类 ```java @Configuration @EnableAsync public class AsyncThreadPoolConfig { @Bean("fileUploadExecutor") public Executor fileUploadExecutor() { ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); executor.setCorePoolSize(100); // 核心线程数 executor.setMaxPoolSize(100); // 最大线程数 executor.setQueueCapacity(0); // 队列容量(0表示不使用队列) executor.setThreadNamePrefix("file-upload-"); // 线程名称前缀 executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); // 拒绝策略 executor.setKeepAliveSeconds(60); // 空闲线程存活时间 executor.setWaitForTasksToCompleteOnShutdown(true); // 等待任务完成再关闭 executor.setAwaitTerminationSeconds(60); // 最长等待时间 executor.initialize(); return executor; } } ``` ### 4.2 拒绝策略 - **策略**: AbortPolicy - **行为**: 抛出 RejectedExecutionException - **处理**: 调度线程捕获异常,等待30秒后重试1次 - **重试失败**: 更新记录状态为 `parsed_failed`,错误信息"系统繁忙" ## 5. 日志管理 ### 5.1 日志文件组织 - **路径格式**: `logs/file-upload/{projectId}/{timestamp}.log` - **示例**: `logs/file-upload/123/20260305-103025.log` - **特点**: 每个批次生成独立的日志文件 ### 5.2 Logback 配置 ```xml %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n logs/file-upload/%d{yyyy-MM-dd}/%d{HH}.log 30 100MB ``` ### 5.3 自定义 Appender ```java public class FileUploadLogAppender extends UnsynchronizedAppenderBase { private static final ThreadLocal> currentAppender = new ThreadLocal<>(); /** * 为指定批次创建独立的日志文件 */ public static void createBatchLogFile(Long projectId, String batchId) { String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); String logPath = String.format("logs/file-upload/%d/%s.log", projectId, timestamp); FileAppender appender = new FileAppender<>(); appender.setFile(logPath); appender.setLayout(...); appender.start(); currentAppender.set(appender); } @Override protected void append(ILoggingEvent event) { FileAppender appender = currentAppender.get(); if (appender != null) { appender.doAppend(event); } } } ``` ## 6. 前端交互设计 ### 6.1 上传流程 ``` 用户选择文件 → 确认上传 → 显示loading ↓ 调用 batchUploadFiles() API ↓ 后端立即返回 batchId ↓ 前端提示"上传任务已提交" ↓ 跳转到上传记录列表页 ↓ 每5秒自动刷新列表(可关闭) ``` ### 6.2 列表页展示 **统计卡片:** - 上传中: 2 - 解析中: 3 - 解析成功: 15 - 解析失败: 1 **文件列表:** | 文件名 | 大小 | 状态 | 主体名称 | 上传时间 | 操作 | |--------|------|------|----------|----------|------| | 流水1.xlsx | 2.5MB | 🔄 解析中 | - | 10:30:25 | - | | 流水2.xlsx | 1.8MB | ✅ 解析成功 | 张三,李四 | 10:28:15 | 查看流水 | | 流水3.xlsx | 3.2MB | ❌ 解析失败 | - | 10:25:30 | 查看错误 | ### 6.3 API 接口 ```javascript // 批量上传文件 POST /ccdi/file-upload/batch 参数: FormData(projectId, files[]) 返回: { code: 200, msg: "上传任务已提交", data: batchId } // 查询上传记录列表 GET /ccdi/file-upload/list 参数: { projectId, fileStatus, pageNum, pageSize } 返回: { rows: [], total: 100 } // 查询上传统计 GET /ccdi/file-upload/statistics/{projectId} 返回: { uploading: 2, parsing: 3, parsed_success: 15, parsed_failed: 1 } ``` ## 7. 异常处理 ### 7.1 Controller 层异常 | 异常类型 | 处理方式 | 返回信息 | |---------|---------|---------| | 参数为空 | 参数校验 | "项目ID不能为空" | | 文件数量超限 | 参数校验 | "单次最多上传100个文件" | | 文件大小超限 | 参数校验 | "文件超过50MB限制" | | 文件格式错误 | 参数校验 | "仅支持Excel文件" | | 项目不存在 | 业务校验 | "项目不存在" | ### 7.2 Service 层异常 | 异常类型 | 处理方式 | 记录状态 | |---------|---------|---------| | 流水分析平台接口异常 | 捕获并记录 | parsed_failed | | 轮询超时(>300次) | 捕获并记录 | parsed_failed | | 文件解析失败 | 捕获并记录 | parsed_failed | | 线程池满且重试失败 | 捕获并记录 | parsed_failed | | 其他未知异常 | 捕获并记录 | parsed_failed | ### 7.3 异常处理代码示例 ```java try { // 处理文件 processFileInternal(projectId, lsfxProjectId, file, record); } catch (LsfxApiException e) { log.error("流水分析平台接口异常", e); updateRecordStatus(recordId, "parsed_failed", "流水分析平台接口异常:" + e.getMessage()); } catch (InterruptedException e) { Thread.currentThread().interrupt(); log.error("处理被中断", e); updateRecordStatus(recordId, "parsed_failed", "处理被中断"); } catch (Exception e) { log.error("处理失败(未知异常)", e); updateRecordStatus(recordId, "parsed_failed", "处理失败:" + e.getMessage()); } ``` ## 8. 性能优化 ### 8.1 数据库优化 **索引建议:** ```sql -- 组合索引提升查询性能 ALTER TABLE ccdi_file_upload_record ADD INDEX idx_project_status (project_id, file_status); ALTER TABLE ccdi_bank_statement ADD INDEX idx_project_log (project_id, batch_id); ``` **批量插入:** - 使用 MyBatis Plus 的 `saveBatch(statements, 500)` - 每批500条,避免单次插入过多数据 ### 8.2 轮询优化 **动态间隔策略:** - 前10次:1秒间隔 - 11-50次:2秒间隔 - 51次后:5秒间隔 ### 8.3 线程池监控 ```java @GetMapping("/thread-pool/status") public AjaxResult getThreadPoolStatus() { ThreadPoolExecutor pool = fileUploadExecutor.getThreadPoolExecutor(); Map status = new HashMap<>(); status.put("activeCount", pool.getActiveCount()); status.put("corePoolSize", pool.getCorePoolSize()); status.put("queueSize", pool.getQueue().size()); status.put("completedTaskCount", pool.getCompletedTaskCount()); return AjaxResult.success(status); } ``` ## 9. 测试场景 ### 9.1 功能测试 | 场景 | 输入 | 预期结果 | |------|------|---------| | 正常上传 | 10个Excel文件,每个5MB | 所有文件处理成功 | | 大文件上传 | 1个50MB文件 | 处理成功 | | 文件数量超限 | 101个文件 | 返回错误提示 | | 文件格式错误 | 上传PDF文件 | 返回错误提示 | | 解析失败 | 格式错误的Excel | 状态更新为parsed_failed | ### 9.2 压力测试 | 场景 | 并发数 | 预期结果 | |------|--------|---------| | 正常并发 | 100个线程同时上传 | 所有任务正常处理 | | 超载测试 | 150个文件同时上传 | 超过100的文件等待30秒重试 | | 持续运行 | 1000次循环上传 | 无内存泄漏,无线程死锁 | ### 9.3 边界测试 | 场景 | 操作 | 预期结果 | |------|------|---------| | 项目被删除 | 上传中删除项目 | 任务取消,状态更新为失败 | | 重复上传 | 同一文件上传2次 | 生成2条独立记录和logId | | 网络中断 | 轮询时网络断开 | 捕获异常,状态更新为失败 | ## 10. 部署注意事项 ### 10.1 配置检查清单 - [ ] 线程池容量配置(默认100) - [ ] 文件上传大小限制(默认50MB) - [ ] 日志文件路径权限 - [ ] 数据库索引创建 - [ ] 流水分析平台地址配置 - [ ] 应用认证信息配置 ### 10.2 监控指标 - 线程池活跃线程数 - 文件上传成功率(parsed_success / total) - 平均处理时长 - 线程池拒绝次数 - 日志文件大小和数量 ### 10.3 运维建议 - 定期清理30天前的日志文件 - 监控线程池状态,必要时调整容量 - 关注数据库连接池使用情况 - 流水分析平台接口调用成功率监控 ## 11. 附录 ### 11.1 状态机转换 ``` uploading (初始状态) ↓ parsing (上传成功,轮询中) ↓ parsed_success (解析成功) 或 parsed_failed (解析失败) ``` ### 11.2 关键时序 - 文件上传:2-5秒(取决于文件大小) - 轮询解析:最多10分钟(300次 × 2秒) - 获取流水数据:1-3分钟(取决于流水数量) - 总处理时长:约3-15分钟/文件 ### 11.3 数据量估算 - 单个Excel文件:平均5000条流水 - 100个文件:约50万条流水 - 数据库存储:约200MB - 日志文件:约5-10MB/批次 --- **文档结束**