fix: 修复员工导入异步方法的实现
## 问题 - importEmployeeAsync方法在返回CompletableFuture之前同步调用了doImport() - 方法上有@Transactional注解,会导致事务管理问题 - 不是真正的异步执行 ## 解决方案 - 移除importEmployeeAsync方法上的@Transactional注解 - 使用CompletableFuture.supplyAsync()在importExecutor线程池中异步执行doImport - 将@Transactional注解移到doImport方法上 - 注入importExecutor线程池 ## 技术细节 - @Async注解会将方法提交到线程池执行 - CompletableFuture.supplyAsync()确保doImport在独立线程中执行 - 事务在doImport方法中管理,避免异步方法事务问题 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -25,6 +25,8 @@ import org.springframework.stereotype.Service;
|
|||||||
import org.springframework.transaction.annotation.Transactional;
|
import org.springframework.transaction.annotation.Transactional;
|
||||||
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
@@ -43,6 +45,9 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
|||||||
@Resource
|
@Resource
|
||||||
private RedisTemplate<String, Object> redisTemplate;
|
private RedisTemplate<String, Object> redisTemplate;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private Executor importExecutor;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 查询员工列表
|
* 查询员工列表
|
||||||
*
|
*
|
||||||
@@ -235,7 +240,6 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
|||||||
*/
|
*/
|
||||||
@Override
|
@Override
|
||||||
@Async("importExecutor")
|
@Async("importExecutor")
|
||||||
@Transactional
|
|
||||||
public CompletableFuture<ImportResultVO> importEmployeeAsync(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport) {
|
public CompletableFuture<ImportResultVO> importEmployeeAsync(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport) {
|
||||||
String taskId = UUID.randomUUID().toString();
|
String taskId = UUID.randomUUID().toString();
|
||||||
long startTime = System.currentTimeMillis();
|
long startTime = System.currentTimeMillis();
|
||||||
@@ -255,30 +259,40 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
|||||||
redisTemplate.opsForHash().putAll(statusKey, statusData);
|
redisTemplate.opsForHash().putAll(statusKey, statusData);
|
||||||
redisTemplate.expire(statusKey, 7, TimeUnit.DAYS);
|
redisTemplate.expire(statusKey, 7, TimeUnit.DAYS);
|
||||||
|
|
||||||
try {
|
// 使用CompletableFuture.supplyAsync在独立的executor中异步执行doImport
|
||||||
// 执行导入
|
return CompletableFuture.supplyAsync(() -> {
|
||||||
ImportResult result = doImport(excelList, isUpdateSupport, taskId);
|
try {
|
||||||
|
// 在独立线程中执行导入
|
||||||
|
ImportResult result = doImport(excelList, isUpdateSupport, taskId);
|
||||||
|
|
||||||
// 更新最终状态
|
// 更新最终状态
|
||||||
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
|
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
|
||||||
updateImportStatus(taskId, finalStatus, result, startTime);
|
updateImportStatus(taskId, finalStatus, result, startTime);
|
||||||
|
|
||||||
} catch (Exception e) {
|
// 构建返回结果
|
||||||
// 处理异常
|
ImportResultVO resultVO = new ImportResultVO();
|
||||||
Map<String, Object> errorData = new HashMap<>();
|
resultVO.setTaskId(taskId);
|
||||||
errorData.put("status", "FAILED");
|
resultVO.setStatus(finalStatus);
|
||||||
errorData.put("message", "导入失败: " + e.getMessage());
|
resultVO.setMessage("导入完成");
|
||||||
errorData.put("endTime", System.currentTimeMillis());
|
|
||||||
redisTemplate.opsForHash().putAll(statusKey, errorData);
|
|
||||||
}
|
|
||||||
|
|
||||||
// 立即返回结果,不等待后台任务完成
|
return resultVO;
|
||||||
ImportResultVO result = new ImportResultVO();
|
|
||||||
result.setTaskId(taskId);
|
|
||||||
result.setStatus("PROCESSING");
|
|
||||||
result.setMessage("导入任务已提交,正在后台处理");
|
|
||||||
|
|
||||||
return CompletableFuture.completedFuture(result);
|
} catch (Exception e) {
|
||||||
|
// 处理异常
|
||||||
|
Map<String, Object> errorData = new HashMap<>();
|
||||||
|
errorData.put("status", "FAILED");
|
||||||
|
errorData.put("message", "导入失败: " + e.getMessage());
|
||||||
|
errorData.put("endTime", System.currentTimeMillis());
|
||||||
|
redisTemplate.opsForHash().putAll(statusKey, errorData);
|
||||||
|
|
||||||
|
ImportResultVO resultVO = new ImportResultVO();
|
||||||
|
resultVO.setTaskId(taskId);
|
||||||
|
resultVO.setStatus("FAILED");
|
||||||
|
resultVO.setMessage("导入失败: " + e.getMessage());
|
||||||
|
|
||||||
|
return resultVO;
|
||||||
|
}
|
||||||
|
}, importExecutor);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@@ -426,6 +440,7 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
|||||||
/**
|
/**
|
||||||
* 执行导入逻辑
|
* 执行导入逻辑
|
||||||
*/
|
*/
|
||||||
|
@Transactional
|
||||||
private ImportResult doImport(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport, String taskId) {
|
private ImportResult doImport(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport, String taskId) {
|
||||||
List<CcdiEmployee> newRecords = new ArrayList<>();
|
List<CcdiEmployee> newRecords = new ArrayList<>();
|
||||||
List<CcdiEmployee> updateRecords = new ArrayList<>();
|
List<CcdiEmployee> updateRecords = new ArrayList<>();
|
||||||
|
|||||||
Reference in New Issue
Block a user