feat: 实现批量上传主方法和调度线程

This commit is contained in:
wkc
2026-03-05 09:55:18 +08:00
parent 10cc8e87a5
commit 85a03a001d

View File

@@ -9,12 +9,19 @@ import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.RejectedExecutionException;
import java.util.ArrayList;
import java.util.Date;
import java.util.UUID;
/**
* 文件上传服务实现
@@ -29,6 +36,10 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Resource
private CcdiFileUploadRecordMapper recordMapper;
@Resource
@Qualifier("fileUploadExecutor")
private Executor fileUploadExecutor;
@Override
public Page<CcdiFileUploadRecord> selectPage(Page<CcdiFileUploadRecord> page,
CcdiFileUploadQueryDTO queryDTO) {
@@ -95,10 +106,119 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
return recordMapper.selectById(id);
}
// batchUploadFiles 方法将在下一步实现
@Override
public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) {
log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}",
projectId, files.length, username);
// 1. 生成批次ID
String batchId = UUID.randomUUID().toString().replace("-", "");
// 2. 获取项目的 lsfxProjectId
// TODO: 需要注入 CcdiProjectMapper 并查询项目信息
// Integer lsfxProjectId = project.getLsfxProjectId();
Integer lsfxProjectId = 1; // 临时硬编码,稍后修复
// 3. 批量插入文件记录(status=uploading)
List<CcdiFileUploadRecord> records = new ArrayList<>();
Date now = new Date();
for (MultipartFile file : files) {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(projectId);
record.setLsfxProjectId(lsfxProjectId);
record.setFileName(file.getOriginalFilename());
record.setFileSize(file.getSize());
record.setFileStatus("uploading");
record.setUploadTime(now);
record.setUploadUser(username);
records.add(record);
}
recordMapper.insertBatch(records);
log.info("【文件上传】批量插入记录成功: 数量={}", records.size());
// 4. 异步启动调度线程提交任务
final Integer finalLsfxProjectId = lsfxProjectId;
CompletableFuture.runAsync(() -> {
submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId);
});
log.info("【文件上传】批量上传任务已提交: batchId={}", batchId);
return batchId;
}
/**
* 调度线程:循环提交任务到线程池
* 支持等待30秒重试机制
*/
private void submitTasksAsync(Long projectId, Integer lsfxProjectId,
MultipartFile[] files,
List<CcdiFileUploadRecord> records,
String batchId) {
log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId);
// 循环提交任务
for (int i = 0; i < files.length; i++) {
MultipartFile file = files[i];
CcdiFileUploadRecord record = records.get(i);
boolean submitted = false;
int retryCount = 0;
while (!submitted && retryCount < 2) {
try {
// 尝试提交异步任务
CompletableFuture.runAsync(
() -> processFileAsync(projectId, lsfxProjectId, file,
record.getId(), batchId, record),
fileUploadExecutor
);
submitted = true;
log.info("【文件上传】任务提交成功: fileName={}, recordId={}",
file.getOriginalFilename(), record.getId());
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}",
file.getOriginalFilename());
try {
Thread.sleep(30000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename());
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
break;
}
} else {
log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename());
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
}
}
}
}
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
}
/**
* 更新记录状态(辅助方法)
*/
private void updateRecordStatus(Long recordId, String status, String errorMessage) {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setId(recordId);
record.setFileStatus(status);
record.setErrorMessage(errorMessage);
recordMapper.updateById(record);
}
/**
* 异步处理单个文件的完整流程
* TODO: 下一步实现完整逻辑
*/
private void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
Long recordId, String batchId, CcdiFileUploadRecord record) {
// TODO: 将在下一步实现
throw new UnsupportedOperationException("Method not implemented yet");
log.info("【文件上传】开始处理文件: fileName={}", file.getOriginalFilename());
}
}