fix: 修复3个Critical代码问题

Critical Fix #1: 事务边界违规
- 添加@Transactional注解
- 使用TransactionSynchronizationManager确保异步任务在事务提交后启动
- 避免事务回滚导致的数据不一致问题

Critical Fix #2: MultipartFile生命周期问题
- 在启动异步任务前将MultipartFile保存到临时存储
- 使用临时文件路径替代MultipartFile对象
- 在处理完成后清理临时文件

Critical Fix #3: 批量插入后ID生成验证
- 在XML映射中添加useGeneratedKeys=true和keyProperty=id
- 在批量插入后验证所有记录ID已生成
- 抛出异常如果ID未生成

Additional Fix: 线程中断处理
- 在调度线程中检查中断状态
- 被中断时停止提交剩余任务
This commit is contained in:
wkc
2026-03-05 10:30:36 +08:00
parent 83e2f39a4e
commit ebf66ea70b
2 changed files with 115 additions and 19 deletions

View File

@@ -12,9 +12,18 @@ import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.scheduling.annotation.Async;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import org.springframework.transaction.support.TransactionSynchronization;
import org.springframework.transaction.support.TransactionSynchronizationManager;
import org.springframework.util.StringUtils;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.util.List;
import java.util.Map;
import java.util.concurrent.Executor;
@@ -34,6 +43,12 @@ import java.util.UUID;
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
/**
* 临时文件存储目录
* TODO: 应该从配置文件中读取
*/
private static final String TEMP_FILE_DIR = System.getProperty("java.io.tmpdir") + File.separator + "ccdi-upload";
@Resource
private CcdiFileUploadRecordMapper recordMapper;
@@ -107,6 +122,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
return recordMapper.selectById(id);
}
@Transactional
@Override
public String batchUploadFiles(Long projectId, MultipartFile[] files, String username) {
log.info("【文件上传】开始批量上传: projectId={}, 文件数量={}, username={}",
@@ -120,11 +136,39 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
// Integer lsfxProjectId = project.getLsfxProjectId();
Integer lsfxProjectId = 1; // 临时硬编码,稍后修复
// Critical Fix #2: 保存MultipartFile到临时存储,避免异步处理时文件已被清理
List<String> tempFilePaths = new ArrayList<>();
try {
// 确保临时目录存在
Path tempDir = Paths.get(TEMP_FILE_DIR);
if (!Files.exists(tempDir)) {
Files.createDirectories(tempDir);
}
// 保存所有文件到临时目录
for (MultipartFile file : files) {
String originalFilename = file.getOriginalFilename();
String tempFileName = batchId + "_" + System.currentTimeMillis() + "_" + originalFilename;
Path tempFilePath = tempDir.resolve(tempFileName);
// 将MultipartFile内容复制到临时文件
Files.copy(file.getInputStream(), tempFilePath, StandardCopyOption.REPLACE_EXISTING);
tempFilePaths.add(tempFilePath.toString());
log.debug("【文件上传】保存临时文件: originalName={}, tempPath={}",
originalFilename, tempFilePath);
}
} catch (IOException e) {
log.error("【文件上传】保存临时文件失败", e);
throw new RuntimeException("保存临时文件失败: " + e.getMessage(), e);
}
// 3. 批量插入文件记录(status=uploading)
List<CcdiFileUploadRecord> records = new ArrayList<>();
Date now = new Date();
for (MultipartFile file : files) {
for (int i = 0; i < files.length; i++) {
MultipartFile file = files[i];
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(projectId);
record.setLsfxProjectId(lsfxProjectId);
@@ -139,10 +183,24 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
recordMapper.insertBatch(records);
log.info("【文件上传】批量插入记录成功: 数量={}", records.size());
// 4. 异步启动调度线程提交任务
// Critical Fix #3: 验证ID已生成
for (CcdiFileUploadRecord record : records) {
if (record.getId() == null) {
throw new RuntimeException("批量插入失败: 未生成记录ID,请检查Mapper配置useGeneratedKeys=true");
}
}
log.debug("【文件上传】ID验证通过: 所有记录ID已生成");
// Critical Fix #1: 使用TransactionSynchronization确保异步任务在事务提交后启动
final Integer finalLsfxProjectId = lsfxProjectId;
CompletableFuture.runAsync(() -> {
submitTasksAsync(projectId, finalLsfxProjectId, files, records, batchId);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
log.info("【文件上传】事务已提交,启动异步任务");
CompletableFuture.runAsync(() -> {
submitTasksAsync(projectId, finalLsfxProjectId, tempFilePaths, records, batchId);
});
}
});
log.info("【文件上传】批量上传任务已提交: batchId={}", batchId);
@@ -152,16 +210,28 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
/**
* 调度线程:循环提交任务到线程池
* 支持等待30秒重试机制
*
* @param projectId 项目ID
* @param lsfxProjectId 流水分析项目ID
* @param tempFilePaths 临时文件路径列表
* @param records 文件上传记录列表
* @param batchId 批次ID
*/
private void submitTasksAsync(Long projectId, Integer lsfxProjectId,
MultipartFile[] files,
List<String> tempFilePaths,
List<CcdiFileUploadRecord> records,
String batchId) {
log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId);
// 循环提交任务
for (int i = 0; i < files.length; i++) {
MultipartFile file = files[i];
for (int i = 0; i < tempFilePaths.size(); i++) {
// Critical Fix #6: 检查线程中断状态
if (Thread.currentThread().isInterrupted()) {
log.warn("【文件上传】调度线程被中断,停止提交剩余任务");
break;
}
String tempFilePath = tempFilePaths.get(i);
CcdiFileUploadRecord record = records.get(i);
boolean submitted = false;
@@ -171,28 +241,28 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
try {
// 尝试提交异步任务
CompletableFuture.runAsync(
() -> processFileAsync(projectId, lsfxProjectId, file,
() -> processFileAsync(projectId, lsfxProjectId, tempFilePath,
record.getId(), batchId, record),
fileUploadExecutor
);
submitted = true;
log.info("【文件上传】任务提交成功: fileName={}, recordId={}",
file.getOriginalFilename(), record.getId());
record.getFileName(), record.getId());
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}",
file.getOriginalFilename());
record.getFileName());
try {
Thread.sleep(30000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("【文件上传】等待被中断: fileName={}", file.getOriginalFilename());
log.error("【文件上传】等待被中断: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
break;
}
} else {
log.error("【文件上传】重试失败,放弃任务: fileName={}", file.getOriginalFilename());
log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
}
}
@@ -216,20 +286,34 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
/**
* 异步处理单个文件的完整流程
* 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据
*
* @param projectId 项目ID
* @param lsfxProjectId 流水分析项目ID
* @param tempFilePath 临时文件路径
* @param recordId 记录ID
* @param batchId 批次ID
* @param record 文件上传记录
*/
@Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, MultipartFile file,
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
Long recordId, String batchId, CcdiFileUploadRecord record) {
log.info("【文件上传】开始处理文件: fileName={}, recordId={}",
file.getOriginalFilename(), recordId);
log.info("【文件上传】开始处理文件: fileName={}, recordId={}, tempPath={}",
record.getFileName(), recordId, tempFilePath);
try {
// 步骤1:状态已是uploading,记录已存在
// 从临时文件路径读取文件
Path filePath = Paths.get(tempFilePath);
if (!Files.exists(filePath)) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
// 步骤2:上传文件到流水分析平台
log.info("【文件上传】步骤2: 上传文件到流水分析平台");
// TODO: 调用 lsfxClient.uploadFile()
// UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
// 需要将临时文件转换为MultipartFile或直接使用文件路径
// UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, filePath.toFile());
// Integer logId = uploadResponse.getData().getLogId();
// 临时模拟 logId
@@ -264,6 +348,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
// 解析成功
log.info("【文件上传】步骤6: 解析成功,保存主体信息");
record.setFileStatus("parsed_success");
// TODO: 从实际的解析结果中获取
record.setEnterpriseNames("测试主体1,测试主体2");
record.setAccountNos("622xxx,623xxx");
recordMapper.updateById(record);
@@ -281,11 +366,22 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
recordMapper.updateById(record);
}
log.info("【文件上传】处理完成: fileName={}", file.getOriginalFilename());
log.info("【文件上传】处理完成: fileName={}", record.getFileName());
} catch (Exception e) {
log.error("【文件上传】处理失败: fileName={}", file.getOriginalFilename(), e);
log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
} finally {
// 清理临时文件
try {
Path filePath = Paths.get(tempFilePath);
if (Files.exists(filePath)) {
Files.delete(filePath);
log.debug("【文件上传】清理临时文件: {}", tempFilePath);
}
} catch (IOException e) {
log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e);
}
}
}

View File

@@ -27,7 +27,7 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</sql>
<!-- 批量插入 -->
<insert id="insertBatch" parameterType="java.util.List">
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
insert into ccdi_file_upload_record (
project_id, lsfx_project_id, file_name, file_size, file_status,
upload_time, upload_user