Merge branch 'codex/lsfx-mock-random-hit-rule-backend' into dev

This commit is contained in:
wkc
2026-03-20 14:52:17 +08:00
9 changed files with 897 additions and 223 deletions

View File

@@ -0,0 +1,43 @@
# LSFX Mock 随机命中规则后端实施记录
## 修改范围
- `lsfx-mock-server/services/file_service.py`
- `lsfx-mock-server/services/statement_rule_samples.py`
- `lsfx-mock-server/services/statement_service.py`
- `lsfx-mock-server/tests/test_file_service.py`
- `lsfx-mock-server/tests/test_statement_service.py`
- `lsfx-mock-server/tests/integration/test_full_workflow.py`
- `sql/migration/2026-03-20-lsfx-mock-random-hit-rule-purchase-baseline.sql`
## 规则命中计划生成方式
-`FileService` 中新增 `LARGE_TRANSACTION_RULE_CODES``PHASE1_RULE_CODES` 两组规则池。
- 新增 `_build_rule_hit_plan(log_id)`,使用 `random.Random(f"rule-plan:{log_id}")` 生成稳定随机源。
- 通过 `_pick_rule_subset()` 从两组规则池内分别稳定选出 `2-4` 条规则,并保留规则池原始顺序。
-`upload_file()``fetch_inner_flow()` 创建 `FileRecord` 时同步写入:
- `large_transaction_hit_rules`
- `phase1_hit_rules`
## 样本模块按规则子集装配
- 将原有“大额交易全量样本”拆成按规则代码独立调用的 builder。
- 新增 `LARGE_TRANSACTION_BUILDERS``PHASE1_RULE_BUILDERS` 两组映射,覆盖:
- 大额交易 8 条规则
- 第一期可由银行流水构造的 8 条规则
- 提供统一入口 `build_seed_statements_for_rule_plan(...)`,仅按 `rule_plan` 中被选中的规则拼装最小命中样本,不再默认返回全量命中样本。
- `build_large_transaction_seed_statements(...)` 保留为兼容测试入口,但内部已改为走新的规则映射。
## StatementService 接通方式
- `StatementService._generate_statements()` 改为优先读取 `FileRecord` 中保存的命中计划。
- 若存在真实 `FileRecord`,则复用其主体、账号、员工及亲属身份证范围,并把命中计划传给 `build_seed_statements_for_rule_plan(...)`
- 命中样本与随机噪声流水继续统一走 `_assign_statement_ids()` 分配稳定 ID。
- 首次生成后仍缓存固定 `200` 条流水;同一 `logId` 重复分页读取保持结果稳定。
## LARGE_PURCHASE_TRANSACTION 单独补数据库基线原因
- `LARGE_PURCHASE_TRANSACTION` 的真实命中来源是 `ccdi_purchase_transaction`,不依赖 `ccdi_bank_statement`
- 为避免伪造银行流水造成业务链路偏移,本次不把该规则塞进 Mock 流水样本。
- 通过新增 `sql/migration/2026-03-20-lsfx-mock-random-hit-rule-purchase-baseline.sql`,只补一条最小采购记录 `LSFXMOCKPUR001`
- 脚本通过 `ccdi_base_staff` 选取一条真实员工主数据作为 `applicant_id/applicant_name` 来源,`actual_amount=186000.00`,满足真实 SQL 的 `>100000` 命中门槛。
## 实施结果
- `FileService -> StatementService -> 缓存分页` 主链路保持不变。
- 大额交易规则与第一期新增规则均已支持“按 `logId` 稳定随机命中一部分”。
- `LARGE_PURCHASE_TRANSACTION` 已通过独立数据库基线补齐联调数据来源。

View File

@@ -0,0 +1,70 @@
# LSFX Mock 随机命中规则后端验证记录
## 执行命令
```bash
cd lsfx-mock-server
python3 -m pytest tests/test_file_service.py -k "rule_hit_plan or persist_rule_hit_plan" -v
python3 -m pytest tests/test_statement_service.py -k "rule_plan_should_only_include or withdraw_cnt_samples" -v
python3 -m pytest tests/test_statement_service.py -k "follow_rule_hit_plan or fixed_total_count_200 or cached_result" -v
python3 -m pytest tests/integration/test_full_workflow.py -k "same_rule_subset or share_same_primary_binding" -v
python3 -m pytest tests/test_file_service.py tests/test_statement_service.py tests/test_api.py tests/integration/test_full_workflow.py -v
cd ..
bin/mysql_utf8_exec.sh sql/migration/2026-03-20-lsfx-mock-random-hit-rule-purchase-baseline.sql
python3 - <<'PY'
import pymysql
from pathlib import Path
import re
text = Path('ruoyi-admin/src/main/resources/application-dev.yml').read_text(encoding='utf-8')
match = re.search(r"url:\s*jdbc:mysql://(?P<host>[^:/?#]+):(?P<port>\d+)/(?P<db>[^?\n]+).*?\n\s*username:\s*(?P<user>[^\n]+)\n\s*password:\s*(?P<pwd>[^\n]+)", text, re.S)
conn = pymysql.connect(
host=match.group('host'),
port=int(match.group('port')),
user=match.group('user').strip(),
password=match.group('pwd').strip(),
database=match.group('db').strip(),
charset='utf8mb4',
cursorclass=pymysql.cursors.DictCursor,
)
with conn, conn.cursor() as cursor:
cursor.execute("""
SELECT COUNT(1) AS cnt
FROM ccdi_purchase_transaction
WHERE purchase_id = 'LSFXMOCKPUR001'
AND actual_amount > 100000
""")
print(cursor.fetchone()['cnt'])
PY
```
## 执行时间与结果
- 2026-03-20 14:50 CST 完成目标回归:
`python3 -m pytest tests/test_file_service.py tests/test_statement_service.py tests/test_api.py tests/integration/test_full_workflow.py -v`
- 回归结果:`38 passed, 20 warnings in 4.20s`
- warnings 为现有 `pydantic``httpx` 弃用提示,本次改动未新增失败或 error。
## SQL 基线脚本执行结果
- 执行命令:`bin/mysql_utf8_exec.sh sql/migration/2026-03-20-lsfx-mock-random-hit-rule-purchase-baseline.sql`
- 执行结果:脚本执行成功,无报错输出。
- 脚本内容采用“先删后插”的幂等方式,避免重复执行造成脏数据。
## 采购基线查询结果
- 执行前查询:
`SELECT COUNT(1) FROM ccdi_purchase_transaction WHERE purchase_id = 'LSFXMOCKPUR001'`
返回 `0`
- 执行后查询:
`SELECT purchase_id, actual_amount, supplier_name FROM ccdi_purchase_transaction WHERE purchase_id = 'LSFXMOCKPUR001'`
返回:
- `purchase_id = LSFXMOCKPUR001`
- `actual_amount = 186000.00`
- `supplier_name = 兰溪市联调供应链有限公司`
- 最终门槛校验:
`SELECT COUNT(1) ... WHERE purchase_id = 'LSFXMOCKPUR001' AND actual_amount > 100000`
返回 `1`
## 是否发现回归
- 未发现本次改动引入的功能回归。
- `FileService` 的规则命中计划生成、`statement_rule_samples.py` 的按规则子集装配、`StatementService` 的缓存稳定性、端到端接口链路均已通过验证。
- 本次验证未启动额外前后端常驻进程,因此无需执行进程清理。

View File

@@ -9,6 +9,28 @@ from datetime import datetime, timedelta
import random
import uuid
LARGE_TRANSACTION_RULE_CODES = [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
"SINGLE_LARGE_INCOME",
"CUMULATIVE_INCOME",
"ANNUAL_TURNOVER",
"LARGE_CASH_DEPOSIT",
"FREQUENT_CASH_DEPOSIT",
"LARGE_TRANSFER",
]
PHASE1_RULE_CODES = [
"GAMBLING_SENSITIVE_KEYWORD",
"SPECIAL_AMOUNT_TRANSACTION",
"SUSPICIOUS_INCOME_KEYWORD",
"FOREX_BUY_AMT",
"FOREX_SELL_AMT",
"STOCK_TFR_LARGE",
"LARGE_STOCK_TRADING",
"WITHDRAW_CNT",
]
@dataclass
class FileRecord:
@@ -64,6 +86,8 @@ class FileRecord:
staff_name: str = ""
staff_id_card: str = ""
family_id_cards: List[str] = field(default_factory=list)
large_transaction_hit_rules: List[str] = field(default_factory=list)
phase1_hit_rules: List[str] = field(default_factory=list)
class FileService:
@@ -110,6 +134,27 @@ class FileService:
"enterpriseNameList": [primary_enterprise_name],
}
def _pick_rule_subset(
self,
rng: random.Random,
rule_codes: List[str],
min_count: int,
max_count: int,
) -> List[str]:
"""按固定随机源选择稳定规则子集,并保留规则池原始顺序。"""
selected_codes = set(rng.sample(rule_codes, rng.randint(min_count, max_count)))
return [rule_code for rule_code in rule_codes if rule_code in selected_codes]
def _build_rule_hit_plan(self, log_id: int) -> dict:
"""基于 logId 生成稳定的规则命中计划。"""
rng = random.Random(f"rule-plan:{log_id}")
return {
"large_transaction_hit_rules": self._pick_rule_subset(
rng, LARGE_TRANSACTION_RULE_CODES, 2, 4
),
"phase1_hit_rules": self._pick_rule_subset(rng, PHASE1_RULE_CODES, 2, 4),
}
def _create_file_record(
self,
*,
@@ -130,6 +175,8 @@ class FileService:
staff_name: str = "",
staff_id_card: str = "",
family_id_cards: List[str] = None,
large_transaction_hit_rules: List[str] = None,
phase1_hit_rules: List[str] = None,
parsing: bool = True,
status: int = -5,
) -> FileRecord:
@@ -160,6 +207,8 @@ class FileService:
staff_name=staff_name,
staff_id_card=staff_id_card,
family_id_cards=list(family_id_cards or []),
large_transaction_hit_rules=list(large_transaction_hit_rules or []),
phase1_hit_rules=list(phase1_hit_rules or []),
parsing=parsing,
status=status,
)
@@ -187,6 +236,7 @@ class FileService:
# 推断银行信息
bank_name, template_name = self._infer_bank_name(file.filename)
rule_hit_plan = self._build_rule_hit_plan(log_id)
# 生成合理的交易日期范围
end_date = datetime.now()
@@ -217,6 +267,8 @@ class FileService:
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
large_transaction_hit_rules=rule_hit_plan["large_transaction_hit_rules"],
phase1_hit_rules=rule_hit_plan["phase1_hit_rules"],
)
# 存储记录
@@ -521,6 +573,7 @@ class FileService:
# 使用递增 logId确保与上传链路一致
self.log_counter += 1
log_id = self.log_counter
rule_hit_plan = self._build_rule_hit_plan(log_id)
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
@@ -542,6 +595,8 @@ class FileService:
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
large_transaction_hit_rules=rule_hit_plan["large_transaction_hit_rules"],
phase1_hit_rules=rule_hit_plan["phase1_hit_rules"],
parsing=False,
)

View File

@@ -141,6 +141,468 @@ def _build_statement(
}
def _build_sample_context(
log_id: int,
primary_enterprise_name: Optional[str] = None,
primary_account_no: Optional[str] = None,
staff_id_card: Optional[str] = None,
family_id_cards: Optional[List[str]] = None,
) -> Dict[str, str]:
identity_scope = resolve_identity_scope(log_id)
staff_identity = identity_scope["staff"]
family_identity = identity_scope["family"]
selected_staff_id_card = staff_id_card or staff_identity["id_card"]
selected_family_id_cards = list(family_id_cards or [family_identity["id_card"]])
return {
"le_name": primary_enterprise_name or "模型测试主体",
"account_no": primary_account_no or "6222024999999999",
"staff_id_card": selected_staff_id_card,
"family_id_card": (
selected_family_id_cards[0] if selected_family_id_cards else selected_staff_id_card
),
}
def build_house_or_car_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=9, hours=1),
cret_no=context["staff_id_card"],
customer_name="杭州贝壳房地产经纪有限公司",
user_memo="购买房产首付款",
cash_type="对公转账",
dr_amount=680000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024555500001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=8, hours=2),
cret_no=context["family_id_card"],
customer_name="兰溪星耀汽车销售服务有限公司",
user_memo="购车首付款",
cash_type="对公转账",
dr_amount=380000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024555500002",
),
]
def build_tax_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=7, hours=1),
cret_no=context["staff_id_card"],
customer_name="国家金库兰溪市中心支库",
user_memo="个人所得税税款",
cash_type="税务缴款",
dr_amount=126000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024555500003",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=6, hours=3),
cret_no=context["family_id_card"],
customer_name="兰溪市税务局",
user_memo="房产税务缴税",
cash_type="税务缴款",
dr_amount=88000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024555500004",
),
]
def build_single_large_income_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=2),
cret_no=context["staff_id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="经营往来收入",
cash_type="对公转账",
cr_amount=18800000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600001",
)
]
def build_cumulative_income_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=2),
cret_no=context["staff_id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="经营往来收入",
cash_type="对公转账",
cr_amount=18800000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=1),
cret_no=context["staff_id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="项目回款收入",
cash_type="对公转账",
cr_amount=20800000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=4, hours=4),
cret_no=context["staff_id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="业务合作收入",
cash_type="对公转账",
cr_amount=20700000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600001",
),
]
def build_annual_turnover_supporting_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=15, hours=2),
cret_no=context["staff_id_card"],
customer_name="浙江金穗供应链有限公司",
user_memo="年度经营回款",
cash_type="对公转账",
cr_amount=17200000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=11, hours=3),
cret_no=context["staff_id_card"],
customer_name="浙江金穗供应链有限公司",
user_memo="年度项目回款",
cash_type="对公转账",
cr_amount=17600000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=9, hours=4),
cret_no=context["staff_id_card"],
customer_name="浙江金穗供应链有限公司",
user_memo="年度合作收入",
cash_type="对公转账",
cr_amount=17800000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024666600002",
),
]
def build_large_cash_deposit_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 9, 0, 0),
cret_no=context["staff_id_card"],
customer_name="",
user_memo="现金存款",
cash_type="现金存款",
cr_amount=3000000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
)
]
def build_frequent_cash_deposit_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
deposit_specs = [
(datetime(2026, 3, 10, 9, 0, 0), "现金存款", 3000000.0),
(datetime(2026, 3, 10, 9, 30, 0), "ATM现金存款", 3100000.0),
(datetime(2026, 3, 10, 10, 0, 0), "自助存款现金存入", 3200000.0),
(datetime(2026, 3, 10, 10, 30, 0), "CRS存款", 3300000.0),
(datetime(2026, 3, 10, 11, 0, 0), "本行ATM存款", 3400000.0),
(datetime(2026, 3, 10, 11, 30, 0), "柜面现金存款", 3500000.0),
]
return [
_build_statement(
group_id,
log_id,
trx_datetime=trx_datetime,
cret_no=context["staff_id_card"],
customer_name="",
user_memo=user_memo,
cash_type="现金存款",
cr_amount=cr_amount,
le_name=context["le_name"],
account_mask_no=context["account_no"],
)
for trx_datetime, user_memo, cr_amount in deposit_specs
]
def build_large_transfer_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=1),
cret_no=context["staff_id_card"],
customer_name="异地转账平台",
user_memo="手机银行转账",
cash_type="转账支出",
dr_amount=12000000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024777700001",
)
]
def build_gambling_sensitive_keyword_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=4, hours=2),
cret_no=context["staff_id_card"],
customer_name="欢乐游戏科技有限公司",
user_memo="游戏充值",
cash_type="快捷支付",
dr_amount=6888.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024888800001",
)
]
def build_special_amount_transaction_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=4, hours=1),
cret_no=context["staff_id_card"],
customer_name="兰溪特别金额结算中心",
user_memo="特殊金额转账",
cash_type="转账支出",
dr_amount=88888.88,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024888800002",
)
]
def build_suspicious_income_keyword_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=5),
cret_no=context["staff_id_card"],
customer_name="灰度信息咨询有限公司",
user_memo="咨询返现收入",
cash_type="对公转账",
cr_amount=166666.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024888800003",
)
]
def build_forex_buy_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=2, hours=6),
cret_no=context["staff_id_card"],
customer_name="中国银行外汇业务中心",
user_memo="个人购汇",
cash_type="购汇支出",
dr_amount=126000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024999900001",
)
]
def build_forex_sell_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=2, hours=4),
cret_no=context["staff_id_card"],
customer_name="中国银行外汇业务中心",
user_memo="个人结汇",
cash_type="结汇收入",
cr_amount=132000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024999900002",
)
]
def build_stock_transfer_large_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=2, hours=2),
cret_no=context["staff_id_card"],
customer_name="国信证券资金账户",
user_memo="证券大额转托管转出",
cash_type="转账支出",
dr_amount=560000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024999900003",
)
]
def build_large_stock_trading_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
return [
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=1, hours=3),
cret_no=context["staff_id_card"],
customer_name="华泰证券资金账户",
user_memo="证券大额交易买入",
cash_type="证券交易",
dr_amount=880000.0,
le_name=context["le_name"],
account_mask_no=context["account_no"],
customer_account_mask_no="6222024999900004",
)
]
def build_withdraw_cnt_samples(group_id: int, log_id: int, **kwargs) -> List[Dict]:
context = _build_sample_context(log_id, **kwargs)
withdraw_specs = [
(datetime(2026, 3, 12, 9, 0, 0), "微信提现", 8000.0),
(datetime(2026, 3, 12, 10, 0, 0), "支付宝提现", 9000.0),
(datetime(2026, 3, 12, 11, 0, 0), "微信提现", 8500.0),
(datetime(2026, 3, 12, 12, 0, 0), "支付宝提现", 9200.0),
]
return [
_build_statement(
group_id,
log_id,
trx_datetime=trx_datetime,
cret_no=context["staff_id_card"],
customer_name="财付通结算账户" if "微信" in user_memo else "支付宝结算账户",
user_memo=user_memo,
cash_type="提现支出",
dr_amount=dr_amount,
le_name=context["le_name"],
account_mask_no=context["account_no"],
)
for trx_datetime, user_memo, dr_amount in withdraw_specs
]
LARGE_TRANSACTION_BUILDERS = {
"HOUSE_OR_CAR_EXPENSE": build_house_or_car_samples,
"TAX_EXPENSE": build_tax_samples,
"SINGLE_LARGE_INCOME": build_single_large_income_samples,
"CUMULATIVE_INCOME": build_cumulative_income_samples,
"ANNUAL_TURNOVER": build_annual_turnover_supporting_samples,
"LARGE_CASH_DEPOSIT": build_large_cash_deposit_samples,
"FREQUENT_CASH_DEPOSIT": build_frequent_cash_deposit_samples,
"LARGE_TRANSFER": build_large_transfer_samples,
}
PHASE1_RULE_BUILDERS = {
"GAMBLING_SENSITIVE_KEYWORD": build_gambling_sensitive_keyword_samples,
"SPECIAL_AMOUNT_TRANSACTION": build_special_amount_transaction_samples,
"SUSPICIOUS_INCOME_KEYWORD": build_suspicious_income_keyword_samples,
"FOREX_BUY_AMT": build_forex_buy_samples,
"FOREX_SELL_AMT": build_forex_sell_samples,
"STOCK_TFR_LARGE": build_stock_transfer_large_samples,
"LARGE_STOCK_TRADING": build_large_stock_trading_samples,
"WITHDRAW_CNT": build_withdraw_cnt_samples,
}
def build_seed_statements_for_rule_plan(
group_id: int,
log_id: int,
rule_plan: Dict,
**kwargs,
) -> List[Dict]:
statements: List[Dict] = []
for rule_code in rule_plan.get("large_transaction_hit_rules", []):
builder = LARGE_TRANSACTION_BUILDERS.get(rule_code)
if builder is not None:
statements.extend(builder(group_id, log_id, **kwargs))
for rule_code in rule_plan.get("phase1_hit_rules", []):
builder = PHASE1_RULE_BUILDERS.get(rule_code)
if builder is not None:
statements.extend(builder(group_id, log_id, **kwargs))
return statements
def build_large_transaction_seed_statements(
group_id: int,
log_id: int,
@@ -149,220 +611,15 @@ def build_large_transaction_seed_statements(
staff_id_card: Optional[str] = None,
family_id_cards: Optional[List[str]] = None,
) -> List[Dict]:
le_name = primary_enterprise_name or "模型测试主体"
account_no = primary_account_no or "6222024999999999"
identity_scope = resolve_identity_scope(log_id)
staff_identity = identity_scope["staff"]
family_identity = identity_scope["family"]
selected_staff_id_card = staff_id_card or staff_identity["id_card"]
selected_family_id_cards = list(family_id_cards or [family_identity["id_card"]])
primary_family_id_card = selected_family_id_cards[0] if selected_family_id_cards else selected_staff_id_card
statements: List[Dict] = []
statements.extend([
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=9, hours=1),
cret_no=selected_staff_id_card,
customer_name="杭州贝壳房地产经纪有限公司",
user_memo="购买房产首付款",
cash_type="对公转账",
dr_amount=680000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=8, hours=2),
cret_no=primary_family_id_card,
customer_name="兰溪星耀汽车销售服务有限公司",
user_memo="购车首付款",
cash_type="对公转账",
dr_amount=380000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=7, hours=1),
cret_no=selected_staff_id_card,
customer_name="国家金库兰溪市中心支库",
user_memo="个人所得税税款",
cash_type="税务缴款",
dr_amount=126000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500003",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=6, hours=3),
cret_no=primary_family_id_card,
customer_name="兰溪市税务局",
user_memo="房产税务缴税",
cash_type="税务缴款",
dr_amount=88000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500004",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=2),
cret_no=selected_staff_id_card,
customer_name="浙江远望贸易有限公司",
user_memo="经营往来收入",
cash_type="对公转账",
cr_amount=18800000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=1),
cret_no=selected_staff_id_card,
customer_name="浙江远望贸易有限公司",
user_memo="项目回款收入",
cash_type="对公转账",
cr_amount=20800000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=4, hours=4),
cret_no=selected_staff_id_card,
customer_name="浙江远望贸易有限公司",
user_memo="业务合作收入",
cash_type="对公转账",
cr_amount=20700000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 9, 0, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="现金存款",
cash_type="现金存款",
cr_amount=3000000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 9, 30, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="ATM现金存款",
cash_type="现金存款",
cr_amount=3100000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 10, 0, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="自助存款现金存入",
cash_type="现金存款",
cr_amount=3200000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 10, 30, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="CRS存款",
cash_type="现金存款",
cr_amount=3300000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 11, 0, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="本行ATM存款",
cash_type="现金存款",
cr_amount=3400000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 11, 30, 0),
cret_no=selected_staff_id_card,
customer_name="",
user_memo="柜面现金存款",
cash_type="现金存款",
cr_amount=3500000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=1),
cret_no=selected_staff_id_card,
customer_name="异地转账平台",
user_memo="手机银行转账",
cash_type="转账支出",
dr_amount=12000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=2),
cret_no=selected_staff_id_card,
customer_name="跨行转账中心",
user_memo="对外转账",
cash_type="转账支出",
dr_amount=10000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=2, hours=5),
cret_no=selected_staff_id_card,
customer_name="跨境转账服务平台",
user_memo="网银转账",
cash_type="转账支出",
dr_amount=9000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700003",
),
])
return statements
return build_seed_statements_for_rule_plan(
group_id=group_id,
log_id=log_id,
rule_plan={
"large_transaction_hit_rules": list(LARGE_TRANSACTION_BUILDERS.keys()),
"phase1_hit_rules": [],
},
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
staff_id_card=staff_id_card,
family_id_cards=family_id_cards,
)

View File

@@ -5,7 +5,7 @@ import uuid
from datetime import datetime, timedelta
from services.statement_rule_samples import (
build_large_transaction_seed_statements,
build_seed_statements_for_rule_plan,
resolve_identity_cards,
)
@@ -144,14 +144,21 @@ class StatementService:
"""生成指定数量的流水记录。"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
record = self.file_service.get_file_record(log_id) if self.file_service is not None else None
rule_plan = {
"large_transaction_hit_rules": (
list(record.large_transaction_hit_rules) if record is not None else []
),
"phase1_hit_rules": list(record.phase1_hit_rules) if record is not None else [],
}
if record is not None and record.staff_id_card:
allowed_identity_cards = tuple([record.staff_id_card, *record.family_id_cards])
else:
allowed_identity_cards = resolve_identity_cards(log_id)
rng = random.Random(f"statement:{log_id}")
seeded_statements = build_large_transaction_seed_statements(
seeded_statements = build_seed_statements_for_rule_plan(
group_id=group_id,
log_id=log_id,
rule_plan=rule_plan,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
staff_id_card=record.staff_id_card if record is not None else None,

View File

@@ -170,3 +170,30 @@ def test_upload_status_and_bank_statement_share_same_primary_binding(client, mon
assert statements
assert all(item["leName"] == status_log["enterpriseNameList"][0] for item in statements)
assert all(item["accountMaskNo"] == status_log["accountNoList"][0] for item in statements)
def test_inner_flow_bank_statement_should_keep_same_rule_subset(client):
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_subset",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
log_id = fetch_response.json()["data"][0]
page1 = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 10},
).json()
page2 = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 10},
).json()
assert page1["data"]["bankStatementList"] == page2["data"]["bankStatementList"]

View File

@@ -154,3 +154,49 @@ def test_fetch_inner_flow_persists_primary_binding_record(monkeypatch):
assert record.enterprise_name_list == ["行内主体"]
assert record.account_no_list == ["6210987654321098"]
assert record.total_records == 200
def test_build_rule_hit_plan_should_be_deterministic_for_same_log_id():
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
plan1 = service._build_rule_hit_plan(10001)
plan2 = service._build_rule_hit_plan(10001)
assert plan1 == plan2
assert 2 <= len(plan1["large_transaction_hit_rules"]) <= 4
assert 2 <= len(plan1["phase1_hit_rules"]) <= 4
def test_fetch_inner_flow_should_persist_rule_hit_plan(monkeypatch):
service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
monkeypatch.setattr(
service,
"_build_rule_hit_plan",
lambda log_id: {
"large_transaction_hit_rules": ["HOUSE_OR_CAR_EXPENSE", "TAX_EXPENSE"],
"phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"],
},
)
response = service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "test_customer_001",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = service.file_records[log_id]
assert record.large_transaction_hit_rules == [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
]
assert record.phase1_hit_rules == [
"GAMBLING_SENSITIVE_KEYWORD",
"FOREX_BUY_AMT",
]

View File

@@ -9,6 +9,7 @@ from services.statement_service import StatementService
from services.statement_rule_samples import (
DEFAULT_LARGE_TRANSACTION_THRESHOLDS,
build_large_transaction_seed_statements,
build_seed_statements_for_rule_plan,
)
@@ -21,16 +22,71 @@ class FakeStaffIdentityRepository:
}
def test_generate_statements_should_include_seeded_samples_before_noise():
"""生成流水必须先混入固定命中样本,而不是纯随机噪声"""
service = StatementService()
def test_generate_statements_should_include_seeded_samples_before_noise_when_rule_plan_exists():
"""存在规则命中计划时,生成流水必须先混入被选中的命中样本"""
file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
service = StatementService(file_service=file_service)
statements = service._generate_statements(group_id=1000, log_id=20001, count=30)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_seeded_samples",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
record.large_transaction_hit_rules = ["HOUSE_OR_CAR_EXPENSE"]
record.phase1_hit_rules = []
statements = service._generate_statements(group_id=1000, log_id=log_id, count=30)
assert len(statements) >= 30
assert any(item["userMemo"] == "购买房产首付款" for item in statements)
def test_build_seed_statements_for_rule_plan_should_only_include_requested_phase1_rules():
plan = {
"large_transaction_hit_rules": [],
"phase1_hit_rules": ["GAMBLING_SENSITIVE_KEYWORD", "FOREX_BUY_AMT"],
}
statements = build_seed_statements_for_rule_plan(
group_id=1000,
log_id=20001,
rule_plan=plan,
)
assert any("游戏" in item["userMemo"] for item in statements)
assert any("购汇" in item["userMemo"] for item in statements)
assert not any("证券" in item["userMemo"] for item in statements)
def test_build_seed_statements_for_rule_plan_should_generate_withdraw_cnt_samples():
plan = {
"large_transaction_hit_rules": [],
"phase1_hit_rules": ["WITHDRAW_CNT"],
}
statements = build_seed_statements_for_rule_plan(
group_id=1000,
log_id=20001,
rule_plan=plan,
)
assert len(
[
item
for item in statements
if "微信提现" in item["userMemo"] or "支付宝提现" in item["userMemo"]
]
) >= 4
def test_large_transaction_seed_should_cover_all_eight_rules():
"""大额交易样本生成器必须覆盖 8 条已实现规则的关键口径。"""
statements = build_large_transaction_seed_statements(group_id=1000, log_id=20001)
@@ -98,6 +154,33 @@ def test_generate_statements_should_fill_noise_up_to_requested_count():
assert len(statements) == 80
def test_generate_statements_should_follow_rule_hit_plan_from_file_record():
file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
statement_service = StatementService(file_service=file_service)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_rule_plan",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
record.large_transaction_hit_rules = ["HOUSE_OR_CAR_EXPENSE"]
record.phase1_hit_rules = ["GAMBLING_SENSITIVE_KEYWORD"]
statements = statement_service._generate_statements(group_id=1001, log_id=log_id, count=200)
assert any("房产首付款" in item["userMemo"] for item in statements)
assert any("游戏" in item["userMemo"] for item in statements)
assert not any("购汇" in item["userMemo"] for item in statements)
def test_generate_statements_should_stay_within_single_employee_scope_per_log_id():
"""同一 logId 的流水只能落在 FileRecord 绑定的员工及亲属身份证内。"""
file_service = FileService(staff_identity_repository=FakeStaffIdentityRepository())
@@ -228,6 +311,14 @@ def test_get_bank_statement_contains_large_transaction_hit_samples(monkeypatch):
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
record.large_transaction_hit_rules = [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
"CUMULATIVE_INCOME",
"FREQUENT_CASH_DEPOSIT",
"LARGE_TRANSFER",
]
record.phase1_hit_rules = []
staff_id_card = record.staff_id_card
family_id_card = record.family_id_cards[0]

View File

@@ -0,0 +1,78 @@
DELETE FROM ccdi_purchase_transaction
WHERE purchase_id = 'LSFXMOCKPUR001';
INSERT INTO ccdi_purchase_transaction (
purchase_id,
purchase_category,
project_name,
subject_name,
subject_desc,
purchase_qty,
budget_amount,
bid_amount,
actual_amount,
contract_amount,
settlement_amount,
purchase_method,
supplier_name,
contact_person,
contact_phone,
supplier_uscc,
supplier_bank_account,
apply_date,
plan_approve_date,
announce_date,
bid_open_date,
contract_sign_date,
expected_delivery_date,
actual_delivery_date,
acceptance_date,
settlement_date,
applicant_id,
applicant_name,
apply_department,
purchase_leader_id,
purchase_leader_name,
purchase_department,
created_by,
updated_by
)
SELECT
'LSFXMOCKPUR001',
'设备采购',
'LSFX Mock 联调',
'反洗钱终端设备采购',
'用于命中 LARGE_PURCHASE_TRANSACTION 真实规则',
1,
188000.00,
186000.00,
186000.00,
186000.00,
186000.00,
'竞争性谈判',
'兰溪市联调供应链有限公司',
'联调联系人',
'13800000000',
'91330781MOCKPUR001',
'6222000000001234',
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CAST(s.staff_id AS CHAR),
s.name,
'纪检初核部',
NULL,
NULL,
NULL,
'admin',
'admin'
FROM ccdi_base_staff s
WHERE COALESCE(TRIM(CAST(s.staff_id AS CHAR)), '') <> ''
AND COALESCE(TRIM(s.name), '') <> ''
LIMIT 1;