Files
ccdi/lsfx-mock-server/tests/integration/test_full_workflow.py
2026-03-25 10:05:30 +08:00

306 lines
10 KiB
Python

"""
集成测试 - 完整的接口调用流程测试
"""
import pytest
def test_complete_workflow(client):
"""测试完整的接口调用流程"""
# 1. 获取 Token
response = client.post(
"/account/common/getToken",
data={
"projectNo": "integration_test_001",
"entityName": "集成测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
assert response.status_code == 200
token_data = response.json()
assert token_data["code"] == "200"
project_id = token_data["data"]["projectId"]
token = token_data["data"]["token"]
assert token is not None
# 2. 上传文件(模拟)
# 注意:在测试环境中,我们跳过实际的文件上传,直接测试其他接口
# 3. 检查解析状态
response = client.post(
"/watson/api/project/upload/getpendings",
data={"groupId": project_id, "inprogressList": "10001"},
)
assert response.status_code == 200
status_data = response.json()
assert "parsing" in status_data["data"]
# 4. 获取银行流水
response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": project_id,
"logId": 10001,
"pageNow": 1,
"pageSize": 10,
},
)
assert response.status_code == 200
statement_data = response.json()
assert statement_data["code"] == "200"
assert "bankStatementList" in statement_data["data"]
assert "totalCount" in statement_data["data"]
def test_all_error_codes(client):
"""测试所有错误码"""
error_codes = ["40101", "40102", "40104", "40105", "40106", "40107", "40108"]
for error_code in error_codes:
response = client.post(
"/account/common/getToken",
data={
"projectNo": f"test_error_{error_code}",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == error_code, f"错误码 {error_code} 未正确触发"
assert data["successResponse"] == False
def test_pagination(client):
"""测试分页功能"""
# 获取 Token
response = client.post(
"/account/common/getToken",
data={
"projectNo": "pagination_test",
"entityName": "分页测试",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
project_id = response.json()["data"]["projectId"]
# 测试第一页
response = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": project_id, "logId": 10001, "pageNow": 1, "pageSize": 1},
)
page1 = response.json()
# 测试第二页
response = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": project_id, "logId": 10001, "pageNow": 2, "pageSize": 1},
)
page2 = response.json()
# 验证总记录数相同
assert page1["data"]["totalCount"] == page2["data"]["totalCount"]
# 验证页码不同
if page1["data"]["totalCount"] > 1:
assert len(page1["data"]["bankStatementList"]) == 1
assert len(page2["data"]["bankStatementList"]) >= 0
def test_upload_status_and_bank_statement_share_same_primary_binding(client, monkeypatch):
"""上传状态接口与银行流水接口对同一 logId 必须使用同一组主体/账号绑定。"""
from routers.api import file_service
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("链路主体", "6222555566667777"),
)
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_002",
"dataChannelCode": "channel_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert fetch_response.status_code == 200
log_id = fetch_response.json()["data"][0]
status_response = client.get(f"/watson/api/project/bs/upload?groupId=1001&logId={log_id}")
assert status_response.status_code == 200
status_log = status_response.json()["data"]["logs"][0]
statement_response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 5,
},
)
assert statement_response.status_code == 200
statements = statement_response.json()["data"]["bankStatementList"]
assert status_log["enterpriseNameList"] == ["链路主体"]
assert status_log["accountNoList"] == ["6222555566667777"]
assert statements
assert all(item["leName"] == status_log["enterpriseNameList"][0] for item in statements)
assert all(item["accountMaskNo"] == status_log["accountNoList"][0] for item in statements)
def test_inner_flow_bank_statement_should_keep_same_rule_subset(client):
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_subset",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
log_id = fetch_response.json()["data"][0]
page1 = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 10},
).json()
page2 = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 10},
).json()
assert page1["data"]["bankStatementList"] == page2["data"]["bankStatementList"]
def test_inner_flow_should_apply_phase2_baselines_before_get_bank_statement(client, monkeypatch):
from routers.api import file_service
applied = {}
def fake_apply(**kwargs):
applied["called"] = True
applied["baseline_rule_codes"] = kwargs["baseline_rule_codes"]
monkeypatch.setattr(file_service.phase2_baseline_service, "apply", fake_apply)
monkeypatch.setattr(
file_service,
"_build_rule_hit_plan",
lambda log_id: {
"large_transaction_hit_rules": [],
"phase1_hit_rules": [],
"phase2_statement_hit_rules": ["MONTHLY_FIXED_INCOME"],
"phase2_baseline_hit_rules": ["SUPPLIER_CONCENTRATION"],
},
)
response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "phase2_customer",
"dataChannelCode": "channel_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert response.status_code == 200
assert applied["called"] is True
assert applied["baseline_rule_codes"] == ["SUPPLIER_CONCENTRATION"]
def test_all_mode_should_expose_sql_aligned_target_rule_samples(client, monkeypatch):
from routers.api import file_service
applied = {}
def fake_apply(**kwargs):
applied["called"] = True
applied["baseline_rule_codes"] = kwargs["baseline_rule_codes"]
monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all")
monkeypatch.setattr(file_service.phase2_baseline_service, "apply", fake_apply)
monkeypatch.setattr(
file_service,
"_build_rule_hit_plan",
lambda log_id: {
"large_transaction_hit_rules": [],
"phase1_hit_rules": [
"SPECIAL_AMOUNT_TRANSACTION",
"SUSPICIOUS_INCOME_KEYWORD",
],
"phase2_statement_hit_rules": [
"LOW_INCOME_RELATIVE_LARGE_TRANSACTION",
"MONTHLY_FIXED_INCOME",
"FIXED_COUNTERPARTY_TRANSFER",
],
"phase2_baseline_hit_rules": [],
},
)
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_sql_aligned_all_mode",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert fetch_response.status_code == 200
log_id = fetch_response.json()["data"][0]
assert applied["called"] is True
assert applied["baseline_rule_codes"] == ["LOW_INCOME_RELATIVE_LARGE_TRANSACTION"]
statement_response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 200,
},
)
assert statement_response.status_code == 200
statements = statement_response.json()["data"]["bankStatementList"]
assert statements
assert any(item["transAmount"] == 1314.0 for item in statements)
assert any(item["userMemo"] == "劳务费发放" for item in statements)
assert any(item["userMemo"] == "月度稳定兼职收入" for item in statements)
assert any(item["userMemo"] == "季度稳定兼职收入" for item in statements)
assert any(item["userMemo"] in {"亲属大额转入", "亲属经营补贴"} for item in statements)