fix: 统一mock流水可识别身份证来源

This commit is contained in:
wkc
2026-03-18 16:39:09 +08:00
parent ddd8cc5dc8
commit 25a2a487dc
5 changed files with 722 additions and 103 deletions

View File

@@ -0,0 +1,47 @@
# Mock 流水 `cretNo` 身份池统一修复记录
## 本次调整
- 目标:让 `lsfx-mock-server` 生成的所有流水都只使用当前库可识别的身份证号。
- 范围:
- 命中样本流水
- 随机噪声流水
## 修改内容
### 1. 统一身份证池常量
- 文件:`lsfx-mock-server/services/statement_rule_samples.py`
- 新增 `IDENTITY_CARD_POOL`
- 当前统一使用以下 4 个身份证号:
- `330101198801010011`
- `330101199001010022`
- `330101198802020033`
- `330101199202020044`
### 2. 噪声流水不再使用旧固定证件号
- 文件:`lsfx-mock-server/services/statement_service.py`
- 原实现:随机噪声流水固定使用 `230902199012261247`
- 新实现:随机噪声流水改为从 `IDENTITY_CARD_POOL` 中稳定抽取
## 测试验证
- 新增断言:`tests/test_statement_service.py`
- 校验全量生成流水中的 `cretNo` 集合必须为可识别身份证子集
- 执行命令:
```bash
PYTHONPATH=. python3 -m pytest tests/test_statement_service.py -q
PYTHONPATH=. python3 -m pytest tests -q
```
- 结果:
- `8 passed`
- `32 passed`
## 结果
- 现在 Mock 返回的命中样本和随机噪声,均不会再出现旧的无效身份证号。
- 后端打标联调时,所有流水都能通过现库已有员工/家属身份池参与规则匹配。

View File

@@ -0,0 +1,74 @@
# project_id=42 大额交易无命中修复记录
## 问题现象
- `project_id=42` 的银行流水共 4133 条。
- 项目配置为 `config_type=default`,当前大额交易默认参数已降到:
- `SINGLE_TRANSACTION_AMOUNT=100`
- `CUMULATIVE_TRANSACTION_AMOUNT=1000`
- `ANNUAL_TURNOVER=5000`
- `LARGE_CASH_DEPOSIT=5000`
- `FREQUENT_CASH_DEPOSIT=2`
- `FREQUENT_TRANSFER=100`
- 但自动打标任务 `id=11/12/13` 均为 `SUCCESS``hit_count=0`
## 根因定位
### 1. 项目 42 现有流水使用了库中不存在的身份证号
- `ccdi_bank_statement``project_id=42` 的全部 4133 条流水都使用同一个 `cret_no=230902199012261247`
- 该证件号在 `ccdi_base_staff.id_card``ccdi_staff_fmy_relation.relation_cert_no` 中均不存在。
- 当前大额交易模型多条 SQL 都依赖:
- `staff.id_card = bs.cret_no`
-`relation.relation_cert_no = bs.cret_no`
- 因此即使金额超过阈值,也会在身份关联层被整体过滤掉。
### 2. Mock 流水服务与既有设计不一致
- `lsfx-mock-server/services/statement_service.py` 原先仍以随机流水为主,并固定输出不存在于当前库的 `cretNo`
- 仓库内已有大额交易样本设计与对应测试口径,但运行链路没有稳定复用可识别身份池。
## 本次处理
### 代码修复
- 校准 `lsfx-mock-server` 的大额交易样本生成与 `StatementService` 集成。
- 保持主体名称 / 本方账号绑定逻辑不变,继续复用同一 `logId` 的主绑定。
- 补充并校准 `lsfx-mock-server/tests/test_statement_service.py` 断言,使其与现有样本口径一致。
### 数据修正
-`project_id=42` 现有流水中的旧证件号批量修正为当前库真实存在的员工身份证:
- `230902199012261247 -> 330101198802020033`
- 影响行数:`4133`
### 触发重算
- 通过后端接口 `/ccdi/project/tags/rebuild` 手动提交重算。
- 最新任务:
- `id=14`
- `trigger_type=MANUAL`
- `status=SUCCESS`
- `hit_count=2559`
## 验证结果
### Mock 测试
- 命令:
- `PYTHONPATH=. python3 -m pytest tests -q`
- 结果:
- `31 passed`
### 项目 42 命中结果
- `SINGLE_LARGE_INCOME`2029
- `LARGE_TRANSFER`521
- `ANNUAL_TURNOVER`1
- `CUMULATIVE_INCOME`1
## 结论
- 本次 `project_id=42` 无命中的直接原因,是流水中的 `cret_no` 无法关联到员工/家属身份。
- 修正为现库存在的身份证后,按当前默认系统参数可立即命中规则。
- Mock 流水层也已同步校准,后续新生成数据不再继续复用这组无效证件号。

View File

@@ -0,0 +1,338 @@
from datetime import datetime, timedelta
from typing import Dict, List, Optional
DEFAULT_LARGE_TRANSACTION_THRESHOLDS = {
"SINGLE_TRANSACTION_AMOUNT": 1111,
"CUMULATIVE_TRANSACTION_AMOUNT": 50000001,
"ANNUAL_TURNOVER": 50000001,
"LARGE_CASH_DEPOSIT": 2000001,
"FREQUENT_CASH_DEPOSIT": 5,
"FREQUENT_TRANSFER": 100001,
}
IDENTITY_POOL = {
"staff_primary": {
"name": "模型测试员工",
"id_card": "330101198801010011",
"account": "6222024000000001",
},
"family_primary": {
"name": "模型测试家属",
"id_card": "330101199001010022",
"account": "6222024000000002",
},
"staff_secondary": {
"name": "模型二测试员工",
"id_card": "330101198802020033",
"account": "6222024000000003",
},
"family_secondary": {
"name": "模型二测试家属",
"id_card": "330101199202020044",
"account": "6222024000000004",
},
}
IDENTITY_CARD_POOL = tuple(identity["id_card"] for identity in IDENTITY_POOL.values())
REFERENCE_NOW = datetime(2026, 3, 18, 9, 0, 0)
def _format_datetime(value: datetime) -> str:
return value.strftime("%Y-%m-%d %H:%M:%S")
def _format_date(value: datetime) -> str:
return value.strftime("%Y-%m-%d")
def _build_statement(
group_id: int,
log_id: int,
*,
trx_datetime: datetime,
cret_no: str,
customer_name: str,
user_memo: str,
cash_type: str,
dr_amount: float = 0.0,
cr_amount: float = 0.0,
le_name: str = "模型测试主体",
account_mask_no: str = "6222024999999999",
customer_account_mask_no: str = "9558800000000001",
bank_comments: str = "",
customer_bank: str = "",
) -> Dict:
trans_amount = round(dr_amount if dr_amount > 0 else cr_amount, 2)
balance_amount = round(80000000 + cr_amount - dr_amount, 2)
return {
"accountId": 0,
"accountMaskNo": account_mask_no,
"accountingDate": _format_date(trx_datetime),
"accountingDateId": int(trx_datetime.strftime("%Y%m%d")),
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": balance_amount,
"bank": "ZJRCU",
"bankComments": bank_comments,
"bankStatementId": 0,
"bankTrxNumber": "",
"batchId": log_id,
"cashType": cash_type,
"commentsNum": 0,
"crAmount": round(cr_amount, 2),
"createDate": _format_datetime(REFERENCE_NOW),
"createdBy": "902001",
"cretNo": cret_no,
"currency": "CNY",
"customerAccountMaskNo": customer_account_mask_no,
"customerBank": customer_bank,
"customerId": -1,
"customerName": customer_name,
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": round(dr_amount, 2),
"exceptionType": "",
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": le_name,
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": trans_amount,
"transFlag": "P" if dr_amount > 0 else "R",
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": _format_datetime(trx_datetime),
"uploadSequnceNumber": 0,
"userMemo": user_memo,
}
def build_large_transaction_seed_statements(
group_id: int,
log_id: int,
primary_enterprise_name: Optional[str] = None,
primary_account_no: Optional[str] = None,
) -> List[Dict]:
le_name = primary_enterprise_name or "模型测试主体"
account_no = primary_account_no or "6222024999999999"
statements: List[Dict] = []
statements.extend([
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=9, hours=1),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="杭州贝壳房地产经纪有限公司",
user_memo="购买房产首付款",
cash_type="对公转账",
dr_amount=680000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=8, hours=2),
cret_no=IDENTITY_POOL["family_primary"]["id_card"],
customer_name="兰溪星耀汽车销售服务有限公司",
user_memo="购车首付款",
cash_type="对公转账",
dr_amount=380000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=7, hours=1),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="国家金库兰溪市中心支库",
user_memo="个人所得税税款",
cash_type="税务缴款",
dr_amount=126000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500003",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=6, hours=3),
cret_no=IDENTITY_POOL["family_secondary"]["id_card"],
customer_name="兰溪市税务局",
user_memo="房产税务缴税",
cash_type="税务缴款",
dr_amount=88000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024555500004",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=2),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="经营往来收入",
cash_type="对公转账",
cr_amount=18800000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=5, hours=1),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="项目回款收入",
cash_type="对公转账",
cr_amount=20800000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=4, hours=4),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="浙江远望贸易有限公司",
user_memo="业务合作收入",
cash_type="对公转账",
cr_amount=20700000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024666600001",
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 9, 0, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="现金存款",
cash_type="现金存款",
cr_amount=3000000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 9, 30, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="ATM现金存款",
cash_type="现金存款",
cr_amount=3100000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 10, 0, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="自助存款现金存入",
cash_type="现金存款",
cr_amount=3200000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 10, 30, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="CRS存款",
cash_type="现金存款",
cr_amount=3300000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 11, 0, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="本行ATM存款",
cash_type="现金存款",
cr_amount=3400000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=datetime(2026, 3, 10, 11, 30, 0),
cret_no=IDENTITY_POOL["staff_primary"]["id_card"],
customer_name="",
user_memo="柜面现金存款",
cash_type="现金存款",
cr_amount=3500000.0,
le_name=le_name,
account_mask_no=account_no,
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=1),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="异地转账平台",
user_memo="手机银行转账",
cash_type="转账支出",
dr_amount=12000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700001",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=3, hours=2),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="跨行转账中心",
user_memo="对外转账",
cash_type="转账支出",
dr_amount=10000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700002",
),
_build_statement(
group_id,
log_id,
trx_datetime=REFERENCE_NOW - timedelta(days=2, hours=5),
cret_no=IDENTITY_POOL["staff_secondary"]["id_card"],
customer_name="跨境转账服务平台",
user_memo="网银转账",
cash_type="转账支出",
dr_amount=9000000.0,
le_name=le_name,
account_mask_no=account_no,
customer_account_mask_no="6222024777700003",
),
])
return statements

View File

@@ -1,9 +1,13 @@
from utils.response_builder import ResponseBuilder
from typing import Dict, Union, List
import random
from datetime import datetime, timedelta
import uuid
from typing import Dict, List, Union
import logging
import random
import uuid
from datetime import datetime, timedelta
from services.statement_rule_samples import (
IDENTITY_CARD_POOL,
build_large_transaction_seed_statements,
)
# 配置日志
logging.basicConfig(level=logging.INFO)
@@ -17,102 +21,75 @@ class StatementService:
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
self.file_service = file_service
# 配置日志级别为 INFO
logger.info(f"StatementService initialized with empty cache")
logger.info("StatementService initialized with empty cache")
def _resolve_primary_binding(self, log_id: int) -> tuple:
"""优先从 FileService 读取真实主绑定,不存在时再走 fallback。"""
"""优先从 FileService 读取真实主绑定,不存在时再走 deterministic fallback。"""
if self.file_service is not None:
record = self.file_service.get_file_record(log_id)
if record is not None:
return record.primary_enterprise_name, record.primary_account_no
return "张传伟", f"{random.randint(100000000000000, 999999999999999)}"
rng = random.Random(f"binding:{log_id}")
return "张传伟", f"{rng.randint(100000000000000, 999999999999999)}"
def _generate_random_statement(
self,
index: int,
group_id: int,
log_id: int,
primary_enterprise_name: str,
primary_account_no: str,
rng: random.Random,
) -> Dict:
"""生成单条随机流水记录
"""生成单条随机噪声流水记录"""
reference_now = datetime(2026, 3, 18, 9, 0, 0)
days_ago = rng.randint(0, 365)
trx_datetime = reference_now - timedelta(days=days_ago, minutes=rng.randint(0, 1439))
trans_amount = round(rng.uniform(10, 10000), 2)
Args:
index: 流水序号
group_id: 项目ID
log_id: 文件ID
primary_enterprise_name: 本方主体名称
primary_account_no: 本方账号
Returns:
单条流水记录字典
"""
# 随机生成交易日期最近1年内
days_ago = random.randint(0, 365)
trx_datetime = datetime.now() - timedelta(days=days_ago)
trx_date = trx_datetime.strftime("%Y-%m-%d %H:%M:%S")
accounting_date = trx_datetime.strftime("%Y-%m-%d")
accounting_date_id = int(trx_datetime.strftime("%Y%m%d"))
# 生成创建日期格式YYYY-MM-DD HH:MM:SS
create_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 随机生成交易金额
trans_amount = round(random.uniform(10, 10000), 2)
# 随机决定是收入还是支出
if random.random() > 0.5:
# 支出
if rng.random() > 0.5:
dr_amount = trans_amount
cr_amount = 0
cr_amount = 0.0
trans_flag = "P"
else:
# 收入
cr_amount = trans_amount
dr_amount = 0
dr_amount = 0.0
trans_flag = "R"
# 随机余额
balance_amount = round(random.uniform(1000, 50000), 2)
# 随机客户信息
customers = ["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"]
customer_name = random.choice(customers)
customer_account = str(random.randint(100000000, 999999999))
# 随机交易描述
memos = [
f"消费_{customer_name}",
f"转账_{customer_name}",
f"收款_{customer_name}",
f"支付_{customer_name}",
f"退款_{customer_name}",
]
user_memo = random.choice(memos)
customer_name = rng.choice(
["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"]
)
user_memo = rng.choice(
[
f"消费_{customer_name}",
f"转账_{customer_name}",
f"收款_{customer_name}",
f"支付_{customer_name}",
f"退款_{customer_name}",
]
)
return {
"accountId": 0,
"accountMaskNo": primary_account_no,
"accountingDate": accounting_date,
"accountingDateId": accounting_date_id,
"accountingDate": trx_datetime.strftime("%Y-%m-%d"),
"accountingDateId": int(trx_datetime.strftime("%Y%m%d")),
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": balance_amount,
"balanceAmount": round(rng.uniform(1000, 50000), 2),
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847662 + index,
"bankTrxNumber": uuid.uuid4().hex,
"bankStatementId": 0,
"bankTrxNumber": "",
"batchId": log_id,
"cashType": "1",
"commentsNum": 0,
"crAmount": cr_amount,
"createDate": create_date,
"createDate": reference_now.strftime("%Y-%m-%d %H:%M:%S"),
"createdBy": "902001",
"cretNo": "230902199012261247",
"cretNo": rng.choice(IDENTITY_CARD_POOL),
"currency": "CNY",
"customerAccountMaskNo": customer_account,
"customerAccountMaskNo": str(rng.randint(100000000, 999999999)),
"customerBank": "",
"customerId": -1,
"customerName": customer_name,
@@ -138,36 +115,54 @@ class StatementService:
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": trx_date,
"uploadSequnceNumber": index + 1,
"userMemo": user_memo
"trxDate": trx_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"uploadSequnceNumber": 0,
"userMemo": user_memo,
}
def _assign_statement_ids(self, statements: List[Dict], group_id: int, log_id: int) -> List[Dict]:
"""为样本与噪声流水统一补齐稳定的流水标识。"""
assigned: List[Dict] = []
base_id = log_id * 100000
for index, statement in enumerate(statements, start=1):
item = dict(statement)
item["groupId"] = group_id
item["batchId"] = log_id
item["bankStatementId"] = base_id + index
item["bankTrxNumber"] = uuid.uuid5(
uuid.NAMESPACE_DNS, f"lsfx-mock-{log_id}-{index}"
).hex
item["uploadSequnceNumber"] = index
item["transAmount"] = round(item.get("drAmount", 0) + item.get("crAmount", 0), 2)
assigned.append(item)
return assigned
def _generate_statements(self, group_id: int, log_id: int, count: int) -> List[Dict]:
"""生成指定数量的流水记录
Args:
group_id: 项目ID
log_id: 文件ID
count: 生成数量
Returns:
流水记录列表
"""
"""生成指定数量的流水记录"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
statements = []
for i in range(count):
rng = random.Random(f"statement:{log_id}")
seeded_statements = build_large_transaction_seed_statements(
group_id=group_id,
log_id=log_id,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
)
total_count = max(count, len(seeded_statements))
statements = list(seeded_statements)
for _ in range(total_count - len(seeded_statements)):
statements.append(
self._generate_random_statement(
i,
group_id,
log_id,
primary_enterprise_name,
primary_account_no,
rng,
)
)
statements = self._assign_statement_ids(statements, group_id, log_id)
rng.shuffle(statements)
return statements
def _apply_primary_binding(
@@ -182,15 +177,7 @@ class StatementService:
statement["accountMaskNo"] = primary_account_no
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求(可以是字典或对象)
Returns:
银行流水响应字典
"""
# 支持 dict 或对象
"""获取银行流水列表"""
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
log_id = request.get("logId", 10000)
@@ -202,25 +189,16 @@ class StatementService:
page_now = request.pageNow
page_size = request.pageSize
# 检查缓存中是否已有该logId的数据
if log_id not in self._cache:
# 随机生成总条数1200-1500之间
total_count = random.randint(1200, 1500)
# 生成所有流水记录
total_rng = random.Random(f"total:{log_id}")
total_count = total_rng.randint(1200, 1500)
all_statements = self._generate_statements(group_id, log_id, total_count)
# 存入缓存
self._cache[log_id] = (all_statements, total_count)
# 从缓存获取数据
all_statements, total_count = self._cache[log_id]
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
self._apply_primary_binding(
all_statements,
primary_enterprise_name,
primary_account_no,
)
self._apply_primary_binding(all_statements, primary_enterprise_name, primary_account_no)
# 模拟分页
start = (page_now - 1) * page_size
end = start + page_size
page_data = all_statements[start:end]

View File

@@ -2,8 +2,121 @@
StatementService 主绑定注入测试
"""
from collections import Counter, defaultdict
from services.file_service import FileService
from services.statement_service import StatementService
from services.statement_rule_samples import (
DEFAULT_LARGE_TRANSACTION_THRESHOLDS,
build_large_transaction_seed_statements,
)
def test_generate_statements_should_include_seeded_samples_before_noise():
"""生成流水时必须先混入固定命中样本,而不是纯随机噪声。"""
service = StatementService()
statements = service._generate_statements(group_id=1000, log_id=20001, count=30)
assert len(statements) >= 30
assert any(item["userMemo"] == "购买房产首付款" for item in statements)
def test_large_transaction_seed_should_cover_all_eight_rules():
"""大额交易样本生成器必须覆盖 8 条已实现规则的关键口径。"""
statements = build_large_transaction_seed_statements(group_id=1000, log_id=20001)
assert any(
item["userMemo"] == "购买房产首付款" and item["drAmount"] > 0
for item in statements
)
assert any(
"" in item["userMemo"] and item["drAmount"] > 0
for item in statements
)
assert any(
item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["SINGLE_TRANSACTION_AMOUNT"]
for item in statements
)
assert sum(
1
for item in statements
if item["customerName"] == "浙江远望贸易有限公司" and item["crAmount"] > 0
) >= 3
assert sum(
1
for item in statements
if item["cashType"] == "现金存款"
and item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["LARGE_CASH_DEPOSIT"]
) >= 1
assert any(
item["userMemo"] == "手机银行转账"
and item["drAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["FREQUENT_TRANSFER"]
for item in statements
)
same_day_cash_deposits = [
item for item in statements
if item["cretNo"] == "330101198801010011"
and item["trxDate"].startswith("2026-03-10")
and item["crAmount"] > DEFAULT_LARGE_TRANSACTION_THRESHOLDS["LARGE_CASH_DEPOSIT"]
]
assert len(same_day_cash_deposits) >= (
DEFAULT_LARGE_TRANSACTION_THRESHOLDS["FREQUENT_CASH_DEPOSIT"] + 1
)
def test_large_transaction_seed_income_should_avoid_salary_exclusion():
"""收入样本不得误带工资代发关键词,否则会被后端过滤。"""
statements = build_large_transaction_seed_statements(group_id=1000, log_id=20001)
income_samples = [item for item in statements if item["crAmount"] > 0]
assert income_samples
assert all(item["customerName"] != "浙江兰溪农村商业银行股份有限公司" for item in income_samples)
assert all(
keyword not in item["userMemo"]
for item in income_samples
for keyword in ("代发", "工资", "奖金", "薪酬", "薪金")
)
def test_generate_statements_should_fill_noise_up_to_requested_count():
"""样本不足请求总数时,服务层需要自动补齐噪声流水。"""
service = StatementService()
statements = service._generate_statements(group_id=1000, log_id=20001, count=80)
assert len(statements) == 80
def test_generate_statements_should_only_use_recognizable_identity_cards():
"""命中样本和随机噪声都只能使用现库可识别的身份证号。"""
service = StatementService()
statements = service._generate_statements(group_id=1000, log_id=20005, count=1600)
assert {item["cretNo"] for item in statements}.issubset(
{
"330101198801010011",
"330101199001010022",
"330101198802020033",
"330101199202020044",
}
)
def test_get_bank_statement_should_keep_same_cached_result_for_same_log_id():
"""同一 logId 首次生成后应复用缓存,避免分页结果漂移。"""
service = StatementService()
page1 = service.get_bank_statement(
{"groupId": 1000, "logId": 30001, "pageNow": 1, "pageSize": 20}
)
page2 = service.get_bank_statement(
{"groupId": 1000, "logId": 30001, "pageNow": 1, "pageSize": 20}
)
assert page1["data"]["bankStatementList"] == page2["data"]["bankStatementList"]
def test_get_bank_statement_uses_primary_binding_from_file_service(monkeypatch):
@@ -45,3 +158,72 @@ def test_get_bank_statement_uses_primary_binding_from_file_service(monkeypatch):
assert all(item["leName"] == record.primary_enterprise_name for item in statements)
assert all(item["accountMaskNo"] == record.primary_account_no for item in statements)
def test_get_bank_statement_contains_large_transaction_hit_samples(monkeypatch):
"""流水 Mock 首次生成时必须稳定包含可命中大额交易规则的样本簇。"""
file_service = FileService()
statement_service = StatementService(file_service=file_service)
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("命中主体", "6222000099998888"),
)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_large_transaction",
"dataChannelCode": "test",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
statement_response = statement_service.get_bank_statement(
{
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 2000,
}
)
statements = statement_response["data"]["bankStatementList"]
assert statements
assert any(
item["cretNo"] in {
"330101198801010011",
"330101199001010022",
"330101198802020033",
"330101199202020044",
}
for item in statements
)
assert any("房产首付款" in item["userMemo"] for item in statements)
assert any("" in item["userMemo"] or "税务" in item["customerName"] for item in statements)
income_amounts = defaultdict(float)
cash_deposit_daily_counter = Counter()
has_large_transfer = False
for item in statements:
if (
item["cretNo"] == "330101198802020033"
and item["customerName"] == "浙江远望贸易有限公司"
and item["crAmount"] > 0
):
income_amounts[(item["cretNo"], item["customerName"])] += item["crAmount"]
if item["crAmount"] > 2000001 and "现金" in item["cashType"]:
cash_deposit_daily_counter[(item["cretNo"], item["trxDate"][:10])] += 1
if item["drAmount"] > 100001 and item["userMemo"] == "手机银行转账":
has_large_transfer = True
assert any(amount > 50000001 for amount in income_amounts.values())
assert any(count >= 6 for count in cash_deposit_daily_counter.values())
assert has_large_transfer