diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java index cbdbdb5..550a8f3 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java @@ -12,9 +12,18 @@ import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.scheduling.annotation.Async; import org.springframework.stereotype.Service; +import org.springframework.transaction.annotation.Transactional; +import org.springframework.transaction.support.TransactionSynchronization; +import org.springframework.transaction.support.TransactionSynchronizationManager; import org.springframework.util.StringUtils; import org.springframework.web.multipart.MultipartFile; +import java.io.File; +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.nio.file.StandardCopyOption; import java.util.List; import java.util.Map; import java.util.concurrent.Executor; @@ -34,6 +43,12 @@ import java.util.UUID; @Service public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { + /** + * 临时文件存储目录 + * TODO: 应该从配置文件中读取 + */ + private static final String TEMP_FILE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "ccdi-upload"; + @Resource private CcdiFileUploadRecordMapper recordMapper; @@ -107,6 +122,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { return recordMapper.selectById(id); } + @Transactional @Override public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}", @@ -120,11 +136,39 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { // Integer lsfxProjectId = project.getLsfxProjectId(); Integer lsfxProjectId = 1; // 临时硬编码,稍后修复 + // Critical Fix #2: 保存MultipartFile到临时存储,避免异步处理时文件已被清理 + List tempFilePaths = new ArrayList<>(); + try { + // 确保临时目录存在 + Path tempDir = Paths.get(TEMP_FILE_DIR); + if (!Files.exists(tempDir)) { + Files.createDirectories(tempDir); + } + + // 保存所有文件到临时目录 + for (MultipartFile file : files) { + String originalFilename = file.getOriginalFilename(); + String tempFileName = batchId + "_" + System.currentTimeMillis() + "_" + originalFilename; + Path tempFilePath = tempDir.resolve(tempFileName); + + // 将MultipartFile内容复制到临时文件 + Files.copy(file.getInputStream(), tempFilePath, StandardCopyOption.REPLACE_EXISTING); + tempFilePaths.add(tempFilePath.toString()); + + log.debug("【文件上传】保存临时文件: originalName={}, tempPath={}", + originalFilename, tempFilePath); + } + } catch (IOException e) { + log.error("【文件上传】保存临时文件失败", e); + throw new RuntimeException("保存临时文件失败: " + e.getMessage(), e); + } + // 3. 批量插入文件记录(status=uploading) List records = new ArrayList<>(); Date now = new Date(); - for (MultipartFile file : files) { + for (int i = 0; i < files.length; i++) { + MultipartFile file = files[i]; CcdiFileUploadRecord record = new CcdiFileUploadRecord(); record.setProjectId(projectId); record.setLsfxProjectId(lsfxProjectId); @@ -139,10 +183,24 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { recordMapper.insertBatch(records); log.info("【文件上传】批量插入记录成功: 数量={}", records.size()); - // 4. 异步启动调度线程提交任务 + // Critical Fix #3: 验证ID已生成 + for (CcdiFileUploadRecord record : records) { + if (record.getId() == null) { + throw new RuntimeException("批量插入失败: 未生成记录ID,请检查Mapper配置useGeneratedKeys=true"); + } + } + log.debug("【文件上传】ID验证通过: 所有记录ID已生成"); + + // Critical Fix #1: 使用TransactionSynchronization确保异步任务在事务提交后启动 final Integer finalLsfxProjectId = lsfxProjectId; - CompletableFuture.runAsync(() -> { - submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId); + TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { + @Override + public void afterCommit() { + log.info("【文件上传】事务已提交,启动异步任务"); + CompletableFuture.runAsync(() -> { + submitTasksAsync(projectId, finalLsfxProjectId, tempFilePaths, records, batchId); + }); + } }); log.info("【文件上传】批量上传任务已提交: batchId={}", batchId); @@ -152,16 +210,28 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { /** * 调度线程:循环提交任务到线程池 * 支持等待30秒重试机制 + * + * @param projectId 项目ID + * @param lsfxProjectId 流水分析项目ID + * @param tempFilePaths 临时文件路径列表 + * @param records 文件上传记录列表 + * @param batchId 批次ID */ private void submitTasksAsync(Long projectId, Integer lsfxProjectId, - MultipartFile[] files, + List tempFilePaths, List records, String batchId) { log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId); // 循环提交任务 - for (int i = 0; i < files.length; i++) { - MultipartFile file = files[i]; + for (int i = 0; i < tempFilePaths.size(); i++) { + // Critical Fix #6: 检查线程中断状态 + if (Thread.currentThread().isInterrupted()) { + log.warn("【文件上传】调度线程被中断,停止提交剩余任务"); + break; + } + + String tempFilePath = tempFilePaths.get(i); CcdiFileUploadRecord record = records.get(i); boolean submitted = false; @@ -171,28 +241,28 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { try { // 尝试提交异步任务 CompletableFuture.runAsync( - () -> processFileAsync(projectId, lsfxProjectId, file, + () -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record), fileUploadExecutor ); submitted = true; log.info("【文件上传】任务提交成功: fileName={}, recordId={}", - file.getOriginalFilename(), record.getId()); + record.getFileName(), record.getId()); } catch (RejectedExecutionException e) { retryCount++; if (retryCount == 1) { log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", - file.getOriginalFilename()); + record.getFileName()); try { Thread.sleep(30000); } catch (InterruptedException ie) { Thread.currentThread().interrupt(); - log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename()); + log.error("【文件上传】等待被中断: fileName={}", record.getFileName()); updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); break; } } else { - log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename()); + log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName()); updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); } } @@ -216,20 +286,34 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { /** * 异步处理单个文件的完整流程 * 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据 + * + * @param projectId 项目ID + * @param lsfxProjectId 流水分析项目ID + * @param tempFilePath 临时文件路径 + * @param recordId 记录ID + * @param batchId 批次ID + * @param record 文件上传记录 */ @Async("fileUploadExecutor") - public void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath, Long recordId, String batchId, CcdiFileUploadRecord record) { - log.info("【文件上传】开始处理文件: fileName={}, recordId={}", - file.getOriginalFilename(), recordId); + log.info("【文件上传】开始处理文件: fileName={}, recordId={}, tempPath={}", + record.getFileName(), recordId, tempFilePath); try { // 步骤1:状态已是uploading,记录已存在 + // 从临时文件路径读取文件 + Path filePath = Paths.get(tempFilePath); + if (!Files.exists(filePath)) { + throw new RuntimeException("临时文件不存在: " + tempFilePath); + } + // 步骤2:上传文件到流水分析平台 log.info("【文件上传】步骤2: 上传文件到流水分析平台"); // TODO: 调用 lsfxClient.uploadFile() - // UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); + // 需要将临时文件转换为MultipartFile或直接使用文件路径 + // UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, filePath.toFile()); // Integer logId = uploadResponse.getData().getLogId(); // 临时模拟 logId @@ -264,6 +348,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { // 解析成功 log.info("【文件上传】步骤6: 解析成功,保存主体信息"); record.setFileStatus("parsed_success"); + // TODO: 从实际的解析结果中获取 record.setEnterpriseNames("测试主体1,测试主体2"); record.setAccountNos("622xxx,623xxx"); recordMapper.updateById(record); @@ -281,11 +366,22 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { recordMapper.updateById(record); } - log.info("【文件上传】处理完成: fileName={}", file.getOriginalFilename()); + log.info("【文件上传】处理完成: fileName={}", record.getFileName()); } catch (Exception e) { - log.error("【文件上传】处理失败: fileName={}", file.getOriginalFilename(), e); + log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e); updateRecordStatus(recordId, "parsed_failed", e.getMessage()); + } finally { + // 清理临时文件 + try { + Path filePath = Paths.get(tempFilePath); + if (Files.exists(filePath)) { + Files.delete(filePath); + log.debug("【文件上传】清理临时文件: {}", tempFilePath); + } + } catch (IOException e) { + log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e); + } } } diff --git a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml index 63bbbec..7190245 100644 --- a/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml +++ b/ccdi-project/src/main/resources/mapper/ccdi/project/CcdiFileUploadRecordMapper.xml @@ -27,7 +27,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" - + insert into ccdi_file_upload_record ( project_id, lsfx_project_id, file_name, file_size, file_status, upload_time, upload_user