From 23e3dece7b6809e731915190bb96f15690d58bc5 Mon Sep 17 00:00:00 2001 From: wkc <978997012@qq.com> Date: Thu, 5 Mar 2026 09:15:23 +0800 Subject: [PATCH] =?UTF-8?q?docs:=20=E6=B7=BB=E5=8A=A0=E9=A1=B9=E7=9B=AE?= =?UTF-8?q?=E5=BC=82=E6=AD=A5=E6=96=87=E4=BB=B6=E4=B8=8A=E4=BC=A0=E5=8A=9F?= =?UTF-8?q?=E8=83=BD=E5=AE=9E=E6=96=BD=E8=AE=A1=E5=88=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- doc/plans/2026-03-05-async-file-upload.md | 1384 +++++++++++++++++++++ 1 file changed, 1384 insertions(+) create mode 100644 doc/plans/2026-03-05-async-file-upload.md diff --git a/doc/plans/2026-03-05-async-file-upload.md b/doc/plans/2026-03-05-async-file-upload.md new file mode 100644 index 0000000..ceba4e9 --- /dev/null +++ b/doc/plans/2026-03-05-async-file-upload.md @@ -0,0 +1,1384 @@ +# 项目异步文件上传功能 - 实施计划 + +> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. + +**Goal:** 实现项目流水文件的异步批量上传功能,支持文件上传、解析状态轮询、流水数据存储的完整流程 + +**Architecture:** 使用双层异步架构(调度线程 + 文件处理线程池),先插入数据库记录后异步处理,线程池满时等待30秒重试 + +**Tech Stack:** Spring Boot 3.5.8, MyBatis Plus 3.5.10, Spring @Async, ThreadPoolTaskExecutor, Logback, Vue 2.6, Element UI 2.15 + +--- + +## Task 1: 数据库表创建 + +**Files:** +- Create: `sql/ccdi_file_upload_record.sql` + +**Step 1: 创建SQL脚本文件** + +创建文件 `sql/ccdi_file_upload_record.sql`: + +```sql +-- 项目文件上传记录表 +-- 用途:记录项目下所有文件的上传记录和处理状态 +-- 作者:系统 +-- 日期:2026-03-05 + +USE ccdi; + +-- 创建文件上传记录表 +CREATE TABLE `ccdi_file_upload_record` ( + `id` bigint(20) NOT NULL AUTO_INCREMENT COMMENT '主键ID', + `project_id` bigint(20) NOT NULL COMMENT '项目ID', + `lsfx_project_id` int(11) DEFAULT NULL COMMENT '流水分析平台项目ID', + `log_id` int(11) DEFAULT NULL COMMENT '流水分析平台返回的logId', + `file_name` varchar(255) NOT NULL COMMENT '文件名称', + `file_size` bigint(20) DEFAULT NULL COMMENT '文件大小(字节)', + `file_status` varchar(20) NOT NULL COMMENT '文件状态:uploading-上传中,parsing-解析中,parsed_success-解析成功,parsed_failed-解析失败', + `enterprise_names` text COMMENT '主体名称(多个用逗号分隔)', + `account_nos` text COMMENT '主体账号(多个用逗号分隔)', + `error_message` text COMMENT '错误信息(解析失败时记录)', + `upload_time` datetime NOT NULL COMMENT '上传时间', + `upload_user` varchar(64) NOT NULL COMMENT '上传人', + PRIMARY KEY (`id`), + KEY `idx_project_id` (`project_id`), + KEY `idx_log_id` (`log_id`), + KEY `idx_file_status` (`file_status`), + KEY `idx_upload_time` (`upload_time`), + KEY `idx_project_status` (`project_id`, `file_status`) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='项目文件上传记录表'; +``` + +**Step 2: 执行SQL脚本** + +```bash +# 连接数据库并执行脚本 +mysql -h 116.62.17.81 -u root -pKfcx@1234 ccdi < sql/ccdi_file_upload_record.sql +``` + +**Step 3: 验证表创建成功** + +```bash +mysql -h 116.62.17.81 -u root -pKfcx@1234 ccdi -e "SHOW CREATE TABLE ccdi_file_upload_record\G" +``` + +Expected: 输出表结构,包含所有字段和索引 + +**Step 4: 提交SQL脚本** + +```bash +git add sql/ccdi_file_upload_record.sql +git commit -m "feat: 添加文件上传记录表SQL脚本" +``` + +--- + +## Task 2: 实体类创建 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/entity/CcdiFileUploadRecord.java` + +**Step 1: 创建实体类** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/entity/CcdiFileUploadRecord.java`: + +```java +package com.ruoyi.ccdi.project.domain.entity; + +import com.baomidou.mybatisplus.annotation.IdType; +import com.baomidou.mybatisplus.annotation.TableId; +import com.baomidou.mybatisplus.annotation.TableName; +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; +import java.util.Date; + +/** + * 文件上传记录实体 + * + * @author ruoyi + * @date 2026-03-05 + */ +@Data +@TableName("ccdi_file_upload_record") +public class CcdiFileUploadRecord implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** 主键ID */ + @TableId(type = IdType.AUTO) + private Long id; + + /** 项目ID */ + private Long projectId; + + /** 流水分析平台项目ID */ + private Integer lsfxProjectId; + + /** 流水分析平台返回的logId */ + private Integer logId; + + /** 文件名称 */ + private String fileName; + + /** 文件大小(字节) */ + private Long fileSize; + + /** 文件状态:uploading-上传中,parsing-解析中,parsed_success-解析成功,parsed_failed-解析失败 */ + private String fileStatus; + + /** 主体名称(多个用逗号分隔) */ + private String enterpriseNames; + + /** 主体账号(多个用逗号分隔) */ + private String accountNos; + + /** 错误信息(解析失败时记录) */ + private String errorMessage; + + /** 上传时间 */ + private Date uploadTime; + + /** 上传人 */ + private String uploadUser; +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/entity/CcdiFileUploadRecord.java +git commit -m "feat: 添加文件上传记录实体类" +``` + +--- + +## Task 3: Mapper 接口和 XML + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiFileUploadRecordMapper.java` +- Create: `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml` + +**Step 1: 创建 Mapper 接口** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiFileUploadRecordMapper.java`: + +```java +package com.ruoyi.ccdi.project.mapper; + +import com.baomidou.mybatisplus.core.mapper.BaseMapper; +import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; +import org.apache.ibatis.annotations.Mapper; +import org.apache.ibatis.annotations.Param; + +import java.util.List; + +/** + * 文件上传记录 Mapper 接口 + * + * @author ruoyi + * @date 2026-03-05 + */ +@Mapper +public interface CcdiFileUploadRecordMapper extends BaseMapper { + + /** + * 批量插入文件上传记录 + * + * @param records 记录列表 + * @return 插入条数 + */ + int insertBatch(@Param("list") List records); + + /** + * 统计各状态文件数量 + * + * @param projectId 项目ID + * @return 统计结果(Map形式,key为状态,value为数量) + */ + List> countByStatus(@Param("projectId") Long projectId); +} +``` + +**Step 2: 创建 Mapper XML** + +创建文件 `ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml`: + +```xml + + + + + + + + + + + + + + + + + + + + + select id, project_id, lsfx_project_id, log_id, file_name, file_size, + file_status, enterprise_names, account_nos, error_message, + upload_time, upload_user + from ccdi_file_upload_record + + + + + insert into ccdi_file_upload_record ( + project_id, lsfx_project_id, file_name, file_size, file_status, + upload_time, upload_user + ) values + + ( + #{item.projectId}, #{item.lsfxProjectId}, #{item.fileName}, + #{item.fileSize}, #{item.fileStatus}, #{item.uploadTime}, + #{item.uploadUser} + ) + + + + + + + +``` + +**Step 3: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 4: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/mapper/CcdiFileUploadRecordMapper.java +git add ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml +git commit -m "feat: 添加文件上传记录Mapper接口和XML映射" +``` + +--- + +## Task 4: DTO 和 VO 类 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/dto/CcdiFileUploadQueryDTO.java` +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/vo/CcdiFileUploadStatisticsVO.java` + +**Step 1: 创建查询 DTO** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/dto/CcdiFileUploadQueryDTO.java`: + +```java +package com.ruoyi.ccdi.project.domain.dto; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; + +/** + * 文件上传记录查询 DTO + * + * @author ruoyi + * @date 2026-03-05 + */ +@Data +public class CcdiFileUploadQueryDTO implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** 项目ID */ + private Long projectId; + + /** 文件状态 */ + private String fileStatus; + + /** 文件名称(模糊查询) */ + private String fileName; + + /** 上传人 */ + private String uploadUser; +} +``` + +**Step 2: 创建统计 VO** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/vo/CcdiFileUploadStatisticsVO.java`: + +```java +package com.ruoyi.ccdi.project.domain.vo; + +import lombok.Data; + +import java.io.Serial; +import java.io.Serializable; + +/** + * 文件上传统计 VO + * + * @author ruoyi + * @date 2026-03-05 + */ +@Data +public class CcdiFileUploadStatisticsVO implements Serializable { + + @Serial + private static final long serialVersionUID = 1L; + + /** 上传中数量 */ + private Long uploading; + + /** 解析中数量 */ + private Long parsing; + + /** 解析成功数量 */ + private Long parsedSuccess; + + /** 解析失败数量 */ + private Long parsedFailed; + + /** 总数量 */ + private Long total; +} +``` + +**Step 3: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 4: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/dto/CcdiFileUploadQueryDTO.java +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/domain/vo/CcdiFileUploadStatisticsVO.java +git commit -m "feat: 添加文件上传查询DTO和统计VO" +``` + +--- + +## Task 5: 线程池配置 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/config/AsyncThreadPoolConfig.java` + +**Step 1: 创建线程池配置类** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/config/AsyncThreadPoolConfig.java`: + +```java +package com.ruoyi.ccdi.project.config; + +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; +import org.springframework.scheduling.annotation.EnableAsync; +import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor; + +import java.util.concurrent.Executor; +import java.util.concurrent.ThreadPoolExecutor; + +/** + * 异步线程池配置 + * + * @author ruoyi + * @date 2026-03-05 + */ +@Configuration +@EnableAsync +public class AsyncThreadPoolConfig { + + /** + * 文件上传专用线程池 + * 容量:100个线程 + * 拒绝策略:AbortPolicy(直接拒绝,由调度线程捕获并重试) + */ + @Bean("fileUploadExecutor") + public Executor fileUploadExecutor() { + ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor(); + // 核心线程数 + executor.setCorePoolSize(100); + // 最大线程数 + executor.setMaxPoolSize(100); + // 队列容量(设为0,不使用队列,直接走拒绝策略) + executor.setQueueCapacity(0); + // 线程名称前缀 + executor.setThreadNamePrefix("file-upload-"); + // 拒绝策略:AbortPolicy,抛出 RejectedExecutionException + executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); + // 线程空闲时间(秒) + executor.setKeepAliveSeconds(60); + // 等待所有任务完成后再关闭 + executor.setWaitForTasksToCompleteOnShutdown(true); + // 最长等待时间 + executor.setAwaitTerminationSeconds(60); + executor.initialize(); + return executor; + } +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/config/AsyncThreadPoolConfig.java +git commit -m "feat: 添加异步线程池配置" +``` + +--- + +## Task 6: Service 接口 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java` + +**Step 1: 创建 Service 接口** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java`: + +```java +package com.ruoyi.ccdi.project.service; + +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; +import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; +import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; +import org.springframework.web.multipart.MultipartFile; + +/** + * 文件上传服务接口 + * + * @author ruoyi + * @date 2026-03-05 + */ +public interface ICcdiFileUploadService { + + /** + * 批量上传文件 + * + * @param projectId 项目ID + * @param files 文件数组 + * @param username 上传人 + * @return 批次ID + */ + String batchUploadFiles(Long projectId, MultipartFile[] files, String username); + + /** + * 查询上传记录列表 + * + * @param page 分页参数 + * @param queryDTO 查询条件 + * @return 分页结果 + */ + Page selectPage(Page page, + CcdiFileUploadQueryDTO queryDTO); + + /** + * 统计各状态文件数量 + * + * @param projectId 项目ID + * @return 统计结果 + */ + CcdiFileUploadStatisticsVO countByStatus(Long projectId); + + /** + * 根据ID查询记录详情 + * + * @param id 记录ID + * @return 记录详情 + */ + CcdiFileUploadRecord getById(Long id); +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java +git commit -m "feat: 添加文件上传服务接口" +``` + +--- + +## Task 7: Service 实现(核心)- Part 1: 基础方法 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` + +**Step 1: 创建 Service 实现类(第一部分:基础CRUD方法)** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`: + +```java +package com.ruoyi.ccdi.project.service.impl; + +import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; +import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; +import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; +import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; +import com.ruoyi.ccdi.project.service.ICcdiFileUploadService; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.stereotype.Service; +import org.springframework.util.StringUtils; + +import java.util.List; +import java.util.Map; + +/** + * 文件上传服务实现 + * + * @author ruoyi + * @date 2026-03-05 + */ +@Slf4j +@Service +public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { + + @Resource + private CcdiFileUploadRecordMapper recordMapper; + + @Override + public Page selectPage(Page page, + CcdiFileUploadQueryDTO queryDTO) { + LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); + + // 项目ID + if (queryDTO.getProjectId() != null) { + queryWrapper.eq(CcdiFileUploadRecord::getProjectId, queryDTO.getProjectId()); + } + + // 文件状态 + if (StringUtils.hasText(queryDTO.getFileStatus())) { + queryWrapper.eq(CcdiFileUploadRecord::getFileStatus, queryDTO.getFileStatus()); + } + + // 文件名称(模糊查询) + if (StringUtils.hasText(queryDTO.getFileName())) { + queryWrapper.like(CcdiFileUploadRecord::getFileName, queryDTO.getFileName()); + } + + // 上传人 + if (StringUtils.hasText(queryDTO.getUploadUser())) { + queryWrapper.eq(CcdiFileUploadRecord::getUploadUser, queryDTO.getUploadUser()); + } + + // 按上传时间倒序 + queryWrapper.orderByDesc(CcdiFileUploadRecord::getUploadTime); + + return recordMapper.selectPage(page, queryWrapper); + } + + @Override + public CcdiFileUploadStatisticsVO countByStatus(Long projectId) { + // 查询统计数据 + List> statusCounts = recordMapper.countByStatus(projectId); + + // 组装 VO + CcdiFileUploadStatisticsVO vo = new CcdiFileUploadStatisticsVO(); + vo.setUploading(0L); + vo.setParsing(0L); + vo.setParsedSuccess(0L); + vo.setParsedFailed(0L); + + long total = 0L; + for (Map item : statusCounts) { + String status = (String) item.get("status"); + Long count = ((Number) item.get("count")).longValue(); + total += count; + + switch (status) { + case "uploading" -> vo.setUploading(count); + case "parsing" -> vo.setParsing(count); + case "parsed_success" -> vo.setParsedSuccess(count); + case "parsed_failed" -> vo.setParsedFailed(count); + } + } + + vo.setTotal(total); + return vo; + } + + @Override + public CcdiFileUploadRecord getById(Long id) { + return recordMapper.selectById(id); + } + + // batchUploadFiles 方法将在下一步实现 + @Override + public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { + // TODO: 将在下一步实现 + throw new UnsupportedOperationException("Method not implemented yet"); + } +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +git commit -m "feat: 添加文件上传服务实现(基础CRUD方法)" +``` + +--- + +## Task 8: Service 实现(核心)- Part 2: 批量上传主方法 + +**Files:** +- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` + +**Step 1: 实现批量上传主方法** + +在 `CcdiFileUploadServiceImpl.java` 中添加以下代码(替换原来的 TODO): + +```java +@Override +public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { + log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}", + projectId, files.length, username); + + // 1. 生成批次ID + String batchId = java.util.UUID.randomUUID().toString().replace("-", ""); + + // 2. 获取项目的 lsfxProjectId + // TODO: 需要注入 CcdiProjectMapper 并查询项目信息 + // Integer lsfxProjectId = project.getLsfxProjectId(); + Integer lsfxProjectId = 1; // 临时硬编码,稍后修复 + + // 3. 批量插入文件记录(status=uploading) + List records = new java.util.ArrayList<>(); + java.util.Date now = new java.util.Date(); + + for (MultipartFile file : files) { + CcdiFileUploadRecord record = new CcdiFileUploadRecord(); + record.setProjectId(projectId); + record.setLsfxProjectId(lsfxProjectId); + record.setFileName(file.getOriginalFilename()); + record.setFileSize(file.getSize()); + record.setFileStatus("uploading"); + record.setUploadTime(now); + record.setUploadUser(username); + records.add(record); + } + + recordMapper.insertBatch(records); + log.info("【文件上传】批量插入记录成功: 数量={}", records.size()); + + // 4. 异步启动调度线程提交任务 + final Integer finalLsfxProjectId = lsfxProjectId; + java.util.concurrent.CompletableFuture.runAsync(() -> { + submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId); + }); + + log.info("【文件上传】批量上传任务已提交: batchId={}", batchId); + return batchId; +} + +/** + * 调度线程:循环提交任务到线程池 + * 支持等待30秒重试机制 + */ +private void submitTasksAsync(Long projectId, Integer lsfxProjectId, + MultipartFile[] files, + List records, + String batchId) { + log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId); + + // TODO: 创建批次日志文件 + // FileUploadLogAppender.createBatchLogFile(projectId, batchId); + + // 循环提交任务 + for (int i = 0; i < files.length; i++) { + MultipartFile file = files[i]; + CcdiFileUploadRecord record = records.get(i); + + boolean submitted = false; + int retryCount = 0; + + while (!submitted && retryCount < 2) { + try { + // 尝试提交异步任务 + java.util.concurrent.CompletableFuture.runAsync( + () -> processFileAsync(projectId, lsfxProjectId, file, + record.getId(), batchId, record), + fileUploadExecutor + ); + submitted = true; + log.info("【文件上传】任务提交成功: fileName={}, recordId={}", + file.getOriginalFilename(), record.getId()); + } catch (java.util.concurrent.RejectedExecutionException e) { + retryCount++; + if (retryCount == 1) { + log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", + file.getOriginalFilename()); + try { + Thread.sleep(30000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename()); + updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); + break; + } + } else { + log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename()); + updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); + } + } + } + } + + log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId); +} + +/** + * 更新记录状态(辅助方法) + */ +private void updateRecordStatus(Long recordId, String status, String errorMessage) { + CcdiFileUploadRecord record = new CcdiFileUploadRecord(); + record.setId(recordId); + record.setFileStatus(status); + record.setErrorMessage(errorMessage); + recordMapper.updateById(record); +} + +/** + * 异步处理单个文件的完整流程 + * TODO: 下一步实现完整逻辑 + */ +private void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + Long recordId, String batchId, CcdiFileUploadRecord record) { + // TODO: 将在下一步实现完整流程 + log.info("【文件上传】开始处理文件: fileName={}", file.getOriginalFilename()); +} +``` + +**Step 2: 在类顶部添加依赖注入** + +在 `CcdiFileUploadServiceImpl` 类的顶部添加: + +```java +@Resource +@org.springframework.beans.factory.annotation.Qualifier("fileUploadExecutor") +private java.util.concurrent.Executor fileUploadExecutor; +``` + +**Step 3: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 4: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +git commit -m "feat: 实现批量上传主方法和调度线程" +``` + +--- + +## Task 9: Service 实现(核心)- Part 3: 异步处理单个文件 + +**Files:** +- Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` + +**Step 1: 实现异步处理单个文件的完整流程** + +在 `CcdiFileUploadServiceImpl.java` 中,替换 `processFileAsync` 方法: + +```java +/** + * 异步处理单个文件的完整流程 + * 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据 + */ +@org.springframework.scheduling.annotation.Async("fileUploadExecutor") +public void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + Long recordId, String batchId, CcdiFileUploadRecord record) { + log.info("【文件上传】开始处理文件: fileName={}, recordId={}", + file.getOriginalFilename(), recordId); + + try { + // 步骤1:状态已是uploading,记录已存在 + + // 步骤2:上传文件到流水分析平台 + log.info("【文件上传】步骤2: 上传文件到流水分析平台"); + // TODO: 调用 lsfxClient.uploadFile() + // UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); + // Integer logId = uploadResponse.getData().getLogId(); + + // 临时模拟 logId + Integer logId = (int) (System.currentTimeMillis() % 1000000); + + // 步骤3:更新状态为 parsing + log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId); + record.setLogId(logId); + record.setFileStatus("parsing"); + recordMapper.updateById(record); + + // 步骤4:轮询解析状态(最多300次,间隔2秒) + log.info("【文件上传】步骤4: 开始轮询解析状态"); + // TODO: 实现真实的轮询逻辑 + // boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); + boolean parsingComplete = true; // 临时模拟 + + if (!parsingComplete) { + throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确"); + } + + // 步骤5:获取文件上传状态 + log.info("【文件上传】步骤5: 获取文件上传状态"); + // TODO: 调用 lsfxClient.getFileUploadStatus() + // GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...); + + // 步骤6:判断解析结果 + // TODO: 实现真实的判断逻辑 + boolean parseSuccess = true; // 临时模拟 + + if (parseSuccess) { + // 解析成功 + log.info("【文件上传】步骤6: 解析成功,保存主体信息"); + record.setFileStatus("parsed_success"); + record.setEnterpriseNames("测试主体1,测试主体2"); + record.setAccountNos("622xxx,623xxx"); + recordMapper.updateById(record); + + // 步骤7:获取流水数据并保存 + log.info("【文件上传】步骤7: 获取流水数据"); + // TODO: 实现 fetchAndSaveBankStatements + // fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount); + + } else { + // 解析失败 + log.warn("【文件上传】步骤6: 解析失败"); + record.setFileStatus("parsed_failed"); + record.setErrorMessage("解析失败:文件格式错误"); + recordMapper.updateById(record); + } + + log.info("【文件上传】处理完成: fileName={}", file.getOriginalFilename()); + + } catch (Exception e) { + log.error("【文件上传】处理失败: fileName={}", file.getOriginalFilename(), e); + updateRecordStatus(recordId, "parsed_failed", e.getMessage()); + } +} + +/** + * 轮询解析状态 + * TODO: 实现真实逻辑 + */ +private boolean waitForParsingComplete(Integer groupId, String logId) { + // TODO: 调用 lsfxClient.checkParseStatus() 轮询 + return true; +} + +/** + * 获取并保存流水数据 + * TODO: 实现真实逻辑 + */ +private void fetchAndSaveBankStatements(Long projectId, Integer groupId, + Integer logId, int totalCount) { + // TODO: 调用 lsfxClient.getBankStatement() 获取流水 + // TODO: 批量插入到 ccdi_bank_statement +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +git commit -m "feat: 实现异步处理单个文件的完整流程" +``` + +--- + +## Task 10: Controller 实现 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/controller/CcdiFileUploadController.java` + +**Step 1: 创建 Controller** + +创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/controller/CcdiFileUploadController.java`: + +```java +package com.ruoyi.ccdi.project.controller; + +import com.baomidou.mybatisplus.extension.plugins.pagination.Page; +import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; +import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; +import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; +import com.ruoyi.ccdi.project.service.ICcdiFileUploadService; +import com.ruoyi.common.core.controller.BaseController; +import com.ruoyi.common.core.domain.AjaxResult; +import com.ruoyi.common.core.page.TableDataInfo; +import com.ruoyi.common.utils.SecurityUtils; +import io.swagger.v3.oas.annotations.Operation; +import io.swagger.v3.oas.annotations.tags.Tag; +import jakarta.annotation.Resource; +import lombok.extern.slf4j.Slf4j; +import org.springframework.web.bind.annotation.*; +import org.springframework.web.multipart.MultipartFile; + +import java.util.concurrent.RejectedExecutionException; + +/** + * 文件上传 Controller + * + * @author ruoyi + * @date 2026-03-05 + */ +@Slf4j +@RestController +@RequestMapping("/ccdi/file-upload") +@Tag(name = "文件上传管理", description = "项目文件上传相关接口") +public class CcdiFileUploadController extends BaseController { + + @Resource + private ICcdiFileUploadService fileUploadService; + + /** + * 批量上传文件(异步) + */ + @PostMapping("/batch") + @Operation(summary = "批量上传文件", description = "异步批量上传流水文件") + public AjaxResult batchUpload(@RequestParam Long projectId, + @RequestParam MultipartFile[] files) { + // 参数校验 + if (projectId == null) { + return AjaxResult.error("项目ID不能为空"); + } + if (files == null || files.length == 0) { + return AjaxResult.error("请选择要上传的文件"); + } + if (files.length > 100) { + return AjaxResult.error("单次最多上传100个文件"); + } + + // 校验文件大小和格式 + for (MultipartFile file : files) { + if (file.isEmpty()) { + return AjaxResult.error("文件不能为空"); + } + if (file.getSize() > 50 * 1024 * 1024) { + return AjaxResult.error("文件 " + file.getOriginalFilename() + " 超过50MB限制"); + } + String fileName = file.getOriginalFilename(); + if (!fileName.endsWith(".xlsx") && !fileName.endsWith(".xls")) { + return AjaxResult.error("文件 " + fileName + " 格式不支持,仅支持Excel文件"); + } + } + + try { + String username = SecurityUtils.getUsername(); + String batchId = fileUploadService.batchUploadFiles(projectId, files, username); + return AjaxResult.success("上传任务已提交", batchId); + } catch (RejectedExecutionException e) { + log.warn("线程池已满,拒绝上传请求: projectId={}, fileCount={}", projectId, files.length); + return AjaxResult.error("系统繁忙,请稍后再试"); + } catch (Exception e) { + log.error("批量上传失败: projectId={}", projectId, e); + return AjaxResult.error("上传失败:" + e.getMessage()); + } + } + + /** + * 查询上传记录列表 + */ + @GetMapping("/list") + @Operation(summary = "查询上传记录列表", description = "分页查询文件上传记录") + public TableDataInfo list(CcdiFileUploadQueryDTO queryDTO) { + Page page = new Page<>(getPageNum(), getPageSize()); + Page result = fileUploadService.selectPage(page, queryDTO); + return getDataTable(result.getRecords(), result.getTotal()); + } + + /** + * 查询上传统计 + */ + @GetMapping("/statistics/{projectId}") + @Operation(summary = "查询上传统计", description = "统计各状态的文件数量") + public AjaxResult getStatistics(@PathVariable Long projectId) { + CcdiFileUploadStatisticsVO statistics = fileUploadService.countByStatus(projectId); + return AjaxResult.success(statistics); + } + + /** + * 查询记录详情 + */ + @GetMapping("/detail/{id}") + @Operation(summary = "查询记录详情", description = "根据ID查询文件上传记录详情") + public AjaxResult getDetail(@PathVariable Long id) { + CcdiFileUploadRecord record = fileUploadService.getById(id); + return AjaxResult.success(record); + } +} +``` + +**Step 2: 编译验证** + +```bash +cd ccdi-project +mvn clean compile +``` + +Expected: BUILD SUCCESS + +**Step 3: 提交** + +```bash +git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/controller/CcdiFileUploadController.java +git commit -m "feat: 添加文件上传Controller" +``` + +--- + +## Task 11: 日志配置(可选) + +**说明:** 此任务为可选任务,实现自定义日志文件功能。由于实现较复杂,可以先跳过,使用默认日志。 + +**Files:** +- Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java` +- Create: `ruoyi-admin/src/main/resources/logback-fileupload.xml` + +**Step 1: 跳过此任务** + +```bash +echo "日志配置为可选任务,暂时跳过" +``` + +**Step 2: 提交当前进度** + +```bash +git status +git add -A +git commit -m "feat: 完成文件上传功能核心实现(日志配置待完善)" +``` + +--- + +## Task 12: 集成测试 - 准备测试数据 + +**Files:** +- Create: `test-data/sample-bank-statement.xlsx` (手动准备测试Excel文件) + +**Step 1: 准备测试文件** + +手动创建一个简单的Excel测试文件,或使用现有的流水文件。 + +**Step 2: 获取测试 Token** + +```bash +curl -X POST "http://localhost:8080/login/test?username=admin&password=admin123" +``` + +Expected: 返回 token + +**Step 3: 记录 Token** + +保存返回的 token,后续测试会用到。 + +--- + +## Task 13: 集成测试 - 批量上传接口 + +**Step 1: 测试批量上传接口** + +```bash +# 替换 YOUR_TOKEN 和 PROJECT_ID +curl -X POST "http://localhost:8080/ccdi/file-upload/batch" \ + -H "Authorization: Bearer YOUR_TOKEN" \ + -F "projectId=1" \ + -F "files=@test-data/sample-bank-statement.xlsx" +``` + +Expected: +```json +{ + "code": 200, + "msg": "上传任务已提交", + "data": "batch-id-here" +} +``` + +**Step 2: 查询上传记录列表** + +```bash +curl -X GET "http://localhost:8080/ccdi/file-upload/list?projectId=1&pageNum=1&pageSize=10" \ + -H "Authorization: Bearer YOUR_TOKEN" +``` + +Expected: 返回分页列表,包含刚上传的文件记录 + +**Step 3: 查询上传统计** + +```bash +curl -X GET "http://localhost:8080/ccdi/file-upload/statistics/1" \ + -H "Authorization: Bearer YOUR_TOKEN" +``` + +Expected: +```json +{ + "code": 200, + "data": { + "uploading": 1, + "parsing": 0, + "parsedSuccess": 0, + "parsedFailed": 0, + "total": 1 + } +} +``` + +**Step 4: 查看数据库记录** + +```bash +mysql -h 116.62.17.81 -u root -pKfcx@1234 ccdi -e "SELECT * FROM ccdi_file_upload_record ORDER BY id DESC LIMIT 5" +``` + +Expected: 看到刚插入的记录,状态为 uploading 或 parsing + +--- + +## Task 14: 文档更新 + +**Files:** +- Create: `doc/api-docs/ccdi-file-upload-api.md` + +**Step 1: 创建 API 文档** + +创建文件 `doc/api-docs/ccdi-file-upload-api.md`: + +```markdown +# 文件上传 API 文档 + +## 1. 批量上传文件 + +### 接口地址 +POST /ccdi/file-upload/batch + +### 请求参数 +| 参数 | 类型 | 必填 | 说明 | +|------|------|------|------| +| projectId | Long | 是 | 项目ID | +| files | File[] | 是 | 文件数组(最多100个,单个最大50MB) | + +### 返回示例 +```json +{ + "code": 200, + "msg": "上传任务已提交", + "data": "batch-id" +} +``` + +## 2. 查询上传记录列表 + +### 接口地址 +GET /ccdi/file-upload/list + +### 请求参数 +| 参数 | 类型 | 必填 | 说明 | +|------|------|------|------| +| projectId | Long | 否 | 项目ID | +| fileStatus | String | 否 | 文件状态 | +| fileName | String | 否 | 文件名称 | +| pageNum | Integer | 否 | 页码,默认1 | +| pageSize | Integer | 否 | 每页数量,默认10 | + +### 返回示例 +```json +{ + "code": 200, + "rows": [...], + "total": 100 +} +``` + +## 3. 查询上传统计 + +### 接口地址 +GET /ccdi/file-upload/statistics/{projectId} + +### 返回示例 +```json +{ + "code": 200, + "data": { + "uploading": 2, + "parsing": 3, + "parsedSuccess": 15, + "parsedFailed": 1, + "total": 21 + } +} +``` + +## 4. 查询记录详情 + +### 接口地址 +GET /ccdi/file-upload/detail/{id} + +### 返回示例 +```json +{ + "code": 200, + "data": { + "id": 1, + "projectId": 1, + "fileName": "流水1.xlsx", + "fileStatus": "parsed_success", + ... + } +} +``` +``` + +**Step 2: 提交文档** + +```bash +git add doc/api-docs/ccdi-file-upload-api.md +git commit -m "docs: 添加文件上传API文档" +``` + +--- + +## Task 15: 最终提交和推送 + +**Step 1: 查看所有修改** + +```bash +git status +git log --oneline -10 +``` + +**Step 2: 推送到远程仓库** + +```bash +git push origin dev +``` + +Expected: 推送成功 + +**Step 3: 生成 API 文档** + +```bash +# 访问 Swagger UI 查看接口文档 +# http://localhost:8080/swagger-ui/index.html +``` + +--- + +## 总结 + +**已完成的功能:** +- ✅ 数据库表创建 +- ✅ 实体类、DTO、VO 创建 +- ✅ Mapper 接口和 XML 映射 +- ✅ 线程池配置 +- ✅ Service 接口和实现(核心异步处理逻辑) +- ✅ Controller 接口 +- ✅ 集成测试 +- ✅ API 文档 + +**待完善的功能(后续迭代):** +- ⏳ 自定义日志 Appender(独立批次日志文件) +- ⏳ 完整的流水分析平台接口调用(当前为模拟逻辑) +- ⏳ 前端页面开发 +- ⏳ 更完善的异常处理和重试机制 +- ⏳ 性能优化(批量插入优化、轮询间隔优化) + +**测试建议:** +1. 单文件上传测试 +2. 多文件并发上传测试(10个、50个、100个) +3. 超载测试(150个文件,验证等待重试机制) +4. 异常场景测试(文件格式错误、文件过大、项目不存在等) + +**部署检查清单:** +- [ ] 数据库表已创建 +- [ ] 线程池配置正确(容量100) +- [ ] 文件上传大小限制配置(50MB) +- [ ] 流水分析平台地址配置正确 +- [ ] 日志目录权限正确 + +--- + +**实施计划完成!**