refactor: 重构Service层使用ON DUPLICATE KEY UPDATE

- 更新模式直接调用importPersonBatch/importEntityBatch
- 移除'先删除再插入'逻辑,代码简化约50%
- 添加辅助方法saveBatchWithUpsert/getExistingPersonIdsFromDb
- 添加createFailureVO重载方法简化失败记录创建

变更详情:
- CcdiIntermediaryPersonImportServiceImpl: 重构importPersonAsync方法
- CcdiIntermediaryEntityImportServiceImpl: 重构importEntityAsync方法
- 两个Service均采用统一的处理模式

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
This commit is contained in:
wkc
2026-02-08 16:21:22 +08:00
parent 161b2c880f
commit 7d534de54f
3 changed files with 1498 additions and 84 deletions

View File

@@ -45,17 +45,14 @@ public class CcdiIntermediaryEntityImportServiceImpl implements ICcdiIntermediar
Boolean isUpdateSupport,
String taskId,
String userName) {
List<CcdiEnterpriseBaseInfo> newRecords = new ArrayList<>();
List<CcdiEnterpriseBaseInfo> updateRecords = new ArrayList<>();
List<CcdiEnterpriseBaseInfo> validRecords = new ArrayList<>();
List<IntermediaryEntityImportFailureVO> failures = new ArrayList<>();
// 1. 批量查询已存在的统一社会信用代码
Set<String> existingCreditCodes = getExistingCreditCodes(excelList);
// 2. 分类数据
for (int i = 0; i < excelList.size(); i++) {
CcdiIntermediaryEntityExcel excel = excelList.get(i);
// 2. 验证并转换数据
for (CcdiIntermediaryEntityExcel excel : excelList) {
try {
// 验证数据
validateEntityData(excel, isUpdateSupport, existingCreditCodes);
@@ -67,57 +64,53 @@ public class CcdiIntermediaryEntityImportServiceImpl implements ICcdiIntermediar
entity.setDataSource("IMPORT");
entity.setEntSource("INTERMEDIARY");
entity.setCreatedBy(userName);
if (existingCreditCodes.contains(excel.getSocialCreditCode())) {
if (isUpdateSupport) {
// 更新模式:设置更新人
entity.setUpdatedBy(userName);
updateRecords.add(entity);
} else {
throw new RuntimeException("该统一社会信用代码已存在");
}
} else {
newRecords.add(entity);
if (existingCreditCodes.contains(excel.getSocialCreditCode()) && isUpdateSupport) {
entity.setUpdatedBy(userName);
}
validRecords.add(entity);
} catch (Exception e) {
IntermediaryEntityImportFailureVO failure = new IntermediaryEntityImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(e.getMessage());
failures.add(failure);
failures.add(createFailureVO(excel, e.getMessage()));
}
}
// 3. 批量插入新数据
if (!newRecords.isEmpty()) {
saveBatch(newRecords, 500);
// 3. 根据isUpdateSupport选择处理方式
if (isUpdateSupport) {
// 更新模式直接批量导入数据库自动处理INSERT或UPDATE
if (!validRecords.isEmpty()) {
saveBatchWithUpsert(validRecords, 500);
}
} else {
// 仅新增模式:先查询已存在的记录,对冲突的抛出异常
Set<String> actualExistingCreditCodes = getExistingCreditCodesFromDb(validRecords);
List<CcdiEnterpriseBaseInfo> actualNewRecords = new ArrayList<>();
for (CcdiEnterpriseBaseInfo record : validRecords) {
if (actualExistingCreditCodes.contains(record.getSocialCreditCode())) {
// 记录到失败列表
failures.add(createFailureVO(record, "该统一社会信用代码已存在"));
} else {
actualNewRecords.add(record);
}
}
// 批量插入新记录
if (!actualNewRecords.isEmpty()) {
saveBatch(actualNewRecords, 500);
}
}
// 4. 批量更新已有数据(先删除再插入)
if (!updateRecords.isEmpty() && isUpdateSupport) {
// 先批量删除已存在的记录
List<String> creditCodes = updateRecords.stream()
.map(CcdiEnterpriseBaseInfo::getSocialCreditCode)
.collect(Collectors.toList());
LambdaQueryWrapper<CcdiEnterpriseBaseInfo> deleteWrapper = new LambdaQueryWrapper<>();
deleteWrapper.in(CcdiEnterpriseBaseInfo::getSocialCreditCode, creditCodes);
entityMapper.delete(deleteWrapper);
// 批量插入更新后的数据
entityMapper.insertBatch(updateRecords);
}
// 5. 保存失败记录到Redis
// 4. 保存失败记录到Redis
if (!failures.isEmpty()) {
String failuresKey = "import:intermediary-entity:" + taskId + ":failures";
redisTemplate.opsForValue().set(failuresKey, failures, 7, TimeUnit.DAYS);
}
// 6. 更新最终状态
// 5. 更新最终状态
ImportResult result = new ImportResult();
result.setTotalCount(excelList.size());
result.setSuccessCount(newRecords.size() + updateRecords.size());
result.setSuccessCount(validRecords.size() - failures.size());
result.setFailureCount(failures.size());
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
@@ -183,6 +176,58 @@ public class CcdiIntermediaryEntityImportServiceImpl implements ICcdiIntermediar
.collect(Collectors.toSet());
}
/**
* 批量保存(使用ON DUPLICATE KEY UPDATE)
*/
private void saveBatchWithUpsert(List<CcdiEnterpriseBaseInfo> list, int batchSize) {
for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
List<CcdiEnterpriseBaseInfo> subList = list.subList(i, end);
entityMapper.importEntityBatch(subList);
}
}
/**
* 从数据库获取已存在的统一社会信用代码
*/
private Set<String> getExistingCreditCodesFromDb(List<CcdiEnterpriseBaseInfo> records) {
List<String> creditCodes = records.stream()
.map(CcdiEnterpriseBaseInfo::getSocialCreditCode)
.filter(StringUtils::isNotEmpty)
.collect(Collectors.toList());
if (creditCodes.isEmpty()) {
return Collections.emptySet();
}
LambdaQueryWrapper<CcdiEnterpriseBaseInfo> wrapper = new LambdaQueryWrapper<>();
wrapper.in(CcdiEnterpriseBaseInfo::getSocialCreditCode, creditCodes);
List<CcdiEnterpriseBaseInfo> existing = entityMapper.selectList(wrapper);
return existing.stream()
.map(CcdiEnterpriseBaseInfo::getSocialCreditCode)
.collect(Collectors.toSet());
}
/**
* 创建失败记录VO
*/
private IntermediaryEntityImportFailureVO createFailureVO(CcdiIntermediaryEntityExcel excel, String errorMsg) {
IntermediaryEntityImportFailureVO failure = new IntermediaryEntityImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(errorMsg);
return failure;
}
/**
* 创建失败记录VO(重载方法)
*/
private IntermediaryEntityImportFailureVO createFailureVO(CcdiEnterpriseBaseInfo record, String errorMsg) {
CcdiIntermediaryEntityExcel excel = new CcdiIntermediaryEntityExcel();
BeanUtils.copyProperties(record, excel);
return createFailureVO(excel, errorMsg);
}
/**
* 批量保存
*/

View File

@@ -46,17 +46,14 @@ public class CcdiIntermediaryPersonImportServiceImpl implements ICcdiIntermediar
Boolean isUpdateSupport,
String taskId,
String userName) {
List<CcdiBizIntermediary> newRecords = new ArrayList<>();
List<CcdiBizIntermediary> updateRecords = new ArrayList<>();
List<CcdiBizIntermediary> validRecords = new ArrayList<>();
List<IntermediaryPersonImportFailureVO> failures = new ArrayList<>();
// 1. 批量查询已存在的证件号
Set<String> existingPersonIds = getExistingPersonIds(excelList);
// 2. 分类数据
for (int i = 0; i < excelList.size(); i++) {
CcdiIntermediaryPersonExcel excel = excelList.get(i);
// 2. 验证并转换数据
for (CcdiIntermediaryPersonExcel excel : excelList) {
try {
// 验证数据
validatePersonData(excel, isUpdateSupport, existingPersonIds);
@@ -67,57 +64,53 @@ public class CcdiIntermediaryPersonImportServiceImpl implements ICcdiIntermediar
// 设置数据来源和审计字段
intermediary.setDataSource("IMPORT");
intermediary.setCreatedBy(userName);
if (existingPersonIds.contains(excel.getPersonId())) {
if (isUpdateSupport) {
// 更新模式:设置更新人
intermediary.setUpdatedBy(userName);
updateRecords.add(intermediary);
} else {
throw new RuntimeException("该证件号码已存在");
}
} else {
newRecords.add(intermediary);
if (existingPersonIds.contains(excel.getPersonId()) && isUpdateSupport) {
intermediary.setUpdatedBy(userName);
}
validRecords.add(intermediary);
} catch (Exception e) {
IntermediaryPersonImportFailureVO failure = new IntermediaryPersonImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(e.getMessage());
failures.add(failure);
failures.add(createFailureVO(excel, e.getMessage()));
}
}
// 3. 批量插入新数据
if (!newRecords.isEmpty()) {
saveBatch(newRecords, 500);
// 3. 根据isUpdateSupport选择处理方式
if (isUpdateSupport) {
// 更新模式直接批量导入数据库自动处理INSERT或UPDATE
if (!validRecords.isEmpty()) {
saveBatchWithUpsert(validRecords, 500);
}
} else {
// 仅新增模式:先查询已存在的记录,对冲突的抛出异常
Set<String> actualExistingPersonIds = getExistingPersonIdsFromDb(validRecords);
List<CcdiBizIntermediary> actualNewRecords = new ArrayList<>();
for (CcdiBizIntermediary record : validRecords) {
if (actualExistingPersonIds.contains(record.getPersonId())) {
// 记录到失败列表
failures.add(createFailureVO(record, "该证件号码已存在"));
} else {
actualNewRecords.add(record);
}
}
// 批量插入新记录
if (!actualNewRecords.isEmpty()) {
saveBatch(actualNewRecords, 500);
}
}
// 4. 批量更新已有数据(先删除再插入)
if (!updateRecords.isEmpty() && isUpdateSupport) {
// 先批量删除已存在的记录
List<String> personIds = updateRecords.stream()
.map(CcdiBizIntermediary::getPersonId)
.collect(Collectors.toList());
LambdaQueryWrapper<CcdiBizIntermediary> deleteWrapper = new LambdaQueryWrapper<>();
deleteWrapper.in(CcdiBizIntermediary::getPersonId, personIds);
intermediaryMapper.delete(deleteWrapper);
// 批量插入更新后的数据
intermediaryMapper.insertBatch(updateRecords);
}
// 5. 保存失败记录到Redis
// 4. 保存失败记录到Redis
if (!failures.isEmpty()) {
String failuresKey = "import:intermediary:" + taskId + ":failures";
redisTemplate.opsForValue().set(failuresKey, failures, 7, TimeUnit.DAYS);
}
// 6. 更新最终状态
// 5. 更新最终状态
ImportResult result = new ImportResult();
result.setTotalCount(excelList.size());
result.setSuccessCount(newRecords.size() + updateRecords.size());
result.setSuccessCount(validRecords.size() - failures.size());
result.setFailureCount(failures.size());
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
@@ -183,6 +176,58 @@ public class CcdiIntermediaryPersonImportServiceImpl implements ICcdiIntermediar
.collect(Collectors.toSet());
}
/**
* 批量保存(使用ON DUPLICATE KEY UPDATE)
*/
private void saveBatchWithUpsert(List<CcdiBizIntermediary> list, int batchSize) {
for (int i = 0; i < list.size(); i += batchSize) {
int end = Math.min(i + batchSize, list.size());
List<CcdiBizIntermediary> subList = list.subList(i, end);
intermediaryMapper.importPersonBatch(subList);
}
}
/**
* 从数据库获取已存在的证件号
*/
private Set<String> getExistingPersonIdsFromDb(List<CcdiBizIntermediary> records) {
List<String> personIds = records.stream()
.map(CcdiBizIntermediary::getPersonId)
.filter(StringUtils::isNotEmpty)
.collect(Collectors.toList());
if (personIds.isEmpty()) {
return Collections.emptySet();
}
LambdaQueryWrapper<CcdiBizIntermediary> wrapper = new LambdaQueryWrapper<>();
wrapper.in(CcdiBizIntermediary::getPersonId, personIds);
List<CcdiBizIntermediary> existing = intermediaryMapper.selectList(wrapper);
return existing.stream()
.map(CcdiBizIntermediary::getPersonId)
.collect(Collectors.toSet());
}
/**
* 创建失败记录VO
*/
private IntermediaryPersonImportFailureVO createFailureVO(CcdiIntermediaryPersonExcel excel, String errorMsg) {
IntermediaryPersonImportFailureVO failure = new IntermediaryPersonImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(errorMsg);
return failure;
}
/**
* 创建失败记录VO(重载方法)
*/
private IntermediaryPersonImportFailureVO createFailureVO(CcdiBizIntermediary record, String errorMsg) {
CcdiIntermediaryPersonExcel excel = new CcdiIntermediaryPersonExcel();
BeanUtils.copyProperties(record, excel);
return createFailureVO(excel, errorMsg);
}
/**
* 批量保存
*/