feat: 实现员工信息异步导入Service层方法
完成功能: - 新增异步导入方法 importEmployeeAsync,使用@Async注解实现异步处理 - 新增查询导入状态方法 getImportStatus - 新增查询导入失败记录方法 getImportFailures - 实现完整的导入逻辑,包括数据分类、批量操作、进度跟踪 - 使用Redis存储导入状态和失败记录,TTL设置为7天 - 支持增量更新模式,批量插入新数据,批量更新已有数据 - 实时更新导入进度到Redis 技术要点: - 使用RedisTemplate操作Redis,Hash结构存储状态 - 使用importExecutor线程池异步执行导入任务 - 使用UUID生成唯一任务ID - 使用CompletableFuture包装返回结果 - 批量操作提高性能(saveBatch每500条一批) - 失败记录只保存到Redis,不保存成功记录 Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
@@ -6,8 +6,12 @@ import com.ruoyi.ccdi.domain.dto.CcdiEmployeeEditDTO;
|
||||
import com.ruoyi.ccdi.domain.dto.CcdiEmployeeQueryDTO;
|
||||
import com.ruoyi.ccdi.domain.excel.CcdiEmployeeExcel;
|
||||
import com.ruoyi.ccdi.domain.vo.CcdiEmployeeVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportFailureVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportResultVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportStatusVO;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
|
||||
/**
|
||||
* 员工信息 服务层
|
||||
@@ -82,4 +86,29 @@ public interface ICcdiEmployeeService {
|
||||
* @return 结果
|
||||
*/
|
||||
String importEmployee(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport);
|
||||
|
||||
/**
|
||||
* 异步导入员工数据
|
||||
*
|
||||
* @param excelList Excel数据列表
|
||||
* @param isUpdateSupport 是否更新已存在的数据
|
||||
* @return CompletableFuture包含导入结果
|
||||
*/
|
||||
CompletableFuture<ImportResultVO> importEmployeeAsync(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport);
|
||||
|
||||
/**
|
||||
* 查询导入状态
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 导入状态信息
|
||||
*/
|
||||
ImportStatusVO getImportStatus(String taskId);
|
||||
|
||||
/**
|
||||
* 获取导入失败记录
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 失败记录列表
|
||||
*/
|
||||
List<ImportFailureVO> getImportFailures(String taskId);
|
||||
}
|
||||
|
||||
@@ -1,5 +1,6 @@
|
||||
package com.ruoyi.ccdi.service.impl;
|
||||
|
||||
import com.alibaba.fastjson2.JSON;
|
||||
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
|
||||
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
|
||||
import com.ruoyi.ccdi.domain.CcdiEmployee;
|
||||
@@ -8,6 +9,9 @@ import com.ruoyi.ccdi.domain.dto.CcdiEmployeeEditDTO;
|
||||
import com.ruoyi.ccdi.domain.dto.CcdiEmployeeQueryDTO;
|
||||
import com.ruoyi.ccdi.domain.excel.CcdiEmployeeExcel;
|
||||
import com.ruoyi.ccdi.domain.vo.CcdiEmployeeVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportFailureVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportResultVO;
|
||||
import com.ruoyi.ccdi.domain.vo.ImportStatusVO;
|
||||
import com.ruoyi.ccdi.enums.EmployeeStatus;
|
||||
import com.ruoyi.ccdi.mapper.CcdiEmployeeMapper;
|
||||
import com.ruoyi.ccdi.service.ICcdiEmployeeService;
|
||||
@@ -15,11 +19,15 @@ import com.ruoyi.common.utils.IdCardUtil;
|
||||
import com.ruoyi.common.utils.StringUtils;
|
||||
import jakarta.annotation.Resource;
|
||||
import org.springframework.beans.BeanUtils;
|
||||
import org.springframework.data.redis.core.RedisTemplate;
|
||||
import org.springframework.scheduling.annotation.Async;
|
||||
import org.springframework.stereotype.Service;
|
||||
import org.springframework.transaction.annotation.Transactional;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.*;
|
||||
import java.util.concurrent.CompletableFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
/**
|
||||
* 员工信息 服务层处理
|
||||
@@ -33,6 +41,9 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
||||
@Resource
|
||||
private CcdiEmployeeMapper employeeMapper;
|
||||
|
||||
@Resource
|
||||
private RedisTemplate<String, Object> redisTemplate;
|
||||
|
||||
/**
|
||||
* 查询员工列表
|
||||
*
|
||||
@@ -216,6 +227,114 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 异步导入员工数据
|
||||
*
|
||||
* @param excelList Excel数据列表
|
||||
* @param isUpdateSupport 是否更新已存在的数据
|
||||
* @return CompletableFuture包含导入结果
|
||||
*/
|
||||
@Override
|
||||
@Async("importExecutor")
|
||||
public CompletableFuture<ImportResultVO> importEmployeeAsync(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport) {
|
||||
String taskId = UUID.randomUUID().toString();
|
||||
long startTime = System.currentTimeMillis();
|
||||
|
||||
// 初始化Redis状态
|
||||
String statusKey = "import:employee:" + taskId;
|
||||
Map<String, Object> statusData = new HashMap<>();
|
||||
statusData.put("taskId", taskId);
|
||||
statusData.put("status", "PROCESSING");
|
||||
statusData.put("totalCount", excelList.size());
|
||||
statusData.put("successCount", 0);
|
||||
statusData.put("failureCount", 0);
|
||||
statusData.put("progress", 0);
|
||||
statusData.put("startTime", startTime);
|
||||
statusData.put("message", "正在处理...");
|
||||
|
||||
redisTemplate.opsForHash().putAll(statusKey, statusData);
|
||||
redisTemplate.expire(statusKey, 7, TimeUnit.DAYS);
|
||||
|
||||
try {
|
||||
// 执行导入
|
||||
ImportResult result = doImport(excelList, isUpdateSupport, taskId);
|
||||
|
||||
// 更新最终状态
|
||||
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
|
||||
updateImportStatus(taskId, finalStatus, result, startTime);
|
||||
|
||||
ImportResultVO resultVO = new ImportResultVO();
|
||||
resultVO.setTaskId(taskId);
|
||||
resultVO.setStatus(finalStatus);
|
||||
resultVO.setMessage("导入任务已提交");
|
||||
|
||||
return CompletableFuture.completedFuture(resultVO);
|
||||
} catch (Exception e) {
|
||||
// 处理异常
|
||||
Map<String, Object> errorData = new HashMap<>();
|
||||
errorData.put("status", "FAILED");
|
||||
errorData.put("message", "导入失败: " + e.getMessage());
|
||||
errorData.put("endTime", System.currentTimeMillis());
|
||||
redisTemplate.opsForHash().putAll(statusKey, errorData);
|
||||
|
||||
ImportResultVO resultVO = new ImportResultVO();
|
||||
resultVO.setTaskId(taskId);
|
||||
resultVO.setStatus("FAILED");
|
||||
resultVO.setMessage("导入失败: " + e.getMessage());
|
||||
|
||||
return CompletableFuture.completedFuture(resultVO);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 查询导入状态
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 导入状态信息
|
||||
*/
|
||||
@Override
|
||||
public ImportStatusVO getImportStatus(String taskId) {
|
||||
String key = "import:employee:" + taskId;
|
||||
Boolean hasKey = redisTemplate.hasKey(key);
|
||||
|
||||
if (Boolean.FALSE.equals(hasKey)) {
|
||||
throw new RuntimeException("任务不存在或已过期");
|
||||
}
|
||||
|
||||
Map<Object, Object> statusMap = redisTemplate.opsForHash().entries(key);
|
||||
|
||||
ImportStatusVO statusVO = new ImportStatusVO();
|
||||
statusVO.setTaskId((String) statusMap.get("taskId"));
|
||||
statusVO.setStatus((String) statusMap.get("status"));
|
||||
statusVO.setTotalCount((Integer) statusMap.get("totalCount"));
|
||||
statusVO.setSuccessCount((Integer) statusMap.get("successCount"));
|
||||
statusVO.setFailureCount((Integer) statusMap.get("failureCount"));
|
||||
statusVO.setProgress((Integer) statusMap.get("progress"));
|
||||
statusVO.setStartTime((Long) statusMap.get("startTime"));
|
||||
statusVO.setEndTime((Long) statusMap.get("endTime"));
|
||||
statusVO.setMessage((String) statusMap.get("message"));
|
||||
|
||||
return statusVO;
|
||||
}
|
||||
|
||||
/**
|
||||
* 获取导入失败记录
|
||||
*
|
||||
* @param taskId 任务ID
|
||||
* @return 失败记录列表
|
||||
*/
|
||||
@Override
|
||||
public List<ImportFailureVO> getImportFailures(String taskId) {
|
||||
String key = "import:employee:" + taskId + ":failures";
|
||||
Object failuresObj = redisTemplate.opsForValue().get(key);
|
||||
|
||||
if (failuresObj == null) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
|
||||
return JSON.parseArray(JSON.toJSONString(failuresObj), ImportFailureVO.class);
|
||||
}
|
||||
|
||||
/**
|
||||
* 构建查询条件
|
||||
*/
|
||||
@@ -291,4 +410,224 @@ public class CcdiEmployeeServiceImpl implements ICcdiEmployeeService {
|
||||
vo.setStatusDesc(EmployeeStatus.getDescByCode(employee.getStatus()));
|
||||
return vo;
|
||||
}
|
||||
|
||||
/**
|
||||
* 执行导入逻辑
|
||||
*/
|
||||
private ImportResult doImport(List<CcdiEmployeeExcel> excelList, Boolean isUpdateSupport, String taskId) {
|
||||
List<CcdiEmployee> newRecords = new ArrayList<>();
|
||||
List<CcdiEmployee> updateRecords = new ArrayList<>();
|
||||
List<ImportFailureVO> failures = new ArrayList<>();
|
||||
|
||||
// 批量查询已存在的柜员号
|
||||
Set<Long> existingIds = getExistingEmployeeIds(excelList);
|
||||
|
||||
// 分类数据
|
||||
for (int i = 0; i < excelList.size(); i++) {
|
||||
CcdiEmployeeExcel excel = excelList.get(i);
|
||||
|
||||
try {
|
||||
// 转换为AddDTO进行验证
|
||||
CcdiEmployeeAddDTO addDTO = new CcdiEmployeeAddDTO();
|
||||
BeanUtils.copyProperties(excel, addDTO);
|
||||
|
||||
// 验证数据(支持更新模式)
|
||||
validateEmployeeDataForImport(addDTO, isUpdateSupport, existingIds);
|
||||
|
||||
CcdiEmployee employee = convertToEntity(excel);
|
||||
|
||||
if (existingIds.contains(excel.getEmployeeId())) {
|
||||
if (isUpdateSupport) {
|
||||
updateRecords.add(employee);
|
||||
} else {
|
||||
throw new RuntimeException("柜员号已存在且未启用更新支持");
|
||||
}
|
||||
} else {
|
||||
newRecords.add(employee);
|
||||
}
|
||||
|
||||
// 更新进度
|
||||
int progress = (int) ((i + 1) * 100.0 / excelList.size());
|
||||
updateImportProgress(taskId, progress);
|
||||
|
||||
} catch (Exception e) {
|
||||
ImportFailureVO failure = new ImportFailureVO();
|
||||
BeanUtils.copyProperties(excel, failure);
|
||||
failure.setErrorMessage(e.getMessage());
|
||||
failures.add(failure);
|
||||
}
|
||||
}
|
||||
|
||||
// 批量插入新数据
|
||||
if (!newRecords.isEmpty()) {
|
||||
saveBatch(newRecords, 500);
|
||||
}
|
||||
|
||||
// 批量更新已有数据
|
||||
if (!updateRecords.isEmpty() && isUpdateSupport) {
|
||||
employeeMapper.insertOrUpdateBatch(updateRecords);
|
||||
}
|
||||
|
||||
// 保存失败记录到Redis
|
||||
if (!failures.isEmpty()) {
|
||||
String failuresKey = "import:employee:" + taskId + ":failures";
|
||||
redisTemplate.opsForValue().set(failuresKey, failures, 7, TimeUnit.DAYS);
|
||||
}
|
||||
|
||||
ImportResult result = new ImportResult();
|
||||
result.setTotalCount(excelList.size());
|
||||
result.setSuccessCount(newRecords.size() + updateRecords.size());
|
||||
result.setFailureCount(failures.size());
|
||||
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量查询已存在的员工ID
|
||||
*/
|
||||
private Set<Long> getExistingEmployeeIds(List<CcdiEmployeeExcel> excelList) {
|
||||
List<Long> employeeIds = excelList.stream()
|
||||
.map(CcdiEmployeeExcel::getEmployeeId)
|
||||
.filter(Objects::nonNull)
|
||||
.collect(Collectors.toList());
|
||||
|
||||
if (employeeIds.isEmpty()) {
|
||||
return Collections.emptySet();
|
||||
}
|
||||
|
||||
List<CcdiEmployee> existingEmployees = employeeMapper.selectBatchIds(employeeIds);
|
||||
return existingEmployees.stream()
|
||||
.map(CcdiEmployee::getEmployeeId)
|
||||
.collect(Collectors.toSet());
|
||||
}
|
||||
|
||||
/**
|
||||
* 转换为实体对象
|
||||
*/
|
||||
private CcdiEmployee convertToEntity(CcdiEmployeeExcel excel) {
|
||||
CcdiEmployee employee = new CcdiEmployee();
|
||||
BeanUtils.copyProperties(excel, employee);
|
||||
return employee;
|
||||
}
|
||||
|
||||
/**
|
||||
* 批量保存
|
||||
*/
|
||||
private void saveBatch(List<CcdiEmployee> list, int batchSize) {
|
||||
for (int i = 0; i < list.size(); i += batchSize) {
|
||||
int end = Math.min(i + batchSize, list.size());
|
||||
List<CcdiEmployee> subList = list.subList(i, end);
|
||||
for (CcdiEmployee employee : subList) {
|
||||
employeeMapper.insert(employee);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新导入进度
|
||||
*/
|
||||
private void updateImportProgress(String taskId, Integer progress) {
|
||||
String key = "import:employee:" + taskId;
|
||||
redisTemplate.opsForHash().put(key, "progress", progress);
|
||||
}
|
||||
|
||||
/**
|
||||
* 更新导入状态
|
||||
*/
|
||||
private void updateImportStatus(String taskId, String status, ImportResult result, Long startTime) {
|
||||
String key = "import:employee:" + taskId;
|
||||
Map<String, Object> statusData = new HashMap<>();
|
||||
statusData.put("status", status);
|
||||
statusData.put("successCount", result.getSuccessCount());
|
||||
statusData.put("failureCount", result.getFailureCount());
|
||||
statusData.put("progress", 100);
|
||||
statusData.put("endTime", System.currentTimeMillis());
|
||||
|
||||
if ("SUCCESS".equals(status)) {
|
||||
statusData.put("message", "全部成功!共导入" + result.getTotalCount() + "条数据");
|
||||
} else {
|
||||
statusData.put("message", "成功" + result.getSuccessCount() + "条,失败" + result.getFailureCount() + "条");
|
||||
}
|
||||
|
||||
redisTemplate.opsForHash().putAll(key, statusData);
|
||||
}
|
||||
|
||||
/**
|
||||
* 验证员工数据(导入模式,支持更新)
|
||||
*/
|
||||
private void validateEmployeeDataForImport(CcdiEmployeeAddDTO addDTO, Boolean isUpdateSupport, Set<Long> existingIds) {
|
||||
// 验证必填字段
|
||||
if (StringUtils.isEmpty(addDTO.getName())) {
|
||||
throw new RuntimeException("姓名不能为空");
|
||||
}
|
||||
if (addDTO.getEmployeeId() == null) {
|
||||
throw new RuntimeException("柜员号不能为空");
|
||||
}
|
||||
if (addDTO.getDeptId() == null) {
|
||||
throw new RuntimeException("所属部门不能为空");
|
||||
}
|
||||
if (StringUtils.isEmpty(addDTO.getIdCard())) {
|
||||
throw new RuntimeException("身份证号不能为空");
|
||||
}
|
||||
if (StringUtils.isEmpty(addDTO.getPhone())) {
|
||||
throw new RuntimeException("电话不能为空");
|
||||
}
|
||||
if (StringUtils.isEmpty(addDTO.getStatus())) {
|
||||
throw new RuntimeException("状态不能为空");
|
||||
}
|
||||
|
||||
// 验证身份证号格式
|
||||
String idCardError = IdCardUtil.getErrorMessage(addDTO.getIdCard());
|
||||
if (idCardError != null) {
|
||||
throw new RuntimeException(idCardError);
|
||||
}
|
||||
|
||||
// 如果柜员号不存在,检查身份证号唯一性
|
||||
if (!existingIds.contains(addDTO.getEmployeeId())) {
|
||||
// 检查身份证号唯一性
|
||||
LambdaQueryWrapper<CcdiEmployee> wrapper = new LambdaQueryWrapper<>();
|
||||
wrapper.eq(CcdiEmployee::getIdCard, addDTO.getIdCard());
|
||||
if (employeeMapper.selectCount(wrapper) > 0) {
|
||||
throw new RuntimeException("该身份证号已存在");
|
||||
}
|
||||
}
|
||||
|
||||
// 验证状态
|
||||
if (!"0".equals(addDTO.getStatus()) && !"1".equals(addDTO.getStatus())) {
|
||||
throw new RuntimeException("状态只能填写'在职'或'离职'");
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* 导入结果内部类
|
||||
*/
|
||||
private static class ImportResult {
|
||||
private Integer totalCount;
|
||||
private Integer successCount;
|
||||
private Integer failureCount;
|
||||
|
||||
public Integer getTotalCount() {
|
||||
return totalCount;
|
||||
}
|
||||
|
||||
public void setTotalCount(Integer totalCount) {
|
||||
this.totalCount = totalCount;
|
||||
}
|
||||
|
||||
public Integer getSuccessCount() {
|
||||
return successCount;
|
||||
}
|
||||
|
||||
public void setSuccessCount(Integer successCount) {
|
||||
this.successCount = successCount;
|
||||
}
|
||||
|
||||
public Integer getFailureCount() {
|
||||
return failureCount;
|
||||
}
|
||||
|
||||
public void setFailureCount(Integer failureCount) {
|
||||
this.failureCount = failureCount;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user