929 lines
33 KiB
Python
929 lines
33 KiB
Python
#!/usr/bin/env python
|
|
# -*- coding: utf-8 -*-
|
|
"""
|
|
导入文件内部主键重复检测功能测试脚本
|
|
|
|
测试目标:
|
|
1. 采购交易导入 - Excel内采购事项ID重复检测
|
|
2. 员工信息导入 - Excel内柜员号和身份证号重复检测
|
|
|
|
作者: 测试专家
|
|
日期: 2026-02-09
|
|
"""
|
|
|
|
import os
|
|
import sys
|
|
import time
|
|
import json
|
|
import requests
|
|
from openpyxl import Workbook
|
|
from datetime import datetime, timedelta
|
|
from typing import List, Dict, Tuple
|
|
|
|
# ==================== 配置部分 ====================
|
|
BASE_URL = "http://localhost:8080"
|
|
LOGIN_URL = f"{BASE_URL}/login/test"
|
|
|
|
# 测试账号
|
|
USERNAME = "admin"
|
|
PASSWORD = "admin123"
|
|
|
|
# 测试结果保存目录
|
|
REPORT_DIR = "D:/ccdi/ccdi/doc/test-reports"
|
|
EXCEL_DIR = "D:/ccdi/ccdi/doc/test-data/temp"
|
|
|
|
# 创建必要目录
|
|
os.makedirs(REPORT_DIR, exist_ok=True)
|
|
os.makedirs(EXCEL_DIR, exist_ok=True)
|
|
|
|
|
|
class APIClient:
|
|
"""API客户端"""
|
|
|
|
def __init__(self, base_url: str):
|
|
self.base_url = base_url
|
|
self.token = None
|
|
self.session = requests.Session()
|
|
|
|
def login(self, username: str, password: str) -> bool:
|
|
"""登录获取token"""
|
|
try:
|
|
response = self.session.post(
|
|
LOGIN_URL,
|
|
json={"username": username, "password": password},
|
|
timeout=10
|
|
)
|
|
result = response.json()
|
|
|
|
if result.get("code") == 200:
|
|
self.token = result.get("data")
|
|
print(f"✓ 登录成功, Token: {self.token[:20]}...")
|
|
return True
|
|
else:
|
|
print(f"✗ 登录失败: {result.get('msg')}")
|
|
return False
|
|
except Exception as e:
|
|
print(f"✗ 登录异常: {str(e)}")
|
|
return False
|
|
|
|
def get_headers(self) -> Dict:
|
|
"""获取请求头"""
|
|
return {
|
|
"Authorization": f"Bearer {self.token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
def upload_file(self, url: str, file_path: str) -> Dict:
|
|
"""上传文件"""
|
|
try:
|
|
with open(file_path, 'rb') as f:
|
|
files = {'file': (os.path.basename(file_path), f, 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet')}
|
|
headers = {"Authorization": f"Bearer {self.token}"}
|
|
|
|
response = self.session.post(url, files=files, headers=headers, timeout=30)
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"code": 500, "msg": f"上传失败: {str(e)}"}
|
|
|
|
def get_import_status(self, url: str) -> Dict:
|
|
"""查询导入状态"""
|
|
try:
|
|
response = self.session.get(url, headers=self.get_headers(), timeout=10)
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"code": 500, "msg": f"查询状态失败: {str(e)}"}
|
|
|
|
def get_import_failures(self, url: str) -> Dict:
|
|
"""查询导入失败记录"""
|
|
try:
|
|
response = self.session.get(url, headers=self.get_headers(), timeout=10)
|
|
return response.json()
|
|
except Exception as e:
|
|
return {"code": 500, "msg": f"查询失败记录失败: {str(e)}"}
|
|
|
|
|
|
class ExcelGenerator:
|
|
"""Excel测试数据生成器"""
|
|
|
|
@staticmethod
|
|
def create_purchase_duplicate_data() -> str:
|
|
"""
|
|
场景1: Excel内采购事项ID重复
|
|
3条记录,采购事项ID都是 PURCHASE001
|
|
"""
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "采购交易重复测试"
|
|
|
|
# 表头
|
|
headers = [
|
|
"采购事项ID", "采购类别", "项目名称", "标的物名称", "标的物描述",
|
|
"采购数量", "预算金额", "采购方式", "采购申请日期",
|
|
"申请人工号", "申请人姓名", "申请部门"
|
|
]
|
|
ws.append(headers)
|
|
|
|
# 测试数据 - 3条相同采购事项ID
|
|
base_date = datetime.now()
|
|
|
|
for i in range(3):
|
|
apply_date = base_date + timedelta(days=i)
|
|
ws.append([
|
|
f"PURCHASE001", # 相同的采购事项ID
|
|
f"采购类别{i+1}",
|
|
f"项目名称{i+1}",
|
|
f"标的物名称{i+1}",
|
|
f"标的物描述{i+1}",
|
|
10 + i,
|
|
10000.00 + i * 1000,
|
|
"公开招标",
|
|
apply_date.strftime("%Y-%m-%d"),
|
|
"1000001",
|
|
"张三",
|
|
"技术部"
|
|
])
|
|
|
|
file_path = os.path.join(EXCEL_DIR, "purchase_duplicate.xlsx")
|
|
wb.save(file_path)
|
|
return file_path
|
|
|
|
@staticmethod
|
|
def create_employee_employee_id_duplicate() -> str:
|
|
"""
|
|
场景2: Excel内员工柜员号重复
|
|
3条记录,柜员号都是 10001,身份证号不同
|
|
"""
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "员工柜员号重复测试"
|
|
|
|
# 表头
|
|
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
|
|
ws.append(headers)
|
|
|
|
# 测试数据 - 3条相同柜员号
|
|
for i in range(3):
|
|
ws.append([
|
|
f"员工{i+1}",
|
|
10001, # 相同的柜员号
|
|
103,
|
|
f"110101199001011{234+i}", # 不同的身份证号
|
|
f"1380000000{i}",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
file_path = os.path.join(EXCEL_DIR, "employee_employee_id_duplicate.xlsx")
|
|
wb.save(file_path)
|
|
return file_path
|
|
|
|
@staticmethod
|
|
def create_employee_id_card_duplicate() -> str:
|
|
"""
|
|
场景3: Excel内员工身份证号重复
|
|
3条记录,柜员号不同,身份证号相同
|
|
"""
|
|
wb = Workbook()
|
|
ws = wb.active
|
|
ws.title = "员工身份证号重复测试"
|
|
|
|
# 表头
|
|
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
|
|
ws.append(headers)
|
|
|
|
# 测试数据 - 3条相同身份证号
|
|
for i in range(3):
|
|
ws.append([
|
|
f"员工{i+1}",
|
|
10001 + i, # 不同的柜员号
|
|
103,
|
|
"110101199001011234", # 相同的身份证号
|
|
f"1380000000{i}",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
file_path = os.path.join(EXCEL_DIR, "employee_id_card_duplicate.xlsx")
|
|
wb.save(file_path)
|
|
return file_path
|
|
|
|
@staticmethod
|
|
def create_mixed_duplicate_scenario() -> Tuple[str, str]:
|
|
"""
|
|
场景4: 混合重复(数据库+Excel)
|
|
- 第1条: 数据库中已存在
|
|
- 第2条: 全新数据
|
|
- 第3条: 与第2条Excel内重复
|
|
- 第4条: 全新数据
|
|
"""
|
|
# 采购交易混合重复数据
|
|
wb_purchase = Workbook()
|
|
ws_purchase = wb_purchase.active
|
|
ws_purchase.title = "采购混合重复测试"
|
|
|
|
headers = [
|
|
"采购事项ID", "采购类别", "项目名称", "标的物名称", "标的物描述",
|
|
"采购数量", "预算金额", "采购方式", "采购申请日期",
|
|
"申请人工号", "申请人姓名", "申请部门"
|
|
]
|
|
ws_purchase.append(headers)
|
|
|
|
base_date = datetime.now()
|
|
|
|
# 第1条: 数据库中已存在(需要先手动插入数据库)
|
|
ws_purchase.append([
|
|
"EXIST001", # 假设数据库中已存在
|
|
"采购类别1",
|
|
"项目名称1",
|
|
"标的物名称1",
|
|
"标的物描述1",
|
|
10,
|
|
10000.00,
|
|
"公开招标",
|
|
base_date.strftime("%Y-%m-%d"),
|
|
"1000001",
|
|
"张三",
|
|
"技术部"
|
|
])
|
|
|
|
# 第2条: 全新数据
|
|
ws_purchase.append([
|
|
"NEW001", # 新的采购事项ID
|
|
"采购类别2",
|
|
"项目名称2",
|
|
"标的物名称2",
|
|
"标的物描述2",
|
|
20,
|
|
20000.00,
|
|
"邀请招标",
|
|
(base_date + timedelta(days=1)).strftime("%Y-%m-%d"),
|
|
"1000002",
|
|
"李四",
|
|
"市场部"
|
|
])
|
|
|
|
# 第3条: 与第2条Excel内重复
|
|
ws_purchase.append([
|
|
"NEW001", # 与第2条重复
|
|
"采购类别3",
|
|
"项目名称3",
|
|
"标的物名称3",
|
|
"标的物描述3",
|
|
30,
|
|
30000.00,
|
|
"竞争性谈判",
|
|
(base_date + timedelta(days=2)).strftime("%Y-%m-%d"),
|
|
"1000003",
|
|
"王五",
|
|
"财务部"
|
|
])
|
|
|
|
# 第4条: 全新数据
|
|
ws_purchase.append([
|
|
"NEW002", # 新的采购事项ID
|
|
"采购类别4",
|
|
"项目名称4",
|
|
"标的物名称4",
|
|
"标的物描述4",
|
|
40,
|
|
40000.00,
|
|
"单一来源",
|
|
(base_date + timedelta(days=3)).strftime("%Y-%m-%d"),
|
|
"1000004",
|
|
"赵六",
|
|
"人事部"
|
|
])
|
|
|
|
purchase_file = os.path.join(EXCEL_DIR, "purchase_mixed_duplicate.xlsx")
|
|
wb_purchase.save(purchase_file)
|
|
|
|
# 员工混合重复数据
|
|
wb_employee = Workbook()
|
|
ws_employee = wb_employee.active
|
|
ws_employee.title = "员工混合重复测试"
|
|
|
|
headers = ["姓名", "柜员号", "所属部门ID", "身份证号", "电话", "入职时间", "状态"]
|
|
ws_employee.append(headers)
|
|
|
|
# 第1条: 数据库中已存在(假设柜员号99999已存在)
|
|
ws_employee.append([
|
|
"已存在员工",
|
|
99999, # 假设数据库中已存在
|
|
103,
|
|
"110101199001019999",
|
|
"13900000000",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
# 第2条: 全新数据
|
|
ws_employee.append([
|
|
"新员工1",
|
|
90001, # 新柜员号
|
|
103,
|
|
"110101199001011111",
|
|
"13800000001",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
# 第3条: 与第2条Excel内重复(柜员号重复)
|
|
ws_employee.append([
|
|
"新员工2",
|
|
90001, # 与第2条柜员号重复
|
|
103,
|
|
"110101199001012222",
|
|
"13800000002",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
# 第4条: 全新数据
|
|
ws_employee.append([
|
|
"新员工3",
|
|
90002, # 新柜员号
|
|
103,
|
|
"110101199001013333",
|
|
"13800000003",
|
|
"2024-01-01",
|
|
"0"
|
|
])
|
|
|
|
employee_file = os.path.join(EXCEL_DIR, "employee_mixed_duplicate.xlsx")
|
|
wb_employee.save(employee_file)
|
|
|
|
return purchase_file, employee_file
|
|
|
|
|
|
class TestCase:
|
|
"""测试用例基类"""
|
|
|
|
def __init__(self, name: str, description: str):
|
|
self.name = name
|
|
self.description = description
|
|
self.start_time = None
|
|
self.end_time = None
|
|
self.passed = False
|
|
self.error_message = None
|
|
self.details = {}
|
|
|
|
def run(self, client: APIClient):
|
|
"""运行测试用例"""
|
|
raise NotImplementedError
|
|
|
|
def to_dict(self) -> Dict:
|
|
"""转换为字典"""
|
|
return {
|
|
"name": self.name,
|
|
"description": self.description,
|
|
"passed": self.passed,
|
|
"error_message": self.error_message,
|
|
"details": self.details,
|
|
"duration": f"{(self.end_time - self.start_time).total_seconds():.2f}s" if self.start_time and self.end_time else "N/A"
|
|
}
|
|
|
|
|
|
class PurchaseDuplicateTestCase(TestCase):
|
|
"""场景1: Excel内采购事项ID重复测试"""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
"采购交易 - Excel内采购事项ID重复",
|
|
"测试导入3条采购事项ID相同的记录,预期第1条成功,第2、3条失败"
|
|
)
|
|
|
|
def run(self, client: APIClient):
|
|
self.start_time = datetime.now()
|
|
|
|
try:
|
|
# 生成测试数据
|
|
file_path = ExcelGenerator.create_purchase_duplicate_data()
|
|
print(f" ✓ 生成测试数据: {file_path}")
|
|
|
|
# 上传文件
|
|
upload_url = f"{BASE_URL}/ccdi/purchaseTransaction/importData"
|
|
upload_result = client.upload_file(upload_url, file_path)
|
|
|
|
if upload_result.get("code") != 200:
|
|
self.error_message = f"上传失败: {upload_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
task_id = upload_result.get("data", {}).get("taskId")
|
|
print(f" ✓ 上传成功, TaskID: {task_id}")
|
|
|
|
# 等待异步任务完成
|
|
time.sleep(3)
|
|
|
|
# 查询导入状态
|
|
status_url = f"{BASE_URL}/ccdi/purchaseTransaction/importStatus/{task_id}"
|
|
status_result = client.get_import_status(status_url)
|
|
|
|
if status_result.get("code") != 200:
|
|
self.error_message = f"查询状态失败: {status_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
status_data = status_result.get("data", {})
|
|
print(f" ✓ 导入状态: {status_data}")
|
|
|
|
# 查询失败记录
|
|
failures_url = f"{BASE_URL}/ccdi/purchaseTransaction/importFailures/{task_id}"
|
|
failures_result = client.get_import_failures(failures_url)
|
|
|
|
if failures_result.get("code") != 200:
|
|
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
failures = failures_result.get("rows", [])
|
|
|
|
# 验证结果
|
|
# 预期: 成功1条,失败2条
|
|
expected_success = 1
|
|
expected_failure = 2
|
|
actual_success = status_data.get("successCount", 0)
|
|
actual_failure = status_data.get("failureCount", 0)
|
|
|
|
self.details = {
|
|
"expected_success": expected_success,
|
|
"expected_failure": expected_failure,
|
|
"actual_success": actual_success,
|
|
"actual_failure": actual_failure,
|
|
"failures": failures
|
|
}
|
|
|
|
# 验证成功/失败数量
|
|
if actual_success != expected_success or actual_failure != expected_failure:
|
|
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
# 验证失败消息
|
|
if len(failures) < 2:
|
|
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}条"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
# 检查失败消息是否包含"在导入文件中重复"
|
|
error_msg_1 = failures[0].get("errorMessage", "")
|
|
error_msg_2 = failures[1].get("errorMessage", "")
|
|
|
|
if "在导入文件中重复" not in error_msg_1 or "在导入文件中重复" not in error_msg_2:
|
|
self.error_message = f"错误消息不正确: {error_msg_1}, {error_msg_2}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
self.passed = True
|
|
print(f" ✓ 测试通过")
|
|
|
|
except Exception as e:
|
|
self.error_message = f"测试异常: {str(e)}"
|
|
print(f" ✗ 测试异常: {str(e)}")
|
|
|
|
self.end_time = datetime.now()
|
|
|
|
|
|
class EmployeeEmployeeIdDuplicateTestCase(TestCase):
|
|
"""场景2: Excel内员工柜员号重复测试"""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
"员工信息 - Excel内柜员号重复",
|
|
"测试导入3条柜员号相同的记录,预期第1条成功,第2、3条失败"
|
|
)
|
|
|
|
def run(self, client: APIClient):
|
|
self.start_time = datetime.now()
|
|
|
|
try:
|
|
# 生成测试数据
|
|
file_path = ExcelGenerator.create_employee_employee_id_duplicate()
|
|
print(f" ✓ 生成测试数据: {file_path}")
|
|
|
|
# 上传文件
|
|
upload_url = f"{BASE_URL}/ccdi/employee/importData"
|
|
upload_result = client.upload_file(upload_url, file_path)
|
|
|
|
if upload_result.get("code") != 200:
|
|
self.error_message = f"上传失败: {upload_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
task_id = upload_result.get("data", {}).get("taskId")
|
|
print(f" ✓ 上传成功, TaskID: {task_id}")
|
|
|
|
# 等待异步任务完成
|
|
time.sleep(3)
|
|
|
|
# 查询导入状态
|
|
status_url = f"{BASE_URL}/ccdi/employee/importStatus/{task_id}"
|
|
status_result = client.get_import_status(status_url)
|
|
|
|
if status_result.get("code") != 200:
|
|
self.error_message = f"查询状态失败: {status_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
status_data = status_result.get("data", {})
|
|
print(f" ✓ 导入状态: {status_data}")
|
|
|
|
# 查询失败记录
|
|
failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{task_id}"
|
|
failures_result = client.get_import_failures(failures_url)
|
|
|
|
if failures_result.get("code") != 200:
|
|
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
failures = failures_result.get("rows", [])
|
|
|
|
# 验证结果
|
|
expected_success = 1
|
|
expected_failure = 2
|
|
actual_success = status_data.get("successCount", 0)
|
|
actual_failure = status_data.get("failureCount", 0)
|
|
|
|
self.details = {
|
|
"expected_success": expected_success,
|
|
"expected_failure": expected_failure,
|
|
"actual_success": actual_success,
|
|
"actual_failure": actual_failure,
|
|
"failures": failures
|
|
}
|
|
|
|
if actual_success != expected_success or actual_failure != expected_failure:
|
|
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
if len(failures) < 2:
|
|
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}条"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
# 验证失败消息
|
|
error_msg_1 = failures[0].get("errorMessage", "")
|
|
error_msg_2 = failures[1].get("errorMessage", "")
|
|
|
|
if "柜员号" not in error_msg_1 or "在导入文件中重复" not in error_msg_1:
|
|
self.error_message = f"错误消息不正确(第1条): {error_msg_1}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
if "柜员号" not in error_msg_2 or "在导入文件中重复" not in error_msg_2:
|
|
self.error_message = f"错误消息不正确(第2条): {error_msg_2}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
self.passed = True
|
|
print(f" ✓ 测试通过")
|
|
|
|
except Exception as e:
|
|
self.error_message = f"测试异常: {str(e)}"
|
|
print(f" ✗ 测试异常: {str(e)}")
|
|
|
|
self.end_time = datetime.now()
|
|
|
|
|
|
class EmployeeIdCardDuplicateTestCase(TestCase):
|
|
"""场景3: Excel内员工身份证号重复测试"""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
"员工信息 - Excel内身份证号重复",
|
|
"测试导入3条身份证号相同的记录,预期第1条成功,第2、3条失败"
|
|
)
|
|
|
|
def run(self, client: APIClient):
|
|
self.start_time = datetime.now()
|
|
|
|
try:
|
|
# 生成测试数据
|
|
file_path = ExcelGenerator.create_employee_id_card_duplicate()
|
|
print(f" ✓ 生成测试数据: {file_path}")
|
|
|
|
# 上传文件
|
|
upload_url = f"{BASE_URL}/ccdi/employee/importData"
|
|
upload_result = client.upload_file(upload_url, file_path)
|
|
|
|
if upload_result.get("code") != 200:
|
|
self.error_message = f"上传失败: {upload_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
task_id = upload_result.get("data", {}).get("taskId")
|
|
print(f" ✓ 上传成功, TaskID: {task_id}")
|
|
|
|
# 等待异步任务完成
|
|
time.sleep(3)
|
|
|
|
# 查询导入状态
|
|
status_url = f"{BASE_URL}/ccdi/employee/importStatus/{task_id}"
|
|
status_result = client.get_import_status(status_url)
|
|
|
|
if status_result.get("code") != 200:
|
|
self.error_message = f"查询状态失败: {status_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
status_data = status_result.get("data", {})
|
|
print(f" ✓ 导入状态: {status_data}")
|
|
|
|
# 查询失败记录
|
|
failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{task_id}"
|
|
failures_result = client.get_import_failures(failures_url)
|
|
|
|
if failures_result.get("code") != 200:
|
|
self.error_message = f"查询失败记录失败: {failures_result.get('msg')}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
failures = failures_result.get("rows", [])
|
|
|
|
# 验证结果
|
|
expected_success = 1
|
|
expected_failure = 2
|
|
actual_success = status_data.get("successCount", 0)
|
|
actual_failure = status_data.get("failureCount", 0)
|
|
|
|
self.details = {
|
|
"expected_success": expected_success,
|
|
"expected_failure": expected_failure,
|
|
"actual_success": actual_success,
|
|
"actual_failure": actual_failure,
|
|
"failures": failures
|
|
}
|
|
|
|
if actual_success != expected_success or actual_failure != expected_failure:
|
|
self.error_message = f"数量不匹配: 预期成功{expected_success}失败{expected_failure}, 实际成功{actual_success}失败{actual_failure}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
if len(failures) < 2:
|
|
self.error_message = f"失败记录数量不足: 预期2条, 实际{len(failures)}条"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
# 验证失败消息
|
|
error_msg_1 = failures[0].get("errorMessage", "")
|
|
error_msg_2 = failures[1].get("errorMessage", "")
|
|
|
|
if "身份证号" not in error_msg_1 or "在导入文件中重复" not in error_msg_1:
|
|
self.error_message = f"错误消息不正确(第1条): {error_msg_1}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
if "身份证号" not in error_msg_2 or "在导入文件中重复" not in error_msg_2:
|
|
self.error_message = f"错误消息不正确(第2条): {error_msg_2}"
|
|
self.end_time = datetime.now()
|
|
return
|
|
|
|
self.passed = True
|
|
print(f" ✓ 测试通过")
|
|
|
|
except Exception as e:
|
|
self.error_message = f"测试异常: {str(e)}"
|
|
print(f" ✗ 测试异常: {str(e)}")
|
|
|
|
self.end_time = datetime.now()
|
|
|
|
|
|
class MixedDuplicateTestCase(TestCase):
|
|
"""场景4: 混合重复(数据库+Excel)测试"""
|
|
|
|
def __init__(self):
|
|
super().__init__(
|
|
"混合重复 - 数据库+Excel重复",
|
|
"测试数据库已存在+Excel内重复的混合场景"
|
|
)
|
|
|
|
def run(self, client: APIClient):
|
|
self.start_time = datetime.now()
|
|
|
|
try:
|
|
# 生成测试数据
|
|
purchase_file, employee_file = ExcelGenerator.create_mixed_duplicate_scenario()
|
|
print(f" ✓ 生成测试数据: {purchase_file}, {employee_file}")
|
|
|
|
# 测试采购交易
|
|
print("\n >> 测试采购交易混合重复")
|
|
purchase_upload_url = f"{BASE_URL}/ccdi/purchaseTransaction/importData"
|
|
purchase_upload_result = client.upload_file(purchase_upload_url, purchase_file)
|
|
|
|
purchase_passed = False
|
|
purchase_details = {}
|
|
|
|
if purchase_upload_result.get("code") == 200:
|
|
purchase_task_id = purchase_upload_result.get("data", {}).get("taskId")
|
|
print(f" ✓ 采购交易上传成功, TaskID: {purchase_task_id}")
|
|
|
|
time.sleep(3)
|
|
|
|
# 查询导入状态
|
|
purchase_status_url = f"{BASE_URL}/ccdi/purchaseTransaction/importStatus/{purchase_task_id}"
|
|
purchase_status_result = client.get_import_status(purchase_status_url)
|
|
|
|
if purchase_status_result.get("code") == 200:
|
|
purchase_status_data = purchase_status_result.get("data", {})
|
|
|
|
# 查询失败记录
|
|
purchase_failures_url = f"{BASE_URL}/ccdi/purchaseTransaction/importFailures/{purchase_task_id}"
|
|
purchase_failures_result = client.get_import_failures(purchase_failures_url)
|
|
|
|
if purchase_failures_result.get("code") == 200:
|
|
purchase_failures = purchase_failures_result.get("rows", [])
|
|
|
|
purchase_details = {
|
|
"success_count": purchase_status_data.get("successCount", 0),
|
|
"failure_count": purchase_status_data.get("failureCount", 0),
|
|
"failures": purchase_failures
|
|
}
|
|
|
|
# 验证: 第1条失败(数据库重复), 第2条成功, 第3条失败(Excel内重复), 第4条成功
|
|
# 预期: 成功2条,失败2条
|
|
if purchase_status_data.get("successCount") == 2 and purchase_status_data.get("failureCount") == 2:
|
|
purchase_passed = True
|
|
print(f" ✓ 采购交易测试通过: 成功2条,失败2条")
|
|
else:
|
|
print(f" ✗ 采购交易测试失败: 预期成功2失败2, 实际成功{purchase_status_data.get('successCount')}失败{purchase_status_data.get('failureCount')}")
|
|
|
|
# 测试员工信息
|
|
print("\n >> 测试员工信息混合重复")
|
|
employee_upload_url = f"{BASE_URL}/ccdi/employee/importData"
|
|
employee_upload_result = client.upload_file(employee_upload_url, employee_file)
|
|
|
|
employee_passed = False
|
|
employee_details = {}
|
|
|
|
if employee_upload_result.get("code") == 200:
|
|
employee_task_id = employee_upload_result.get("data", {}).get("taskId")
|
|
print(f" ✓ 员工信息上传成功, TaskID: {employee_task_id}")
|
|
|
|
time.sleep(3)
|
|
|
|
# 查询导入状态
|
|
employee_status_url = f"{BASE_URL}/ccdi/employee/importStatus/{employee_task_id}"
|
|
employee_status_result = client.get_import_status(employee_status_url)
|
|
|
|
if employee_status_result.get("code") == 200:
|
|
employee_status_data = employee_status_result.get("data", {})
|
|
|
|
# 查询失败记录
|
|
employee_failures_url = f"{BASE_URL}/ccdi/employee/importFailures/{employee_task_id}"
|
|
employee_failures_result = client.get_import_failures(employee_failures_url)
|
|
|
|
if employee_failures_result.get("code") == 200:
|
|
employee_failures = employee_failures_result.get("rows", [])
|
|
|
|
employee_details = {
|
|
"success_count": employee_status_data.get("successCount", 0),
|
|
"failure_count": employee_status_data.get("failureCount", 0),
|
|
"failures": employee_failures
|
|
}
|
|
|
|
# 验证: 第1条失败(数据库重复), 第2条成功, 第3条失败(Excel内重复), 第4条成功
|
|
# 预期: 成功2条,失败2条
|
|
if employee_status_data.get("successCount") == 2 and employee_status_data.get("failureCount") == 2:
|
|
employee_passed = True
|
|
print(f" ✓ 员工信息测试通过: 成功2条,失败2条")
|
|
else:
|
|
print(f" ✗ 员工信息测试失败: 预期成功2失败2, 实际成功{employee_status_data.get('successCount')}失败{employee_status_data.get('failureCount')}")
|
|
|
|
self.details = {
|
|
"purchase": {
|
|
"passed": purchase_passed,
|
|
"details": purchase_details
|
|
},
|
|
"employee": {
|
|
"passed": employee_passed,
|
|
"details": employee_details
|
|
}
|
|
}
|
|
|
|
# 至少一个通过则认为测试通过(因为数据库可能不存在预置数据)
|
|
self.passed = purchase_passed or employee_passed
|
|
|
|
if self.passed:
|
|
print(f" ✓ 测试通过")
|
|
|
|
except Exception as e:
|
|
self.error_message = f"测试异常: {str(e)}"
|
|
print(f" ✗ 测试异常: {str(e)}")
|
|
|
|
self.end_time = datetime.now()
|
|
|
|
|
|
class TestRunner:
|
|
"""测试运行器"""
|
|
|
|
def __init__(self):
|
|
self.client = APIClient(BASE_URL)
|
|
self.test_cases: List[TestCase] = []
|
|
self.results = []
|
|
|
|
def add_test_case(self, test_case: TestCase):
|
|
"""添加测试用例"""
|
|
self.test_cases.append(test_case)
|
|
|
|
def run_all(self):
|
|
"""运行所有测试用例"""
|
|
print("=" * 80)
|
|
print("导入文件内部主键重复检测功能测试")
|
|
print("=" * 80)
|
|
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
|
|
print(f"测试环境: {BASE_URL}")
|
|
print("=" * 80)
|
|
|
|
# 登录
|
|
print("\n[1/2] 登录系统...")
|
|
if not self.client.login(USERNAME, PASSWORD):
|
|
print("✗ 登录失败,测试终止")
|
|
return
|
|
|
|
# 运行测试
|
|
print("\n[2/2] 运行测试用例...")
|
|
print("-" * 80)
|
|
|
|
for i, test_case in enumerate(self.test_cases, 1):
|
|
print(f"\n测试用例 {i}/{len(self.test_cases)}: {test_case.name}")
|
|
print(f"描述: {test_case.description}")
|
|
print("-" * 80)
|
|
|
|
test_case.run(self.client)
|
|
self.results.append(test_case.to_dict())
|
|
|
|
# 生成报告
|
|
self.generate_report()
|
|
|
|
def generate_report(self):
|
|
"""生成测试报告"""
|
|
print("\n" + "=" * 80)
|
|
print("测试报告")
|
|
print("=" * 80)
|
|
|
|
passed_count = sum(1 for r in self.results if r["passed"])
|
|
failed_count = len(self.results) - passed_count
|
|
|
|
print(f"\n总测试用例数: {len(self.results)}")
|
|
print(f"通过: {passed_count}")
|
|
print(f"失败: {failed_count}")
|
|
print(f"通过率: {passed_count / len(self.results) * 100:.1f}%")
|
|
|
|
print("\n详细结果:")
|
|
print("-" * 80)
|
|
|
|
for i, result in enumerate(self.results, 1):
|
|
status = "✓ PASS" if result["passed"] else "✗ FAIL"
|
|
print(f"\n{i}. {result['name']}")
|
|
print(f" 状态: {status}")
|
|
print(f" 耗时: {result['duration']}")
|
|
|
|
if not result["passed"]:
|
|
print(f" 错误: {result['error_message']}")
|
|
|
|
if result["details"]:
|
|
print(f" 详情: {json.dumps(result['details'], ensure_ascii=False, indent=6)}")
|
|
|
|
# 保存报告到文件
|
|
report_file = os.path.join(REPORT_DIR, f"test_report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
|
|
with open(report_file, 'w', encoding='utf-8') as f:
|
|
json.dump({
|
|
"test_time": datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
|
|
"environment": BASE_URL,
|
|
"total_count": len(self.results),
|
|
"passed_count": passed_count,
|
|
"failed_count": failed_count,
|
|
"pass_rate": f"{passed_count / len(self.results) * 100:.1f}%",
|
|
"results": self.results
|
|
}, f, ensure_ascii=False, indent=2)
|
|
|
|
print(f"\n报告已保存到: {report_file}")
|
|
print("=" * 80)
|
|
|
|
|
|
def main():
|
|
"""主函数"""
|
|
runner = TestRunner()
|
|
|
|
# 添加测试用例
|
|
runner.add_test_case(PurchaseDuplicateTestCase())
|
|
runner.add_test_case(EmployeeEmployeeIdDuplicateTestCase())
|
|
runner.add_test_case(EmployeeIdCardDuplicateTestCase())
|
|
runner.add_test_case(MixedDuplicateTestCase())
|
|
|
|
# 运行所有测试
|
|
try:
|
|
runner.run_all()
|
|
except KeyboardInterrupt:
|
|
print("\n\n测试被用户中断")
|
|
except Exception as e:
|
|
print(f"\n\n测试运行异常: {str(e)}")
|
|
import traceback
|
|
traceback.print_exc()
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|