269 lines
8.6 KiB
Python
269 lines
8.6 KiB
Python
|
|
"""
|
|||
|
|
中介黑名单导入功能测试脚本
|
|||
|
|
|
|||
|
|
测试目标:
|
|||
|
|
1. 验证机构中介导入时 certificate_no 字段不能为 null 的修复
|
|||
|
|
2. 验证个人中介导入功能正常
|
|||
|
|
3. 验证更新模式功能正常
|
|||
|
|
|
|||
|
|
测试数据准备:
|
|||
|
|
- 个人中介:2条记录
|
|||
|
|
- 机构中介:2条记录
|
|||
|
|
"""
|
|||
|
|
|
|||
|
|
import requests
|
|||
|
|
import json
|
|||
|
|
from datetime import datetime
|
|||
|
|
|
|||
|
|
BASE_URL = "http://localhost:8080"
|
|||
|
|
|
|||
|
|
def login():
|
|||
|
|
"""登录并获取token"""
|
|||
|
|
url = f"{BASE_URL}/login/test"
|
|||
|
|
data = {
|
|||
|
|
"username": "admin",
|
|||
|
|
"password": "admin123"
|
|||
|
|
}
|
|||
|
|
response = requests.post(url, json=data)
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get("code") == 200:
|
|||
|
|
token = result.get("token")
|
|||
|
|
print(f"✓ 登录成功,获取token: {token[:20]}...")
|
|||
|
|
return token
|
|||
|
|
print(f"✗ 登录失败: {response.text}")
|
|||
|
|
return None
|
|||
|
|
|
|||
|
|
def get_headers(token):
|
|||
|
|
"""获取请求头"""
|
|||
|
|
return {
|
|||
|
|
"Authorization": f"Bearer {token}"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
def test_import_person_intermediary(token):
|
|||
|
|
"""测试个人中介导入"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试1: 个人中介导入功能")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 准备个人中介数据(直接通过API调用测试)
|
|||
|
|
url = f"{BASE_URL}/dpc/intermediary"
|
|||
|
|
headers = get_headers(token)
|
|||
|
|
headers["Content-Type"] = "application/json"
|
|||
|
|
|
|||
|
|
person_data = {
|
|||
|
|
"name": "测试个人中介",
|
|||
|
|
"certificateNo": "110101199001011234",
|
|||
|
|
"intermediaryType": "1",
|
|||
|
|
"status": "0",
|
|||
|
|
"remark": "测试个人中介导入",
|
|||
|
|
"indivType": "中介",
|
|||
|
|
"indivSubType": "本人",
|
|||
|
|
"indivGender": "M",
|
|||
|
|
"indivCertType": "身份证",
|
|||
|
|
"indivPhone": "13800138000",
|
|||
|
|
"indivWechat": "test_wx_id",
|
|||
|
|
"indivAddress": "北京市朝阳区",
|
|||
|
|
"indivCompany": "测试公司",
|
|||
|
|
"indivPosition": "经纪人"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.post(url, json=person_data, headers=headers)
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get("code") == 200:
|
|||
|
|
print("✓ 个人中介导入成功")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 个人中介导入失败: {result.get('msg')}")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 个人中介导入请求失败: {response.status_code} - {response.text}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def test_import_entity_intermediary(token):
|
|||
|
|
"""测试机构中介导入"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试2: 机构中介导入功能")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
# 准备机构中介数据
|
|||
|
|
url = f"{BASE_URL}/dpc/intermediary"
|
|||
|
|
headers = get_headers(token)
|
|||
|
|
headers["Content-Type"] = "application/json"
|
|||
|
|
|
|||
|
|
entity_data = {
|
|||
|
|
"name": "测试机构中介有限公司",
|
|||
|
|
"certificateNo": "91110108MA0000001A", # 统一社会信用代码
|
|||
|
|
"intermediaryType": "2",
|
|||
|
|
"status": "0",
|
|||
|
|
"remark": "测试机构中介导入",
|
|||
|
|
"corpCreditCode": "91110108MA0000001A",
|
|||
|
|
"corpType": "有限责任公司",
|
|||
|
|
"corpNature": "民营企业",
|
|||
|
|
"corpIndustryCategory": "房地产业",
|
|||
|
|
"corpIndustry": "房地产中介服务",
|
|||
|
|
"corpEstablishDate": "2020-01-01",
|
|||
|
|
"corpAddress": "北京市海淀区",
|
|||
|
|
"corpLegalRep": "张三",
|
|||
|
|
"corpLegalCertType": "身份证",
|
|||
|
|
"corpLegalCertNo": "110101199001011235",
|
|||
|
|
"corpShareholder1": "李四",
|
|||
|
|
"corpShareholder2": "王五"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.post(url, json=entity_data, headers=headers)
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get("code") == 200:
|
|||
|
|
print("✓ 机构中介导入成功")
|
|||
|
|
print(f" - 机构名称: {entity_data['name']}")
|
|||
|
|
print(f" - 统一社会信用代码: {entity_data['corpCreditCode']}")
|
|||
|
|
print(f" - 证件号字段: {entity_data['certificateNo']}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 机构中介导入失败: {result.get('msg')}")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 机构中介导入请求失败: {response.status_code} - {response.text}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def test_import_entity_without_credit_code(token):
|
|||
|
|
"""测试机构中介导入时统一社会信用代码为空的情况"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试4: 机构中介导入时统一社会信用代码为空(应该失败)")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
url = f"{BASE_URL}/dpc/intermediary"
|
|||
|
|
headers = get_headers(token)
|
|||
|
|
headers["Content-Type"] = "application/json"
|
|||
|
|
|
|||
|
|
# 故意不提供统一社会信用代码
|
|||
|
|
entity_data = {
|
|||
|
|
"name": "测试机构中介有限公司(无信用代码)",
|
|||
|
|
"certificateNo": "", # 空字符串
|
|||
|
|
"intermediaryType": "2",
|
|||
|
|
"status": "0",
|
|||
|
|
"remark": "测试统一社会信用代码为空的情况",
|
|||
|
|
"corpCreditCode": "", # 空字符串
|
|||
|
|
"corpType": "有限责任公司"
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.post(url, json=entity_data, headers=headers)
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get("code") != 200:
|
|||
|
|
# 预期失败
|
|||
|
|
print(f"✓ 预期行为:导入被拒绝,错误信息: {result.get('msg')}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
# 不应该成功
|
|||
|
|
print(f"✗ 测试失败:统一社会信用代码为空时不应该导入成功")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 请求失败: {response.status_code} - {response.text}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def test_query_intermediary_list(token):
|
|||
|
|
"""测试查询中介列表"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试3: 查询中介列表")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
url = f"{BASE_URL}/dpc/intermediary/list"
|
|||
|
|
headers = get_headers(token)
|
|||
|
|
|
|||
|
|
params = {
|
|||
|
|
"pageNum": 1,
|
|||
|
|
"pageSize": 10
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
response = requests.get(url, params=params, headers=headers)
|
|||
|
|
if response.status_code == 200:
|
|||
|
|
result = response.json()
|
|||
|
|
if result.get("code") == 200:
|
|||
|
|
rows = result.get("rows", [])
|
|||
|
|
total = result.get("total", 0)
|
|||
|
|
print(f"✓ 查询成功,共 {total} 条记录")
|
|||
|
|
for item in rows:
|
|||
|
|
print(f" - {item['name']} ({item.get('intermediaryTypeName', '未知')}) - 证件号: {item.get('certificateNo', '无')}")
|
|||
|
|
return True
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 查询失败: {result.get('msg')}")
|
|||
|
|
return False
|
|||
|
|
else:
|
|||
|
|
print(f"✗ 查询请求失败: {response.status_code} - {response.text}")
|
|||
|
|
return False
|
|||
|
|
|
|||
|
|
def generate_test_report(results):
|
|||
|
|
"""生成测试报告"""
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试报告")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
total_tests = len(results)
|
|||
|
|
passed_tests = sum(1 for r in results.values() if r)
|
|||
|
|
failed_tests = total_tests - passed_tests
|
|||
|
|
|
|||
|
|
print(f"\n总测试数: {total_tests}")
|
|||
|
|
print(f"通过: {passed_tests}")
|
|||
|
|
print(f"失败: {failed_tests}")
|
|||
|
|
print(f"通过率: {passed_tests/total_tests*100:.1f}%")
|
|||
|
|
|
|||
|
|
print("\n详细结果:")
|
|||
|
|
for test_name, result in results.items():
|
|||
|
|
status = "✓ 通过" if result else "✗ 失败"
|
|||
|
|
print(f" {test_name}: {status}")
|
|||
|
|
|
|||
|
|
# 保存报告到文件
|
|||
|
|
report_content = {
|
|||
|
|
"测试时间": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
|
|||
|
|
"总测试数": total_tests,
|
|||
|
|
"通过": passed_tests,
|
|||
|
|
"失败": failed_tests,
|
|||
|
|
"通过率": f"{passed_tests/total_tests*100:.1f}%",
|
|||
|
|
"详细结果": {k: "通过" if v else "失败" for k, v in results.items()}
|
|||
|
|
}
|
|||
|
|
|
|||
|
|
with open("doc/test-data/import_test_report.json", "w", encoding="utf-8") as f:
|
|||
|
|
json.dump(report_content, f, ensure_ascii=False, indent=2)
|
|||
|
|
|
|||
|
|
print(f"\n测试报告已保存至: doc/test-data/import_test_report.json")
|
|||
|
|
|
|||
|
|
def main():
|
|||
|
|
"""主测试函数"""
|
|||
|
|
print("="*60)
|
|||
|
|
print("中介黑名单导入功能测试")
|
|||
|
|
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
results = {}
|
|||
|
|
|
|||
|
|
# 1. 登录
|
|||
|
|
token = login()
|
|||
|
|
if not token:
|
|||
|
|
print("登录失败,无法继续测试")
|
|||
|
|
return
|
|||
|
|
|
|||
|
|
# 2. 测试个人中介导入
|
|||
|
|
results["个人中介导入"] = test_import_person_intermediary(token)
|
|||
|
|
|
|||
|
|
# 3. 测试机构中介导入
|
|||
|
|
results["机构中介导入"] = test_import_entity_intermediary(token)
|
|||
|
|
|
|||
|
|
# 4. 测试统一社会信用代码为空的情况
|
|||
|
|
results["机构中介无信用代码校验"] = test_import_entity_without_credit_code(token)
|
|||
|
|
|
|||
|
|
# 5. 测试查询列表
|
|||
|
|
results["查询列表"] = test_query_intermediary_list(token)
|
|||
|
|
|
|||
|
|
# 5. 生成测试报告
|
|||
|
|
generate_test_report(results)
|
|||
|
|
|
|||
|
|
print("\n" + "="*60)
|
|||
|
|
print("测试完成")
|
|||
|
|
print("="*60)
|
|||
|
|
|
|||
|
|
if __name__ == "__main__":
|
|||
|
|
main()
|