Files
ccdi/test/test_pagination.py

438 lines
14 KiB
Python
Raw Normal View History

#!/usr/bin/env python3
# -*- coding: utf-8 -*-
"""分页接口总数测试脚本
测试接口
1. /dpc/employee/list - 员工列表MyBatis Plus分页
2. /dpc/intermediary/list - 中介黑名单列表若依startPage分页
"""
import sys
import io
import requests
import json
from datetime import datetime
# 设置stdout编码为UTF-8
if sys.platform == 'win32':
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding='utf-8')
BASE_URL = "http://localhost:8080"
LOGIN_URL = f"{BASE_URL}/login/test"
EMPLOYEE_LIST_URL = f"{BASE_URL}/dpc/employee/list"
INTERMEDIARY_LIST_URL = f"{BASE_URL}/dpc/intermediary/list"
# 测试结果存储
test_results = []
def login():
"""登录获取token"""
print("=" * 60)
print("步骤1: 获取认证Token")
print("=" * 60)
login_body = {
"username": "admin",
"password": "admin123"
}
try:
# 使用json参数发送JSON格式请求体
response = requests.post(LOGIN_URL, json=login_body)
print(f"请求URL: {LOGIN_URL}")
print(f"请求参数: {login_body}")
print(f"响应状态码: {response.status_code}")
data = response.json()
if data.get("code") == 200:
token = data.get("token")
print(f"✓ Token获取成功: {token[:20]}...")
return token
else:
print(f"✗ Token获取失败: {data.get('msg')}")
return None
except Exception as e:
print(f"✗ 异常: {e}")
return None
def test_page(url, token, page_num, page_size, test_name, api_type):
"""测试指定分页参数的接口"""
print(f"\n========== 测试: {test_name} ==========")
print(f"API类型: {api_type}")
print(f"URL: {url}")
print(f"参数: pageNum={page_num}, pageSize={page_size}")
params = {
"pageNum": page_num,
"pageSize": page_size
}
headers = {
"Authorization": f"Bearer {token}"
}
try:
response = requests.get(url, params=params, headers=headers)
print(f"响应状态码: {response.status_code}")
if response.status_code != 200:
print(f"✗ HTTP错误: {response.text}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"error": f"HTTP {response.status_code}"
})
return None
data = response.json()
print(f"响应内容:\n{json.dumps(data, indent=2, ensure_ascii=False)}")
code = data.get("code", 0)
if code != 200:
print(f"✗ 业务错误: {data.get('msg')}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"error": data.get("msg", "Unknown error")
})
return None
# 检查响应结构 - 支持两种格式
# 格式1: {data: {total, rows}, code, msg}
# 格式2: {total, rows, code, msg}
if "data" in data:
response_data = data["data"]
rows = response_data.get("rows", [])
total = response_data.get("total")
else:
# 扁平结构格式
rows = data.get("rows", [])
total = data.get("total")
rows_count = len(rows)
print(f"\n--- 分页数据分析 ---")
print(f"返回行数(rows): {rows_count}")
print(f"总数(total): {total}")
# 验证total字段
if total is None:
print(f"✗ 响应缺少total字段")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"error": "响应缺少total字段"
})
return None
if not isinstance(total, int):
print(f"✗ total类型错误: {type(total)}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"error": f"total类型错误: {type(total)}"
})
return None
if total < 0:
print(f"✗ total值无效: {total}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"error": f"total值无效: {total}"
})
return None
# 计算预期值
expected_total_pages = (total + page_size - 1) // page_size
if page_num < expected_total_pages:
expected_rows = page_size
elif page_num == expected_total_pages:
expected_rows = total - (page_size * (page_num - 1))
else:
expected_rows = 0
print(f"预期行数: {expected_rows}")
print(f"预期总页数: {expected_total_pages}")
# 验证行数是否正确
is_correct = rows_count == expected_rows
if is_correct:
print(f"✓ 测试通过 - 分页总数返回正常")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "PASS",
"page_num": page_num,
"page_size": page_size,
"rows_count": rows_count,
"total": total,
"expected_rows": expected_rows
})
else:
print(f"✗ 测试失败 - 预期{expected_rows}行,实际{rows_count}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "FAIL",
"page_num": page_num,
"page_size": page_size,
"rows_count": rows_count,
"total": total,
"expected_rows": expected_rows,
"error": f"行数不匹配"
})
# 显示返回的ID
if rows:
if "employeeId" in rows[0]:
ids = ', '.join([str(r.get("employeeId")) for r in rows])
print(f"员工ID: {ids}")
elif "intermediaryId" in rows[0]:
ids = ', '.join([str(r.get("intermediaryId")) for r in rows])
print(f"中介ID: {ids}")
return {
"success": True,
"rows": rows_count,
"total": total,
"expected_rows": expected_rows,
"is_correct": is_correct
}
except Exception as e:
print(f"✗ 异常: {e}")
test_results.append({
"test_name": test_name,
"api_type": api_type,
"status": "ERROR",
"error": str(e)
})
return None
def test_consistency(url, token, test_name, api_type):
"""测试不同pageSize下total是否一致"""
print(f"\n========== 测试总数一致性: {test_name} ==========")
page_sizes = [10, 20, 50]
totals = []
for size in page_sizes:
params = {"pageNum": 1, "pageSize": size}
headers = {"Authorization": f"Bearer {token}"}
try:
response = requests.get(url, params=params, headers=headers)
if response.status_code == 200:
data = response.json()
if data.get("code") == 200:
# 支持两种响应格式
if "data" in data:
total = data.get("data", {}).get("total")
else:
total = data.get("total")
totals.append(total)
print(f"pageSize={size}: total={total}")
except Exception as e:
print(f"✗ 异常: {e}")
if len(set(totals)) == 1 and totals[0] is not None:
print(f"✓ 不同pageSize下总数一致: {totals[0]}")
return True
else:
print(f"✗ 不同pageSize下总数不一致: {totals}")
return False
def generate_report():
"""生成测试报告"""
print("\n" + "=" * 60)
print("测试报告")
print("=" * 60)
# 按API类型分组
employee_results = [r for r in test_results if "员工" in r.get("test_name", "")]
intermediary_results = [r for r in test_results if "中介" in r.get("test_name", "")]
pass_count = sum(1 for r in test_results if r["status"] == "PASS")
fail_count = sum(1 for r in test_results if r["status"] == "FAIL")
error_count = sum(1 for r in test_results if r["status"] == "ERROR")
print(f"\n总测试数: {len(test_results)}")
print(f"通过: {pass_count}")
print(f"失败: {fail_count}")
print(f"错误: {error_count}")
# 员工接口结果
print(f"\n--- 员工列表接口 (MyBatis Plus) ---")
print(f"测试数: {len(employee_results)}")
for r in employee_results:
status_icon = "" if r["status"] == "PASS" else ""
print(f"{status_icon} {r['test_name']}: {r['status']}")
if r["status"] == "PASS":
print(f" 页码: {r.get('page_num')}/{r.get('page_size')}, "
f"返回行数: {r.get('rows_count')}, 总数: {r.get('total')}")
else:
print(f" 错误: {r.get('error', 'Unknown')}")
# 中介黑名单接口结果
print(f"\n--- 中介黑名单接口 (若依startPage) ---")
print(f"测试数: {len(intermediary_results)}")
for r in intermediary_results:
status_icon = "" if r["status"] == "PASS" else ""
print(f"{status_icon} {r['test_name']}: {r['status']}")
if r["status"] == "PASS":
print(f" 页码: {r.get('page_num')}/{r.get('page_size')}, "
f"返回行数: {r.get('rows_count')}, 总数: {r.get('total')}")
else:
print(f" 错误: {r.get('error', 'Unknown')}")
# 总体结论
print(f"\n--- 测试结论 ---")
if fail_count == 0 and error_count == 0:
print("✓ 所有分页接口总数返回正常")
else:
print("✗ 存在分页接口总数返回异常")
# 保存报告到文件
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
report_file = f"test/pagination_test_report_{timestamp}.txt"
with open(report_file, "w", encoding="utf-8") as f:
f.write("=" * 60 + "\n")
f.write("分页接口总数测试报告\n")
f.write("=" * 60 + "\n\n")
f.write(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}\n\n")
f.write("测试统计:\n")
f.write(f" 总测试数: {len(test_results)}\n")
f.write(f" 通过: {pass_count}\n")
f.write(f" 失败: {fail_count}\n")
f.write(f" 错误: {error_count}\n\n")
f.write("测试接口:\n")
f.write(" 1. /dpc/employee/list - 员工列表MyBatis Plus分页\n")
f.write(" 2. /dpc/intermediary/list - 中介黑名单列表若依startPage分页\n\n")
f.write("-" * 60 + "\n")
f.write("详细结果:\n")
f.write("-" * 60 + "\n\n")
for r in test_results:
f.write(f"测试: {r['test_name']}\n")
f.write(f"API类型: {r['api_type']}\n")
f.write(f"状态: {r['status']}\n")
if r['status'] == 'PASS':
f.write(f" 页码: {r.get('page_num')}/{r.get('page_size')}\n")
f.write(f" 返回行数: {r.get('rows_count')}\n")
f.write(f" 总数: {r.get('total')}\n")
f.write(f" 预期行数: {r.get('expected_rows')}\n")
else:
f.write(f" 错误: {r.get('error', 'Unknown')}\n")
f.write("\n")
f.write("-" * 60 + "\n")
f.write("测试结论:\n")
if fail_count == 0 and error_count == 0:
f.write("✓ 所有分页接口总数返回正常\n")
else:
f.write("✗ 存在分页接口总数返回异常\n")
print(f"\n报告已保存至: {report_file}")
def main():
"""主函数"""
print("\n" + "=" * 60)
print("分页接口总数测试")
print("=" * 60)
print(f"测试时间: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
# 获取token
token = login()
if not token:
print("\n✗ 无法获取token测试终止")
return
print("\n✓ 登录成功,开始测试")
# 员工列表接口测试用例
employee_test_cases = [
{"page_num": 1, "page_size": 10, "desc": "员工列表 - 第1页(10条/页)"},
{"page_num": 2, "page_size": 10, "desc": "员工列表 - 第2页(10条/页)"},
{"page_num": 1, "page_size": 5, "desc": "员工列表 - 第1页(5条/页)"},
{"page_num": 1, "page_size": 20, "desc": "员工列表 - 第1页(20条/页)"},
]
print("\n" + "=" * 60)
print("测试员工列表接口MyBatis Plus分页")
print("=" * 60)
for test_case in employee_test_cases:
test_page(
EMPLOYEE_LIST_URL,
token,
test_case["page_num"],
test_case["page_size"],
test_case["desc"],
"MyBatis Plus"
)
# 测试总数一致性
test_consistency(
EMPLOYEE_LIST_URL,
token,
"员工列表-总数一致性",
"MyBatis Plus"
)
# 中介黑名单接口测试用例
intermediary_test_cases = [
{"page_num": 1, "page_size": 10, "desc": "中介黑名单 - 第1页(10条/页)"},
{"page_num": 2, "page_size": 10, "desc": "中介黑名单 - 第2页(10条/页)"},
{"page_num": 1, "page_size": 5, "desc": "中介黑名单 - 第1页(5条/页)"},
{"page_num": 1, "page_size": 20, "desc": "中介黑名单 - 第1页(20条/页)"},
]
print("\n" + "=" * 60)
print("测试中介黑名单接口若依startPage分页")
print("=" * 60)
for test_case in intermediary_test_cases:
test_page(
INTERMEDIARY_LIST_URL,
token,
test_case["page_num"],
test_case["page_size"],
test_case["desc"],
"若依startPage"
)
# 测试总数一致性
test_consistency(
INTERMEDIARY_LIST_URL,
token,
"中介黑名单-总数一致性",
"若依startPage"
)
# 生成报告
generate_report()
print("\n" + "=" * 60)
print("测试完成")
print("=" * 60)
if __name__ == "__main__":
main()