""" StatementService 主绑定注入测试 """ from collections import Counter, defaultdict import services.statement_rule_samples as statement_rule_samples from services.file_service import FileService from services.statement_service import StatementService from services.statement_rule_samples import ( DEFAULT_LARGE_TRANSACTION_THRESHOLDS, build_fixed_counterparty_transfer_samples, build_large_transaction_seed_statements, build_low_income_relative_large_transaction_samples, build_monthly_fixed_income_samples, build_seed_statements_for_rule_plan, build_special_amount_transaction_samples, build_suspicious_income_keyword_samples, ) class FakeStaffIdentityRepository: def select_random_staff_with_families(self): return { "staff_name": "数据库员工", "staff_id_card": "320101199001010030", "family_id_cards": ["320101199201010051", "320101199301010052"], } class FakeAbnormalAccountBaselineService: def apply(self, staff_id_card, abnormal_accounts): return None def test_generate_statements_should_include_seeded_samples_before_noise_when_rule_plan_exists(): """存在规则命中计划时,生成流水必须先混入被选中的命中样本。""" file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_seeded_samples", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.large_transaction_hit_rules = ["HOUSE_OR_CAR_EXPENSE"] record.phase1_hit_rules = [] statements = service._generate_statements(group_id=1000, log_id=log_id, count=30) assert len(statements) >= 30 assert any(item["userMemo"] == "购买房产首付款" for item in statements) def test_build_seed_statements_for_rule_plan_should_only_include_requested_phase1_rules(): plan = { "large_transaction_hit_rules": [], "phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"], } statements = build_seed_statements_for_rule_plan( group_id=1000, log_id=20001, rule_plan=plan, ) assert any("游戏" in item["userMemo"] for item in statements) assert any("购汇" in item["userMemo"] for item in statements) assert not any("证券" in item["userMemo"] for item in statements) def test_build_seed_statements_for_rule_plan_should_generate_withdraw_cnt_samples(): plan = { "large_transaction_hit_rules": [], "phase1_hit_rules": ["WITHDRAW_CNT"], } statements = build_seed_statements_for_rule_plan( group_id=1000, log_id=20001, rule_plan=plan, ) assert len( [ item for item in statements if "微信提现" in item["userMemo"] or "支付宝提现" in item["userMemo"] ] ) >= 4 def test_build_seed_statements_for_rule_plan_should_only_include_requested_phase2_rules(): plan = { "large_transaction_hit_rules": [], "phase1_hit_rules": [], "phase2_statement_hit_rules": [ "MULTI_PARTY_GAMBLING_TRANSFER", "SALARY_QUICK_TRANSFER", ], "phase2_baseline_hit_rules": [], } statements = build_seed_statements_for_rule_plan( group_id=1000, log_id=30001, rule_plan=plan, ) assert any(item["userMemo"] == "工资入账" for item in statements) assert any(item["customerName"] == "欢乐游戏科技有限公司" for item in statements) assert not any(item["userMemo"] == "季度稳定兼职收入" for item in statements) def test_salary_quick_transfer_and_salary_unused_should_use_different_identity_groups(): plan = { "large_transaction_hit_rules": [], "phase1_hit_rules": [], "phase2_statement_hit_rules": [ "SALARY_QUICK_TRANSFER", "SALARY_UNUSED", ], "phase2_baseline_hit_rules": [], } statements = build_seed_statements_for_rule_plan( group_id=1000, log_id=30001, rule_plan=plan, ) salary_id_cards = { item["cretNo"] for item in statements if item["userMemo"] == "工资入账" } assert len(salary_id_cards) >= 2 def test_large_transaction_seed_should_cover_all_eight_rules(): """大额交易样本生成器必须覆盖 8 条已实现规则的关键口径。""" statements = build_large_transaction_seed_statements(group_id=1000, log_id=20001) assert any( item["userMemo"] == "购买房产首付款" and item["drAmount"] > 0 for item in statements ) assert any( "税" in item["userMemo"] and item["drAmount"] > 0 for item in statements ) assert any( item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["SINGLE_TRANSACTION_AMOUNT"] for item in statements ) assert sum( 1 for item in statements if item["customerName"] == "浙江远望贸易有限公司" and item["crAmount"] > 0 ) >= 3 assert sum( 1 for item in statements if item["cashType"] == "现金存款" and item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["LARGE_CASH_DEPOSIT"] ) >= 1 assert any( item["userMemo"] == "手机银行转账" and item["drAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["FREQUENT_TRANSFER"] for item in statements ) same_day_cash_deposits = [ item for item in statements if item["cretNo"] == "330101198801010011" and item["trxDate"].startswith("2026-03-10") and item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["LARGE_CASH_DEPOSIT"] ] assert len(same_day_cash_deposits) >= ( DEFAULT_LARGE_TRANSACTION_THRESHOLDS["FREQUENT_CASH_DEPOSIT"] + 1 ) def test_large_transaction_seed_income_should_avoid_salary_exclusion(): """收入样本不得误带工资代发关键词,否则会被后端过滤。""" statements = build_large_transaction_seed_statements(group_id=1000, log_id=20001) income_samples = [item for item in statements if item["crAmount"] > 0] assert income_samples assert all(item["customerName"] != "浙江兰溪农村商业银行股份有限公司" for item in income_samples) assert all( keyword not in item["userMemo"] for item in income_samples for keyword in ("代发", "工资", "奖金", "薪酬", "薪金") ) def test_generate_statements_should_fill_noise_up_to_requested_count(): """样本不足请求总数时,服务层需要自动补齐噪声流水。""" service = StatementService() statements = service._generate_statements(group_id=1000, log_id=20001, count=80) assert len(statements) == 80 def test_generate_statements_should_follow_rule_hit_plan_from_file_record(): file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_rule_plan", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.large_transaction_hit_rules = ["HOUSE_OR_CAR_EXPENSE"] record.phase1_hit_rules = ["GAMBLING_SENSITIVE_KEYWORD"] statements = statement_service._generate_statements(group_id=1001, log_id=log_id, count=200) assert any("房产首付款" in item["userMemo"] for item in statements) assert any("游戏" in item["userMemo"] for item in statements) assert not any("购汇" in item["userMemo"] for item in statements) def test_sudden_account_closure_samples_should_stay_within_30_days_before_invalid_date(): statements = statement_rule_samples.build_sudden_account_closure_samples( group_id=1000, log_id=20001, account_fact={ "account_no": "6222000000000001", "owner_id_card": "320101199001010030", "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", }, le_name="测试主体", ) assert statements assert all("6222000000000001" == item["accountMaskNo"] for item in statements) assert all("2026-02-18" <= item["trxDate"][:10] < "2026-03-20" for item in statements) def test_dormant_account_large_activation_samples_should_exceed_threshold_after_6_months(): statements = statement_rule_samples.build_dormant_account_large_activation_samples( group_id=1000, log_id=20001, account_fact={ "account_no": "6222000000000002", "owner_id_card": "320101199001010030", "account_name": "测试员工工资卡", "status": 1, "effective_date": "2025-01-01", "invalid_date": None, }, le_name="测试主体", ) assert statements assert min(item["trxDate"][:10] for item in statements) >= "2025-07-01" assert sum(item["drAmount"] + item["crAmount"] for item in statements) >= 500000 assert max(item["drAmount"] + item["crAmount"] for item in statements) >= 100000 def test_generate_statements_should_follow_abnormal_account_rule_plan_from_file_record(): file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_rule_plan", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.abnormal_account_hit_rules = ["SUDDEN_ACCOUNT_CLOSURE"] record.abnormal_accounts = [ { "account_no": "6222000000000001", "owner_id_card": record.staff_id_card, "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", } ] statements = statement_service._generate_statements(group_id=1001, log_id=log_id, count=80) assert any(item["accountMaskNo"] == "6222000000000001" for item in statements) assert any("销户" in item["userMemo"] or "异常账户" in item["userMemo"] for item in statements) def test_get_bank_statement_should_preserve_abnormal_account_mask_no(): file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_api", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.abnormal_account_hit_rules = [ "SUDDEN_ACCOUNT_CLOSURE", "DORMANT_ACCOUNT_LARGE_ACTIVATION", ] record.abnormal_accounts = [ { "account_no": "6222000000000001", "owner_id_card": record.staff_id_card, "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", "rule_code": "SUDDEN_ACCOUNT_CLOSURE", }, { "account_no": "6222000000000002", "owner_id_card": record.staff_id_card, "account_name": "测试员工工资卡", "status": 1, "effective_date": "2025-01-01", "invalid_date": None, "rule_code": "DORMANT_ACCOUNT_LARGE_ACTIVATION", }, ] response = statement_service.get_bank_statement( { "groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 500, } ) statements = response["data"]["bankStatementList"] abnormal_statements = [ item for item in statements if "销户" in item["userMemo"] or "激活" in item["userMemo"] ] assert abnormal_statements assert any(item["accountMaskNo"] == "6222000000000001" for item in abnormal_statements) assert any(item["accountMaskNo"] == "6222000000000002" for item in abnormal_statements) def test_get_bank_statement_should_only_use_abnormal_account_numbers_from_file_record(): file_service = FileService( staff_identity_repository=FakeStaffIdentityRepository(), abnormal_account_baseline_service=FakeAbnormalAccountBaselineService(), ) statement_service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_statement_consistency", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.abnormal_account_hit_rules = ["SUDDEN_ACCOUNT_CLOSURE"] record.abnormal_accounts = [ { "account_no": "6222000000000099", "owner_id_card": record.staff_id_card, "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", "rule_code": "SUDDEN_ACCOUNT_CLOSURE", } ] result = statement_service.get_bank_statement( {"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 500} ) abnormal_numbers = { item["accountMaskNo"] for item in result["data"]["bankStatementList"] if "销户" in item["userMemo"] or "异常账户" in item["userMemo"] } assert abnormal_numbers == {"6222000000000099"} def test_generate_statements_should_stay_within_single_employee_scope_per_log_id(): """同一 logId 的流水只能落在 FileRecord 绑定的员工及亲属身份证内。""" file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_scope", "dataChannelCode": "test", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] allowed_id_cards = {record.staff_id_card, *record.family_id_cards} statements = service._generate_statements(group_id=1000, log_id=log_id, count=1600) assert {item["cretNo"] for item in statements}.issubset(allowed_id_cards) def test_get_bank_statement_should_include_counterparty_identity_fields(): service = StatementService() response = service.get_bank_statement( { "groupId": 1000, "logId": 20001, "pageNow": 1, "pageSize": 5, } ) statements = response["data"]["bankStatementList"] assert statements assert all("customerCertNo" in item for item in statements) assert all("customerSocialCreditCode" in item for item in statements) def test_all_mode_monthly_fixed_income_log_should_keep_monthly_income_stable(monkeypatch): monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all") file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) service = StatementService(file_service=file_service) monkeypatch.setattr( file_service, "_generate_primary_binding", lambda: ("全命中主体", "6222000012345678"), ) request = { "groupId": 1001, "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } first_log_id = file_service.fetch_inner_flow( {**request, "customerNo": "customer_monthly_income_1"} )["data"][0] file_service.fetch_inner_flow( {**request, "customerNo": "customer_monthly_income_2"} ) record = file_service.file_records[first_log_id] statements = service._generate_statements(group_id=1001, log_id=first_log_id, count=200) monthly_amounts = defaultdict(float) for item in statements: if item["cretNo"] != record.staff_id_card: continue if item["crAmount"] <= 0: continue if not item["customerName"]: continue if item["leName"] == item["customerName"]: continue if item["customerName"] == "浙江兰溪农村商业银行股份有限公司" and ( any(keyword in item["userMemo"] for keyword in ("代发", "工资", "奖金", "薪酬", "薪金", "补贴", "薪", "年终奖", "年金", "加班费", "劳务费", "劳务外包", "提成", "劳务派遣", "绩效", "酬劳", "PAYROLL", "SALA", "CPF", "directors", "批量代付")) or any(keyword in item["cashType"] for keyword in ("代发", "工资", "劳务费")) ): continue monthly_amounts[item["trxDate"][:7]] += item["crAmount"] assert monthly_amounts assert len(monthly_amounts) == 6 assert set(monthly_amounts.values()) == {7200.0} def test_generate_statements_should_only_use_recognizable_identity_cards(): """命中样本和随机噪声都只能使用现库可识别的身份证号。""" service = StatementService() statements = service._generate_statements(group_id=1000, log_id=20005, count=1600) assert {item["cretNo"] for item in statements}.issubset( { "330101198801010011", "330101199001010022", "330101198802020033", "330101199202020044", } ) def test_get_bank_statement_should_keep_same_cached_result_for_same_log_id(): """同一 logId 首次生成后应复用缓存,避免分页结果漂移。""" service = StatementService() page1 = service.get_bank_statement( {"groupId": 1000, "logId": 30001, "pageNow": 1, "pageSize": 20} ) page2 = service.get_bank_statement( {"groupId": 1000, "logId": 30001, "pageNow": 1, "pageSize": 20} ) assert page1["data"]["bankStatementList"] == page2["data"]["bankStatementList"] def test_get_bank_statement_should_use_fixed_total_count_200(): """兰溪本地流水列表首次生成后,总条数应固定为 200。""" service = StatementService() response = service.get_bank_statement( {"groupId": 1000, "logId": 30002, "pageNow": 1, "pageSize": 500} ) assert response["data"]["totalCount"] == 200 assert len(response["data"]["bankStatementList"]) == 200 def test_get_bank_statement_uses_primary_binding_from_file_service(monkeypatch): """同一 logId 的流水记录必须复用 FileService 中的主体与账号绑定。""" file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) monkeypatch.setattr( file_service, "_generate_primary_binding", lambda: ("绑定主体", "6222000011112222"), ) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_001", "dataChannelCode": "test", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] statement_response = statement_service.get_bank_statement( { "groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 5, } ) statements = statement_response["data"]["bankStatementList"] assert statements assert all(item["leName"] == record.primary_enterprise_name for item in statements) assert all(item["accountMaskNo"] == record.primary_account_no for item in statements) def test_get_bank_statement_contains_large_transaction_hit_samples(monkeypatch): """流水 Mock 首次生成时必须稳定包含可命中大额交易规则的样本簇。""" file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) monkeypatch.setattr( file_service, "_generate_primary_binding", lambda: ("命中主体", "6222000099998888"), ) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_large_transaction", "dataChannelCode": "test", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.large_transaction_hit_rules = [ "HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE", "CUMULATIVE_INCOME", "FREQUENT_CASH_DEPOSIT", "LARGE_TRANSFER", ] record.phase1_hit_rules = [] staff_id_card = record.staff_id_card family_id_card = record.family_id_cards[0] statement_response = statement_service.get_bank_statement( { "groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 2000, } ) statements = statement_response["data"]["bankStatementList"] assert statements assert any( item["cretNo"] in {staff_id_card, family_id_card} for item in statements ) assert any("房产首付款" in item["userMemo"] for item in statements) assert any("税" in item["userMemo"] or "税务" in item["customerName"] for item in statements) income_amounts = defaultdict(float) cash_deposit_daily_counter = Counter() has_large_transfer = False for item in statements: if ( item["cretNo"] == staff_id_card and item["customerName"] == "浙江远望贸易有限公司" and item["crAmount"] > 0 ): income_amounts[(item["cretNo"], item["customerName"])] += item["crAmount"] if item["crAmount"] > 2000001 and "现金" in item["cashType"]: cash_deposit_daily_counter[(item["cretNo"], item["trxDate"][:10])] += 1 if item["drAmount"] > 100001 and item["userMemo"] == "手机银行转账": has_large_transfer = True assert any(amount > 50000001 for amount in income_amounts.values()) assert any(count >= 6 for count in cash_deposit_daily_counter.values()) assert has_large_transfer def test_special_amount_transaction_samples_should_use_sql_supported_amounts(): statements = build_special_amount_transaction_samples( group_id=1000, log_id=20001, ) assert statements assert {item["transAmount"] for item in statements}.issubset({520.0, 1314.0}) def test_monthly_fixed_income_samples_should_cover_at_least_six_months(): statements = build_monthly_fixed_income_samples( group_id=1000, log_id=20001, ) income_months = {item["trxDate"][:7] for item in statements} assert len(income_months) >= 6 def test_suspicious_income_keyword_samples_should_hit_sql_keyword_set(): statements = build_suspicious_income_keyword_samples( group_id=1000, log_id=20001, ) assert statements assert any( any(keyword in item["userMemo"] for keyword in ("工资", "分红", "红利", "奖金", "劳务费", "绩效")) or any(keyword in item["cashType"] for keyword in ("工资", "劳务费", "代发")) for item in statements ) def test_fixed_counterparty_transfer_samples_should_use_staff_identity(): statements = build_fixed_counterparty_transfer_samples( group_id=1000, log_id=20001, staff_id_card="320101199001010030", family_id_cards=["320101199201010051", "320101199301010052"], ) assert statements assert {item["cretNo"] for item in statements} == {"320101199001010030"} def test_low_income_relative_large_transaction_samples_should_exceed_total_threshold(): statements = build_low_income_relative_large_transaction_samples( group_id=1000, log_id=20001, staff_id_card="320101199001010030", family_id_cards=["320101199201010051", "320101199301010052"], ) assert statements assert len({item["cretNo"] for item in statements}) == 1 assert sum(item["crAmount"] + item["drAmount"] for item in statements) > 100000 def test_generate_statements_should_keep_all_mode_noise_as_safe_debits(monkeypatch): file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) statement_service = StatementService(file_service=file_service) monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all") response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_safe_noise", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.large_transaction_hit_rules = [] record.phase1_hit_rules = [] record.phase2_statement_hit_rules = [ "MONTHLY_FIXED_INCOME", "FIXED_COUNTERPARTY_TRANSFER", ] record.abnormal_account_hit_rules = [] record.abnormal_accounts = [] statements = statement_service._generate_statements(group_id=1001, log_id=log_id, count=30) noise_statements = [ item for item in statements if item["userMemo"] not in {"月度稳定兼职收入", "季度稳定兼职收入"} ] assert noise_statements assert all(item["drAmount"] > 0 and item["crAmount"] == 0 for item in noise_statements)