""" 员工亲属关系功能测试脚本 测试员工身份证号字段改造后的功能 """ import requests import json BASE_URL = "http://localhost:8080" # 登录获取 token def login(): url = f"{BASE_URL}/login/test" data = { "username": "admin", "password": "admin123" } response = requests.post(url, json=data) result = response.json() if result.get("code") == 200: token = result.get("token") print("✓ 登录成功") return token else: print(f"✗ 登录失败: {result.get('msg')}") return None # 获取请求头 def get_headers(token): return { "Authorization": f"Bearer {token}", "Content-Type": "application/json" } # 测试1: 查询员工列表(用于下拉选择) def test_list_staff(): print("\n【测试1】查询员工列表") url = f"{BASE_URL}/ccdi/baseStaff/list" params = { "pageNum": 1, "pageSize": 10 } response = requests.get(url, params=params, headers=get_headers(token)) result = response.json() if result.get("code") == 200: rows = result.get("rows", []) print(f"✓ 查询成功,共 {len(rows)} 条员工记录") if rows: print(f" 示例: {rows[0].get('name')} - {rows[0].get('idCard')}") return rows[0].get("idCard") # 返回第一个员工的身份证号 else: print(" 警告: 暂无员工数据") return None else: print(f"✗ 查询失败: {result.get('msg')}") return None # 测试2: 根据身份证号搜索员工 def test_search_staff(keyword): print(f"\n【测试2】根据身份证号搜索员工: {keyword}") url = f"{BASE_URL}/ccdi/baseStaff/list" params = { "idCard": keyword, "pageNum": 1, "pageSize": 10 } response = requests.get(url, params=params, headers=get_headers(token)) result = response.json() if result.get("code") == 200: rows = result.get("rows", []) print(f"✓ 搜索成功,找到 {len(rows)} 条记录") for row in rows[:3]: # 只显示前3条 print(f" - {row.get('name')} - {row.get('idCard')}") return rows else: print(f"✗ 搜索失败: {result.get('msg')}") return [] # 测试3: 查询已有亲属关系(避免重复插入) def test_get_existing_relation(person_id): print(f"\n【测试3】查询已有员工亲属关系") url = f"{BASE_URL}/ccdi/staffFmyRelation/list" params = { "personId": person_id, "pageNum": 1, "pageSize": 1 } response = requests.get(url, params=params, headers=get_headers(token)) result = response.json() if result.get("code") == 200: rows = result.get("rows", []) if rows: relation_id = rows[0].get("id") print(f"✓ 找到已有记录, ID: {relation_id}") return relation_id else: print(" 未找到已有记录,准备新增测试数据") return None else: print(f"✗ 查询失败: {result.get('msg')}") return None # 测试4: 新增员工亲属关系(如果没有现有数据) def test_add_relation(person_id): print(f"\n【测试4】新增员工亲属关系") url = f"{BASE_URL}/ccdi/staffFmyRelation" import random random_suffix = random.randint(10000, 99999) data = { "personId": person_id, "relationType": "01", # 配偶 "relationName": "测试亲属", "gender": "M", "relationCertType": "01", # 身份证 "relationCertNo": f"11010119900101{random_suffix}", # 随机生成避免重复 "status": 1 } response = requests.post(url, json=data, headers=get_headers(token)) result = response.json() if result.get("code") == 200: # 若依框架新增接口返回的是 "操作成功" 消息,没有返回ID # 需要通过查询列表来获取新增的记录 print(f"✓ 新增成功") # 查询列表获取最新记录的ID list_url = f"{BASE_URL}/ccdi/staffFmyRelation/list" list_params = { "personId": person_id, "pageNum": 1, "pageSize": 1 } list_response = requests.get(list_url, params=list_params, headers=get_headers(token)) list_result = list_response.json() if list_result.get("rows"): relation_id = list_result.get("rows")[0].get("id") print(f" 获取到记录ID: {relation_id}") return relation_id return None else: print(f"✗ 新增失败: {result.get('msg')}") return None # 测试5: 查询员工亲属关系列表 def test_list_relations(person_id): print(f"\n【测试5】查询员工亲属关系列表") url = f"{BASE_URL}/ccdi/staffFmyRelation/list" params = { "personId": person_id, "pageNum": 1, "pageSize": 10 } response = requests.get(url, params=params, headers=get_headers(token)) result = response.json() if result.get("code") == 200: rows = result.get("rows", []) total = result.get("total", 0) print(f"✓ 查询成功,共 {total} 条记录") for row in rows: print(f" - {row.get('personName')}({row.get('personId')}) 的{row.get('relationType')} - {row.get('relationName')}") return rows else: print(f"✗ 查询失败: {result.get('msg')}") return [] # 测试6: 查询亲属关系详情 def test_get_relation_detail(relation_id): print(f"\n【测试6】查询亲属关系详情: ID={relation_id}") url = f"{BASE_URL}/ccdi/staffFmyRelation/{relation_id}" response = requests.get(url, headers=get_headers(token)) result = response.json() if result.get("code") == 200: data = result.get("data") print(f"✓ 查询成功") print(f" 员工: {data.get('personName')}({data.get('personId')})") print(f" 关系: {data.get('relationType')} - {data.get('relationName')}") print(f" 状态: {'有效' if data.get('status') == 1 else '无效'}") return data else: print(f"✗ 查询失败: {result.get('msg')}") return None # 测试7: 编辑员工亲属关系(只修改非身份证号字段) def test_update_relation(relation_id): print(f"\n【测试7】编辑员工亲属关系: ID={relation_id}") url = f"{BASE_URL}/ccdi/staffFmyRelation" data = { "id": relation_id, "relationName": "测试亲属(已修改)", "mobilePhone1": "13800138000", "status": 1 } response = requests.put(url, json=data, headers=get_headers(token)) result = response.json() if result.get("code") == 200: print(f"✓ 修改成功") return True else: print(f"✗ 修改失败: {result.get('msg')}") return False # 测试8: 删除员工亲属关系 def test_delete_relation(relation_id): print(f"\n【测试8】删除员工亲属关系: ID={relation_id}") url = f"{BASE_URL}/ccdi/staffFmyRelation/{relation_id}" response = requests.delete(url, headers=get_headers(token)) result = response.json() if result.get("code") == 200: print(f"✓ 删除成功") return True else: print(f"✗ 删除失败: {result.get('msg')}") return False # 主测试流程 def main(): global token print("=" * 60) print("员工亲属关系功能测试") print("=" * 60) # 登录 token = login() if not token: return # 测试1: 查询员工列表 person_id = test_list_staff() if not person_id: print("\n提示: 请先在系统中添加员工数据") return # 测试2: 搜索员工 test_search_staff(person_id[:6]) # 使用身份证号前6位搜索 # 测试3: 查询已有亲属关系 relation_id = test_get_existing_relation(person_id) # 如果没有已有记录,则新增 if not relation_id: relation_id = test_add_relation(person_id) if not relation_id: print("\n提示: 无法获取测试记录ID,跳过后续测试") return # 测试5: 查询亲属关系列表 test_list_relations(person_id) # 测试6: 查询详情 test_get_relation_detail(relation_id) # 测试7: 编辑亲属关系 test_update_relation(relation_id) # 测试8: 再次查看详情(验证修改) test_get_relation_detail(relation_id) # 清理测试数据(仅限新增的数据) print("\n【清理】测试结束(保留现有数据)") print("\n" + "=" * 60) print("测试完成!") print("=" * 60) if __name__ == "__main__": main()