# 项目异步文件上传功能 - 子计划2:Service层核心实现 > **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task. **Goal:** 实现文件上传的核心业务逻辑,包括批量上传、异步处理、状态更新 **Architecture:** 双层异步架构(调度线程 + 文件处理线程池),先插入记录后异步处理 **Tech Stack:** Spring @Async, CompletableFuture, MyBatis Plus --- ## Task 1: Service 接口 **Files:** - Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java` **Step 1: 创建 Service 接口** 创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java`: ```java package com.ruoyi.ccdi.project.service; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; import org.springframework.web.multipart.MultipartFile; /** * 文件上传服务接口 * * @author ruoyi * @date 2026-03-05 */ public interface ICcdiFileUploadService { /** * 批量上传文件 * * @param projectId 项目ID * @param files 文件数组 * @param username 上传人 * @return 批次ID */ String batchUploadFiles(Long projectId, MultipartFile[] files, String username); /** * 查询上传记录列表 * * @param page 分页参数 * @param queryDTO 查询条件 * @return 分页结果 */ Page selectPage(Page page, CcdiFileUploadQueryDTO queryDTO); /** * 统计各状态文件数量 * * @param projectId 项目ID * @return 统计结果 */ CcdiFileUploadStatisticsVO countByStatus(Long projectId); /** * 根据ID查询记录详情 * * @param id 记录ID * @return 记录详情 */ CcdiFileUploadRecord getById(Long id); } ``` **Step 2: 编译验证** ```bash cd ccdi-project mvn clean compile ``` Expected: BUILD SUCCESS **Step 3: 提交** ```bash git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/ICcdiFileUploadService.java git commit -m "feat: 添加文件上传服务接口" ``` --- ## Task 2: Service 实现 - Part 1: 基础CRUD方法 **Files:** - Create: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` **Step 1: 创建 Service 实现类** 创建文件 `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java`: ```java package com.ruoyi.ccdi.project.service.impl; import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; import com.ruoyi.ccdi.project.service.ICcdiFileUploadService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import java.util.List; import java.util.Map; /** * 文件上传服务实现 * * @author ruoyi * @date 2026-03-05 */ @Slf4j @Service public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { @Resource private CcdiFileUploadRecordMapper recordMapper; @Override public Page selectPage(Page page, CcdiFileUploadQueryDTO queryDTO) { LambdaQueryWrapper queryWrapper = new LambdaQueryWrapper<>(); // 项目ID if (queryDTO.getProjectId() != null) { queryWrapper.eq(CcdiFileUploadRecord::getProjectId, queryDTO.getProjectId()); } // 文件状态 if (StringUtils.hasText(queryDTO.getFileStatus())) { queryWrapper.eq(CcdiFileUploadRecord::getFileStatus, queryDTO.getFileStatus()); } // 文件名称(模糊查询) if (StringUtils.hasText(queryDTO.getFileName())) { queryWrapper.like(CcdiFileUploadRecord::getFileName, queryDTO.getFileName()); } // 上传人 if (StringUtils.hasText(queryDTO.getUploadUser())) { queryWrapper.eq(CcdiFileUploadRecord::getUploadUser, queryDTO.getUploadUser()); } // 按上传时间倒序 queryWrapper.orderByDesc(CcdiFileUploadRecord::getUploadTime); return recordMapper.selectPage(page, queryWrapper); } @Override public CcdiFileUploadStatisticsVO countByStatus(Long projectId) { // 查询统计数据 List> statusCounts = recordMapper.countByStatus(projectId); // 组装 VO CcdiFileUploadStatisticsVO vo = new CcdiFileUploadStatisticsVO(); vo.setUploading(0L); vo.setParsing(0L); vo.setParsedSuccess(0L); vo.setParsedFailed(0L); long total = 0L; for (Map item : statusCounts) { String status = (String) item.get("status"); Long count = ((Number) item.get("count")).longValue(); total += count; switch (status) { case "uploading" -> vo.setUploading(count); case "parsing" -> vo.setParsing(count); case "parsed_success" -> vo.setParsedSuccess(count); case "parsed_failed" -> vo.setParsedFailed(count); } } vo.setTotal(total); return vo; } @Override public CcdiFileUploadRecord getById(Long id) { return recordMapper.selectById(id); } // batchUploadFiles 方法将在下一步实现 @Override public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { // TODO: 将在下一步实现 throw new UnsupportedOperationException("Method not implemented yet"); } } ``` **Step 2: 编译验证** ```bash cd ccdi-project mvn clean compile ``` Expected: BUILD SUCCESS **Step 3: 提交** ```bash git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java git commit -m "feat: 添加文件上传服务实现(基础CRUD方法)" ``` --- ## Task 3: Service 实现 - Part 2: 批量上传主方法 **Files:** - Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` **Step 1: 实现批量上传主方法** 在 `CcdiFileUploadServiceImpl.java` 中添加以下代码(替换原来的 TODO): ```java @Resource @org.springframework.beans.factory.annotation.Qualifier("fileUploadExecutor") private java.util.concurrent.Executor fileUploadExecutor; @Override public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}", projectId, files.length, username); // 1. 生成批次ID String batchId = java.util.UUID.randomUUID().toString().replace("-", ""); // 2. 获取项目的 lsfxProjectId // TODO: 需要注入 CcdiProjectMapper 并查询项目信息 // Integer lsfxProjectId = project.getLsfxProjectId(); Integer lsfxProjectId = 1; // 临时硬编码,稍后修复 // 3. 批量插入文件记录(status=uploading) List records = new java.util.ArrayList<>(); java.util.Date now = new java.util.Date(); for (MultipartFile file : files) { CcdiFileUploadRecord record = new CcdiFileUploadRecord(); record.setProjectId(projectId); record.setLsfxProjectId(lsfxProjectId); record.setFileName(file.getOriginalFilename()); record.setFileSize(file.getSize()); record.setFileStatus("uploading"); record.setUploadTime(now); record.setUploadUser(username); records.add(record); } recordMapper.insertBatch(records); log.info("【文件上传】批量插入记录成功: 数量={}", records.size()); // 4. 异步启动调度线程提交任务 final Integer finalLsfxProjectId = lsfxProjectId; java.util.concurrent.CompletableFuture.runAsync(() -> { submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId); }); log.info("【文件上传】批量上传任务已提交: batchId={}", batchId); return batchId; } /** * 调度线程:循环提交任务到线程池 * 支持等待30秒重试机制 */ private void submitTasksAsync(Long projectId, Integer lsfxProjectId, MultipartFile[] files, List records, String batchId) { log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId); // 循环提交任务 for (int i = 0; i < files.length; i++) { MultipartFile file = files[i]; CcdiFileUploadRecord record = records.get(i); boolean submitted = false; int retryCount = 0; while (!submitted && retryCount < 2) { try { // 尝试提交异步任务 java.util.concurrent.CompletableFuture.runAsync( () -> processFileAsync(projectId, lsfxProjectId, file, record.getId(), batchId, record), fileUploadExecutor ); submitted = true; log.info("【文件上传】任务提交成功: fileName={}, recordId={}", file.getOriginalFilename(), record.getId()); } catch (java.util.concurrent.RejectedExecutionException e) { retryCount++; if (retryCount == 1) { log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", file.getOriginalFilename()); try { Thread.sleep(30000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename()); updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); break; } } else { log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename()); updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); } } } } log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId); } /** * 更新记录状态(辅助方法) */ private void updateRecordStatus(Long recordId, String status, String errorMessage) { CcdiFileUploadRecord record = new CcdiFileUploadRecord(); record.setId(recordId); record.setFileStatus(status); record.setErrorMessage(errorMessage); recordMapper.updateById(record); } /** * 异步处理单个文件的完整流程 * TODO: 下一步实现完整逻辑 */ private void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, Long recordId, String batchId, CcdiFileUploadRecord record) { // TODO: 将在下一步实现 log.info("【文件上传】开始处理文件: fileName={}", file.getOriginalFilename()); } ``` **Step 2: 编译验证** ```bash cd ccdi-project mvn clean compile ``` Expected: BUILD SUCCESS **Step 3: 提交** ```bash git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java git commit -m "feat: 实现批量上传主方法和调度线程" ``` --- ## Task 4: Service 实现 - Part 3: 异步处理单个文件 **Files:** - Modify: `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` **Step 1: 实现异步处理单个文件的完整流程** 在 `CcdiFileUploadServiceImpl.java` 中,替换 `processFileAsync` 方法: ```java /** * 异步处理单个文件的完整流程 * 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据 */ @org.springframework.scheduling.annotation.Async("fileUploadExecutor") public void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, Long recordId, String batchId, CcdiFileUploadRecord record) { log.info("【文件上传】开始处理文件: fileName={}, recordId={}", file.getOriginalFilename(), recordId); try { // 步骤1:状态已是uploading,记录已存在 // 步骤2:上传文件到流水分析平台 log.info("【文件上传】步骤2: 上传文件到流水分析平台"); // TODO: 调用 lsfxClient.uploadFile() // UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); // Integer logId = uploadResponse.getData().getLogId(); // 临时模拟 logId Integer logId = (int) (System.currentTimeMillis() % 1000000); // 步骤3:更新状态为 parsing log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId); record.setLogId(logId); record.setFileStatus("parsing"); recordMapper.updateById(record); // 步骤4:轮询解析状态(最多300次,间隔2秒) log.info("【文件上传】步骤4: 开始轮询解析状态"); // TODO: 实现真实的轮询逻辑 // boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); boolean parsingComplete = true; // 临时模拟 if (!parsingComplete) { throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确"); } // 步骤5:获取文件上传状态 log.info("【文件上传】步骤5: 获取文件上传状态"); // TODO: 调用 lsfxClient.getFileUploadStatus() // GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...); // 步骤6:判断解析结果 // TODO: 实现真实的判断逻辑 boolean parseSuccess = true; // 临时模拟 if (parseSuccess) { // 解析成功 log.info("【文件上传】步骤6: 解析成功,保存主体信息"); record.setFileStatus("parsed_success"); record.setEnterpriseNames("测试主体1,测试主体2"); record.setAccountNos("622xxx,623xxx"); recordMapper.updateById(record); // 步骤7:获取流水数据并保存 log.info("【文件上传】步骤7: 获取流水数据"); // TODO: 实现 fetchAndSaveBankStatements // fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount); } else { // 解析失败 log.warn("【文件上传】步骤6: 解析失败"); record.setFileStatus("parsed_failed"); record.setErrorMessage("解析失败:文件格式错误"); recordMapper.updateById(record); } log.info("【文件上传】处理完成: fileName={}", file.getOriginalFilename()); } catch (Exception e) { log.error("【文件上传】处理失败: fileName={}", file.getOriginalFilename(), e); updateRecordStatus(recordId, "parsed_failed", e.getMessage()); } } /** * 轮询解析状态 * TODO: 实现真实逻辑 */ private boolean waitForParsingComplete(Integer groupId, String logId) { // TODO: 调用 lsfxClient.checkParseStatus() 轮询 return true; } /** * 获取并保存流水数据 * TODO: 实现真实逻辑 */ private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId, int totalCount) { // TODO: 调用 lsfxClient.getBankStatement() 获取流水 // TODO: 批量插入到 ccdi_bank_statement } ``` **Step 2: 编译验证** ```bash cd ccdi-project mvn clean compile ``` Expected: BUILD SUCCESS **Step 3: 提交** ```bash git add ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java git commit -m "feat: 实现异步处理单个文件的完整流程" ``` --- ## 子计划2完成检查清单 - [ ] Service接口创建完成 - [ ] 基础CRUD方法实现并测试通过 - [ ] 批量上传主方法实现完成 - [ ] 调度线程和重试机制实现 - [ ] 异步处理单个文件流程实现 - [ ] 所有代码已提交到git **下一步:** 执行子计划3 - Controller和API文档