Files
ccdi/lsfx-mock-server/tests/test_file_service.py

221 lines
7.7 KiB
Python

"""
FileService 单一主绑定语义测试
"""
import asyncio
import io
from fastapi import BackgroundTasks
from fastapi.datastructures import UploadFile
from services.file_service import FileRecord, FileService
class FakeStaffIdentityRepository:
def select_random_staff_with_families(self):
return {
"staff_name": "数据库员工",
"staff_id_card": "320101199001010030",
"family_id_cards": ["320101199201010051", "320101199301010052"],
}
def test_upload_file_primary_binding_response(monkeypatch):
"""同一 logId 的主绑定必须稳定且只保留一组主体/账号信息。"""
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("测试主体A", "6222021234567890"),
)
background_tasks = BackgroundTasks()
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
response = asyncio.run(service.upload_file(1001, file, background_tasks))
log = response["data"]["uploadLogList"][0]
account_info = response["data"]["accountsOfLog"][str(log["logId"])][0]
record = service.file_records[log["logId"]]
assert log["enterpriseNameList"] == ["测试主体A"]
assert log["accountNoList"] == ["6222021234567890"]
assert account_info["accountName"] == "测试主体A"
assert account_info["accountNo"] == "6222021234567890"
assert record.primary_enterprise_name == "测试主体A"
assert record.primary_account_no == "6222021234567890"
assert record.enterprise_name_list == ["测试主体A"]
assert record.account_no_list == ["6222021234567890"]
def test_upload_file_total_records_range(monkeypatch):
"""上传文件返回的流水条数必须限制在 150-200 条。"""
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("测试主体C", "6222000011112222"),
)
original_randint = __import__("services.file_service", fromlist=["random"]).random.randint
def fake_randint(start, end):
if (start, end) == (100, 300):
return 300
return original_randint(start, end)
monkeypatch.setattr("services.file_service.random.randint", fake_randint)
background_tasks = BackgroundTasks()
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
response = asyncio.run(service.upload_file(1001, file, background_tasks))
total_records = response["data"]["uploadLogList"][0]["totalRecords"]
assert 150 <= total_records <= 200
def test_upload_file_then_upload_status_reads_same_record(monkeypatch):
"""上传后再查状态时,上传状态接口必须读取同一条真实记录。"""
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("测试主体B", "6222333344445555"),
)
background_tasks = BackgroundTasks()
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
upload_response = asyncio.run(service.upload_file(1001, file, background_tasks))
log = upload_response["data"]["uploadLogList"][0]
monkeypatch.setattr(
service,
"_build_deterministic_log_detail",
lambda *args, **kwargs: (_ for _ in ()).throw(
AssertionError("真实记录存在时不应走 deterministic fallback")
),
)
status_response = service.get_upload_status(1001, log["logId"])
status_log = status_response["data"]["logs"][0]
assert status_log["enterpriseNameList"] == log["enterpriseNameList"]
assert status_log["accountNoList"] == log["accountNoList"]
assert status_log["bankName"] == log["bankName"]
assert status_log["templateName"] == log["templateName"]
assert status_log["uploadFileName"] == log["uploadFileName"]
assert status_log["trxDateStartId"] == log["trxDateStartId"]
assert status_log["trxDateEndId"] == log["trxDateEndId"]
assert status_log["enterpriseNameList"] == ["测试主体B"]
assert status_log["accountNoList"] == ["6222333344445555"]
assert len(status_log["enterpriseNameList"]) == 1
assert len(status_log["accountNoList"]) == 1
def test_fetch_inner_flow_persists_primary_binding_record(monkeypatch):
"""拉取行内流水必须创建并保存数据库员工及亲属身份。"""
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("行内主体", "6210987654321098"),
)
request = {
"groupId": 1001,
"customerNo": "test_customer_001",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
response = service.fetch_inner_flow(request)
log_id = response["data"][0]
assert 10000 <= log_id <= 99999
assert log_id in service.file_records
record = service.file_records[log_id]
assert record.parsing is False
assert record.primary_enterprise_name
assert record.primary_account_no
assert record.staff_name == "数据库员工"
assert record.staff_id_card == "320101199001010030"
assert record.family_id_cards == ["320101199201010051", "320101199301010052"]
assert record.primary_enterprise_name == "行内主体"
assert record.primary_account_no == "6210987654321098"
assert record.enterprise_name_list == ["行内主体"]
assert record.account_no_list == ["6210987654321098"]
assert record.total_records == 200
def test_generate_log_id_should_retry_when_random_value_conflicts(monkeypatch):
"""随机 logId 命中已存在记录时必须重试并返回未占用值。"""
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
service.file_records[34567] = FileRecord(
log_id=34567,
group_id=1001,
file_name="existing.csv",
)
candidate_values = iter([34567, 45678])
monkeypatch.setattr(
"services.file_service.random.randint",
lambda start, end: next(candidate_values),
)
assert service._generate_log_id() == 45678
def test_build_rule_hit_plan_should_be_deterministic_for_same_log_id():
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
plan1 = service._build_rule_hit_plan(10001)
plan2 = service._build_rule_hit_plan(10001)
assert plan1 == plan2
assert 2 <= len(plan1["large_transaction_hit_rules"]) <= 4
assert 2 <= len(plan1["phase1_hit_rules"]) <= 4
def test_fetch_inner_flow_should_persist_rule_hit_plan(monkeypatch):
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_build_rule_hit_plan",
lambda log_id: {
"large_transaction_hit_rules": ["HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE"],
"phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"],
},
)
response = service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "test_customer_001",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = service.file_records[log_id]
assert record.large_transaction_hit_rules == [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
]
assert record.phase1_hit_rules == [
"GAMBLING_SENSITIVE_KEYWORD",
"FOREX_BUY_AMT",
]