""" FileService 单一主绑定语义测试 """ import asyncio import io from fastapi import BackgroundTasks from fastapi.datastructures import UploadFile from services.file_service import ( LARGE_TRANSACTION_RULE_CODES, PHASE1_RULE_CODES, PHASE2_BASELINE_RULE_CODES, PHASE2_STATEMENT_RULE_CODES, FileRecord, FileService, ) class FakeStaffIdentityRepository: def select_random_staff_with_families(self): return { "staff_name": "数据库员工", "staff_id_card": "320101199001010030", "family_id_cards": ["320101199201010051", "320101199301010052"], } def test_upload_file_primary_binding_response(monkeypatch): """同一 logId 的主绑定必须稳定且只保留一组主体/账号信息。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_generate_primary_binding", lambda: ("测试主体A", "6222021234567890"), ) background_tasks = BackgroundTasks() file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock")) response = asyncio.run(service.upload_file(1001, file, background_tasks)) log = response["data"]["uploadLogList"][0] account_info = response["data"]["accountsOfLog"][str(log["logId"])][0] record = service.file_records[log["logId"]] assert log["enterpriseNameList"] == ["测试主体A"] assert log["accountNoList"] == ["6222021234567890"] assert account_info["accountName"] == "测试主体A" assert account_info["accountNo"] == "6222021234567890" assert record.primary_enterprise_name == "测试主体A" assert record.primary_account_no == "6222021234567890" assert record.enterprise_name_list == ["测试主体A"] assert record.account_no_list == ["6222021234567890"] def test_upload_file_total_records_range(monkeypatch): """上传文件返回的流水条数必须限制在 150-200 条。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_generate_primary_binding", lambda: ("测试主体C", "6222000011112222"), ) original_randint = __import__("services.file_service", fromlist=["random"]).random.randint def fake_randint(start, end): if (start, end) == (100, 300): return 300 return original_randint(start, end) monkeypatch.setattr("services.file_service.random.randint", fake_randint) background_tasks = BackgroundTasks() file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock")) response = asyncio.run(service.upload_file(1001, file, background_tasks)) total_records = response["data"]["uploadLogList"][0]["totalRecords"] assert 150 <= total_records <= 200 def test_upload_file_then_upload_status_reads_same_record(monkeypatch): """上传后再查状态时,上传状态接口必须读取同一条真实记录。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_generate_primary_binding", lambda: ("测试主体B", "6222333344445555"), ) background_tasks = BackgroundTasks() file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock")) upload_response = asyncio.run(service.upload_file(1001, file, background_tasks)) log = upload_response["data"]["uploadLogList"][0] monkeypatch.setattr( service, "_build_deterministic_log_detail", lambda *args, **kwargs: (_ for _ in ()).throw( AssertionError("真实记录存在时不应走 deterministic fallback") ), ) status_response = service.get_upload_status(1001, log["logId"]) status_log = status_response["data"]["logs"][0] assert status_log["enterpriseNameList"] == log["enterpriseNameList"] assert status_log["accountNoList"] == log["accountNoList"] assert status_log["bankName"] == log["bankName"] assert status_log["templateName"] == log["templateName"] assert status_log["uploadFileName"] == log["uploadFileName"] assert status_log["trxDateStartId"] == log["trxDateStartId"] assert status_log["trxDateEndId"] == log["trxDateEndId"] assert status_log["enterpriseNameList"] == ["测试主体B"] assert status_log["accountNoList"] == ["6222333344445555"] assert len(status_log["enterpriseNameList"]) == 1 assert len(status_log["accountNoList"]) == 1 def test_fetch_inner_flow_persists_primary_binding_record(monkeypatch): """拉取行内流水必须创建并保存数据库员工及亲属身份。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_generate_primary_binding", lambda: ("行内主体", "6210987654321098"), ) request = { "groupId": 1001, "customerNo": "test_customer_001", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } response = service.fetch_inner_flow(request) log_id = response["data"][0] assert 10000 <= log_id <= 99999 assert log_id in service.file_records record = service.file_records[log_id] assert record.parsing is False assert record.primary_enterprise_name assert record.primary_account_no assert record.staff_name == "数据库员工" assert record.staff_id_card == "320101199001010030" assert record.family_id_cards == ["320101199201010051", "320101199301010052"] assert record.primary_enterprise_name == "行内主体" assert record.primary_account_no == "6210987654321098" assert record.enterprise_name_list == ["行内主体"] assert record.account_no_list == ["6210987654321098"] assert record.total_records == 200 def test_generate_log_id_should_retry_when_random_value_conflicts(monkeypatch): """随机 logId 命中已存在记录时必须重试并返回未占用值。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) service.file_records[34567] = FileRecord( log_id=34567, group_id=1001, file_name="existing.csv", ) candidate_values = iter([34567, 45678]) monkeypatch.setattr( "services.file_service.random.randint", lambda start, end: next(candidate_values), ) assert service._generate_log_id() == 45678 def test_build_rule_hit_plan_should_be_deterministic_for_same_log_id(): service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) plan1 = service._build_rule_hit_plan(10001) plan2 = service._build_rule_hit_plan(10001) assert plan1 == plan2 assert 2 <= len(plan1["large_transaction_hit_rules"]) <= 4 assert 2 <= len(plan1["phase1_hit_rules"]) <= 4 def test_phase2_rule_hit_plan_should_be_deterministic_for_same_log_id(): service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) plan1 = service._build_rule_hit_plan(10001) plan2 = service._build_rule_hit_plan(10001) assert plan1 == plan2 assert 2 <= len(plan1["phase2_statement_hit_rules"]) <= 4 assert 2 <= len(plan1["phase2_baseline_hit_rules"]) <= 4 def test_build_rule_hit_plan_should_return_all_compatible_rules_in_all_mode(monkeypatch): monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all") service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) plan = service._build_rule_hit_plan(10001) assert plan["large_transaction_hit_rules"] == LARGE_TRANSACTION_RULE_CODES assert plan["phase1_hit_rules"] == PHASE1_RULE_CODES assert plan["phase2_statement_hit_rules"] == PHASE2_STATEMENT_RULE_CODES assert plan["phase2_baseline_hit_rules"] == PHASE2_BASELINE_RULE_CODES def test_build_rule_hit_plan_should_keep_subset_mode_as_default(): service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) plan1 = service._build_rule_hit_plan(10001) plan2 = service._build_rule_hit_plan(10001) assert plan1 == plan2 assert 2 <= len(plan1["large_transaction_hit_rules"]) <= 4 def test_build_rule_hit_plan_should_drop_conflicting_rules_from_all_mode(monkeypatch): monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all") monkeypatch.setattr( "services.file_service.RULE_CONFLICT_GROUPS", [["SALARY_QUICK_TRANSFER", "SALARY_UNUSED"]], ) service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) plan = service._build_rule_hit_plan(10001) assert not ( "SALARY_QUICK_TRANSFER" in plan["phase2_statement_hit_rules"] and "SALARY_UNUSED" in plan["phase2_statement_hit_rules"] ) def test_fetch_inner_flow_should_persist_rule_hit_plan(monkeypatch): service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_build_rule_hit_plan", lambda log_id: { "large_transaction_hit_rules": ["HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE"], "phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"], }, ) response = service.fetch_inner_flow( { "groupId": 1001, "customerNo": "test_customer_001", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = service.file_records[log_id] assert record.large_transaction_hit_rules == [ "HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE", ] assert record.phase1_hit_rules == [ "GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT", ] def test_fetch_inner_flow_should_persist_phase2_rule_hit_plan(monkeypatch): service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) monkeypatch.setattr( service, "_build_rule_hit_plan", lambda log_id: { "large_transaction_hit_rules": ["HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE"], "phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"], "phase2_statement_hit_rules": [ "LOW_INCOME_RELATIVE_LARGE_TRANSACTION", "SALARY_QUICK_TRANSFER", ], "phase2_baseline_hit_rules": [ "HOUSE_REGISTRATION_MISMATCH", "SUPPLIER_CONCENTRATION", ], }, ) response = service.fetch_inner_flow( { "groupId": 1001, "customerNo": "test_customer_001", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = service.file_records[log_id] assert record.phase2_statement_hit_rules == [ "LOW_INCOME_RELATIVE_LARGE_TRANSACTION", "SALARY_QUICK_TRANSFER", ] assert record.phase2_baseline_hit_rules == [ "HOUSE_REGISTRATION_MISMATCH", "SUPPLIER_CONCENTRATION", ]