Files
ccdi/doc/test-scripts/test_import_duplicate_detection.py

929 lines
33 KiB
Python
Raw Normal View History

#!/usr/bin/env python
# -*- coding: utf-8 -*-
"""
导入文件内部主键重复检测功能测试脚本
测试目标:
1. 采购交易导入 - Excel内采购事项ID重复检测
2. 员工信息导入 - Excel内柜员号和身份证号重复检测
作者: 测试专家
日期: 2026-02-09
"""
import os
import sys
import time
import json
import requests
from openpyxl import Workbook
from datetime import datetime, timedelta
from typing import List, Dict, Tuple
# ==================== 配置部分 ====================
BASE_URL = "http://localhost:8080"
LOGIN_URL = f"{BASE_URL}/login/test"
# 测试账号
USERNAME = "admin"
PASSWORD = "admin123"
# 测试结果保存目录
REPORT_DIR = "D:/ccdi/ccdi/doc/test-reports"
EXCEL_DIR = "D:/ccdi/ccdi/doc/test-data/temp"
# 创建必要目录
os.makedirs(REPORT_DIR, exist_ok=True)
os.makedirs(EXCEL_DIR, exist_ok=True)
class APIClient:
"""API客户端"""
def __init__(self, base_url: str):
self.base_url = base_url
self.token = None
self.session = requests.Session()
def login(self, username: str, password: str) -> bool:
"""登录获取token"""
try:
response = self.session.post(
LOGIN_URL,
json={"username": username, "password": password},
timeout=10
)
result = response.json()
if result.get("code") == 200:
self.token = result.get("data")
print(f"✓ 登录成功, Token: {self.token[:20]}...")
return True
else:
print(f"✗ 登录失败: {result.get('msg')}")
return False
except Exception as e:
print(f"✗ 登录异常: {str(e)}")
return False
def get_headers(self) -> Dict:
"""获取请求头"""
return {
"Authorization": f"Bearer {self.token}",
"Content-Type": "application/json"
}
def upload_file(self, url: str, file_path: str) -> Dict:
"""上传文件"""
try:
with open(file_path, 'rb') as f:
files = {'file': (os.path.basename(file_path), f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')}
headers = {"Authorization": f"Bearer {self.token}"}
response = self.session.post(url, files=files, headers=headers, timeout=30)
return response.json()
except Exception as e:
return {"code": 500, "msg": f"上传失败: {str(e)}"}
def get_import_status(self, url: str) -> Dict:
"""查询导入状态"""
try:
response = self.session.get(url, headers=self.get_headers(), timeout=10)
return response.json()
except Exception as e:
return {"code": 500, "msg": f"查询状态失败: {str(e)}"}
def get_import_failures(self, url: str) -> Dict:
"""查询导入失败记录"""
try:
response = self.session.get(url, headers=self.get_headers(), timeout=10)
return response.json()
except Exception as e:
return {"code": 500, "msg": f"查询失败记录失败: {str(e)}"}
class ExcelGenerator:
"""Excel测试数据生成器"""
@staticmethod
def create_purchase_duplicate_data() -> str:
"""
场景1: Excel内采购事项ID重复
3条记录,采购事项ID都是 PURCHASE001
"""
wb = Workbook()
ws = wb.active
ws.title = "采购交易重复测试"
# 表头
headers = [
"采购事项ID", "采购类别", "项目名称", "标的物名称", "标的物描述",
"采购数量", "预算金额", "采购方式", "采购申请日期",
"申请人工号", "申请人姓名", "申请部门"
]
ws.append(headers)
# 测试数据 - 3条相同采购事项ID
base_date = datetime.now()
for i in range(3):
apply_date = base_date + timedelta(days=i)
ws.append([
f"PURCHASE001", # 相同的采购事项ID
f"采购类别{i+1}",
f"项目名称{i+1}",
f"标的物名称{i+1}",
f"标的物描述{i+1}",
10 + i,
10000.00 + i * 1000,
"公开招标",
apply_date.strftime("%Y-%m-%d"),
"1000001",
"张三",
"技术部"
])
file_path = os.path.join(EXCEL_DIR, "purchase_duplicate.xlsx")
wb.save(file_path)
return file_path
@staticmethod
def create_employee_employee_id_duplicate() -> str:
"""
场景2: Excel内员工柜员号重复
3条记录,柜员号都是 10001,身份证号不同
"""
wb = Workbook()
ws = wb.active
ws.title = "员工柜员号重复测试"
# 表头
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
ws.append(headers)
# 测试数据 - 3条相同柜员号
for i in range(3):
ws.append([
f"员工{i+1}",
10001, # 相同的柜员号
103,
f"110101199001011{234+i}", # 不同的身份证号
f"1380000000{i}",
"2024-01-01",
"0"
])
file_path = os.path.join(EXCEL_DIR, "employee_employee_id_duplicate.xlsx")
wb.save(file_path)
return file_path
@staticmethod
def create_employee_id_card_duplicate() -> str:
"""
场景3: Excel内员工身份证号重复
3条记录,柜员号不同,身份证号相同
"""
wb = Workbook()
ws = wb.active
ws.title = "员工身份证号重复测试"
# 表头
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
ws.append(headers)
# 测试数据 - 3条相同身份证号
for i in range(3):
ws.append([
f"员工{i+1}",
10001 + i, # 不同的柜员号
103,
"110101199001011234", # 相同的身份证号
f"1380000000{i}",
"2024-01-01",
"0"
])
file_path = os.path.join(EXCEL_DIR, "employee_id_card_duplicate.xlsx")
wb.save(file_path)
return file_path
@staticmethod
def create_mixed_duplicate_scenario() -> Tuple[str, str]:
"""
场景4: 混合重复(数据库+Excel)
- 第1条: 数据库中已存在
- 第2条: 全新数据
- 第3条: 与第2条Excel内重复
- 第4条: 全新数据
"""
# 采购交易混合重复数据
wb_purchase = Workbook()
ws_purchase = wb_purchase.active
ws_purchase.title = "采购混合重复测试"
headers = [
"采购事项ID", "采购类别", "项目名称", "标的物名称", "标的物描述",
"采购数量", "预算金额", "采购方式", "采购申请日期",
"申请人工号", "申请人姓名", "申请部门"
]
ws_purchase.append(headers)
base_date = datetime.now()
# 第1条: 数据库中已存在(需要先手动插入数据库)
ws_purchase.append([
"EXIST001", # 假设数据库中已存在
"采购类别1",
"项目名称1",
"标的物名称1",
"标的物描述1",
10,
10000.00,
"公开招标",
base_date.strftime("%Y-%m-%d"),
"1000001",
"张三",
"技术部"
])
# 第2条: 全新数据
ws_purchase.append([
"NEW001", # 新的采购事项ID
"采购类别2",
"项目名称2",
"标的物名称2",
"标的物描述2",
20,
20000.00,
"邀请招标",
(base_date + timedelta(days=1)).strftime("%Y-%m-%d"),
"1000002",
"李四",
"市场部"
])
# 第3条: 与第2条Excel内重复
ws_purchase.append([
"NEW001", # 与第2条重复
"采购类别3",
"项目名称3",
"标的物名称3",
"标的物描述3",
30,
30000.00,
"竞争性谈判",
(base_date + timedelta(days=2)).strftime("%Y-%m-%d"),
"1000003",
"王五",
"财务部"
])
# 第4条: 全新数据
ws_purchase.append([
"NEW002", # 新的采购事项ID
"采购类别4",
"项目名称4",
"标的物名称4",
"标的物描述4",
40,
40000.00,
"单一来源",
(base_date + timedelta(days=3)).strftime("%Y-%m-%d"),
"1000004",
"赵六",
"人事部"
])
purchase_file = os.path.join(EXCEL_DIR, "purchase_mixed_duplicate.xlsx")
wb_purchase.save(purchase_file)
# 员工混合重复数据
wb_employee = Workbook()
ws_employee = wb_employee.active
ws_employee.title = "员工混合重复测试"
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
ws_employee.append(headers)
# 第1条: 数据库中已存在(假设柜员号99999已存在)
ws_employee.append([
"已存在员工",
99999, # 假设数据库中已存在
103,
"110101199001019999",
"13900000000",
"2024-01-01",
"0"
])
# 第2条: 全新数据
ws_employee.append([
"新员工1",
90001, # 新柜员号
103,
"110101199001011111",
"13800000001",
"2024-01-01",
"0"
])
# 第3条: 与第2条Excel内重复(柜员号重复)
ws_employee.append([
"新员工2",
90001, # 与第2条柜员号重复
103,
"110101199001012222",
"13800000002",
"2024-01-01",
"0"
])
# 第4条: 全新数据
ws_employee.append([
"新员工3",
90002, # 新柜员号
103,
"110101199001013333",
"13800000003",
"2024-01-01",
"0"
])
employee_file = os.path.join(EXCEL_DIR, "employee_mixed_duplicate.xlsx")
wb_employee.save(employee_file)
return purchase_file, employee_file
class TestCase:
"""测试用例基类"""
def __init__(self, name: str, description: str):
self.name = name
self.description = description
self.start_time = None
self.end_time = None
self.passed = False
self.error_message = None
self.details = {}
def run(self, client: APIClient):
"""运行测试用例"""
raise NotImplementedError
def to_dict(self) -> Dict:
"""转换为字典"""
return {
"name": self.name,
"description": self.description,
"passed": self.passed,
"error_message": self.error_message,
"details": self.details,
"duration": f"{(self.end_time - self.start_time).total_seconds():.2f}s" if self.start_time and self.end_time else "N/A"
}
class PurchaseDuplicateTestCase(TestCase):
"""场景1: Excel内采购事项ID重复测试"""
def __init__(self):
super().__init__(
"采购交易 - Excel内采购事项ID重复",
"测试导入3条采购事项ID相同的记录,预期第1条成功,第2、3条失败"
)
def run(self, client: APIClient):
self.start_time = datetime.now()
try:
# 生成测试数据
file_path = ExcelGenerator.create_purchase_duplicate_data()
print(f" ✓ 生成测试数据: {file_path}")
# 上传文件
upload_url = f"{BASE_URL}/ccdi/purchaseTransaction/importData"
upload_result = client.upload_file(upload_url, file_path)
if upload_result.get("code") != 200:
self.error_message = f"上传失败: {upload_result.get('msg')}"
self.end_time = datetime.now()
return
task_id = upload_result.get("data", {}).get("taskId")
print(f" ✓ 上传成功, TaskID: {task_id}")
# 等待异步任务完成
time.sleep(3)
# 查询导入状态
status_url = f"{BASE_URL}/ccdi/purchaseTransaction/importStatus/{task_id}"
status_result = client.get_import_status(status_url)
if status_result.get("code") != 200:
self.error_message = f"查询状态失败: {status_result.get('msg')}"
self.end_time = datetime.now()
return
status_data = status_result.get("data", {})
print(f" ✓ 导入状态: {status_data}")
# 查询失败记录
failures_url = f"{BASE_URL}/ccdi/purchaseTransaction/importFailures/{task_id}"
failures_result = client.get_import_failures(failures_url)
if failures_result.get("code") != 200:
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
self.end_time = datetime.now()
return
failures = failures_result.get("rows", [])
# 验证结果
# 预期: 成功1条,失败2条
expected_success = 1
expected_failure = 2
actual_success = status_data.get("successCount", 0)
actual_failure = status_data.get("failureCount", 0)
self.details = {
"expected_success": expected_success,
"expected_failure": expected_failure,
"actual_success": actual_success,
"actual_failure": actual_failure,
"failures": failures
}
# 验证成功/失败数量
if actual_success != expected_success or actual_failure != expected_failure:
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
self.end_time = datetime.now()
return
# 验证失败消息
if len(failures) < 2:
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}"
self.end_time = datetime.now()
return
# 检查失败消息是否包含"在导入文件中重复"
error_msg_1 = failures[0].get("errorMessage", "")
error_msg_2 = failures[1].get("errorMessage", "")
if "在导入文件中重复" not in error_msg_1 or "在导入文件中重复" not in error_msg_2:
self.error_message = f"错误消息不正确: {error_msg_1}, {error_msg_2}"
self.end_time = datetime.now()
return
self.passed = True
print(f" ✓ 测试通过")
except Exception as e:
self.error_message = f"测试异常: {str(e)}"
print(f" ✗ 测试异常: {str(e)}")
self.end_time = datetime.now()
class EmployeeEmployeeIdDuplicateTestCase(TestCase):
"""场景2: Excel内员工柜员号重复测试"""
def __init__(self):
super().__init__(
"员工信息 - Excel内柜员号重复",
"测试导入3条柜员号相同的记录,预期第1条成功,第2、3条失败"
)
def run(self, client: APIClient):
self.start_time = datetime.now()
try:
# 生成测试数据
file_path = ExcelGenerator.create_employee_employee_id_duplicate()
print(f" ✓ 生成测试数据: {file_path}")
# 上传文件
upload_url = f"{BASE_URL}/ccdi/employee/importData"
upload_result = client.upload_file(upload_url, file_path)
if upload_result.get("code") != 200:
self.error_message = f"上传失败: {upload_result.get('msg')}"
self.end_time = datetime.now()
return
task_id = upload_result.get("data", {}).get("taskId")
print(f" ✓ 上传成功, TaskID: {task_id}")
# 等待异步任务完成
time.sleep(3)
# 查询导入状态
status_url = f"{BASE_URL}/ccdi/employee/importStatus/{task_id}"
status_result = client.get_import_status(status_url)
if status_result.get("code") != 200:
self.error_message = f"查询状态失败: {status_result.get('msg')}"
self.end_time = datetime.now()
return
status_data = status_result.get("data", {})
print(f" ✓ 导入状态: {status_data}")
# 查询失败记录
failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{task_id}"
failures_result = client.get_import_failures(failures_url)
if failures_result.get("code") != 200:
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
self.end_time = datetime.now()
return
failures = failures_result.get("rows", [])
# 验证结果
expected_success = 1
expected_failure = 2
actual_success = status_data.get("successCount", 0)
actual_failure = status_data.get("failureCount", 0)
self.details = {
"expected_success": expected_success,
"expected_failure": expected_failure,
"actual_success": actual_success,
"actual_failure": actual_failure,
"failures": failures
}
if actual_success != expected_success or actual_failure != expected_failure:
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
self.end_time = datetime.now()
return
if len(failures) < 2:
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}"
self.end_time = datetime.now()
return
# 验证失败消息
error_msg_1 = failures[0].get("errorMessage", "")
error_msg_2 = failures[1].get("errorMessage", "")
if "柜员号" not in error_msg_1 or "在导入文件中重复" not in error_msg_1:
self.error_message = f"错误消息不正确(第1条): {error_msg_1}"
self.end_time = datetime.now()
return
if "柜员号" not in error_msg_2 or "在导入文件中重复" not in error_msg_2:
self.error_message = f"错误消息不正确(第2条): {error_msg_2}"
self.end_time = datetime.now()
return
self.passed = True
print(f" ✓ 测试通过")
except Exception as e:
self.error_message = f"测试异常: {str(e)}"
print(f" ✗ 测试异常: {str(e)}")
self.end_time = datetime.now()
class EmployeeIdCardDuplicateTestCase(TestCase):
"""场景3: Excel内员工身份证号重复测试"""
def __init__(self):
super().__init__(
"员工信息 - Excel内身份证号重复",
"测试导入3条身份证号相同的记录,预期第1条成功,第2、3条失败"
)
def run(self, client: APIClient):
self.start_time = datetime.now()
try:
# 生成测试数据
file_path = ExcelGenerator.create_employee_id_card_duplicate()
print(f" ✓ 生成测试数据: {file_path}")
# 上传文件
upload_url = f"{BASE_URL}/ccdi/employee/importData"
upload_result = client.upload_file(upload_url, file_path)
if upload_result.get("code") != 200:
self.error_message = f"上传失败: {upload_result.get('msg')}"
self.end_time = datetime.now()
return
task_id = upload_result.get("data", {}).get("taskId")
print(f" ✓ 上传成功, TaskID: {task_id}")
# 等待异步任务完成
time.sleep(3)
# 查询导入状态
status_url = f"{BASE_URL}/ccdi/employee/importStatus/{task_id}"
status_result = client.get_import_status(status_url)
if status_result.get("code") != 200:
self.error_message = f"查询状态失败: {status_result.get('msg')}"
self.end_time = datetime.now()
return
status_data = status_result.get("data", {})
print(f" ✓ 导入状态: {status_data}")
# 查询失败记录
failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{task_id}"
failures_result = client.get_import_failures(failures_url)
if failures_result.get("code") != 200:
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
self.end_time = datetime.now()
return
failures = failures_result.get("rows", [])
# 验证结果
expected_success = 1
expected_failure = 2
actual_success = status_data.get("successCount", 0)
actual_failure = status_data.get("failureCount", 0)
self.details = {
"expected_success": expected_success,
"expected_failure": expected_failure,
"actual_success": actual_success,
"actual_failure": actual_failure,
"failures": failures
}
if actual_success != expected_success or actual_failure != expected_failure:
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
self.end_time = datetime.now()
return
if len(failures) < 2:
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}"
self.end_time = datetime.now()
return
# 验证失败消息
error_msg_1 = failures[0].get("errorMessage", "")
error_msg_2 = failures[1].get("errorMessage", "")
if "身份证号" not in error_msg_1 or "在导入文件中重复" not in error_msg_1:
self.error_message = f"错误消息不正确(第1条): {error_msg_1}"
self.end_time = datetime.now()
return
if "身份证号" not in error_msg_2 or "在导入文件中重复" not in error_msg_2:
self.error_message = f"错误消息不正确(第2条): {error_msg_2}"
self.end_time = datetime.now()
return
self.passed = True
print(f" ✓ 测试通过")
except Exception as e:
self.error_message = f"测试异常: {str(e)}"
print(f" ✗ 测试异常: {str(e)}")
self.end_time = datetime.now()
class MixedDuplicateTestCase(TestCase):
"""场景4: 混合重复(数据库+Excel)测试"""
def __init__(self):
super().__init__(
"混合重复 - 数据库+Excel重复",
"测试数据库已存在+Excel内重复的混合场景"
)
def run(self, client: APIClient):
self.start_time = datetime.now()
try:
# 生成测试数据
purchase_file, employee_file = ExcelGenerator.create_mixed_duplicate_scenario()
print(f" ✓ 生成测试数据: {purchase_file}, {employee_file}")
# 测试采购交易
print("\n >> 测试采购交易混合重复")
purchase_upload_url = f"{BASE_URL}/ccdi/purchaseTransaction/importData"
purchase_upload_result = client.upload_file(purchase_upload_url, purchase_file)
purchase_passed = False
purchase_details = {}
if purchase_upload_result.get("code") == 200:
purchase_task_id = purchase_upload_result.get("data", {}).get("taskId")
print(f" ✓ 采购交易上传成功, TaskID: {purchase_task_id}")
time.sleep(3)
# 查询导入状态
purchase_status_url = f"{BASE_URL}/ccdi/purchaseTransaction/importStatus/{purchase_task_id}"
purchase_status_result = client.get_import_status(purchase_status_url)
if purchase_status_result.get("code") == 200:
purchase_status_data = purchase_status_result.get("data", {})
# 查询失败记录
purchase_failures_url = f"{BASE_URL}/ccdi/purchaseTransaction/importFailures/{purchase_task_id}"
purchase_failures_result = client.get_import_failures(purchase_failures_url)
if purchase_failures_result.get("code") == 200:
purchase_failures = purchase_failures_result.get("rows", [])
purchase_details = {
"success_count": purchase_status_data.get("successCount", 0),
"failure_count": purchase_status_data.get("failureCount", 0),
"failures": purchase_failures
}
# 验证: 第1条失败(数据库重复), 第2条成功, 第3条失败(Excel内重复), 第4条成功
# 预期: 成功2条,失败2条
if purchase_status_data.get("successCount") == 2 and purchase_status_data.get("failureCount") == 2:
purchase_passed = True
print(f" ✓ 采购交易测试通过: 成功2条,失败2条")
else:
print(f" ✗ 采购交易测试失败: 预期成功2失败2, 实际成功{purchase_status_data.get('successCount')}失败{purchase_status_data.get('failureCount')}")
# 测试员工信息
print("\n >> 测试员工信息混合重复")
employee_upload_url = f"{BASE_URL}/ccdi/employee/importData"
employee_upload_result = client.upload_file(employee_upload_url, employee_file)
employee_passed = False
employee_details = {}
if employee_upload_result.get("code") == 200:
employee_task_id = employee_upload_result.get("data", {}).get("taskId")
print(f" ✓ 员工信息上传成功, TaskID: {employee_task_id}")
time.sleep(3)
# 查询导入状态
employee_status_url = f"{BASE_URL}/ccdi/employee/importStatus/{employee_task_id}"
employee_status_result = client.get_import_status(employee_status_url)
if employee_status_result.get("code") == 200:
employee_status_data = employee_status_result.get("data", {})
# 查询失败记录
employee_failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{employee_task_id}"
employee_failures_result = client.get_import_failures(employee_failures_url)
if employee_failures_result.get("code") == 200:
employee_failures = employee_failures_result.get("rows", [])
employee_details = {
"success_count": employee_status_data.get("successCount", 0),
"failure_count": employee_status_data.get("failureCount", 0),
"failures": employee_failures
}
# 验证: 第1条失败(数据库重复), 第2条成功, 第3条失败(Excel内重复), 第4条成功
# 预期: 成功2条,失败2条
if employee_status_data.get("successCount") == 2 and employee_status_data.get("failureCount") == 2:
employee_passed = True
print(f" ✓ 员工信息测试通过: 成功2条,失败2条")
else:
print(f" ✗ 员工信息测试失败: 预期成功2失败2, 实际成功{employee_status_data.get('successCount')}失败{employee_status_data.get('failureCount')}")
self.details = {
"purchase": {
"passed": purchase_passed,
"details": purchase_details
},
"employee": {
"passed": employee_passed,
"details": employee_details
}
}
# 至少一个通过则认为测试通过(因为数据库可能不存在预置数据)
self.passed = purchase_passed or employee_passed
if self.passed:
print(f" ✓ 测试通过")
except Exception as e:
self.error_message = f"测试异常: {str(e)}"
print(f" ✗ 测试异常: {str(e)}")
self.end_time = datetime.now()
class TestRunner:
"""测试运行器"""
def __init__(self):
self.client = APIClient(BASE_URL)
self.test_cases: List[TestCase] = []
self.results = []
def add_test_case(self, test_case: TestCase):
"""添加测试用例"""
self.test_cases.append(test_case)
def run_all(self):
"""运行所有测试用例"""
print("=" * 80)
print("导入文件内部主键重复检测功能测试")
print("=" * 80)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
print(f"测试环境: {BASE_URL}")
print("=" * 80)
# 登录
print("\n[1/2] 登录系统...")
if not self.client.login(USERNAME, PASSWORD):
print("✗ 登录失败,测试终止")
return
# 运行测试
print("\n[2/2] 运行测试用例...")
print("-" * 80)
for i, test_case in enumerate(self.test_cases, 1):
print(f"\n测试用例 {i}/{len(self.test_cases)}: {test_case.name}")
print(f"描述: {test_case.description}")
print("-" * 80)
test_case.run(self.client)
self.results.append(test_case.to_dict())
# 生成报告
self.generate_report()
def generate_report(self):
"""生成测试报告"""
print("\n" + "=" * 80)
print("测试报告")
print("=" * 80)
passed_count = sum(1 for r in self.results if r["passed"])
failed_count = len(self.results) - passed_count
print(f"\n总测试用例数: {len(self.results)}")
print(f"通过: {passed_count}")
print(f"失败: {failed_count}")
print(f"通过率: {passed_count / len(self.results) * 100:.1f}%")
print("\n详细结果:")
print("-" * 80)
for i, result in enumerate(self.results, 1):
status = "✓ PASS" if result["passed"] else "✗ FAIL"
print(f"\n{i}. {result['name']}")
print(f" 状态: {status}")
print(f" 耗时: {result['duration']}")
if not result["passed"]:
print(f" 错误: {result['error_message']}")
if result["details"]:
print(f" 详情: {json.dumps(result['details'], ensure_ascii=False, indent=6)}")
# 保存报告到文件
report_file = os.path.join(REPORT_DIR, f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump({
"test_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
"environment": BASE_URL,
"total_count": len(self.results),
"passed_count": passed_count,
"failed_count": failed_count,
"pass_rate": f"{passed_count / len(self.results) * 100:.1f}%",
"results": self.results
}, f, ensure_ascii=False, indent=2)
print(f"\n报告已保存到: {report_file}")
print("=" * 80)
def main():
"""主函数"""
runner = TestRunner()
# 添加测试用例
runner.add_test_case(PurchaseDuplicateTestCase())
runner.add_test_case(EmployeeEmployeeIdDuplicateTestCase())
runner.add_test_case(EmployeeIdCardDuplicateTestCase())
runner.add_test_case(MixedDuplicateTestCase())
# 运行所有测试
try:
runner.run_all()
except KeyboardInterrupt:
print("\n\n测试被用户中断")
except Exception as e:
print(f"\n\n测试运行异常: {str(e)}")
import traceback
traceback.print_exc()
if __name__ == "__main__":
main()