文件夹整理

This commit is contained in:
wkc
2026-02-09 14:34:27 +08:00
parent 3ffe173dd6
commit d08782ae9e
4433 changed files with 537607 additions and 0 deletions

View File

@@ -0,0 +1,301 @@
# 中介导入功能重构测试报告
## 测试目标
验证Service层重构后,使用 `importPersonBatch``importEntityBatch` 方法
(基于 `ON DUPLICATE KEY UPDATE`) 的导入功能是否正常工作。
## 重构内容
### Task 5: 重构个人中介导入Service
**文件:** `CcdiIntermediaryPersonImportServiceImpl.java`
**核心变更:**
- 移除"先查询后分类再删除再插入"的逻辑
- 更新模式(`isUpdateSupport=true`): 直接调用 `intermediaryMapper.importPersonBatch(validRecords)`
- 仅新增模式(`isUpdateSupport=false`): 先查询冲突,然后只插入无冲突数据
- 新增辅助方法:
- `saveBatchWithUpsert()`: 使用 `importPersonBatch` 进行批量UPSERT
- `getExistingPersonIdsFromDb()`: 从数据库获取已存在的证件号
- `createFailureVO()`: 创建失败记录VO(两个重载方法)
### Task 6: 重构实体中介导入Service
**文件:** `CcdiIntermediaryEntityImportServiceImpl.java`
**同样的重构逻辑**
## 测试场景
### 场景1: 个人中介 - 更新模式(第一次导入)
**目的:** 验证批量INSERT功能
**操作:**
- 上传测试数据文件(1000条个人中介数据)
- 设置 `updateSupport=true`
**预期结果:**
- 所有数据成功插入
- 状态: SUCCESS
- 成功数 = 总数
- 失败数 = 0
**实际结果:** _待测试_
**状态:** ⏳ 待执行
---
### 场景2: 个人中介 - 仅新增模式(重复导入)
**目的:** 验证冲突检测功能
**操作:**
- 再次上传相同的测试数据
- 设置 `updateSupport=false`
**预期结果:**
- 所有数据因为冲突而失败
- 状态: PARTIAL_SUCCESS 或 FAILURE
- 成功数 = 0
- 失败数 = 总数
- 失败原因: "该证件号码已存在"
**实际结果:** _待测试_
**状态:** ⏳ 待执行
---
### 场景3: 实体中介 - 更新模式(第一次导入)
**目的:** 验证实体中介批量INSERT功能
**操作:**
- 上传测试数据文件(1000条实体中介数据)
- 设置 `updateSupport=true`
**预期结果:**
- 所有数据成功插入
- 状态: SUCCESS
- 成功数 = 总数
- 失败数 = 0
**实际结果:** _待测试_
**状态:** ⏳ 待执行
---
### 场景4: 实体中介 - 仅新增模式(重复导入)
**目的:** 验证实体中介冲突检测功能
**操作:**
- 再次上传相同的测试数据
- 设置 `updateSupport=false`
**预期结果:**
- 所有数据因为冲突而失败
- 状态: PARTIAL_SUCCESS 或 FAILURE
- 成功数 = 0
- 失败数 = 总数
- 失败原因: "该统一社会信用代码已存在"
**实际结果:** _待测试_
**状态:** ⏳ 待执行
---
### 场景5: 个人中介 - 再次更新模式
**目的:** 验证 `ON DUPLICATE KEY UPDATE` 功能
**操作:**
- 第三次上传相同的测试数据
- 设置 `updateSupport=true`
**预期结果:**
- 所有数据成功更新(而不是先删除再插入)
- 状态: SUCCESS
- 成功数 = 总数
- 失败数 = 0
- 数据库中不会出现重复记录
**实际结果:** _待测试_
**状态:** ⏳ 待执行
---
## 测试方法
### 手动测试
1. **启动后端服务**
```bash
cd ruoyi-ccdi
mvn spring-boot:run
```
2. **访问Swagger UI**
- URL: http://localhost:8080/swagger-ui/index.html
- 找到 `/ccdi/intermediary/importPersonData` 和 `/ccdi/intermediary/importEntityData` 接口
3. **执行测试场景**
- 使用"Try it out"功能上传测试文件
- 观察响应结果
- 使用任务ID查询导入状态
- 查看失败记录
### 自动化测试
运行测试脚本:
```bash
cd doc/test-data/intermediary
node test-import-upsert.js
```
测试脚本会自动执行所有测试场景并生成报告。
## 测试数据
### 个人中介测试数据
- 文件: `doc/test-data/intermediary/个人中介黑名单测试数据_1000条_第1批.xlsx`
- 记录数: 1000
- 特点: 包含有效的身份证号码
### 实体中介测试数据
- 文件: `doc/test-data/intermediary/机构中介黑名单测试数据_1000条_第1批.xlsx`
- 记录数: 1000
- 特点: 包含有效的统一社会信用代码
## 关键验证点
### 1. 数据库层面验证
**更新模式下的UPSERT操作:**
- 检查 `ccdi_biz_intermediary` 表,确保持有相同 `person_id` 的记录只有1条
- 检查 `ccdi_enterprise_base_info` 表,确保持有相同 `social_credit_code` 的记录只有1条
**验证SQL:**
```sql
-- 检查个人中介重复记录
SELECT person_id, COUNT(*) as cnt
FROM ccdi_biz_intermediary
GROUP BY person_id
HAVING cnt > 1;
-- 检查实体中介重复记录
SELECT social_credit_code, COUNT(*) as cnt
FROM ccdi_enterprise_base_info
GROUP BY social_credit_code
HAVING cnt > 1;
```
### 2. 性能验证
**对比重构前后的性能差异:**
| 场景 | 重构前(先删后插) | 重构后(UPSERT) | 性能提升 |
|------|----------------|---------------|---------|
| 1000条首次导入 | _待测试_ | _待测试_ | _待计算_ |
| 1000条重复导入 | _待测试_ | _待测试_ | _待计算_ |
### 3. 错误处理验证
**验证失败记录的正确性:**
- 失败原因是否准确
- 失败记录的完整信息是否保留
- Redis中失败记录的存储和读取
## 测试结果汇总
| 场景 | 状态 | 通过/失败 | 备注 |
|------|------|----------|------|
| 场景1 | ⏳ 待执行 | - | 个人中介首次导入 |
| 场景2 | ⏳ 待执行 | - | 个人中介重复导入(仅新增) |
| 场景3 | ⏳ 待执行 | - | 实体中介首次导入 |
| 场景4 | ⏳ 待执行 | - | 实体中介重复导入(仅新增) |
| 场景5 | ⏳ 待执行 | - | 个人中介重复导入(更新) |
**总通过率:** 0/5 (0%)
## 问题记录
### 问题1: _问题描述_
**场景:** _相关场景_
**现象:** _具体表现_
**原因:** _根本原因_
**解决方案:** _修复方法_
**状态:** ⏳ 待解决 / ✅ 已解决
---
## 结论
_测试完成后填写总体结论_
### 代码质量评估
- **可读性:** _评分_ / 10
- **可维护性:** _评分_ / 10
- **性能:** _评分_ / 10
- **错误处理:** _评分_ / 10
### 优化建议
_根据测试结果提出优化建议_
## 附录
### A. 测试环境信息
- **操作系统:** Windows 11
- **Java版本:** 17
- **Spring Boot版本:** 3.5.8
- **MySQL版本:** 8.2.0
- **Redis版本:** _待填写_
### B. 相关文件清单
- `ruoyi-ccdi/src/main/java/com/ruoyi/ccdi/service/impl/CcdiIntermediaryPersonImportServiceImpl.java`
- `ruoyi-ccdi/src/main/java/com/ruoyi/ccdi/service/impl/CcdiIntermediaryEntityImportServiceImpl.java`
- `ruoyi-ccdi/src/main/java/com/ruoyi/ccdi/mapper/CcdiBizIntermediaryMapper.java`
- `ruoyi-ccdi/src/main/java/com/ruoyi/ccdi/mapper/CcdiEnterpriseBaseInfoMapper.java`
- `doc/test-data/intermediary/test-import-upsert.js`
### C. Git提交信息
```
commit 7d534de
refactor: 重构Service层使用ON DUPLICATE KEY UPDATE
- 更新模式直接调用importPersonBatch/importEntityBatch
- 移除'先删除再插入'逻辑,代码简化约50%
- 添加辅助方法saveBatchWithUpsert/getExistingPersonIdsFromDb
- 添加createFailureVO重载方法简化失败记录创建
变更详情:
- CcdiIntermediaryPersonImportServiceImpl: 重构importPersonAsync方法
- CcdiIntermediaryEntityImportServiceImpl: 重构importEntityAsync方法
- 两个Service均采用统一的处理模式
Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
```
---
**报告生成时间:** 2026-02-08
**测试执行人:** _待填写_
**审核人:** _待填写_

View File

@@ -0,0 +1,151 @@
import pandas as pd
import random
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
def calculate_id_check_code(id_17):
"""
计算身份证校验码符合GB 11643-1999标准
"""
weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
weighted_sum = sum(int(id_17[i]) * weights[i] for i in range(17))
mod = weighted_sum % 11
return check_codes[mod]
def generate_valid_person_id():
"""
生成符合校验标准的18位身份证号
"""
area_code = f"{random.randint(110000, 659999)}"
birth_year = random.randint(1960, 2000)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
sequence_code = f"{random.randint(0, 999):03d}"
id_17 = f"{area_code}{birth_year}{birth_month}{birth_day}{sequence_code}"
check_code = calculate_id_check_code(id_17)
return f"{id_17}{check_code}"
def validate_id_check_code(person_id):
"""
验证身份证校验码是否正确
"""
if len(str(person_id)) != 18:
return False
id_17 = str(person_id)[:17]
check_code = str(person_id)[17]
return calculate_id_check_code(id_17) == check_code.upper()
# 读取现有文件
input_file = 'doc/test-data/intermediary/intermediary_test_data_1000_valid.xlsx'
output_file = 'doc/test-data/intermediary/intermediary_test_data_1000_valid.xlsx'
print(f"正在读取文件: {input_file}")
df = pd.read_excel(input_file)
print(f"总行数: {len(df)}\n")
# 统计各证件类型
print("=== 原始证件类型分布 ===")
for id_type, count in df['证件类型'].value_counts().items():
print(f"{id_type}: {count}")
# 找出所有非身份证类型的记录
non_id_mask = df['证件类型'] != '身份证'
non_id_count = non_id_mask.sum()
id_card_count = (~non_id_mask).sum()
print(f"\n需要转换的证件数量: {non_id_count}")
print(f"现有身份证数量: {id_card_count}条(保持不变)")
# 备份现有身份证号码
existing_id_cards = df[~non_id_mask]['证件号码*'].copy()
print(f"\n已备份 {len(existing_id_cards)} 条现有身份证号码")
# 转换证件类型并生成新身份证号
print(f"\n正在转换证件类型并生成身份证号码...")
updated_count = 0
for idx in df[non_id_mask].index:
# 修改证件类型为身份证
df.loc[idx, '证件类型'] = '身份证'
# 生成新的身份证号
new_id = generate_valid_person_id()
df.loc[idx, '证件号码*'] = new_id
updated_count += 1
if (updated_count % 100 == 0) or (updated_count == non_id_count):
print(f"已处理 {updated_count}/{non_id_count}")
# 保存到Excel
df.to_excel(output_file, index=False, engine='openpyxl')
# 格式化Excel文件
wb = load_workbook(output_file)
ws = wb.active
# 设置列宽
ws.column_dimensions['A'].width = 15
ws.column_dimensions['B'].width = 12
ws.column_dimensions['C'].width = 12
ws.column_dimensions['D'].width = 8
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 20
ws.column_dimensions['G'].width = 15
ws.column_dimensions['H'].width = 15
ws.column_dimensions['I'].width = 30
ws.column_dimensions['J'].width = 20
ws.column_dimensions['K'].width = 20
ws.column_dimensions['L'].width = 12
ws.column_dimensions['M'].width = 15
ws.column_dimensions['N'].width = 12
ws.column_dimensions['O'].width = 20
# 设置表头样式
header_fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')
header_font = Font(bold=True)
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 冻结首行
ws.freeze_panes = 'A2'
wb.save(output_file)
# 最终验证
print("\n正在进行最终验证...")
df_verify = pd.read_excel(output_file)
# 验证所有记录都是身份证
all_id_card = (df_verify['证件类型'] == '身份证').all()
print(f"所有证件类型均为身份证: {'✅ 是' if all_id_card else '❌ 否'}")
# 验证所有身份证号码
all_valid = True
invalid_count = 0
for idx, person_id in df_verify['证件号码*'].items():
if not validate_id_check_code(str(person_id)):
all_valid = False
invalid_count += 1
if invalid_count <= 5:
print(f"❌ 错误: {person_id}")
print(f"\n身份证号码验证:")
print(f"总数: {len(df_verify)}")
print(f"校验通过: {len(df_verify) - invalid_count}条 ✅")
if invalid_count > 0:
print(f"校验失败: {invalid_count}条 ❌")
print(f"\n=== 更新完成 ===")
print(f"文件: {output_file}")
print(f"转换证件数量: {updated_count}")
print(f"保持不变: {len(existing_id_cards)}")
print(f"总记录数: {len(df_verify)}")
print(f"\n✅ 所有1000条记录现在都使用身份证类型")
print(f"✅ 所有身份证号码已通过GB 11643-1999标准校验")

Binary file not shown.

View File

@@ -0,0 +1,143 @@
import pandas as pd
import random
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
def calculate_id_check_code(id_17):
"""
计算身份证校验码符合GB 11643-1999标准
"""
weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
weighted_sum = sum(int(id_17[i]) * weights[i] for i in range(17))
mod = weighted_sum % 11
return check_codes[mod]
def generate_valid_person_id():
"""
生成符合校验标准的18位身份证号
"""
area_code = f"{random.randint(110000, 659999)}"
birth_year = random.randint(1960, 2000)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
sequence_code = f"{random.randint(0, 999):03d}"
id_17 = f"{area_code}{birth_year}{birth_month}{birth_day}{sequence_code}"
check_code = calculate_id_check_code(id_17)
return f"{id_17}{check_code}"
def validate_id_check_code(person_id):
"""
验证身份证校验码是否正确
"""
if len(person_id) != 18:
return False
id_17 = person_id[:17]
check_code = person_id[17]
return calculate_id_check_code(id_17) == check_code.upper()
# 读取现有文件
input_file = 'doc/test-data/intermediary/intermediary_test_data_1000_valid.xlsx'
output_file = 'doc/test-data/intermediary/intermediary_test_data_1000_valid.xlsx'
print(f"正在读取文件: {input_file}")
df = pd.read_excel(input_file)
print(f"总行数: {len(df)}")
# 找出所有身份证类型的记录
id_card_mask = df['证件类型'] == '身份证'
id_card_count = id_card_mask.sum()
print(f"\n找到 {id_card_count} 条身份证记录")
# 验证现有身份证
print("\n正在验证现有身份证校验码...")
invalid_count = 0
invalid_indices = []
for idx in df[id_card_mask].index:
person_id = str(df.loc[idx, '证件号码*'])
if not validate_id_check_code(person_id):
invalid_count += 1
invalid_indices.append(idx)
print(f"校验正确: {id_card_count - invalid_count}")
print(f"校验错误: {invalid_count}")
if invalid_count > 0:
print(f"\n需要重新生成 {invalid_count} 条身份证号码")
# 重新生成所有身份证号码
print(f"\n正在重新生成所有身份证号码...")
updated_count = 0
for idx in df[id_card_mask].index:
old_id = df.loc[idx, '证件号码*']
new_id = generate_valid_person_id()
df.loc[idx, '证件号码*'] = new_id
updated_count += 1
if (updated_count % 50 == 0) or (updated_count == id_card_count):
print(f"已更新 {updated_count}/{id_card_count}")
# 保存到Excel
df.to_excel(output_file, index=False, engine='openpyxl')
# 格式化Excel文件
wb = load_workbook(output_file)
ws = wb.active
# 设置列宽
ws.column_dimensions['A'].width = 15
ws.column_dimensions['B'].width = 12
ws.column_dimensions['C'].width = 12
ws.column_dimensions['D'].width = 8
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 20
ws.column_dimensions['G'].width = 15
ws.column_dimensions['H'].width = 15
ws.column_dimensions['I'].width = 30
ws.column_dimensions['J'].width = 20
ws.column_dimensions['K'].width = 20
ws.column_dimensions['L'].width = 12
ws.column_dimensions['M'].width = 15
ws.column_dimensions['N'].width = 12
ws.column_dimensions['O'].width = 20
# 设置表头样式
header_fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')
header_font = Font(bold=True)
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 冻结首行
ws.freeze_panes = 'A2'
wb.save(output_file)
# 最终验证
print("\n正在进行最终验证...")
df_verify = pd.read_excel(output_file)
id_cards = df_verify[df_verify['证件类型'] == '身份证']['证件号码*']
all_valid = True
for idx, person_id in id_cards.items():
if not validate_id_check_code(str(person_id)):
all_valid = False
print(f"❌ 错误: {person_id}")
if all_valid:
print(f"✅ 所有 {len(id_cards)} 条身份证号码校验通过!")
else:
print("❌ 存在校验失败的身份证号码")
print(f"\n=== 更新完成 ===")
print(f"文件: {output_file}")
print(f"更新身份证数量: {updated_count}")
print(f"其他证件类型保持不变")

View File

@@ -0,0 +1,215 @@
import pandas as pd
import random
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
def calculate_id_check_code(id_17):
"""
计算身份证校验码符合GB 11643-1999标准
:param id_17: 前17位身份证号
:return: 校验码0-9或X
"""
# 权重因子
weights = [7, 9, 10, 5, 8, 4, 2, 1, 6, 3, 7, 9, 10, 5, 8, 4, 2]
# 校验码对应表
check_codes = ['1', '0', 'X', '9', '8', '7', '6', '5', '4', '3', '2']
# 计算加权和
weighted_sum = sum(int(id_17[i]) * weights[i] for i in range(17))
# 取模得到索引
mod = weighted_sum % 11
# 返回对应的校验码
return check_codes[mod]
def generate_valid_person_id(id_type):
"""
生成符合校验标准的证件号码
"""
if id_type == '身份证':
# 6位地区码 + 4位年份 + 2位月份 + 2位日期 + 3位顺序码
area_code = f"{random.randint(110000, 659999)}"
birth_year = random.randint(1960, 2000)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
sequence_code = f"{random.randint(0, 999):03d}"
# 前17位
id_17 = f"{area_code}{birth_year}{birth_month}{birth_day}{sequence_code}"
# 计算校验码
check_code = calculate_id_check_code(id_17)
return f"{id_17}{check_code}"
else:
# 护照、台胞证、港澳通行证8位数字
return str(random.randint(10000000, 99999999))
# 验证身份证校验码
def validate_id_check_code(person_id):
"""
验证身份证校验码是否正确
"""
if len(person_id) != 18:
return False
id_17 = person_id[:17]
check_code = person_id[17]
return calculate_id_check_code(id_17) == check_code.upper()
# 定义数据生成规则
last_names = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
first_names_male = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
first_names_female = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
person_types = ['中介']
person_sub_types = ['本人', '配偶', '子女', '父母', '其他']
genders = ['M', 'F', 'O']
id_types = ['身份证', '护照', '台胞证', '港澳通行证']
companies = ['房屋租赁公司', '房产经纪公司', '投资咨询公司', '置业咨询公司', '不动产咨询公司', '物业管理公司', '资产评估公司', '土地评估公司', '地产代理公司', '房产咨询公司']
positions = ['区域经理', '店长', '高级经纪人', '房产经纪人', '销售经理', '置业顾问', '物业顾问', '评估师', '业务员', '总监', '主管', None]
relation_types = ['配偶', '子女', '父母', '兄弟姐妹', None, None]
provinces = ['北京市', '上海市', '广东省', '江苏省', '浙江省', '四川省', '河南省', '福建省', '湖北省', '湖南省']
districts = ['海淀区', '朝阳区', '天河区', '浦东新区', '西湖区', '黄浦区', '静安区', '徐汇区', '福田区', '罗湖区']
streets = ['', '大街', '大道', '街道', '', '广场', '大厦', '花园']
buildings = ['1号楼', '2号楼', '3号楼', '4号楼', '5号楼', '6号楼', '7号楼', '8号楼', 'A座', 'B座']
def generate_name(gender):
first_names = first_names_male if gender == 'M' else first_names_female
return random.choice(last_names) + random.choice(first_names)
def generate_mobile():
return f"1{random.choice([3, 5, 7, 8, 9])}{random.randint(0, 9)}{random.randint(10000000, 99999999)}"
def generate_wechat():
return f"wx_{''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8))}"
def generate_address():
return f"{random.choice(provinces)}{random.choice(districts)}{random.choice(streets)}{random.randint(1, 100)}"
def generate_social_credit_code():
return f"91{random.randint(0, 9)}{random.randint(10000000000000000, 99999999999999999)}"
def generate_related_num_id():
return f"ID{random.randint(10000, 99999)}"
def generate_row(index):
gender = random.choice(genders)
person_sub_type = random.choice(person_sub_types)
id_type = random.choice(id_types)
return {
'姓名*': generate_name(gender),
'人员类型': '中介',
'人员子类型': person_sub_type,
'性别': gender,
'证件类型': id_type,
'证件号码*': generate_valid_person_id(id_type),
'手机号码': generate_mobile(),
'微信号': random.choice([generate_wechat(), None, None]),
'联系地址': generate_address(),
'所在公司': random.choice(companies),
'企业统一信用码': random.choice([generate_social_credit_code(), None, None]),
'职位': random.choice(positions),
'关联人员ID': random.choice([generate_related_num_id(), None, None, None]),
'关系类型': random.choice(relation_types),
'备注': None
}
# 生成1000条数据
print("正在生成1000条测试数据...")
data = []
for i in range(1000):
row = generate_row(i)
data.append(row)
if (i + 1) % 100 == 0:
print(f"已生成 {i + 1} 条...")
# 创建DataFrame
df = pd.DataFrame(data)
# 输出文件
output_file = 'doc/test-data/intermediary/intermediary_test_data_1000_valid.xlsx'
# 保存到Excel
df.to_excel(output_file, index=False, engine='openpyxl')
# 格式化Excel文件
wb = load_workbook(output_file)
ws = wb.active
# 设置列宽
ws.column_dimensions['A'].width = 15
ws.column_dimensions['B'].width = 12
ws.column_dimensions['C'].width = 12
ws.column_dimensions['D'].width = 8
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 20
ws.column_dimensions['G'].width = 15
ws.column_dimensions['H'].width = 15
ws.column_dimensions['I'].width = 30
ws.column_dimensions['J'].width = 20
ws.column_dimensions['K'].width = 20
ws.column_dimensions['L'].width = 12
ws.column_dimensions['M'].width = 15
ws.column_dimensions['N'].width = 12
ws.column_dimensions['O'].width = 20
# 设置表头样式
header_fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')
header_font = Font(bold=True)
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 冻结首行
ws.freeze_panes = 'A2'
wb.save(output_file)
# 验证身份证校验码
print("\n正在验证身份证校验码...")
df_read = pd.read_excel(output_file)
id_cards = df_read[df_read['证件类型'] == '身份证']['证件号码*']
valid_count = 0
invalid_count = 0
invalid_ids = []
for idx, person_id in id_cards.items():
if validate_id_check_code(str(person_id)):
valid_count += 1
else:
invalid_count += 1
invalid_ids.append(person_id)
print(f"\n✅ 成功生成1000条测试数据到: {output_file}")
print(f"\n=== 身份证校验码验证 ===")
print(f"身份证总数: {len(id_cards)}")
print(f"校验正确: {valid_count}条 ✅")
print(f"校验错误: {invalid_count}")
if invalid_count > 0:
print(f"\n错误的身份证号:")
for pid in invalid_ids[:10]:
print(f" {pid}")
print(f"\n=== 数据统计 ===")
print(f"人员类型: {df_read['人员类型'].unique()}")
print(f"性别分布: {dict(df_read['性别'].value_counts())}")
print(f"证件类型分布: {dict(df_read['证件类型'].value_counts())}")
print(f"人员子类型分布: {dict(df_read['人员子类型'].value_counts())}")
print(f"\n=== 身份证号码样本(已验证校验码)===")
valid_id_samples = id_cards.head(5).tolist()
for sample in valid_id_samples:
is_valid = "" if validate_id_check_code(str(sample)) else ""
print(f"{sample} {is_valid}")

View File

@@ -0,0 +1,163 @@
import pandas as pd
import random
from openpyxl import load_workbook
from openpyxl.styles import Font, PatternFill, Alignment
# 读取模板文件
template_file = 'doc/test-data/intermediary/person_1770542031351.xlsx'
output_file = 'doc/test-data/intermediary/intermediary_test_data_1000.xlsx'
# 定义数据生成规则
last_names = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
first_names_male = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
first_names_female = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
person_types = ['中介']
person_sub_types = ['本人', '配偶', '子女', '父母', '其他']
genders = ['M', 'F', 'O']
id_types = ['身份证', '护照', '台胞证', '港澳通行证']
companies = ['房屋租赁公司', '房产经纪公司', '投资咨询公司', '置业咨询公司', '不动产咨询公司', '物业管理公司', '资产评估公司', '土地评估公司', '地产代理公司', '房产咨询公司']
positions = ['区域经理', '店长', '高级经纪人', '房产经纪人', '销售经理', '置业顾问', '物业顾问', '评估师', '业务员', '总监', '主管', None]
relation_types = ['配偶', '子女', '父母', '兄弟姐妹', None, None]
provinces = ['北京市', '上海市', '广东省', '江苏省', '浙江省', '四川省', '河南省', '福建省', '湖北省', '湖南省']
districts = ['海淀区', '朝阳区', '天河区', '浦东新区', '西湖区', '黄浦区', '静安区', '徐汇区', '福田区', '罗湖区']
streets = ['', '大街', '大道', '街道', '', '广场', '大厦', '花园']
buildings = ['1号楼', '2号楼', '3号楼', '4号楼', '5号楼', '6号楼', '7号楼', '8号楼', 'A座', 'B座']
# 现有数据样本(从数据库获取的格式)
existing_data_samples = [
{'name': '林玉兰', 'person_type': '中介', 'person_sub_type': '本人', 'gender': 'F', 'id_type': '护照', 'person_id': '45273944', 'mobile': '18080309834', 'wechat_no': 'wx_rt54d59p', 'contact_address': '福建省黄浦区巷4号', 'company': '房屋租赁公司', 'social_credit_code': '911981352496905281', 'position': '区域经理', 'related_num_id': 'ID92351', 'relation_type': None},
{'name': '刘平', 'person_type': '中介', 'person_sub_type': '本人', 'gender': 'F', 'id_type': '台胞证', 'person_id': '38639164', 'mobile': '19360856434', 'wechat_no': None, 'contact_address': '四川省海淀区路3号', 'company': '房产经纪公司', 'social_credit_code': '918316437629447909', 'position': None, 'related_num_id': None, 'relation_type': None},
{'name': '何娜', 'person_type': '中介', 'person_sub_type': '本人', 'gender': 'O', 'id_type': '港澳通行证', 'person_id': '83433341', 'mobile': '18229577387', 'wechat_no': 'wx_8ikozqjx', 'contact_address': '河南省天河区巷4号', 'company': '房产经纪公司', 'social_credit_code': '918315578905616368', 'position': '店长', 'related_num_id': None, 'relation_type': '父母'},
{'name': '王毅', 'person_type': '中介', 'person_sub_type': '本人', 'gender': 'M', 'id_type': '台胞证', 'person_id': '76369869', 'mobile': '17892993806', 'wechat_no': None, 'contact_address': '江苏省西湖区街道1号', 'company': '投资咨询公司', 'social_credit_code': None, 'position': '高级经纪人', 'related_num_id': 'ID61198', 'relation_type': None},
{'name': '李桂英', 'person_type': '中介', 'person_sub_type': '配偶', 'gender': 'F', 'id_type': '护照', 'person_id': '75874216', 'mobile': '15648713336', 'wechat_no': 'wx_5n0e926w', 'contact_address': '浙江省海淀区大道2号', 'company': '投资咨询公司', 'social_credit_code': None, 'position': '店长', 'related_num_id': None, 'relation_type': None},
]
def generate_name(gender):
first_names = first_names_male if gender == 'M' else first_names_female
return random.choice(last_names) + random.choice(first_names)
def generate_mobile():
return f"1{random.choice([3, 5, 7, 8, 9])}{random.randint(0, 9)}{random.randint(10000000, 99999999)}"
def generate_wechat():
return f"wx_{''.join(random.choices('abcdefghijklmnopqrstuvwxyz0123456789', k=8))}"
def generate_person_id(id_type):
if id_type == '身份证':
# 18位身份证号6位地区码 + 4位年份 + 2位月份 + 2位日期 + 3位顺序码 + 1位校验码
area_code = f"{random.randint(110000, 659999)}"
birth_year = random.randint(1960, 2000)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
sequence_code = f"{random.randint(0, 999):03d}"
# 简单校验码随机0-9或X
check_code = random.choice(['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', 'X'])
return f"{area_code}{birth_year}{birth_month}{birth_day}{sequence_code}{check_code}"
else:
return str(random.randint(10000000, 99999999))
def generate_social_credit_code():
return f"91{random.randint(0, 9)}{random.randint(10000000000000000, 99999999999999999)}"
def generate_address():
return f"{random.choice(provinces)}{random.choice(districts)}{random.choice(streets)}{random.randint(1, 100)}"
def generate_related_num_id():
return f"ID{random.randint(10000, 99999)}"
def generate_row(index, is_existing):
if is_existing:
sample = existing_data_samples[index % len(existing_data_samples)]
return {
'姓名*': sample['name'],
'人员类型': sample['person_type'],
'人员子类型': sample['person_sub_type'],
'性别': sample['gender'],
'证件类型': sample['id_type'],
'证件号码*': sample['person_id'],
'手机号码': sample['mobile'],
'微信号': sample['wechat_no'],
'联系地址': sample['contact_address'],
'所在公司': sample['company'],
'企业统一信用码': sample['social_credit_code'],
'职位': sample['position'],
'关联人员ID': sample['related_num_id'],
'关系类型': sample['relation_type'],
'备注': None
}
else:
gender = random.choice(genders)
person_sub_type = random.choice(person_sub_types)
id_type = random.choice(id_types)
return {
'姓名*': generate_name(gender),
'人员类型': '中介',
'人员子类型': person_sub_type,
'性别': gender,
'证件类型': id_type,
'证件号码*': generate_person_id(id_type),
'手机号码': generate_mobile(),
'微信号': random.choice([generate_wechat(), None, None]),
'联系地址': generate_address(),
'所在公司': random.choice(companies),
'企业统一信用码': random.choice([generate_social_credit_code(), None, None]),
'职位': random.choice(positions),
'关联人员ID': random.choice([generate_related_num_id(), None, None, None]),
'关系类型': random.choice(relation_types),
'备注': None
}
# 生成1000条数据
data = []
for i in range(1000):
is_existing = i < 500
row = generate_row(i, is_existing)
data.append(row)
# 创建DataFrame
df = pd.DataFrame(data)
# 保存到Excel
df.to_excel(output_file, index=False, engine='openpyxl')
# 格式化Excel文件
wb = load_workbook(output_file)
ws = wb.active
# 设置列宽
ws.column_dimensions['A'].width = 15
ws.column_dimensions['B'].width = 12
ws.column_dimensions['C'].width = 12
ws.column_dimensions['D'].width = 8
ws.column_dimensions['E'].width = 12
ws.column_dimensions['F'].width = 20
ws.column_dimensions['G'].width = 15
ws.column_dimensions['H'].width = 15
ws.column_dimensions['I'].width = 30
ws.column_dimensions['J'].width = 20
ws.column_dimensions['K'].width = 20
ws.column_dimensions['L'].width = 12
ws.column_dimensions['M'].width = 15
ws.column_dimensions['N'].width = 12
ws.column_dimensions['O'].width = 20
# 设置表头样式
header_fill = PatternFill(start_color='D3D3D3', end_color='D3D3D3', fill_type='solid')
header_font = Font(bold=True)
for cell in ws[1]:
cell.fill = header_fill
cell.font = header_font
cell.alignment = Alignment(horizontal='center', vertical='center')
# 冻结首行
ws.freeze_panes = 'A2'
wb.save(output_file)
print(f'成功生成1000条测试数据到: {output_file}')
print('- 500条现有数据前500行')
print('- 500条新数据后500行')

View File

@@ -0,0 +1,181 @@
import random
import string
from datetime import datetime, timedelta
import pandas as pd
# 机构名称前缀
company_prefixes = ['北京市', '上海市', '广州市', '深圳市', '杭州市', '成都市', '武汉市', '南京市', '西安市', '重庆市']
company_keywords = ['房产', '地产', '置业', '中介', '经纪', '咨询', '投资', '资产', '物业', '不动产']
company_suffixes = ['有限公司', '股份有限公司', '集团', '企业', '合伙企业', '有限责任公司']
# 主体类型
entity_types = ['企业', '个体工商户', '农民专业合作社', '其他组织']
# 企业性质
enterprise_natures = ['国有企业', '集体企业', '私营企业', '混合所有制企业', '外商投资企业', '港澳台投资企业']
# 行业分类
industry_classes = ['房地产业', '金融业', '租赁和商务服务业', '建筑业', '批发和零售业']
# 所属行业
industry_names = [
'房地产中介服务', '房地产经纪', '房地产开发经营', '物业管理',
'投资咨询', '资产管理', '商务咨询', '市场调查',
'建筑工程', '装饰装修', '园林绿化'
]
# 法定代表人姓名
surnames = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
given_names = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '秀英', '', '']
# 证件类型
cert_types = ['身份证', '护照', '港澳通行证', '台胞证', '其他']
# 常用地址
provinces = ['北京市', '上海市', '广东省', '浙江省', '江苏省', '四川省', '湖北省', '河南省', '山东省', '福建省']
cities = ['朝阳区', '海淀区', '浦东新区', '黄浦区', '天河区', '福田区', '西湖区', '滨江区', '鼓楼区', '玄武区',
'武侯区', '江汉区', '金水区', '市南区', '思明区']
districts = ['街道', '大道', '', '', '小区', '花园', '广场', '大厦']
street_numbers = ['1号', '2号', '3号', '88号', '66号', '108号', '188号', '888号', '666号', '168号']
# 股东姓名
shareholder_names = [
'张伟', '李芳', '王强', '刘军', '陈静', '杨洋', '黄勇', '赵艳',
'周杰', '吴娟', '徐涛', '孙明', '马超', '胡秀英', '朱霞', '郭平',
'何桂英', '罗玉兰', '高萍', '林毅', '王浩', '李宇', '张轩', '刘然'
]
def generate_company_name():
"""生成机构名称"""
prefix = random.choice(company_prefixes)
keyword = random.choice(company_keywords)
suffix = random.choice(company_suffixes)
return f"{prefix}{keyword}{suffix}"
def generate_social_credit_code():
"""生成统一社会信用代码18位"""
# 统一社会信用代码规则18位第一位为登记管理部门代码1-5第二位为机构类别代码1-9
dept_code = random.choice(['1', '2', '3', '4', '5'])
org_code = random.choice(['1', '2', '3', '4', '5', '6', '7', '8', '9'])
rest = ''.join([str(random.randint(0, 9)) for _ in range(16)])
return f"{dept_code}{org_code}{rest}"
def generate_id_card():
"""生成身份证号码18位简化版"""
# 地区码前6位
area_code = f"{random.randint(110000, 650000):06d}"
# 出生日期8位
birth_year = random.randint(1960, 1990)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
birth_date = f"{birth_year}{birth_month}{birth_day}"
# 顺序码3位
sequence = f"{random.randint(1, 999):03d}"
# 校验码1位
check_code = random.randint(0, 9)
return f"{area_code}{birth_date}{sequence}{check_code}"
def generate_other_id():
"""生成其他证件号码"""
return f"{random.randint(10000000, 99999999):08d}"
def generate_register_address():
"""生成注册地址"""
province = random.choice(provinces)
city = random.choice(cities)
district = random.choice(districts)
number = random.choice(street_numbers)
return f"{province}{city}{district}{number}"
def generate_establish_date():
"""生成成立日期2000-2024年之间"""
start_date = datetime(2000, 1, 1)
end_date = datetime(2024, 12, 31)
time_between = end_date - start_date
days_between = time_between.days
random_days = random.randrange(days_between)
return start_date + timedelta(days=random_days)
def generate_legal_representative():
"""生成法定代表人"""
name = random.choice(surnames) + random.choice(given_names)
cert_type = random.choice(cert_types)
cert_no = generate_id_card() if cert_type == '身份证' else generate_other_id()
return name, cert_type, cert_no
def generate_shareholders():
"""生成股东列表1-5个股东"""
shareholder_count = random.randint(1, 5)
selected_shareholders = random.sample(shareholder_names, shareholder_count)
shareholders = [None] * 5
for i, shareholder in enumerate(selected_shareholders):
shareholders[i] = shareholder
return shareholders
def generate_entity(index):
"""生成单条机构中介数据"""
# 基本信息
enterprise_name = generate_company_name()
social_credit_code = generate_social_credit_code()
entity_type = random.choice(entity_types)
enterprise_nature = random.choice(enterprise_natures)
industry_class = random.choice(industry_classes)
industry_name = random.choice(industry_names)
# 成立日期
establish_date = generate_establish_date()
# 注册地址
register_address = generate_register_address()
# 法定代表人信息
legal_name, legal_cert_type, legal_cert_no = generate_legal_representative()
# 股东
shareholders = generate_shareholders()
return {
'机构名称*': enterprise_name,
'统一社会信用代码*': social_credit_code,
'主体类型': entity_type,
'企业性质': enterprise_nature if random.random() > 0.3 else '',
'行业分类': industry_class if random.random() > 0.3 else '',
'所属行业': industry_name if random.random() > 0.2 else '',
'成立日期': establish_date.strftime('%Y-%m-%d') if random.random() > 0.4 else '',
'注册地址': register_address,
'法定代表人': legal_name,
'法定代表人证件类型': legal_cert_type,
'法定代表人证件号码': legal_cert_no,
'股东1': shareholders[0] if shareholders[0] else '',
'股东2': shareholders[1] if shareholders[1] else '',
'股东3': shareholders[2] if shareholders[2] else '',
'股东4': shareholders[3] if shareholders[3] else '',
'股东5': shareholders[4] if shareholders[4] else '',
'备注': f'测试数据{index}' if random.random() > 0.5 else ''
}
# 生成第一个1000条数据
print("正在生成第一批1000条机构中介黑名单数据...")
data = [generate_entity(i) for i in range(1, 1001)]
df = pd.DataFrame(data)
# 保存第一个文件
output1 = r'D:\ccdi\ccdi\doc\test-data\intermediary\机构中介黑名单测试数据_1000条_第1批.xlsx'
df.to_excel(output1, index=False, engine='openpyxl')
print(f"已生成第一个文件: {output1}")
# 生成第二个1000条数据
print("正在生成第二批1000条机构中介黑名单数据...")
data2 = [generate_entity(i) for i in range(1, 1001)]
df2 = pd.DataFrame(data2)
# 保存第二个文件
output2 = r'D:\ccdi\ccdi\doc\test-data\intermediary\机构中介黑名单测试数据_1000条_第2批.xlsx'
df2.to_excel(output2, index=False, engine='openpyxl')
print(f"已生成第二个文件: {output2}")
print("\n✅ 生成完成!")
print(f"文件1: {output1}")
print(f"文件2: {output2}")
print(f"\n每个文件包含1000条测试数据")
print(f"数据格式与CcdiIntermediaryEntityExcel.java定义一致")

View File

@@ -0,0 +1,110 @@
import random
import string
from datetime import datetime
import pandas as pd
# 常用姓氏和名字
surnames = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '']
given_names = ['', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '', '秀英', '', '', '', '桂英', '玉兰', '', '', '', '', '', '', '']
# 人员类型
person_types = ['中介', '职业背债人', '房产中介']
person_sub_types = ['本人', '配偶', '子女', '其他']
genders = ['M', 'F', 'O']
id_types = ['身份证', '护照', '港澳通行证', '台胞证', '军官证']
relation_types = ['配偶', '子女', '父母', '兄弟姐妹', '其他']
# 常用地址
provinces = ['北京市', '上海市', '广东省', '浙江省', '江苏省', '四川省', '湖北省', '河南省', '山东省', '福建省']
cities = ['朝阳区', '海淀区', '浦东新区', '黄浦区', '天河区', '福田区', '西湖区', '滨江区', '鼓楼区', '玄武区']
districts = ['街道1号', '大道2号', '路3号', '巷4号', '小区5栋', '花园6号', '广场7号', '大厦8号楼']
# 公司和职位
companies = ['房产中介有限公司', '置业咨询公司', '房产经纪公司', '地产代理公司', '不动产咨询公司', '房屋租赁公司', '物业管理公司', '投资咨询公司']
positions = ['房产经纪人', '销售经理', '业务员', '置业顾问', '店长', '区域经理', '高级经纪人', '项目经理']
# 生成身份证号码(简化版,仅用于测试)
def generate_id_card():
# 地区码前6位
area_code = f"{random.randint(110000, 650000):06d}"
# 出生日期8位
birth_year = random.randint(1960, 2000)
birth_month = f"{random.randint(1, 12):02d}"
birth_day = f"{random.randint(1, 28):02d}"
birth_date = f"{birth_year}{birth_month}{birth_day}"
# 顺序码3位
sequence = f"{random.randint(1, 999):03d}"
# 校验码1位
check_code = random.randint(0, 9)
return f"{area_code}{birth_date}{sequence}{check_code}"
# 生成手机号
def generate_phone():
second_digits = ['3', '5', '7', '8', '9']
second = random.choice(second_digits)
return f"1{second}{''.join([str(random.randint(0, 9)) for _ in range(9)])}"
# 生成统一信用代码
def generate_credit_code():
return f"91{''.join([str(random.randint(0, 9)) for _ in range(16)])}"
# 生成微信号
def generate_wechat():
return f"wx_{''.join([random.choice(string.ascii_lowercase + string.digits) for _ in range(8)])}"
# 生成单条数据
def generate_person(index):
person_type = random.choice(person_types)
gender = random.choice(genders)
# 根据性别选择更合适的名字
if gender == 'M':
name = random.choice(surnames) + random.choice(['', '', '', '', '', '', '', '', '', '', '', '', ''])
else:
name = random.choice(surnames) + random.choice(['', '', '', '', '', '', '', '秀英', '', '', '桂英', '玉兰', ''])
id_type = random.choice(id_types)
id_card = generate_id_card() if id_type == '身份证' else f"{random.randint(10000000, 99999999):08d}"
return {
'姓名': name,
'人员类型': person_type,
'人员子类型': random.choice(person_sub_types),
'性别': gender,
'证件类型': id_type,
'证件号码': id_card,
'手机号码': generate_phone(),
'微信号': generate_wechat() if random.random() > 0.3 else '',
'联系地址': f"{random.choice(provinces)}{random.choice(cities)}{random.choice(districts)}",
'所在公司': random.choice(companies) if random.random() > 0.2 else '',
'企业统一信用码': generate_credit_code() if random.random() > 0.5 else '',
'职位': random.choice(positions) if random.random() > 0.3 else '',
'关联人员ID': f"ID{random.randint(10000, 99999)}" if random.random() > 0.6 else '',
'关系类型': random.choice(relation_types) if random.random() > 0.6 else '',
'备注': f'测试数据{index}' if random.random() > 0.5 else ''
}
# 生成1000条数据
print("正在生成1000条个人中介黑名单数据...")
data = [generate_person(i) for i in range(1, 1001)]
df = pd.DataFrame(data)
# 保存第一个文件
output1 = r'D:\ccdi\ccdi\doc\test-data\intermediary\个人中介黑名单测试数据_1000条_第1批.xlsx'
df.to_excel(output1, index=False)
print(f"已生成第一个文件: {output1}")
# 生成第二个1000条数据
print("正在生成第二批1000条个人中介黑名单数据...")
data2 = [generate_person(i) for i in range(1, 1001)]
df2 = pd.DataFrame(data2)
# 保存第二个文件
output2 = r'D:\ccdi\ccdi\doc\test-data\intermediary\个人中介黑名单测试数据_1000条_第2批.xlsx'
df2.to_excel(output2, index=False)
print(f"已生成第二个文件: {output2}")
print("\n生成完成!")
print(f"文件1: {output1}")
print(f"文件2: {output2}")
print(f"\n每个文件包含1000条测试数据")

Binary file not shown.

View File

@@ -0,0 +1,446 @@
/**
* 中介导入功能测试脚本 - 验证ON DUPLICATE KEY UPDATE重构
*
* 测试场景:
* 1. 更新模式 - 测试importPersonBatch/importEntityBatch的INSERT ON DUPLICATE KEY UPDATE
* 2. 仅新增模式 - 测试冲突检测和失败记录
* 3. 边界情况 - 空列表、全部冲突、部分冲突等
*/
const axios = require('axios');
const FormData = require('form-data');
const fs = require('fs');
const path = require('path');
// 配置
const BASE_URL = 'http://localhost:8080';
const LOGIN_URL = `${BASE_URL}/login/test`;
const PERSON_IMPORT_URL = `${BASE_URL}/ccdi/intermediary/importPersonData`;
const ENTITY_IMPORT_URL = `${BASE_URL}/ccdi/intermediary/importEntityData`;
const PERSON_STATUS_URL = `${BASE_URL}/ccdi/intermediary/person/import/status`;
const ENTITY_STATUS_URL = `${BASE_URL}/ccdi/intermediary/entity/import/status`;
const PERSON_FAILURES_URL = `${BASE_URL}/ccdi/intermediary/person/import/failures`;
const ENTITY_FAILURES_URL = `${BASE_URL}/ccdi/intermediary/entity/import/failures`;
// 测试数据文件路径
const TEST_DATA_DIR = path.join(__dirname, '../test-data/intermediary');
const PERSON_TEST_FILE = path.join(TEST_DATA_DIR, '个人中介黑名单测试数据_1000条_第1批.xlsx');
const ENTITY_TEST_FILE = path.join(TEST_DATA_DIR, '机构中介黑名单测试数据_1000条_第1批.xlsx');
let authToken = '';
// 颜色输出
const colors = {
reset: '\x1b[0m',
green: '\x1b[32m',
red: '\x1b[31m',
yellow: '\x1b[33m',
blue: '\x1b[36m'
};
function log(message, color = 'reset') {
console.log(`${colors[color]}${message}${colors.reset}`);
}
function logSuccess(message) {
log(`${message}`, 'green');
}
function logError(message) {
log(`${message}`, 'red');
}
function logInfo(message) {
log(` ${message}`, 'blue');
}
function logSection(title) {
console.log('\n' + '='.repeat(60));
log(title, 'yellow');
console.log('='.repeat(60));
}
/**
* 登录获取Token
*/
async function login() {
logSection('登录系统');
try {
const response = await axios.post(LOGIN_URL, {
username: 'admin',
password: 'admin123'
});
if (response.data.code === 200) {
authToken = response.data.data;
logSuccess('登录成功');
logInfo(`Token: ${authToken.substring(0, 20)}...`);
return true;
} else {
logError(`登录失败: ${response.data.msg}`);
return false;
}
} catch (error) {
logError(`登录请求失败: ${error.message}`);
return false;
}
}
/**
* 上传文件并开始导入
*/
async function importData(file, url, updateSupport, description) {
logSection(description);
if (!fs.existsSync(file)) {
logError(`测试文件不存在: ${file}`);
return null;
}
logInfo(`上传文件: ${path.basename(file)}`);
logInfo(`更新模式: ${updateSupport ? '是' : '否'}`);
try {
const form = new FormData();
form.append('file', fs.createReadStream(file));
form.append('updateSupport', updateSupport.toString());
const response = await axios.post(url, form, {
headers: {
...form.getHeaders(),
'Authorization': `Bearer ${authToken}`
}
});
if (response.data.code === 200) {
logSuccess('导入任务已提交');
logInfo(`响应信息: ${response.data.msg}`);
// 从响应中提取taskId
const match = response.data.msg.match(/任务ID: ([a-zA-Z0-9-]+)/);
if (match) {
const taskId = match[1];
logInfo(`任务ID: ${taskId}`);
return taskId;
}
} else {
logError(`导入失败: ${response.data.msg}`);
}
} catch (error) {
logError(`导入请求失败: ${error.message}`);
if (error.response) {
logError(`状态码: ${error.response.status}`);
logError(`响应数据: ${JSON.stringify(error.response.data)}`);
}
}
return null;
}
/**
* 轮询查询导入状态
*/
async function pollImportStatus(taskId, url, description, maxAttempts = 30, interval = 2000) {
logInfo(`等待导入完成...`);
for (let attempt = 1; attempt <= maxAttempts; attempt++) {
try {
const response = await axios.get(`${url}?taskId=${taskId}`, {
headers: {
'Authorization': `Bearer ${authToken}`
}
});
if (response.data.code === 200) {
const status = response.data.data;
logInfo(`[尝试 ${attempt}/${maxAttempts}] 状态: ${status.status}, 进度: ${status.progress}%`);
if (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS') {
logSuccess(`${description}完成!`);
logInfo(`总数: ${status.totalCount}, 成功: ${status.successCount}, 失败: ${status.failureCount}`);
return status;
} else if (status.status === 'FAILURE') {
logError(`${description}失败`);
return status;
}
}
} catch (error) {
logError(`查询状态失败: ${error.message}`);
}
await sleep(interval);
}
logError('导入超时');
return null;
}
/**
* 获取导入失败记录
*/
async function getImportFailures(taskId, url, description) {
logSection(`获取${description}失败记录`);
try {
const response = await axios.get(`${url}?taskId=${taskId}`, {
headers: {
'Authorization': `Bearer ${authToken}`
}
});
if (response.data.code === 200) {
const failures = response.data.data;
logInfo(`失败记录数: ${failures.length}`);
if (failures.length > 0) {
logInfo('前3条失败记录:');
failures.slice(0, 3).forEach((failure, index) => {
console.log(` ${index + 1}. ${failure.errorMessage || '未知错误'}`);
});
// 保存失败记录到文件
const failureFile = path.join(__dirname, `failures_${taskId}.json`);
fs.writeFileSync(failureFile, JSON.stringify(failures, null, 2));
logInfo(`失败记录已保存到: ${failureFile}`);
}
return failures;
}
} catch (error) {
logError(`获取失败记录失败: ${error.message}`);
}
return [];
}
/**
* 辅助函数: 延迟
*/
function sleep(ms) {
return new Promise(resolve => setTimeout(resolve, ms));
}
/**
* 测试场景1: 个人中介 - 更新模式(第一次导入)
*/
async function testPersonImportUpdateMode() {
logSection('测试场景1: 个人中介 - 更新模式(第一次导入)');
const taskId = await importData(
PERSON_TEST_FILE,
PERSON_IMPORT_URL,
true, // 更新模式
'个人中介导入(更新模式)'
);
if (!taskId) {
logError('导入任务未创建');
return false;
}
const status = await pollImportStatus(taskId, PERSON_STATUS_URL, '个人中介导入');
if (status && (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS')) {
const failures = await getImportFailures(taskId, PERSON_FAILURES_URL, '个人中介');
logSuccess(`测试场景1完成 - 成功: ${status.successCount}, 失败: ${status.failureCount}`);
return true;
}
return false;
}
/**
* 测试场景2: 个人中介 - 仅新增模式(重复导入应失败)
*/
async function testPersonImportInsertOnly() {
logSection('测试场景2: 个人中介 - 仅新增模式(重复导入)');
const taskId = await importData(
PERSON_TEST_FILE,
PERSON_IMPORT_URL,
false, // 仅新增模式
'个人中介导入(仅新增)'
);
if (!taskId) {
logError('导入任务未创建');
return false;
}
const status = await pollImportStatus(taskId, PERSON_STATUS_URL, '个人中介导入');
if (status && (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS')) {
const failures = await getImportFailures(taskId, PERSON_FAILURES_URL, '个人中介');
// 在仅新增模式下,重复导入应该全部失败
if (failures.length > 0) {
logSuccess(`测试场景2完成 - 预期有失败记录, 实际失败: ${failures.length}`);
return true;
} else {
logError('测试场景2失败 - 预期有失败记录, 但实际没有');
return false;
}
}
return false;
}
/**
* 测试场景3: 实体中介 - 更新模式(第一次导入)
*/
async function testEntityImportUpdateMode() {
logSection('测试场景3: 实体中介 - 更新模式(第一次导入)');
const taskId = await importData(
ENTITY_TEST_FILE,
ENTITY_IMPORT_URL,
true, // 更新模式
'实体中介导入(更新模式)'
);
if (!taskId) {
logError('导入任务未创建');
return false;
}
const status = await pollImportStatus(taskId, ENTITY_STATUS_URL, '实体中介导入');
if (status && (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS')) {
const failures = await getImportFailures(taskId, ENTITY_FAILURES_URL, '实体中介');
logSuccess(`测试场景3完成 - 成功: ${status.successCount}, 失败: ${status.failureCount}`);
return true;
}
return false;
}
/**
* 测试场景4: 实体中介 - 仅新增模式(重复导入应失败)
*/
async function testEntityImportInsertOnly() {
logSection('测试场景4: 实体中介 - 仅新增模式(重复导入)');
const taskId = await importData(
ENTITY_TEST_FILE,
ENTITY_IMPORT_URL,
false, // 仅新增模式
'实体中介导入(仅新增)'
);
if (!taskId) {
logError('导入任务未创建');
return false;
}
const status = await pollImportStatus(taskId, ENTITY_STATUS_URL, '实体中介导入');
if (status && (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS')) {
const failures = await getImportFailures(taskId, ENTITY_FAILURES_URL, '实体中介');
// 在仅新增模式下,重复导入应该全部失败
if (failures.length > 0) {
logSuccess(`测试场景4完成 - 预期有失败记录, 实际失败: ${failures.length}`);
return true;
} else {
logError('测试场景4失败 - 预期有失败记录, 但实际没有');
return false;
}
}
return false;
}
/**
* 测试场景5: 个人中介 - 再次更新模式(应该更新已有数据)
*/
async function testPersonImportUpdateAgain() {
logSection('测试场景5: 个人中介 - 再次更新模式');
const taskId = await importData(
PERSON_TEST_FILE,
PERSON_IMPORT_URL,
true, // 更新模式
'个人中介导入(再次更新)'
);
if (!taskId) {
logError('导入任务未创建');
return false;
}
const status = await pollImportStatus(taskId, PERSON_STATUS_URL, '个人中介导入');
if (status && (status.status === 'SUCCESS' || status.status === 'PARTIAL_SUCCESS')) {
const failures = await getImportFailures(taskId, PERSON_FAILURES_URL, '个人中介');
logSuccess(`测试场景5完成 - 成功: ${status.successCount}, 失败: ${status.failureCount}`);
return true;
}
return false;
}
/**
* 主测试流程
*/
async function runTests() {
console.log('\n╔════════════════════════════════════════════════════════════╗');
console.log('║ 中介导入功能测试 - ON DUPLICATE KEY UPDATE验证 ║');
console.log('╚════════════════════════════════════════════════════════════╝');
const startTime = Date.now();
const results = {
passed: 0,
failed: 0
};
// 登录
const loginSuccess = await login();
if (!loginSuccess) {
logError('无法登录,终止测试');
return;
}
// 执行测试
const tests = [
{ name: '场景1: 个人中介-更新模式(首次)', fn: testPersonImportUpdateMode },
{ name: '场景2: 个人中介-仅新增(重复)', fn: testPersonImportInsertOnly },
{ name: '场景3: 实体中介-更新模式(首次)', fn: testEntityImportUpdateMode },
{ name: '场景4: 实体中介-仅新增(重复)', fn: testEntityImportInsertOnly },
{ name: '场景5: 个人中介-再次更新', fn: testPersonImportUpdateAgain }
];
for (const test of tests) {
try {
const passed = await test.fn();
if (passed) {
results.passed++;
} else {
results.failed++;
}
await sleep(2000); // 测试之间间隔
} catch (error) {
logError(`${test.name} 执行异常: ${error.message}`);
results.failed++;
}
}
// 输出测试结果摘要
const duration = ((Date.now() - startTime) / 1000).toFixed(2);
console.log('\n' + '='.repeat(60));
log('测试结果摘要', 'yellow');
console.log('='.repeat(60));
logSuccess(`通过: ${results.passed}/${tests.length}`);
if (results.failed > 0) {
logError(`失败: ${results.failed}/${tests.length}`);
}
logInfo(`总耗时: ${duration}`);
console.log('='.repeat(60) + '\n');
}
// 运行测试
runTests().catch(error => {
logError(`测试运行失败: ${error.message}`);
console.error(error);
process.exit(1);
});