Files
ccdi/assets/requirements/plans/2026-02-08-intermediary-import-on-duplicate-key-update-design.md

491 lines
15 KiB
Markdown
Raw Permalink Normal View History

# 中介导入功能优化设计文档
## 概述
2026-03-03 16:14:16 +08:00
本设计文档描述了如何使用 MySQL 的 `INSERT ... ON DUPLICATE KEY UPDATE` 语句优化中介信息导入功能,替代现有的"先删除再插入"
更新模式,提升性能并简化代码逻辑。
**设计日期**: 2026-02-08
**目标**: 优化个人中介和实体中介的批量导入性能
**核心改进**: 使用 `ON DUPLICATE KEY UPDATE` 实现 Upsert 操作
---
## 一、整体架构设计
### 1.1 核心变更
**保持现有架构:**
2026-03-03 16:14:16 +08:00
- Controller 层:`CcdiIntermediaryController` - 无需修改
- Service 层:`CcdiIntermediaryServiceImpl` - 简化逻辑
- Mapper 层:新增批量导入方法
**架构优化点:**
2026-03-03 16:14:16 +08:00
| 层级 | 现有方案 | 优化方案 | 改进点 |
|---------|--------------------------|-----------------------------------------|--------------|
| Mapper | `insertBatch` + `delete` | `importBatch` (ON DUPLICATE KEY UPDATE) | 单次SQL完成插入或更新 |
| Service | 查询→分类→删除→插入 | 验证→直接导入 | 减少50%代码量 |
| 数据库 | 2-3次操作 | 1次操作 | 减少30-40%响应时间 |
### 1.2 数据流变化
**优化前流程:**
2026-03-03 16:14:16 +08:00
```
解析Excel → 验证数据 → 批量查询已存在记录 → 分类数据
→ 批量删除已存在记录 → 批量插入新记录和更新记录
```
**优化后流程:**
2026-03-03 16:14:16 +08:00
```
解析Excel → 验证数据 → 批量 INSERT ON DUPLICATE KEY UPDATE
```
**简化关键点:**
2026-03-03 16:14:16 +08:00
- 移除"批量查询已存在记录"步骤
- 移除"分类新增/更新记录"步骤
- 移除"批量删除已存在记录"步骤
---
## 二、SQL实现细节
### 2.1 个人中介批量导入SQL
**Mapper方法签名:**
2026-03-03 16:14:16 +08:00
```java
void importPersonBatch(@Param("list") List<CcdiBizIntermediary> list);
```
**SQL实现 (CcdiBizIntermediaryMapper.xml):**
2026-03-03 16:14:16 +08:00
```xml
<insert id="importPersonBatch">
INSERT INTO cdi_biz_intermediary (
person_id, name, gender, phone, address,
intermediary_type, data_source, created_by, updated_by
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.personId}, #{item.name}, #{item.gender},
#{item.phone}, #{item.address}, #{item.intermediaryType},
#{item.dataSource}, #{item.createdBy}, #{item.updatedBy}
)
</foreach>
ON DUPLICATE KEY UPDATE
name = IF(#{item.name} IS NOT NULL AND #{item.name} != '', #{item.name}, name),
gender = IF(#{item.gender} IS NOT NULL AND #{item.gender} != '', #{item.gender}, gender),
phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
intermediary_type = IF(#{item.intermediaryType} IS NOT NULL AND #{item.intermediaryType} != '', #{item.intermediaryType}, intermediary_type),
update_time = NOW(),
update_by = #{item.updatedBy}
</insert>
```
### 2.2 实体中介批量导入SQL
**Mapper方法签名:**
2026-03-03 16:14:16 +08:00
```java
void importEntityBatch(@Param("list") List<CcdiEnterpriseBaseInfo> list);
```
**SQL实现 (CcdiEnterpriseBaseInfoMapper.xml):**
2026-03-03 16:14:16 +08:00
```xml
<insert id="importEntityBatch">
INSERT INTO cdi_enterprise_base_info (
social_credit_code, enterprise_name, legal_representative,
phone, address, risk_level, ent_source, data_source,
created_by, updated_by
) VALUES
<foreach collection="list" item="item" separator=",">
(
#{item.socialCreditCode}, #{item.enterpriseName},
#{item.legalRepresentative}, #{item.phone}, #{item.address},
#{item.riskLevel}, #{item.entSource}, #{item.dataSource},
#{item.createdBy}, #{item.updatedBy}
)
</foreach>
ON DUPLICATE KEY UPDATE
enterprise_name = IF(#{item.enterpriseName} IS NOT NULL AND #{item.enterpriseName} != '', #{item.enterpriseName}, enterprise_name),
legal_representative = IF(#{item.legalRepresentative} IS NOT NULL AND #{item.legalRepresentative} != '', #{item.legalRepresentative}, legal_representative),
phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
update_time = NOW(),
update_by = #{item.updatedBy}
</insert>
```
### 2.3 关键设计要点
**1. 非空字段更新策略:**
2026-03-03 16:14:16 +08:00
```sql
field = IF(#{item.field} IS NOT NULL AND #{item.field} != '', #{item.field}, field)
```
2026-03-03 16:14:16 +08:00
- 只更新Excel中非空的字段
- 保留数据库中的原有值
- 避免误清空数据
**2. 审计字段处理:**
| 字段 | INSERT时 | UPDATE时 |
|------|----------|----------|
| created_by | 设置当前用户 | 不更新 |
| create_time | 数据库默认NOW() | 不更新 |
| updated_by | NULL | 设置当前用户 |
| update_time | 数据库默认NOW() | 更新为NOW() |
**3. 唯一键约束:**
2026-03-03 16:14:16 +08:00
- 个人中介: `person_id` (证件号)
- 实体中介: `social_credit_code` (统一社会信用代码)
**4. 批量操作优化:**
2026-03-03 16:14:16 +08:00
- 每批最多500条记录
- 避免SQL过长导致性能问题
- 超过500条时分批处理
---
## 三、Service层实现
### 3.1 isUpdateSupport参数处理
采用**方案C: Service层预处理**
```java
@Override
@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(List<CcdiIntermediaryPersonExcel> excelList,
Boolean isUpdateSupport,
String taskId,
String userName) {
List<CcdiBizIntermediary> validRecords = new ArrayList<>();
List<IntermediaryPersonImportFailureVO> failures = new ArrayList<>();
// 1. 数据验证阶段
for (CcdiIntermediaryPersonExcel excel : excelList) {
try {
validatePersonData(excel);
CcdiBizIntermediary intermediary = new CcdiBizIntermediary();
BeanUtils.copyProperties(excel, intermediary);
intermediary.setDataSource("IMPORT");
intermediary.setCreatedBy(userName);
if (isUpdateSupport) {
intermediary.setUpdatedBy(userName);
}
validRecords.add(intermediary);
} catch (Exception e) {
IntermediaryPersonImportFailureVO failure = new IntermediaryPersonImportFailureVO();
BeanUtils.copyProperties(excel, failure);
failure.setErrorMessage(e.getMessage());
failures.add(failure);
}
}
// 2. 根据isUpdateSupport选择处理方式
if (isUpdateSupport) {
// 更新模式直接批量导入数据库自动处理INSERT或UPDATE
importBatchWithUpdateSupport(validRecords, 500);
} else {
// 仅新增模式:先查询已存在的记录,对冲突的抛出异常
Set<String> existingIds = getExistingPersonIds(validRecords);
for (CcdiBizIntermediary record : validRecords) {
if (existingIds.contains(record.getPersonId())) {
throw new RuntimeException("该证件号已存在");
}
}
// 确认无冲突后,批量插入
importBatchWithoutUpdateSupport(validRecords, 500);
}
// 3. 更新导入状态
ImportResult result = new ImportResult();
result.setTotalCount(excelList.size());
result.setSuccessCount(validRecords.size());
result.setFailureCount(failures.size());
String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
updateImportStatus(taskId, finalStatus, result);
}
```
### 3.2 代码简化对比
**优化前 (约120行):**
2026-03-03 16:14:16 +08:00
```java
// 1. 批量查询已存在记录
Set<String> existingIds = getExistingPersonIds(excelList);
// 2. 分类数据
for (excel : excelList) {
if (existingIds.contains(excel.getPersonId())) {
if (isUpdateSupport) {
updateRecords.add(convert(excel));
} else {
throw new RuntimeException("已存在");
}
} else {
newRecords.add(convert(excel));
}
}
// 3. 批量插入新数据
if (!newRecords.isEmpty()) {
saveBatch(newRecords, 500);
}
// 4. 批量更新已有数据(先删除再插入)
if (!updateRecords.isEmpty() && isUpdateSupport) {
List<String> personIds = updateRecords.stream()
.map(CcdiBizIntermediary::getPersonId)
.collect(Collectors.toList());
LambdaQueryWrapper<CcdiBizIntermediary> deleteWrapper = new LambdaQueryWrapper<>();
deleteWrapper.in(CcdiBizIntermediary::getPersonId, personIds);
intermediaryMapper.delete(deleteWrapper);
intermediaryMapper.insertBatch(updateRecords);
}
```
**优化后 (约60行):**
2026-03-03 16:14:16 +08:00
```java
// 1. 验证数据并转换
for (excel : excelList) {
validatePersonData(excel);
validRecords.add(convert(excel));
}
// 2. 直接批量导入数据库自动处理INSERT或UPDATE
if (isUpdateSupport) {
intermediaryMapper.importPersonBatch(validRecords);
} else {
// 仅新增模式:检查唯一性
checkUniqueAndInsert(validRecords);
}
```
---
## 四、错误处理与边界情况
### 4.1 错误分类处理
2026-03-03 16:14:16 +08:00
| 错误类型 | 处理方式 | 状态标记 |
|-------------------------------|--------------------|-----------------|
| 数据验证错误 | 添加到失败列表,继续处理后续数据 | PARTIAL_SUCCESS |
| 唯一性冲突 (isUpdateSupport=false) | 抛出异常,添加到失败列表 | PARTIAL_SUCCESS |
| SQL执行错误 | 事务回滚,记录详细错误信息 | FAILED |
| 所有记录失败 | 状态为FAILED | FAILED |
| 部分成功 | 状态为PARTIAL_SUCCESS | PARTIAL_SUCCESS |
### 4.2 边界情况处理
2026-03-03 16:14:16 +08:00
| 场景 | 处理方式 |
|---------------|------------------------|
| Excel为空 | 返回"至少需要一条数据" |
| 所有数据格式错误 | 成功数=0失败数=总数,状态=FAILED |
| 超大数据量(>5000条) | 分批处理每批500条 |
| 并发导入相同数据 | 依靠数据库唯一索引保证一致性 |
| NULL字段更新 | 使用IF语句跳过保留原值 |
| 空字符串字段更新 | 视为NULL不更新 |
### 4.3 事务处理
```java
@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(...) {
try {
// 数据验证
// 批量导入
// 更新状态
} catch (Exception e) {
// 事务自动回滚
// 记录错误日志
// 更新状态为FAILED
throw e;
}
}
```
---
## 五、测试策略
### 5.1 单元测试
**Mapper层测试:**
2026-03-03 16:14:16 +08:00
- ✅ 批量插入全新记录
- ✅ 批量更新已存在记录
- ✅ 混合场景(部分新记录+部分已存在)
- ✅ NULL值字段不覆盖原值
- ✅ 审计字段正确设置和更新
- ✅ 唯一键冲突处理
**Service层测试:**
2026-03-03 16:14:16 +08:00
-`isUpdateSupport=true` 的完整流程
-`isUpdateSupport=false` 时重复数据抛异常
- ✅ 数据验证逻辑(必填字段、格式校验)
- ✅ 事务回滚机制
- ✅ 失败记录保存到Redis
### 5.2 集成测试场景
2026-03-03 16:14:16 +08:00
| 测试场景 | 测试步骤 | 预期结果 |
|---------|--------------------------------|------------------|
| 新增模式测试 | 导入100条全新记录 | 全部成功插入,审计字段正确 |
| 更新模式测试 | 导入→修改→再导入 | 数据正确更新NULL字段保留 |
| 混合模式测试 | 50新+50已存在记录 | 新记录插入,旧记录更新 |
| 仅新增冲突测试 | 导入已存在记录isUpdateSupport=false | 抛出异常,记录失败 |
| 空文件测试 | 导入空Excel | 返回"至少需要一条数据" |
| 全部失败测试 | 所有数据格式错误 | 状态=FAILED失败数=总数 |
| 大数据量测试 | 导入2000+条记录 | 分批处理,全部成功 |
| 并发测试 | 同时导入相同数据 | 依靠唯一索引保证一致性 |
### 5.3 性能测试
**测试数据:**
2026-03-03 16:14:16 +08:00
- 500条记录
- 1000条记录
- 2000条记录
**性能指标:**
2026-03-03 16:14:16 +08:00
- 总响应时间
- 数据库操作次数
- 内存使用情况
**预期性能提升:**
2026-03-03 16:14:16 +08:00
- 更新模式下性能提升 30-40%
- 数据库操作次数减少 2次查询+删除)
---
## 六、实施计划
### 6.1 实施步骤
1. **数据库准备**
2026-03-03 16:14:16 +08:00
- 确认 `cdi_biz_intermediary.person_id` 有唯一索引
- 确认 `cdi_enterprise_base_info.social_credit_code` 有唯一索引
2. **Mapper层实现**
2026-03-03 16:14:16 +08:00
-`CcdiBizIntermediaryMapper` 接口添加 `importPersonBatch` 方法
-`CcdiEnterpriseBaseInfoMapper` 接口添加 `importEntityBatch` 方法
- 在对应的XML文件实现SQL语句
3. **Service层重构**
2026-03-03 16:14:16 +08:00
- 修改 `CcdiIntermediaryPersonImportServiceImpl.importPersonAsync` 方法
- 修改 `CcdiIntermediaryEntityImportServiceImpl.importEntityAsync` 方法
- 简化逻辑,移除删除操作
4. **单元测试**
2026-03-03 16:14:16 +08:00
- 编写Mapper层测试
- 编写Service层测试
5. **集成测试**
2026-03-03 16:14:16 +08:00
- 使用现有测试数据验证功能
- 对比优化前后的性能
6. **文档更新**
2026-03-03 16:14:16 +08:00
- 更新API文档
- 记录性能优化结果
### 6.2 向后兼容性
- ✅ API接口保持不变前端无需修改
- ✅ 返回数据格式不变
- ✅ 错误处理机制不变
- ✅ Redis状态管理不变
### 6.3 风险评估
2026-03-03 16:14:16 +08:00
| 风险 | 影响 | 缓解措施 |
|----------|----------|--------------|
| 唯一索引缺失 | 功能失败 | 实施前检查索引存在性 |
| 数据库版本兼容性 | SQL语法不支持 | 确认MySQL 5.7+ |
2026-03-03 16:14:16 +08:00
| 并发冲突 | 数据不一致 | 依赖数据库唯一索引和事务 |
| 性能回退 | 响应变慢 | 进行性能测试对比 |
---
## 七、预期收益
### 7.1 性能提升
2026-03-03 16:14:16 +08:00
| 指标 | 优化前 | 优化后 | 提升 |
|---------------|--------------|------------|--------|
| 数据库操作次数 | 3次查询+删除+插入) | 1次UPSERT | -66% |
| 代码行数 | ~120行 | ~60行 | -50% |
| 响应时间(1000条更新) | 基准 | 减少30-40% | 30-40% |
### 7.2 代码质量
- ✅ 逻辑更清晰,易于维护
- ✅ 减少出错可能性
- ✅ 更好的事务一致性
- ✅ 符合数据库最佳实践
### 7.3 可维护性
- SQL集中在XML文件易于优化
- 业务逻辑简化,降低认知负担
- 错误处理更精确
- 测试覆盖更全面
---
## 八、附录
### 8.1 相关文件
- Controller: `CcdiIntermediaryController.java`
- Service接口: `ICcdiIntermediaryService.java`
- Service实现: `CcdiIntermediaryServiceImpl.java`
- Import Service: `CcdiIntermediaryPersonImportServiceImpl.java`
- Mapper接口: `CcdiBizIntermediaryMapper.java`
- Mapper XML: `CcdiBizIntermediaryMapper.xml`
### 8.2 数据库表结构
**个人中介表 (cdi_biz_intermediary):**
2026-03-03 16:14:16 +08:00
```sql
UNIQUE KEY `uk_person_id` (`person_id`)
```
**实体中介表 (cdi_enterprise_base_info):**
2026-03-03 16:14:16 +08:00
```sql
PRIMARY KEY (`social_credit_code`)
```
### 8.3 测试数据
- 测试文件: `doc/test-data/purchase_transaction/purchase_test_data_2000_final.xlsx`
- 测试脚本: 待生成
---
**文档版本**: 1.0
**最后更新**: 2026-02-08
**状态**: 待评审