修复all模式月固定收入规则命中隔离问题

This commit is contained in:
wkc
2026-03-25 10:28:08 +08:00
parent 5eea3c66ff
commit 071c02192d
4 changed files with 268 additions and 10 deletions

View File

@@ -48,6 +48,21 @@ PHASE2_BASELINE_RULE_CODES = [
"SUPPLIER_CONCENTRATION",
]
MONTHLY_FIXED_INCOME_ISOLATED_LARGE_TRANSACTION_RULE_CODES = {
"SINGLE_LARGE_INCOME",
"CUMULATIVE_INCOME",
"ANNUAL_TURNOVER",
"LARGE_CASH_DEPOSIT",
"FREQUENT_CASH_DEPOSIT",
}
MONTHLY_FIXED_INCOME_ISOLATED_PHASE1_RULE_CODES = {
"SUSPICIOUS_INCOME_KEYWORD",
"FOREX_SELL_AMT",
}
MONTHLY_FIXED_INCOME_ISOLATED_PHASE2_RULE_CODES = {
"FIXED_COUNTERPARTY_TRANSFER",
}
RULE_CONFLICT_GROUPS = []
ALL_MODE_STATEMENT_BASELINE_RULE_CODES = {
"LOW_INCOME_RELATIVE_LARGE_TRANSACTION",
@@ -209,6 +224,28 @@ class FileService:
"phase2_baseline_hit_rules": list(PHASE2_BASELINE_RULE_CODES),
}
def _build_monthly_fixed_income_isolated_rule_hit_plan(self) -> dict:
"""为月固定收入准备不受正向流入污染的 all 模式计划。"""
full_plan = self._build_all_compatible_rule_hit_plan()
return {
"large_transaction_hit_rules": [
rule_code
for rule_code in full_plan["large_transaction_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_LARGE_TRANSACTION_RULE_CODES
],
"phase1_hit_rules": [
rule_code
for rule_code in full_plan["phase1_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_PHASE1_RULE_CODES
],
"phase2_statement_hit_rules": [
rule_code
for rule_code in full_plan["phase2_statement_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_PHASE2_RULE_CODES
],
"phase2_baseline_hit_rules": list(full_plan["phase2_baseline_hit_rules"]),
}
def _apply_conflict_groups(self, rule_plan: dict) -> dict:
"""按显式互斥组裁剪规则计划,同组仅保留固定优先级的首个规则。"""
resolved_plan = {plan_key: list(rule_codes) for plan_key, rule_codes in rule_plan.items()}
@@ -235,6 +272,42 @@ class FileService:
return self._apply_conflict_groups(self._build_all_compatible_rule_hit_plan())
return self._build_subset_rule_hit_plan(log_id)
def _apply_rule_hit_plan_to_record(self, file_record: FileRecord, rule_hit_plan: dict) -> None:
"""将规则命中计划回填到指定文件记录。"""
file_record.large_transaction_hit_rules = list(
rule_hit_plan.get("large_transaction_hit_rules", [])
)
file_record.phase1_hit_rules = list(rule_hit_plan.get("phase1_hit_rules", []))
file_record.phase2_statement_hit_rules = list(
rule_hit_plan.get("phase2_statement_hit_rules", [])
)
file_record.phase2_baseline_hit_rules = list(
rule_hit_plan.get("phase2_baseline_hit_rules", [])
)
def _rebalance_all_mode_group_rule_plans(self, group_id: int) -> None:
"""同项目存在多文件时,隔离月固定收入样本,避免被其他正向流入规则污染。"""
if settings.RULE_HIT_MODE != "all":
return
group_records = [
record for record in self.file_records.values()
if record.group_id == group_id
]
if not group_records:
return
full_plan = self._apply_conflict_groups(self._build_all_compatible_rule_hit_plan())
if len(group_records) == 1:
return
monthly_safe_plan = self._apply_conflict_groups(
self._build_monthly_fixed_income_isolated_rule_hit_plan()
)
self._apply_rule_hit_plan_to_record(group_records[0], monthly_safe_plan)
for record in group_records[1:]:
self._apply_rule_hit_plan_to_record(record, full_plan)
def _create_file_record(
self,
*,
@@ -375,6 +448,7 @@ class FileService:
# 存储记录
self.file_records[log_id] = file_record
self._rebalance_all_mode_group_rule_plans(group_id)
self._apply_phase2_baselines(file_record)
# 添加后台任务(延迟解析)
@@ -705,6 +779,7 @@ class FileService:
)
self.file_records[log_id] = file_record
self._rebalance_all_mode_group_rule_plans(group_id)
self._apply_phase2_baselines(file_record)
# 返回成功的响应包含logId数组

View File

@@ -228,6 +228,56 @@ def test_build_rule_hit_plan_should_keep_sql_aligned_target_rules_in_all_mode(mo
assert "FIXED_COUNTERPARTY_TRANSFER" in plan["phase2_statement_hit_rules"]
def test_fetch_inner_flow_should_rebalance_all_mode_records_for_monthly_fixed_income(monkeypatch):
monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all")
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("全命中主体", "6222000099990001"),
)
request = {
"groupId": 1001,
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
first_log_id = service.fetch_inner_flow(
{
**request,
"customerNo": "customer_monthly_safe_1",
}
)["data"][0]
second_log_id = service.fetch_inner_flow(
{
**request,
"customerNo": "customer_monthly_safe_2",
}
)["data"][0]
first_record = service.file_records[first_log_id]
second_record = service.file_records[second_log_id]
assert "MONTHLY_FIXED_INCOME" in first_record.phase2_statement_hit_rules
assert "FIXED_COUNTERPARTY_TRANSFER" not in first_record.phase2_statement_hit_rules
assert "SUSPICIOUS_INCOME_KEYWORD" not in first_record.phase1_hit_rules
assert "FOREX_SELL_AMT" not in first_record.phase1_hit_rules
assert "SINGLE_LARGE_INCOME" not in first_record.large_transaction_hit_rules
assert "CUMULATIVE_INCOME" not in first_record.large_transaction_hit_rules
assert "ANNUAL_TURNOVER" not in first_record.large_transaction_hit_rules
assert "LARGE_CASH_DEPOSIT" not in first_record.large_transaction_hit_rules
assert "FREQUENT_CASH_DEPOSIT" not in first_record.large_transaction_hit_rules
assert "FIXED_COUNTERPARTY_TRANSFER" in second_record.phase2_statement_hit_rules
assert "SUSPICIOUS_INCOME_KEYWORD" in second_record.phase1_hit_rules
assert "SINGLE_LARGE_INCOME" in second_record.large_transaction_hit_rules
def test_build_rule_hit_plan_should_not_include_placeholder_rules_in_all_mode(monkeypatch):
monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all")
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())

View File

@@ -258,6 +258,58 @@ def test_generate_statements_should_stay_within_single_employee_scope_per_log_id
assert {item["cretNo"] for item in statements}.issubset(allowed_id_cards)
def test_all_mode_monthly_fixed_income_log_should_keep_monthly_income_stable(monkeypatch):
monkeypatch.setattr("services.file_service.settings.RULE_HIT_MODE", "all")
file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
service = StatementService(file_service=file_service)
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("全命中主体", "6222000012345678"),
)
request = {
"groupId": 1001,
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
first_log_id = file_service.fetch_inner_flow(
{**request, "customerNo": "customer_monthly_income_1"}
)["data"][0]
file_service.fetch_inner_flow(
{**request, "customerNo": "customer_monthly_income_2"}
)
record = file_service.file_records[first_log_id]
statements = service._generate_statements(group_id=1001, log_id=first_log_id, count=200)
monthly_amounts = defaultdict(float)
for item in statements:
if item["cretNo"] != record.staff_id_card:
continue
if item["crAmount"] <= 0:
continue
if not item["customerName"]:
continue
if item["leName"] == item["customerName"]:
continue
if item["customerName"] == "浙江兰溪农村商业银行股份有限公司" and (
any(keyword in item["userMemo"] for keyword in ("代发", "工资", "奖金", "薪酬", "薪金", "补贴", "", "年终奖", "年金", "加班费", "劳务费", "劳务外包", "提成", "劳务派遣", "绩效", "酬劳", "PAYROLL", "SALA", "CPF", "directors", "批量代付"))
or any(keyword in item["cashType"] for keyword in ("代发", "工资", "劳务费"))
):
continue
monthly_amounts[item["trxDate"][:7]] += item["crAmount"]
assert monthly_amounts
assert len(monthly_amounts) == 6
assert set(monthly_amounts.values()) == {7200.0}
def test_generate_statements_should_only_use_recognizable_identity_cards():
"""命中样本和随机噪声都只能使用现库可识别的身份证号。"""
service = StatementService()