Files
ccdi/assets/requirements/plans/2026-02-08-intermediary-import-on-duplicate-key-update-design.md
2026-03-03 16:14:16 +08:00

491 lines
15 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 中介导入功能优化设计文档
## 概述
本设计文档描述了如何使用 MySQL 的 `INSERT ... ON DUPLICATE KEY UPDATE` 语句优化中介信息导入功能,替代现有的"先删除再插入"
更新模式,提升性能并简化代码逻辑。
**设计日期**: 2026-02-08
**目标**: 优化个人中介和实体中介的批量导入性能
**核心改进**: 使用 `ON DUPLICATE KEY UPDATE` 实现 Upsert 操作
---
## 一、整体架构设计
### 1.1 核心变更
**保持现有架构:**
- Controller 层:`CcdiIntermediaryController` - 无需修改
- Service 层:`CcdiIntermediaryServiceImpl` - 简化逻辑
- Mapper 层:新增批量导入方法
**架构优化点:**
| 层级 | 现有方案 | 优化方案 | 改进点 |
|---------|--------------------------|-----------------------------------------|--------------|
| Mapper | `insertBatch` + `delete` | `importBatch` (ON DUPLICATE KEY UPDATE) | 单次SQL完成插入或更新 |
| Service | 查询→分类→删除→插入 | 验证→直接导入 | 减少50%代码量 |
| 数据库 | 2-3次操作 | 1次操作 | 减少30-40%响应时间 |
### 1.2 数据流变化
**优化前流程:**
```
解析Excel → 验证数据 → 批量查询已存在记录 → 分类数据
→ 批量删除已存在记录 → 批量插入新记录和更新记录
```
**优化后流程:**
```
解析Excel → 验证数据 → 批量 INSERT ON DUPLICATE KEY UPDATE
```
**简化关键点:**
- 移除"批量查询已存在记录"步骤
- 移除"分类新增/更新记录"步骤
- 移除"批量删除已存在记录"步骤
---
## 二、SQL实现细节
### 2.1 个人中介批量导入SQL
**Mapper方法签名:**
```java
void importPersonBatch(@Param("list") List<CcdiBizIntermediary> list);
```
**SQL实现 (CcdiBizIntermediaryMapper.xml):**
```xml
<insert id="importPersonBatch">
INSERT INTO cdi_biz_intermediary (
person_id, name, gender, phone, address,
intermediary_type, data_source, created_by, updated_by
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.personId}, #{item.name}, #{item.gender},
#{item.phone}, #{item.address}, #{item.intermediaryType},
#{item.dataSource}, #{item.createdBy}, #{item.updatedBy}
)
</foreach>
ON DUPLICATE KEY UPDATE
name = IF(#{item.name} IS NOT NULL AND #{item.name} != '', #{item.name}, name),
gender = IF(#{item.gender} IS NOT NULL AND #{item.gender} != '', #{item.gender}, gender),
phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
intermediary_type = IF(#{item.intermediaryType} IS NOT NULL AND #{item.intermediaryType} != '', #{item.intermediaryType}, intermediary_type),
update_time = NOW(),
update_by = #{item.updatedBy}
</insert>
```
### 2.2 实体中介批量导入SQL
**Mapper方法签名:**
```java
void importEntityBatch(@Param("list") List<CcdiEnterpriseBaseInfo> list);
```
**SQL实现 (CcdiEnterpriseBaseInfoMapper.xml):**
```xml
<insert id="importEntityBatch">
INSERT INTO cdi_enterprise_base_info (
social_credit_code, enterprise_name, legal_representative,
phone, address, risk_level, ent_source, data_source,
created_by, updated_by
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.socialCreditCode}, #{item.enterpriseName},
#{item.legalRepresentative}, #{item.phone}, #{item.address},
#{item.riskLevel}, #{item.entSource}, #{item.dataSource},
#{item.createdBy}, #{item.updatedBy}
)
</foreach>
ON DUPLICATE KEY UPDATE
enterprise_name = IF(#{item.enterpriseName} IS NOT NULL AND #{item.enterpriseName} != '', #{item.enterpriseName}, enterprise_name),
legal_representative = IF(#{item.legalRepresentative} IS NOT NULL AND #{item.legalRepresentative} != '', #{item.legalRepresentative}, legal_representative),
phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
update_time = NOW(),
update_by = #{item.updatedBy}
</insert>
```
### 2.3 关键设计要点
**1. 非空字段更新策略:**
```sql
field = IF(#{item.field} IS NOT NULL AND #{item.field} != '', #{item.field}, field)
```
- 只更新Excel中非空的字段
- 保留数据库中的原有值
- 避免误清空数据
**2. 审计字段处理:**
| 字段 | INSERT时 | UPDATE时 |
|------|----------|----------|
| created_by | 设置当前用户 | 不更新 |
| create_time | 数据库默认NOW() | 不更新 |
| updated_by | NULL | 设置当前用户 |
| update_time | 数据库默认NOW() | 更新为NOW() |
**3. 唯一键约束:**
- 个人中介: `person_id` (证件号)
- 实体中介: `social_credit_code` (统一社会信用代码)
**4. 批量操作优化:**
- 每批最多500条记录
- 避免SQL过长导致性能问题
- 超过500条时分批处理
---
## 三、Service层实现
### 3.1 isUpdateSupport参数处理
采用**方案C: Service层预处理**
```java
@Override
@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(List<CcdiIntermediaryPersonExcel> excelList,
Boolean isUpdateSupport,
String taskId,
String userName) {
List<CcdiBizIntermediary> validRecords = new ArrayList<>();
List<IntermediaryPersonImportFailureVO> failures = new ArrayList<>();
// 1. 数据验证阶段
for (CcdiIntermediaryPersonExcel excel : excelList) {
try {
validatePersonData(excel);
CcdiBizIntermediary intermediary = new CcdiBizIntermediary();
BeanUtils.copyProperties(excel, intermediary);
intermediary.setDataSource("IMPORT");
intermediary.setCreatedBy(userName);
if (isUpdateSupport) {
intermediary.setUpdatedBy(userName);
}
validRecords.add(intermediary);
} catch (Exception e) {
IntermediaryPersonImportFailureVO failure = new IntermediaryPersonImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(e.getMessage());
failures.add(failure);
}
}
// 2. 根据isUpdateSupport选择处理方式
if (isUpdateSupport) {
// 更新模式直接批量导入数据库自动处理INSERT或UPDATE
importBatchWithUpdateSupport(validRecords, 500);
} else {
// 仅新增模式:先查询已存在的记录,对冲突的抛出异常
Set<String> existingIds = getExistingPersonIds(validRecords);
for (CcdiBizIntermediary record : validRecords) {
if (existingIds.contains(record.getPersonId())) {
throw new RuntimeException("该证件号已存在");
}
}
// 确认无冲突后,批量插入
importBatchWithoutUpdateSupport(validRecords, 500);
}
// 3. 更新导入状态
ImportResult result = new ImportResult();
result.setTotalCount(excelList.size());
result.setSuccessCount(validRecords.size());
result.setFailureCount(failures.size());
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
updateImportStatus(taskId, finalStatus, result);
}
```
### 3.2 代码简化对比
**优化前 (约120行):**
```java
// 1. 批量查询已存在记录
Set<String> existingIds = getExistingPersonIds(excelList);
// 2. 分类数据
for (excel : excelList) {
if (existingIds.contains(excel.getPersonId())) {
if (isUpdateSupport) {
updateRecords.add(convert(excel));
} else {
throw new RuntimeException("已存在");
}
} else {
newRecords.add(convert(excel));
}
}
// 3. 批量插入新数据
if (!newRecords.isEmpty()) {
saveBatch(newRecords, 500);
}
// 4. 批量更新已有数据(先删除再插入)
if (!updateRecords.isEmpty() && isUpdateSupport) {
List<String> personIds = updateRecords.stream()
.map(CcdiBizIntermediary::getPersonId)
.collect(Collectors.toList());
LambdaQueryWrapper<CcdiBizIntermediary> deleteWrapper = new LambdaQueryWrapper<>();
deleteWrapper.in(CcdiBizIntermediary::getPersonId, personIds);
intermediaryMapper.delete(deleteWrapper);
intermediaryMapper.insertBatch(updateRecords);
}
```
**优化后 (约60行):**
```java
// 1. 验证数据并转换
for (excel : excelList) {
validatePersonData(excel);
validRecords.add(convert(excel));
}
// 2. 直接批量导入数据库自动处理INSERT或UPDATE
if (isUpdateSupport) {
intermediaryMapper.importPersonBatch(validRecords);
} else {
// 仅新增模式:检查唯一性
checkUniqueAndInsert(validRecords);
}
```
---
## 四、错误处理与边界情况
### 4.1 错误分类处理
| 错误类型 | 处理方式 | 状态标记 |
|-------------------------------|--------------------|-----------------|
| 数据验证错误 | 添加到失败列表,继续处理后续数据 | PARTIAL_SUCCESS |
| 唯一性冲突 (isUpdateSupport=false) | 抛出异常,添加到失败列表 | PARTIAL_SUCCESS |
| SQL执行错误 | 事务回滚,记录详细错误信息 | FAILED |
| 所有记录失败 | 状态为FAILED | FAILED |
| 部分成功 | 状态为PARTIAL_SUCCESS | PARTIAL_SUCCESS |
### 4.2 边界情况处理
| 场景 | 处理方式 |
|---------------|------------------------|
| Excel为空 | 返回"至少需要一条数据" |
| 所有数据格式错误 | 成功数=0失败数=总数,状态=FAILED |
| 超大数据量(>5000条) | 分批处理每批500条 |
| 并发导入相同数据 | 依靠数据库唯一索引保证一致性 |
| NULL字段更新 | 使用IF语句跳过保留原值 |
| 空字符串字段更新 | 视为NULL不更新 |
### 4.3 事务处理
```java
@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(...) {
try {
// 数据验证
// 批量导入
// 更新状态
} catch (Exception e) {
// 事务自动回滚
// 记录错误日志
// 更新状态为FAILED
throw e;
}
}
```
---
## 五、测试策略
### 5.1 单元测试
**Mapper层测试:**
- ✅ 批量插入全新记录
- ✅ 批量更新已存在记录
- ✅ 混合场景(部分新记录+部分已存在)
- ✅ NULL值字段不覆盖原值
- ✅ 审计字段正确设置和更新
- ✅ 唯一键冲突处理
**Service层测试:**
-`isUpdateSupport=true` 的完整流程
-`isUpdateSupport=false` 时重复数据抛异常
- ✅ 数据验证逻辑(必填字段、格式校验)
- ✅ 事务回滚机制
- ✅ 失败记录保存到Redis
### 5.2 集成测试场景
| 测试场景 | 测试步骤 | 预期结果 |
|---------|--------------------------------|------------------|
| 新增模式测试 | 导入100条全新记录 | 全部成功插入,审计字段正确 |
| 更新模式测试 | 导入→修改→再导入 | 数据正确更新NULL字段保留 |
| 混合模式测试 | 50新+50已存在记录 | 新记录插入,旧记录更新 |
| 仅新增冲突测试 | 导入已存在记录isUpdateSupport=false | 抛出异常,记录失败 |
| 空文件测试 | 导入空Excel | 返回"至少需要一条数据" |
| 全部失败测试 | 所有数据格式错误 | 状态=FAILED失败数=总数 |
| 大数据量测试 | 导入2000+条记录 | 分批处理,全部成功 |
| 并发测试 | 同时导入相同数据 | 依靠唯一索引保证一致性 |
### 5.3 性能测试
**测试数据:**
- 500条记录
- 1000条记录
- 2000条记录
**性能指标:**
- 总响应时间
- 数据库操作次数
- 内存使用情况
**预期性能提升:**
- 更新模式下性能提升 30-40%
- 数据库操作次数减少 2次查询+删除)
---
## 六、实施计划
### 6.1 实施步骤
1. **数据库准备**
- 确认 `cdi_biz_intermediary.person_id` 有唯一索引
- 确认 `cdi_enterprise_base_info.social_credit_code` 有唯一索引
2. **Mapper层实现**
-`CcdiBizIntermediaryMapper` 接口添加 `importPersonBatch` 方法
-`CcdiEnterpriseBaseInfoMapper` 接口添加 `importEntityBatch` 方法
- 在对应的XML文件实现SQL语句
3. **Service层重构**
- 修改 `CcdiIntermediaryPersonImportServiceImpl.importPersonAsync` 方法
- 修改 `CcdiIntermediaryEntityImportServiceImpl.importEntityAsync` 方法
- 简化逻辑,移除删除操作
4. **单元测试**
- 编写Mapper层测试
- 编写Service层测试
5. **集成测试**
- 使用现有测试数据验证功能
- 对比优化前后的性能
6. **文档更新**
- 更新API文档
- 记录性能优化结果
### 6.2 向后兼容性
- ✅ API接口保持不变前端无需修改
- ✅ 返回数据格式不变
- ✅ 错误处理机制不变
- ✅ Redis状态管理不变
### 6.3 风险评估
| 风险 | 影响 | 缓解措施 |
|----------|----------|--------------|
| 唯一索引缺失 | 功能失败 | 实施前检查索引存在性 |
| 数据库版本兼容性 | SQL语法不支持 | 确认MySQL 5.7+ |
| 并发冲突 | 数据不一致 | 依赖数据库唯一索引和事务 |
| 性能回退 | 响应变慢 | 进行性能测试对比 |
---
## 七、预期收益
### 7.1 性能提升
| 指标 | 优化前 | 优化后 | 提升 |
|---------------|--------------|------------|--------|
| 数据库操作次数 | 3次查询+删除+插入) | 1次UPSERT | -66% |
| 代码行数 | ~120行 | ~60行 | -50% |
| 响应时间(1000条更新) | 基准 | 减少30-40% | 30-40% |
### 7.2 代码质量
- ✅ 逻辑更清晰,易于维护
- ✅ 减少出错可能性
- ✅ 更好的事务一致性
- ✅ 符合数据库最佳实践
### 7.3 可维护性
- SQL集中在XML文件易于优化
- 业务逻辑简化,降低认知负担
- 错误处理更精确
- 测试覆盖更全面
---
## 八、附录
### 8.1 相关文件
- Controller: `CcdiIntermediaryController.java`
- Service接口: `ICcdiIntermediaryService.java`
- Service实现: `CcdiIntermediaryServiceImpl.java`
- Import Service: `CcdiIntermediaryPersonImportServiceImpl.java`
- Mapper接口: `CcdiBizIntermediaryMapper.java`
- Mapper XML: `CcdiBizIntermediaryMapper.xml`
### 8.2 数据库表结构
**个人中介表 (cdi_biz_intermediary):**
```sql
UNIQUE KEY `uk_person_id` (`person_id`)
```
**实体中介表 (cdi_enterprise_base_info):**
```sql
PRIMARY KEY (`social_credit_code`)
```
### 8.3 测试数据
- 测试文件: `doc/test-data/purchase_transaction/purchase_test_data_2000_final.xlsx`
- 测试脚本: 待生成
---
**文档版本**: 1.0
**最后更新**: 2026-02-08
**状态**: 待评审