Files
ccdi/doc/test-data/test_import_fix.py

269 lines
8.6 KiB
Python
Raw Permalink Normal View History

2026-01-29 22:03:42 +08:00
"""
中介黑名单导入功能测试脚本
测试目标
1. 验证机构中介导入时 certificate_no 字段不能为 null 的修复
2. 验证个人中介导入功能正常
3. 验证更新模式功能正常
测试数据准备
- 个人中介2条记录
- 机构中介2条记录
"""
import requests
import json
from datetime import datetime
BASE_URL = "http://localhost:8080"
def login():
"""登录并获取token"""
url = f"{BASE_URL}/login/test"
data = {
"username": "admin",
"password": "admin123"
}
response = requests.post(url, json=data)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
token = result.get("token")
print(f"✓ 登录成功获取token: {token[:20]}...")
return token
print(f"✗ 登录失败: {response.text}")
return None
def get_headers(token):
"""获取请求头"""
return {
"Authorization": f"Bearer {token}"
}
def test_import_person_intermediary(token):
"""测试个人中介导入"""
print("\n" + "="*60)
print("测试1: 个人中介导入功能")
print("="*60)
# 准备个人中介数据直接通过API调用测试
url = f"{BASE_URL}/dpc/intermediary"
headers = get_headers(token)
headers["Content-Type"] = "application/json"
person_data = {
"name": "测试个人中介",
"certificateNo": "110101199001011234",
"intermediaryType": "1",
"status": "0",
"remark": "测试个人中介导入",
"indivType": "中介",
"indivSubType": "本人",
"indivGender": "M",
"indivCertType": "身份证",
"indivPhone": "13800138000",
"indivWechat": "test_wx_id",
"indivAddress": "北京市朝阳区",
"indivCompany": "测试公司",
"indivPosition": "经纪人"
}
response = requests.post(url, json=person_data, headers=headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
print("✓ 个人中介导入成功")
return True
else:
print(f"✗ 个人中介导入失败: {result.get('msg')}")
return False
else:
print(f"✗ 个人中介导入请求失败: {response.status_code} - {response.text}")
return False
def test_import_entity_intermediary(token):
"""测试机构中介导入"""
print("\n" + "="*60)
print("测试2: 机构中介导入功能")
print("="*60)
# 准备机构中介数据
url = f"{BASE_URL}/dpc/intermediary"
headers = get_headers(token)
headers["Content-Type"] = "application/json"
entity_data = {
"name": "测试机构中介有限公司",
"certificateNo": "91110108MA0000001A", # 统一社会信用代码
"intermediaryType": "2",
"status": "0",
"remark": "测试机构中介导入",
"corpCreditCode": "91110108MA0000001A",
"corpType": "有限责任公司",
"corpNature": "民营企业",
"corpIndustryCategory": "房地产业",
"corpIndustry": "房地产中介服务",
"corpEstablishDate": "2020-01-01",
"corpAddress": "北京市海淀区",
"corpLegalRep": "张三",
"corpLegalCertType": "身份证",
"corpLegalCertNo": "110101199001011235",
"corpShareholder1": "李四",
"corpShareholder2": "王五"
}
response = requests.post(url, json=entity_data, headers=headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
print("✓ 机构中介导入成功")
print(f" - 机构名称: {entity_data['name']}")
print(f" - 统一社会信用代码: {entity_data['corpCreditCode']}")
print(f" - 证件号字段: {entity_data['certificateNo']}")
return True
else:
print(f"✗ 机构中介导入失败: {result.get('msg')}")
return False
else:
print(f"✗ 机构中介导入请求失败: {response.status_code} - {response.text}")
return False
def test_import_entity_without_credit_code(token):
"""测试机构中介导入时统一社会信用代码为空的情况"""
print("\n" + "="*60)
print("测试4: 机构中介导入时统一社会信用代码为空(应该失败)")
print("="*60)
url = f"{BASE_URL}/dpc/intermediary"
headers = get_headers(token)
headers["Content-Type"] = "application/json"
# 故意不提供统一社会信用代码
entity_data = {
"name": "测试机构中介有限公司(无信用代码)",
"certificateNo": "", # 空字符串
"intermediaryType": "2",
"status": "0",
"remark": "测试统一社会信用代码为空的情况",
"corpCreditCode": "", # 空字符串
"corpType": "有限责任公司"
}
response = requests.post(url, json=entity_data, headers=headers)
if response.status_code == 200:
result = response.json()
if result.get("code") != 200:
# 预期失败
print(f"✓ 预期行为:导入被拒绝,错误信息: {result.get('msg')}")
return True
else:
# 不应该成功
print(f"✗ 测试失败:统一社会信用代码为空时不应该导入成功")
return False
else:
print(f"✗ 请求失败: {response.status_code} - {response.text}")
return False
def test_query_intermediary_list(token):
"""测试查询中介列表"""
print("\n" + "="*60)
print("测试3: 查询中介列表")
print("="*60)
url = f"{BASE_URL}/dpc/intermediary/list"
headers = get_headers(token)
params = {
"pageNum": 1,
"pageSize": 10
}
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
rows = result.get("rows", [])
total = result.get("total", 0)
print(f"✓ 查询成功,共 {total} 条记录")
for item in rows:
print(f" - {item['name']} ({item.get('intermediaryTypeName', '未知')}) - 证件号: {item.get('certificateNo', '')}")
return True
else:
print(f"✗ 查询失败: {result.get('msg')}")
return False
else:
print(f"✗ 查询请求失败: {response.status_code} - {response.text}")
return False
def generate_test_report(results):
"""生成测试报告"""
print("\n" + "="*60)
print("测试报告")
print("="*60)
total_tests = len(results)
passed_tests = sum(1 for r in results.values() if r)
failed_tests = total_tests - passed_tests
print(f"\n总测试数: {total_tests}")
print(f"通过: {passed_tests}")
print(f"失败: {failed_tests}")
print(f"通过率: {passed_tests/total_tests*100:.1f}%")
print("\n详细结果:")
for test_name, result in results.items():
status = "✓ 通过" if result else "✗ 失败"
print(f" {test_name}: {status}")
# 保存报告到文件
report_content = {
"测试时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"总测试数": total_tests,
"通过": passed_tests,
"失败": failed_tests,
"通过率": f"{passed_tests/total_tests*100:.1f}%",
"详细结果": {k: "通过" if v else "失败" for k, v in results.items()}
}
with open("doc/test-data/import_test_report.json", "w", encoding="utf-8") as f:
json.dump(report_content, f, ensure_ascii=False, indent=2)
print(f"\n测试报告已保存至: doc/test-data/import_test_report.json")
def main():
"""主测试函数"""
print("="*60)
print("中介黑名单导入功能测试")
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print("="*60)
results = {}
# 1. 登录
token = login()
if not token:
print("登录失败,无法继续测试")
return
# 2. 测试个人中介导入
results["个人中介导入"] = test_import_person_intermediary(token)
# 3. 测试机构中介导入
results["机构中介导入"] = test_import_entity_intermediary(token)
# 4. 测试统一社会信用代码为空的情况
results["机构中介无信用代码校验"] = test_import_entity_without_credit_code(token)
# 5. 测试查询列表
results["查询列表"] = test_query_intermediary_list(token)
# 5. 生成测试报告
generate_test_report(results)
print("\n" + "="*60)
print("测试完成")
print("="*60)
if __name__ == "__main__":
main()