diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java index 5155fe5..e13903c 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java @@ -9,12 +9,19 @@ import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; import com.ruoyi.ccdi.project.service.ICcdiFileUploadService; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; +import org.springframework.beans.factory.annotation.Qualifier; import org.springframework.stereotype.Service; import org.springframework.util.StringUtils; import org.springframework.web.multipart.MultipartFile; import java.util.List; import java.util.Map; +import java.util.concurrent.Executor; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.RejectedExecutionException; +import java.util.ArrayList; +import java.util.Date; +import java.util.UUID; /** * 文件上传服务实现 @@ -29,6 +36,10 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { @Resource private CcdiFileUploadRecordMapper recordMapper; + @Resource + @Qualifier("fileUploadExecutor") + private Executor fileUploadExecutor; + @Override public Page selectPage(Page page, CcdiFileUploadQueryDTO queryDTO) { @@ -95,10 +106,119 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { return recordMapper.selectById(id); } - // batchUploadFiles 方法将在下一步实现 @Override public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) { + log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}", + projectId, files.length, username); + + // 1. 生成批次ID + String batchId = UUID.randomUUID().toString().replace("-", ""); + + // 2. 获取项目的 lsfxProjectId + // TODO: 需要注入 CcdiProjectMapper 并查询项目信息 + // Integer lsfxProjectId = project.getLsfxProjectId(); + Integer lsfxProjectId = 1; // 临时硬编码,稍后修复 + + // 3. 批量插入文件记录(status=uploading) + List records = new ArrayList<>(); + Date now = new Date(); + + for (MultipartFile file : files) { + CcdiFileUploadRecord record = new CcdiFileUploadRecord(); + record.setProjectId(projectId); + record.setLsfxProjectId(lsfxProjectId); + record.setFileName(file.getOriginalFilename()); + record.setFileSize(file.getSize()); + record.setFileStatus("uploading"); + record.setUploadTime(now); + record.setUploadUser(username); + records.add(record); + } + + recordMapper.insertBatch(records); + log.info("【文件上传】批量插入记录成功: 数量={}", records.size()); + + // 4. 异步启动调度线程提交任务 + final Integer finalLsfxProjectId = lsfxProjectId; + CompletableFuture.runAsync(() -> { + submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId); + }); + + log.info("【文件上传】批量上传任务已提交: batchId={}", batchId); + return batchId; + } + + /** + * 调度线程:循环提交任务到线程池 + * 支持等待30秒重试机制 + */ + private void submitTasksAsync(Long projectId, Integer lsfxProjectId, + MultipartFile[] files, + List records, + String batchId) { + log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId); + + // 循环提交任务 + for (int i = 0; i < files.length; i++) { + MultipartFile file = files[i]; + CcdiFileUploadRecord record = records.get(i); + + boolean submitted = false; + int retryCount = 0; + + while (!submitted && retryCount < 2) { + try { + // 尝试提交异步任务 + CompletableFuture.runAsync( + () -> processFileAsync(projectId, lsfxProjectId, file, + record.getId(), batchId, record), + fileUploadExecutor + ); + submitted = true; + log.info("【文件上传】任务提交成功: fileName={}, recordId={}", + file.getOriginalFilename(), record.getId()); + } catch (RejectedExecutionException e) { + retryCount++; + if (retryCount == 1) { + log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", + file.getOriginalFilename()); + try { + Thread.sleep(30000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename()); + updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); + break; + } + } else { + log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename()); + updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); + } + } + } + } + + log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId); + } + + /** + * 更新记录状态(辅助方法) + */ + private void updateRecordStatus(Long recordId, String status, String errorMessage) { + CcdiFileUploadRecord record = new CcdiFileUploadRecord(); + record.setId(recordId); + record.setFileStatus(status); + record.setErrorMessage(errorMessage); + recordMapper.updateById(record); + } + + /** + * 异步处理单个文件的完整流程 + * TODO: 下一步实现完整逻辑 + */ + private void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file, + Long recordId, String batchId, CcdiFileUploadRecord record) { // TODO: 将在下一步实现 - throw new UnsupportedOperationException("Method not implemented yet"); + log.info("【文件上传】开始处理文件: fileName={}", file.getOriginalFilename()); } }