Files
ccdi/doc/requirements/plans/2026-02-08-intermediary-import-on-duplicate-key-update-design.md
2026-02-09 14:28:25 +08:00

14 KiB
Raw Blame History

中介导入功能优化设计文档

概述

本设计文档描述了如何使用 MySQL 的 INSERT ... ON DUPLICATE KEY UPDATE 语句优化中介信息导入功能,替代现有的"先删除再插入"更新模式,提升性能并简化代码逻辑。

设计日期: 2026-02-08 目标: 优化个人中介和实体中介的批量导入性能 核心改进: 使用 ON DUPLICATE KEY UPDATE 实现 Upsert 操作


一、整体架构设计

1.1 核心变更

保持现有架构:

  • Controller 层:CcdiIntermediaryController - 无需修改
  • Service 层:CcdiIntermediaryServiceImpl - 简化逻辑
  • Mapper 层:新增批量导入方法

架构优化点:

层级 现有方案 优化方案 改进点
Mapper insertBatch + delete importBatch (ON DUPLICATE KEY UPDATE) 单次SQL完成插入或更新
Service 查询→分类→删除→插入 验证→直接导入 减少50%代码量
数据库 2-3次操作 1次操作 减少30-40%响应时间

1.2 数据流变化

优化前流程:

解析Excel → 验证数据 → 批量查询已存在记录 → 分类数据
  → 批量删除已存在记录 → 批量插入新记录和更新记录

优化后流程:

解析Excel → 验证数据 → 批量 INSERT ON DUPLICATE KEY UPDATE

简化关键点:

  • 移除"批量查询已存在记录"步骤
  • 移除"分类新增/更新记录"步骤
  • 移除"批量删除已存在记录"步骤

二、SQL实现细节

2.1 个人中介批量导入SQL

Mapper方法签名:

void importPersonBatch(@Param("list") List<CcdiBizIntermediary> list);

SQL实现 (CcdiBizIntermediaryMapper.xml):

<insert id="importPersonBatch">
    INSERT INTO cdi_biz_intermediary (
        person_id, name, gender, phone, address,
        intermediary_type, data_source, created_by, updated_by
    ) VALUES
    <foreach collection="list" item="item" separator=",">
        (
            #{item.personId}, #{item.name}, #{item.gender},
            #{item.phone}, #{item.address}, #{item.intermediaryType},
            #{item.dataSource}, #{item.createdBy}, #{item.updatedBy}
        )
    </foreach>
    ON DUPLICATE KEY UPDATE
        name = IF(#{item.name} IS NOT NULL AND #{item.name} != '', #{item.name}, name),
        gender = IF(#{item.gender} IS NOT NULL AND #{item.gender} != '', #{item.gender}, gender),
        phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
        address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
        intermediary_type = IF(#{item.intermediaryType} IS NOT NULL AND #{item.intermediaryType} != '', #{item.intermediaryType}, intermediary_type),
        update_time = NOW(),
        update_by = #{item.updatedBy}
</insert>

2.2 实体中介批量导入SQL

Mapper方法签名:

void importEntityBatch(@Param("list") List<CcdiEnterpriseBaseInfo> list);

SQL实现 (CcdiEnterpriseBaseInfoMapper.xml):

<insert id="importEntityBatch">
    INSERT INTO cdi_enterprise_base_info (
        social_credit_code, enterprise_name, legal_representative,
        phone, address, risk_level, ent_source, data_source,
        created_by, updated_by
    ) VALUES
    <foreach collection="list" item="item" separator=",">
        (
            #{item.socialCreditCode}, #{item.enterpriseName},
            #{item.legalRepresentative}, #{item.phone}, #{item.address},
            #{item.riskLevel}, #{item.entSource}, #{item.dataSource},
            #{item.createdBy}, #{item.updatedBy}
        )
    </foreach>
    ON DUPLICATE KEY UPDATE
        enterprise_name = IF(#{item.enterpriseName} IS NOT NULL AND #{item.enterpriseName} != '', #{item.enterpriseName}, enterprise_name),
        legal_representative = IF(#{item.legalRepresentative} IS NOT NULL AND #{item.legalRepresentative} != '', #{item.legalRepresentative}, legal_representative),
        phone = IF(#{item.phone} IS NOT NULL AND #{item.phone} != '', #{item.phone}, phone),
        address = IF(#{item.address} IS NOT NULL AND #{item.address} != '', #{item.address}, address),
        update_time = NOW(),
        update_by = #{item.updatedBy}
</insert>

2.3 关键设计要点

1. 非空字段更新策略:

field = IF(#{item.field} IS NOT NULL AND #{item.field} != '', #{item.field}, field)
  • 只更新Excel中非空的字段
  • 保留数据库中的原有值
  • 避免误清空数据

2. 审计字段处理:

字段 INSERT时 UPDATE时
created_by 设置当前用户 不更新
create_time 数据库默认NOW() 不更新
updated_by NULL 设置当前用户
update_time 数据库默认NOW() 更新为NOW()

3. 唯一键约束:

  • 个人中介: person_id (证件号)
  • 实体中介: social_credit_code (统一社会信用代码)

4. 批量操作优化:

  • 每批最多500条记录
  • 避免SQL过长导致性能问题
  • 超过500条时分批处理

三、Service层实现

3.1 isUpdateSupport参数处理

采用方案C: Service层预处理

@Override
@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(List<CcdiIntermediaryPersonExcel> excelList,
                              Boolean isUpdateSupport,
                              String taskId,
                              String userName) {
    List<CcdiBizIntermediary> validRecords = new ArrayList<>();
    List<IntermediaryPersonImportFailureVO> failures = new ArrayList<>();

    // 1. 数据验证阶段
    for (CcdiIntermediaryPersonExcel excel : excelList) {
        try {
            validatePersonData(excel);
            CcdiBizIntermediary intermediary = new CcdiBizIntermediary();
            BeanUtils.copyProperties(excel, intermediary);
            intermediary.setDataSource("IMPORT");
            intermediary.setCreatedBy(userName);
            if (isUpdateSupport) {
                intermediary.setUpdatedBy(userName);
            }
            validRecords.add(intermediary);
        } catch (Exception e) {
            IntermediaryPersonImportFailureVO failure = new IntermediaryPersonImportFailureVO();
            BeanUtils.copyProperties(excel, failure);
            failure.setErrorMessage(e.getMessage());
            failures.add(failure);
        }
    }

    // 2. 根据isUpdateSupport选择处理方式
    if (isUpdateSupport) {
        // 更新模式直接批量导入数据库自动处理INSERT或UPDATE
        importBatchWithUpdateSupport(validRecords, 500);
    } else {
        // 仅新增模式:先查询已存在的记录,对冲突的抛出异常
        Set<String> existingIds = getExistingPersonIds(validRecords);
        for (CcdiBizIntermediary record : validRecords) {
            if (existingIds.contains(record.getPersonId())) {
                throw new RuntimeException("该证件号已存在");
            }
        }
        // 确认无冲突后,批量插入
        importBatchWithoutUpdateSupport(validRecords, 500);
    }

    // 3. 更新导入状态
    ImportResult result = new ImportResult();
    result.setTotalCount(excelList.size());
    result.setSuccessCount(validRecords.size());
    result.setFailureCount(failures.size());

    String finalStatus = result.getFailureCount() == 0 ? "SUCCESS" : "PARTIAL_SUCCESS";
    updateImportStatus(taskId, finalStatus, result);
}

3.2 代码简化对比

优化前 (约120行):

// 1. 批量查询已存在记录
Set<String> existingIds = getExistingPersonIds(excelList);

// 2. 分类数据
for (excel : excelList) {
    if (existingIds.contains(excel.getPersonId())) {
        if (isUpdateSupport) {
            updateRecords.add(convert(excel));
        } else {
            throw new RuntimeException("已存在");
        }
    } else {
        newRecords.add(convert(excel));
    }
}

// 3. 批量插入新数据
if (!newRecords.isEmpty()) {
    saveBatch(newRecords, 500);
}

// 4. 批量更新已有数据(先删除再插入)
if (!updateRecords.isEmpty() && isUpdateSupport) {
    List<String> personIds = updateRecords.stream()
        .map(CcdiBizIntermediary::getPersonId)
        .collect(Collectors.toList());

    LambdaQueryWrapper<CcdiBizIntermediary> deleteWrapper = new LambdaQueryWrapper<>();
    deleteWrapper.in(CcdiBizIntermediary::getPersonId, personIds);
    intermediaryMapper.delete(deleteWrapper);

    intermediaryMapper.insertBatch(updateRecords);
}

优化后 (约60行):

// 1. 验证数据并转换
for (excel : excelList) {
    validatePersonData(excel);
    validRecords.add(convert(excel));
}

// 2. 直接批量导入数据库自动处理INSERT或UPDATE
if (isUpdateSupport) {
    intermediaryMapper.importPersonBatch(validRecords);
} else {
    // 仅新增模式:检查唯一性
    checkUniqueAndInsert(validRecords);
}

四、错误处理与边界情况

4.1 错误分类处理

错误类型 处理方式 状态标记
数据验证错误 添加到失败列表,继续处理后续数据 PARTIAL_SUCCESS
唯一性冲突 (isUpdateSupport=false) 抛出异常,添加到失败列表 PARTIAL_SUCCESS
SQL执行错误 事务回滚,记录详细错误信息 FAILED
所有记录失败 状态为FAILED FAILED
部分成功 状态为PARTIAL_SUCCESS PARTIAL_SUCCESS

4.2 边界情况处理

场景 处理方式
Excel为空 返回"至少需要一条数据"
所有数据格式错误 成功数=0失败数=总数,状态=FAILED
超大数据量(>5000条) 分批处理每批500条
并发导入相同数据 依靠数据库唯一索引保证一致性
NULL字段更新 使用IF语句跳过保留原值
空字符串字段更新 视为NULL不更新

4.3 事务处理

@Async
@Transactional(rollbackFor = Exception.class)
public void importPersonAsync(...) {
    try {
        // 数据验证
        // 批量导入
        // 更新状态
    } catch (Exception e) {
        // 事务自动回滚
        // 记录错误日志
        // 更新状态为FAILED
        throw e;
    }
}

五、测试策略

5.1 单元测试

Mapper层测试:

  • 批量插入全新记录
  • 批量更新已存在记录
  • 混合场景(部分新记录+部分已存在)
  • NULL值字段不覆盖原值
  • 审计字段正确设置和更新
  • 唯一键冲突处理

Service层测试:

  • isUpdateSupport=true 的完整流程
  • isUpdateSupport=false 时重复数据抛异常
  • 数据验证逻辑(必填字段、格式校验)
  • 事务回滚机制
  • 失败记录保存到Redis

5.2 集成测试场景

测试场景 测试步骤 预期结果
新增模式测试 导入100条全新记录 全部成功插入,审计字段正确
更新模式测试 导入→修改→再导入 数据正确更新NULL字段保留
混合模式测试 50新+50已存在记录 新记录插入,旧记录更新
仅新增冲突测试 导入已存在记录isUpdateSupport=false 抛出异常,记录失败
空文件测试 导入空Excel 返回"至少需要一条数据"
全部失败测试 所有数据格式错误 状态=FAILED失败数=总数
大数据量测试 导入2000+条记录 分批处理,全部成功
并发测试 同时导入相同数据 依靠唯一索引保证一致性

5.3 性能测试

测试数据:

  • 500条记录
  • 1000条记录
  • 2000条记录

性能指标:

  • 总响应时间
  • 数据库操作次数
  • 内存使用情况

预期性能提升:

  • 更新模式下性能提升 30-40%
  • 数据库操作次数减少 2次查询+删除)

六、实施计划

6.1 实施步骤

  1. 数据库准备

    • 确认 cdi_biz_intermediary.person_id 有唯一索引
    • 确认 cdi_enterprise_base_info.social_credit_code 有唯一索引
  2. Mapper层实现

    • CcdiBizIntermediaryMapper 接口添加 importPersonBatch 方法
    • CcdiEnterpriseBaseInfoMapper 接口添加 importEntityBatch 方法
    • 在对应的XML文件实现SQL语句
  3. Service层重构

    • 修改 CcdiIntermediaryPersonImportServiceImpl.importPersonAsync 方法
    • 修改 CcdiIntermediaryEntityImportServiceImpl.importEntityAsync 方法
    • 简化逻辑,移除删除操作
  4. 单元测试

    • 编写Mapper层测试
    • 编写Service层测试
  5. 集成测试

    • 使用现有测试数据验证功能
    • 对比优化前后的性能
  6. 文档更新

    • 更新API文档
    • 记录性能优化结果

6.2 向后兼容性

  • API接口保持不变前端无需修改
  • 返回数据格式不变
  • 错误处理机制不变
  • Redis状态管理不变

6.3 风险评估

风险 影响 缓解措施
唯一索引缺失 功能失败 实施前检查索引存在性
数据库版本兼容性 SQL语法不支持 确认MySQL 5.7+
并发冲突 数据不一致 依赖数据库唯一索引和事务
性能回退 响应变慢 进行性能测试对比

七、预期收益

7.1 性能提升

指标 优化前 优化后 提升
数据库操作次数 3次查询+删除+插入) 1次UPSERT -66%
代码行数 ~120行 ~60行 -50%
响应时间(1000条更新) 基准 减少30-40% 30-40%

7.2 代码质量

  • 逻辑更清晰,易于维护
  • 减少出错可能性
  • 更好的事务一致性
  • 符合数据库最佳实践

7.3 可维护性

  • SQL集中在XML文件易于优化
  • 业务逻辑简化,降低认知负担
  • 错误处理更精确
  • 测试覆盖更全面

八、附录

8.1 相关文件

  • Controller: CcdiIntermediaryController.java
  • Service接口: ICcdiIntermediaryService.java
  • Service实现: CcdiIntermediaryServiceImpl.java
  • Import Service: CcdiIntermediaryPersonImportServiceImpl.java
  • Mapper接口: CcdiBizIntermediaryMapper.java
  • Mapper XML: CcdiBizIntermediaryMapper.xml

8.2 数据库表结构

个人中介表 (cdi_biz_intermediary):

UNIQUE KEY `uk_person_id` (`person_id`)

实体中介表 (cdi_enterprise_base_info):

PRIMARY KEY (`social_credit_code`)

8.3 测试数据

  • 测试文件: doc/test-data/purchase_transaction/purchase_test_data_2000_final.xlsx
  • 测试脚本: 待生成

文档版本: 1.0 最后更新: 2026-02-08 状态: 待评审