Files
ccdi/doc/测试数据/员工调动记录/test_staff_transfer_complete.py

424 lines
18 KiB
Python
Raw Normal View History

2026-02-11 10:42:38 +08:00
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""
员工调动记录功能完整测试脚本
测试新的员工搜索接口和Treeselect部门选择功能
"""
import requests
import json
from datetime import datetime
# 配置
BASE_URL = "http://localhost:8080"
USERNAME = "admin"
PASSWORD = "admin123"
class StaffTransferTester:
def __init__(self):
self.base_url = BASE_URL
self.token = None
self.headers = {}
self.test_results = []
def log_test(self, test_name, passed, message=""):
"""记录测试结果"""
status = "✅ PASS" if passed else "❌ FAIL"
result = {
"test": test_name,
"status": status,
"message": message,
"timestamp": datetime.now().strftime("%Y-%m-%d %H:%M:%S")
}
self.test_results.append(result)
print(f"{status} - {test_name}")
if message:
print(f" {message}")
def login(self):
"""登录获取token"""
print("\n=== 测试1: 用户登录 ===")
try:
url = f"{self.base_url}/login/test"
data = {
"username": USERNAME,
"password": PASSWORD
}
response = requests.post(url, json=data)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
self.token = result.get("token")
self.headers = {"Authorization": f"Bearer {self.token}"}
self.log_test("用户登录", True, f"成功获取token: {self.token[:20]}...")
return True
else:
self.log_test("用户登录", False, f"登录失败: {result.get('msg')}")
return False
else:
self.log_test("用户登录", False, f"HTTP错误: {response.status_code}")
return False
except Exception as e:
self.log_test("用户登录", False, f"异常: {str(e)}")
return False
def test_staff_search_no_param(self):
"""测试员工搜索接口 - 不带参数"""
print("\n=== 测试2: 员工搜索(不带参数)===")
try:
url = f"{self.base_url}/ccdi/baseStaff/options"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
data = result.get("data", [])
self.log_test("员工搜索(不带参数)", True, f"返回{len(data)}条记录")
if data:
print(f" 示例数据: {json.dumps(data[0], ensure_ascii=False)}")
return data
else:
self.log_test("员工搜索(不带参数)", False, f"业务错误: {result.get('msg')}")
return []
else:
self.log_test("员工搜索(不带参数)", False, f"HTTP错误: {response.status_code}")
return []
except Exception as e:
self.log_test("员工搜索(不带参数)", False, f"异常: {str(e)}")
return []
def test_staff_search_by_id(self, staff_list):
"""测试员工搜索接口 - 按员工ID搜索"""
print("\n=== 测试3: 员工搜索按员工ID===")
if not staff_list:
self.log_test("员工搜索按员工ID", False, "无可用员工数据")
return None
try:
staff_id = staff_list[0]["staffId"]
url = f"{self.base_url}/ccdi/baseStaff/options"
params = {"query": str(staff_id)}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
data = result.get("data", [])
self.log_test("员工搜索按员工ID", True, f"搜索'{staff_id}'返回{len(data)}条记录")
return staff_list[0]
else:
self.log_test("员工搜索按员工ID", False, f"业务错误: {result.get('msg')}")
return None
else:
self.log_test("员工搜索按员工ID", False, f"HTTP错误: {response.status_code}")
return None
except Exception as e:
self.log_test("员工搜索按员工ID", False, f"异常: {str(e)}")
return None
def test_staff_search_by_name(self, staff_list):
"""测试员工搜索接口 - 按姓名搜索"""
print("\n=== 测试4: 员工搜索(按姓名)===")
if not staff_list:
self.log_test("员工搜索(按姓名)", False, "无可用员工数据")
return []
try:
# 获取第一个员工的姓名进行搜索
staff_name = staff_list[0]["staffName"]
url = f"{self.base_url}/ccdi/baseStaff/options"
params = {"query": staff_name}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
data = result.get("data", [])
self.log_test("员工搜索(按姓名)", True, f"搜索'{staff_name}'返回{len(data)}条记录")
if data:
print(f" 匹配结果: {json.dumps(data[0], ensure_ascii=False)}")
return data
else:
self.log_test("员工搜索(按姓名)", False, f"业务错误: {result.get('msg')}")
return []
else:
self.log_test("员工搜索(按姓名)", False, f"HTTP错误: {response.status_code}")
return []
except Exception as e:
self.log_test("员工搜索(按姓名)", False, f"异常: {str(e)}")
return []
def test_add_transfer(self, staff_data):
"""测试新增调动记录"""
print("\n=== 测试5: 新增员工调动记录 ===")
try:
url = f"{self.base_url}/ccdi/staffTransfer"
data = {
"staffId": staff_data["staffId"],
"transferType": "LATERAL",
"transferSubType": "测试调动",
"deptIdBefore": 100, # 示例部门ID
"deptNameBefore": "测试部门A",
"gradeBefore": "初级",
"positionBefore": "员工",
"salaryLevelBefore": "P1",
"deptIdAfter": 101, # 示例部门ID
"deptNameAfter": "测试部门B",
"gradeAfter": "中级",
"positionAfter": "主管",
"salaryLevelAfter": "P2",
"transferDate": datetime.now().strftime("%Y-%m-%d")
}
response = requests.post(url, json=data, headers=self.headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
self.log_test("新增员工调动记录", True, "新增成功")
return True
else:
self.log_test("新增员工调动记录", False, f"业务错误: {result.get('msg')}")
return False
else:
self.log_test("新增员工调动记录", False, f"HTTP错误: {response.status_code}")
return False
except Exception as e:
self.log_test("新增员工调动记录", False, f"异常: {str(e)}")
return False
def test_query_transfer_list(self):
"""测试查询调动记录列表"""
print("\n=== 测试6: 查询调动记录列表 ===")
try:
url = f"{self.base_url}/ccdi/staffTransfer/list"
params = {
"pageNum": 1,
"pageSize": 10
}
response = requests.get(url, headers=self.headers, params=params)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
rows = result.get("rows", [])
total = result.get("total", 0)
self.log_test("查询调动记录列表", True, f"查询成功,共{total}条记录,当前页{len(rows)}")
if rows:
print(f" 示例记录ID: {rows[0].get('id', 'N/A')}")
return rows
else:
self.log_test("查询调动记录列表", False, f"业务错误: {result.get('msg')}")
return []
else:
self.log_test("查询调动记录列表", False, f"HTTP错误: {response.status_code}")
return []
except Exception as e:
self.log_test("查询调动记录列表", False, f"异常: {str(e)}")
return []
def test_get_transfer_detail(self, transfer_list):
"""测试获取调动记录详情"""
print("\n=== 测试7: 获取调动记录详情 ===")
if not transfer_list:
self.log_test("获取调动记录详情", False, "无可用调动记录")
return None
try:
transfer_id = transfer_list[0].get("id")
url = f"{self.base_url}/ccdi/staffTransfer/{transfer_id}"
response = requests.get(url, headers=self.headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
data = result.get("data")
self.log_test("获取调动记录详情", True, f"获取成功记录ID: {transfer_id}")
print(f" 员工ID: {data.get('staffId')}")
print(f" 调动类型: {data.get('transferType')}")
return data
else:
self.log_test("获取调动记录详情", False, f"业务错误: {result.get('msg')}")
return None
else:
self.log_test("获取调动记录详情", False, f"HTTP错误: {response.status_code}")
return None
except Exception as e:
self.log_test("获取调动记录详情", False, f"异常: {str(e)}")
return None
def test_edit_transfer(self, transfer_list):
"""测试编辑调动记录"""
print("\n=== 测试8: 编辑员工调动记录 ===")
if not transfer_list:
self.log_test("编辑员工调动记录", False, "无可用调动记录")
return False
try:
transfer_id = transfer_list[0].get("id")
url = f"{self.base_url}/ccdi/staffTransfer"
data = {
"id": transfer_id,
"staffId": transfer_list[0].get("staffId"), # 员工ID不可修改但需要传递
"transferType": "PROMOTION", # 修改调动类型
"transferSubType": "测试调动-已编辑",
"deptIdBefore": 100,
"deptNameBefore": "测试部门A",
"gradeBefore": "初级",
"positionBefore": "员工",
"salaryLevelBefore": "P1",
"deptIdAfter": 102,
"deptNameAfter": "测试部门C",
"gradeAfter": "高级",
"positionAfter": "经理",
"salaryLevelAfter": "P3",
"transferDate": datetime.now().strftime("%Y-%m-%d")
}
response = requests.put(url, json=data, headers=self.headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
self.log_test("编辑员工调动记录", True, "编辑成功")
return True
else:
self.log_test("编辑员工调动记录", False, f"业务错误: {result.get('msg')}")
return False
else:
self.log_test("编辑员工调动记录", False, f"HTTP错误: {response.status_code}")
return False
except Exception as e:
self.log_test("编辑员工调动记录", False, f"异常: {str(e)}")
return False
def test_delete_transfer(self, transfer_list):
"""测试删除调动记录"""
print("\n=== 测试9: 删除员工调动记录 ===")
if not transfer_list:
self.log_test("删除员工调动记录", False, "无可用调动记录")
return False
try:
transfer_id = transfer_list[0].get("id")
url = f"{self.base_url}/ccdi/staffTransfer/{transfer_id}"
response = requests.delete(url, headers=self.headers)
if response.status_code == 200:
result = response.json()
if result.get("code") == 200:
self.log_test("删除员工调动记录", True, f"删除成功记录ID: {transfer_id}")
return True
else:
self.log_test("删除员工调动记录", False, f"业务错误: {result.get('msg')}")
return False
else:
self.log_test("删除员工调动记录", False, f"HTTP错误: {response.status_code}")
return False
except Exception as e:
self.log_test("删除员工调动记录", False, f"异常: {str(e)}")
return False
def test_export_transfer(self):
"""测试导出调动记录"""
print("\n=== 测试10: 导出调动记录 ===")
try:
url = f"{self.base_url}/ccdi/staffTransfer/export"
params = {
"staffId": "",
"transferType": "",
"deptIdAfter": ""
}
response = requests.post(url, headers=self.headers, params=params)
if response.status_code == 200:
# 检查是否返回Excel文件
content_type = response.headers.get("Content-Type", "")
if "excel" in content_type or "spreadsheet" in content_type or response.status_code == 200:
self.log_test("导出调动记录", True, f"导出成功,文件类型: {content_type}")
return True
else:
self.log_test("导出调动记录", False, f"返回格式错误: {content_type}")
return False
else:
self.log_test("导出调动记录", False, f"HTTP错误: {response.status_code}")
return False
except Exception as e:
self.log_test("导出调动记录", False, f"异常: {str(e)}")
return False
def generate_report(self):
"""生成测试报告"""
print("\n" + "="*60)
print("测试报告")
print("="*60)
passed = sum(1 for r in self.test_results if "PASS" in r["status"])
failed = sum(1 for r in self.test_results if "FAIL" in r["status"])
print(f"\n总计: {len(self.test_results)} 个测试")
print(f"通过: {passed}")
print(f"失败: {failed}")
if len(self.test_results) > 0:
print(f"成功率: {passed/len(self.test_results)*100:.1f}%\n")
# 保存到文件
report_path = "D:\\ccdi\\ccdi\\doc\\测试数据\\员工调动记录\\test_report.txt"
with open(report_path, "w", encoding="utf-8") as f:
f.write("员工调动记录功能测试报告\n")
f.write("="*60 + "\n")
f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n")
f.write(f"总计: {len(self.test_results)} 个测试\n")
f.write(f"通过: {passed}\n")
f.write(f"失败: {failed}\n")
if len(self.test_results) > 0:
f.write(f"成功率: {passed/len(self.test_results)*100:.1f}%\n\n")
f.write("详细结果:\n")
f.write("-"*60 + "\n")
for result in self.test_results:
f.write(f"{result['status']} - {result['test']}\n")
if result['message']:
f.write(f" {result['message']}\n")
print(f"测试报告已保存至: {report_path}")
def run_all_tests(self):
"""运行所有测试"""
print("="*60)
print("员工调动记录功能测试")
print("="*60)
# 测试1: 登录
if not self.login():
print("登录失败,终止测试")
self.generate_report()
return
# 测试2: 搜索员工(不带参数)
staff_list = self.test_staff_search_no_param()
# 测试3: 搜索员工按ID
staff_data = self.test_staff_search_by_id(staff_list)
# 测试4: 搜索员工(按姓名)
self.test_staff_search_by_name(staff_list)
# 测试5: 新增调动记录
if staff_data:
add_success = self.test_add_transfer(staff_data)
else:
add_success = False
print(" 跳过新增测试:无有效员工数据")
# 测试6: 查询调动记录列表
transfer_list = self.test_query_transfer_list()
# 测试7: 获取调动记录详情
self.test_get_transfer_detail(transfer_list)
# 测试8: 编辑调动记录
if add_success and transfer_list:
self.test_edit_transfer(transfer_list)
# 测试9: 删除调动记录(可选,谨慎执行)
# self.test_delete_transfer(transfer_list)
# 测试10: 导出调动记录
self.test_export_transfer()
# 生成报告
self.generate_report()
if __name__ == "__main__":
tester = StaffTransferTester()
tester.run_all_tests()