中介新增 修改接口
This commit is contained in:
268
doc/test-data/test_import_fix.py
Normal file
268
doc/test-data/test_import_fix.py
Normal file
@@ -0,0 +1,268 @@
|
||||
"""
|
||||
中介黑名单导入功能测试脚本
|
||||
|
||||
测试目标:
|
||||
1. 验证机构中介导入时 certificate_no 字段不能为 null 的修复
|
||||
2. 验证个人中介导入功能正常
|
||||
3. 验证更新模式功能正常
|
||||
|
||||
测试数据准备:
|
||||
- 个人中介:2条记录
|
||||
- 机构中介:2条记录
|
||||
"""
|
||||
|
||||
import requests
|
||||
import json
|
||||
from datetime import datetime
|
||||
|
||||
BASE_URL = "http://localhost:8080"
|
||||
|
||||
def login():
|
||||
"""登录并获取token"""
|
||||
url = f"{BASE_URL}/login/test"
|
||||
data = {
|
||||
"username": "admin",
|
||||
"password": "admin123"
|
||||
}
|
||||
response = requests.post(url, json=data)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("code") == 200:
|
||||
token = result.get("token")
|
||||
print(f"✓ 登录成功,获取token: {token[:20]}...")
|
||||
return token
|
||||
print(f"✗ 登录失败: {response.text}")
|
||||
return None
|
||||
|
||||
def get_headers(token):
|
||||
"""获取请求头"""
|
||||
return {
|
||||
"Authorization": f"Bearer {token}"
|
||||
}
|
||||
|
||||
def test_import_person_intermediary(token):
|
||||
"""测试个人中介导入"""
|
||||
print("\n" + "="*60)
|
||||
print("测试1: 个人中介导入功能")
|
||||
print("="*60)
|
||||
|
||||
# 准备个人中介数据(直接通过API调用测试)
|
||||
url = f"{BASE_URL}/dpc/intermediary"
|
||||
headers = get_headers(token)
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
person_data = {
|
||||
"name": "测试个人中介",
|
||||
"certificateNo": "110101199001011234",
|
||||
"intermediaryType": "1",
|
||||
"status": "0",
|
||||
"remark": "测试个人中介导入",
|
||||
"indivType": "中介",
|
||||
"indivSubType": "本人",
|
||||
"indivGender": "M",
|
||||
"indivCertType": "身份证",
|
||||
"indivPhone": "13800138000",
|
||||
"indivWechat": "test_wx_id",
|
||||
"indivAddress": "北京市朝阳区",
|
||||
"indivCompany": "测试公司",
|
||||
"indivPosition": "经纪人"
|
||||
}
|
||||
|
||||
response = requests.post(url, json=person_data, headers=headers)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("code") == 200:
|
||||
print("✓ 个人中介导入成功")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ 个人中介导入失败: {result.get('msg')}")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ 个人中介导入请求失败: {response.status_code} - {response.text}")
|
||||
return False
|
||||
|
||||
def test_import_entity_intermediary(token):
|
||||
"""测试机构中介导入"""
|
||||
print("\n" + "="*60)
|
||||
print("测试2: 机构中介导入功能")
|
||||
print("="*60)
|
||||
|
||||
# 准备机构中介数据
|
||||
url = f"{BASE_URL}/dpc/intermediary"
|
||||
headers = get_headers(token)
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
entity_data = {
|
||||
"name": "测试机构中介有限公司",
|
||||
"certificateNo": "91110108MA0000001A", # 统一社会信用代码
|
||||
"intermediaryType": "2",
|
||||
"status": "0",
|
||||
"remark": "测试机构中介导入",
|
||||
"corpCreditCode": "91110108MA0000001A",
|
||||
"corpType": "有限责任公司",
|
||||
"corpNature": "民营企业",
|
||||
"corpIndustryCategory": "房地产业",
|
||||
"corpIndustry": "房地产中介服务",
|
||||
"corpEstablishDate": "2020-01-01",
|
||||
"corpAddress": "北京市海淀区",
|
||||
"corpLegalRep": "张三",
|
||||
"corpLegalCertType": "身份证",
|
||||
"corpLegalCertNo": "110101199001011235",
|
||||
"corpShareholder1": "李四",
|
||||
"corpShareholder2": "王五"
|
||||
}
|
||||
|
||||
response = requests.post(url, json=entity_data, headers=headers)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("code") == 200:
|
||||
print("✓ 机构中介导入成功")
|
||||
print(f" - 机构名称: {entity_data['name']}")
|
||||
print(f" - 统一社会信用代码: {entity_data['corpCreditCode']}")
|
||||
print(f" - 证件号字段: {entity_data['certificateNo']}")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ 机构中介导入失败: {result.get('msg')}")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ 机构中介导入请求失败: {response.status_code} - {response.text}")
|
||||
return False
|
||||
|
||||
def test_import_entity_without_credit_code(token):
|
||||
"""测试机构中介导入时统一社会信用代码为空的情况"""
|
||||
print("\n" + "="*60)
|
||||
print("测试4: 机构中介导入时统一社会信用代码为空(应该失败)")
|
||||
print("="*60)
|
||||
|
||||
url = f"{BASE_URL}/dpc/intermediary"
|
||||
headers = get_headers(token)
|
||||
headers["Content-Type"] = "application/json"
|
||||
|
||||
# 故意不提供统一社会信用代码
|
||||
entity_data = {
|
||||
"name": "测试机构中介有限公司(无信用代码)",
|
||||
"certificateNo": "", # 空字符串
|
||||
"intermediaryType": "2",
|
||||
"status": "0",
|
||||
"remark": "测试统一社会信用代码为空的情况",
|
||||
"corpCreditCode": "", # 空字符串
|
||||
"corpType": "有限责任公司"
|
||||
}
|
||||
|
||||
response = requests.post(url, json=entity_data, headers=headers)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("code") != 200:
|
||||
# 预期失败
|
||||
print(f"✓ 预期行为:导入被拒绝,错误信息: {result.get('msg')}")
|
||||
return True
|
||||
else:
|
||||
# 不应该成功
|
||||
print(f"✗ 测试失败:统一社会信用代码为空时不应该导入成功")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ 请求失败: {response.status_code} - {response.text}")
|
||||
return False
|
||||
|
||||
def test_query_intermediary_list(token):
|
||||
"""测试查询中介列表"""
|
||||
print("\n" + "="*60)
|
||||
print("测试3: 查询中介列表")
|
||||
print("="*60)
|
||||
|
||||
url = f"{BASE_URL}/dpc/intermediary/list"
|
||||
headers = get_headers(token)
|
||||
|
||||
params = {
|
||||
"pageNum": 1,
|
||||
"pageSize": 10
|
||||
}
|
||||
|
||||
response = requests.get(url, params=params, headers=headers)
|
||||
if response.status_code == 200:
|
||||
result = response.json()
|
||||
if result.get("code") == 200:
|
||||
rows = result.get("rows", [])
|
||||
total = result.get("total", 0)
|
||||
print(f"✓ 查询成功,共 {total} 条记录")
|
||||
for item in rows:
|
||||
print(f" - {item['name']} ({item.get('intermediaryTypeName', '未知')}) - 证件号: {item.get('certificateNo', '无')}")
|
||||
return True
|
||||
else:
|
||||
print(f"✗ 查询失败: {result.get('msg')}")
|
||||
return False
|
||||
else:
|
||||
print(f"✗ 查询请求失败: {response.status_code} - {response.text}")
|
||||
return False
|
||||
|
||||
def generate_test_report(results):
|
||||
"""生成测试报告"""
|
||||
print("\n" + "="*60)
|
||||
print("测试报告")
|
||||
print("="*60)
|
||||
|
||||
total_tests = len(results)
|
||||
passed_tests = sum(1 for r in results.values() if r)
|
||||
failed_tests = total_tests - passed_tests
|
||||
|
||||
print(f"\n总测试数: {total_tests}")
|
||||
print(f"通过: {passed_tests}")
|
||||
print(f"失败: {failed_tests}")
|
||||
print(f"通过率: {passed_tests/total_tests*100:.1f}%")
|
||||
|
||||
print("\n详细结果:")
|
||||
for test_name, result in results.items():
|
||||
status = "✓ 通过" if result else "✗ 失败"
|
||||
print(f" {test_name}: {status}")
|
||||
|
||||
# 保存报告到文件
|
||||
report_content = {
|
||||
"测试时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
||||
"总测试数": total_tests,
|
||||
"通过": passed_tests,
|
||||
"失败": failed_tests,
|
||||
"通过率": f"{passed_tests/total_tests*100:.1f}%",
|
||||
"详细结果": {k: "通过" if v else "失败" for k, v in results.items()}
|
||||
}
|
||||
|
||||
with open("doc/test-data/import_test_report.json", "w", encoding="utf-8") as f:
|
||||
json.dump(report_content, f, ensure_ascii=False, indent=2)
|
||||
|
||||
print(f"\n测试报告已保存至: doc/test-data/import_test_report.json")
|
||||
|
||||
def main():
|
||||
"""主测试函数"""
|
||||
print("="*60)
|
||||
print("中介黑名单导入功能测试")
|
||||
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
||||
print("="*60)
|
||||
|
||||
results = {}
|
||||
|
||||
# 1. 登录
|
||||
token = login()
|
||||
if not token:
|
||||
print("登录失败,无法继续测试")
|
||||
return
|
||||
|
||||
# 2. 测试个人中介导入
|
||||
results["个人中介导入"] = test_import_person_intermediary(token)
|
||||
|
||||
# 3. 测试机构中介导入
|
||||
results["机构中介导入"] = test_import_entity_intermediary(token)
|
||||
|
||||
# 4. 测试统一社会信用代码为空的情况
|
||||
results["机构中介无信用代码校验"] = test_import_entity_without_credit_code(token)
|
||||
|
||||
# 5. 测试查询列表
|
||||
results["查询列表"] = test_query_intermediary_list(token)
|
||||
|
||||
# 5. 生成测试报告
|
||||
generate_test_report(results)
|
||||
|
||||
print("\n" + "="*60)
|
||||
print("测试完成")
|
||||
print("="*60)
|
||||
|
||||
if __name__ == "__main__":
|
||||
main()
|
||||
Reference in New Issue
Block a user