Files
ccdi/lsfx-mock-server/services/statement_service.py

253 lines
9.8 KiB
Python
Raw Normal View History

from typing import Dict, List, Union
import logging
import random
import uuid
from datetime import datetime, timedelta
2026-03-25 10:05:30 +08:00
from config.settings import settings
from services.statement_rule_samples import (
build_seed_statements_for_rule_plan,
resolve_identity_cards,
)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StatementService:
"""流水数据服务"""
2026-03-19 17:18:02 +08:00
FIXED_TOTAL_COUNT = 200
def __init__(self, file_service=None):
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
self.file_service = file_service
logger.info("StatementService initialized with empty cache")
def _resolve_primary_binding(self, log_id: int) -> tuple:
"""优先从 FileService 读取真实主绑定,不存在时再走 deterministic fallback。"""
if self.file_service is not None:
record = self.file_service.get_file_record(log_id)
if record is not None:
return record.primary_enterprise_name, record.primary_account_no
rng = random.Random(f"binding:{log_id}")
return "张传伟", f"{rng.randint(100000000000000, 999999999999999)}"
def _generate_random_statement(
self,
group_id: int,
log_id: int,
primary_enterprise_name: str,
primary_account_no: str,
allowed_identity_cards: tuple,
rng: random.Random,
2026-03-25 10:05:30 +08:00
noise_index: int = 0,
safe_all_mode_noise: bool = False,
) -> Dict:
"""生成单条随机噪声流水记录。"""
reference_now = datetime(2026, 3, 18, 9, 0, 0)
days_ago = rng.randint(0, 365)
trx_datetime = reference_now - timedelta(days=days_ago, minutes=rng.randint(0, 1439))
2026-03-25 10:05:30 +08:00
if safe_all_mode_noise:
trans_amount = round(rng.uniform(10, 200), 2)
dr_amount = trans_amount
cr_amount = 0.0
trans_flag = "P"
2026-03-25 10:05:30 +08:00
customer_name = f"日常消费商户{noise_index}"
user_memo = f"日常消费_{noise_index}"
else:
2026-03-25 10:05:30 +08:00
trans_amount = round(rng.uniform(10, 10000), 2)
if rng.random() > 0.5:
dr_amount = trans_amount
cr_amount = 0.0
trans_flag = "P"
else:
cr_amount = trans_amount
dr_amount = 0.0
trans_flag = "R"
customer_name = rng.choice(
["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"]
)
user_memo = rng.choice(
[
f"消费_{customer_name}",
f"转账_{customer_name}",
f"收款_{customer_name}",
f"支付_{customer_name}",
f"退款_{customer_name}",
]
)
return {
"accountId": 0,
"accountMaskNo": primary_account_no,
"accountingDate": trx_datetime.strftime("%Y-%m-%d"),
"accountingDateId": int(trx_datetime.strftime("%Y%m%d")),
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": round(rng.uniform(1000, 50000), 2),
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 0,
"bankTrxNumber": "",
"batchId": log_id,
"cashType": "1",
"commentsNum": 0,
"crAmount": cr_amount,
"createDate": reference_now.strftime("%Y-%m-%d %H:%M:%S"),
"createdBy": "902001",
"cretNo": rng.choice(allowed_identity_cards),
"currency": "CNY",
"customerAccountMaskNo": str(rng.randint(100000000, 999999999)),
"customerBank": "",
"customerId": -1,
"customerName": customer_name,
"customerReference": "",
"customerCertNo": rng.choice(allowed_identity_cards),
"customerSocialCreditCode": "",
"downPaymentFlag": 0,
"drAmount": dr_amount,
"exceptionType": "",
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": primary_enterprise_name,
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": trans_amount,
"transFlag": trans_flag,
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": trx_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"uploadSequnceNumber": 0,
"userMemo": user_memo,
}
def _assign_statement_ids(self, statements: List[Dict], group_id: int, log_id: int) -> List[Dict]:
"""为样本与噪声流水统一补齐稳定的流水标识。"""
assigned: List[Dict] = []
base_id = log_id * 100000
for index, statement in enumerate(statements, start=1):
item = dict(statement)
item["groupId"] = group_id
item["batchId"] = log_id
item["bankStatementId"] = base_id + index
item["bankTrxNumber"] = uuid.uuid5(
uuid.NAMESPACE_DNS, f"lsfx-mock-{log_id}-{index}"
).hex
item["uploadSequnceNumber"] = index
item["transAmount"] = round(item.get("drAmount", 0) + item.get("crAmount", 0), 2)
assigned.append(item)
return assigned
def _generate_statements(self, group_id: int, log_id: int, count: int) -> List[Dict]:
"""生成指定数量的流水记录。"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
record = self.file_service.get_file_record(log_id) if self.file_service is not None else None
rule_plan = {
"large_transaction_hit_rules": (
list(record.large_transaction_hit_rules) if record is not None else []
),
"phase1_hit_rules": list(record.phase1_hit_rules) if record is not None else [],
"phase2_statement_hit_rules": (
list(record.phase2_statement_hit_rules) if record is not None else []
),
"abnormal_account_hit_rules": (
list(record.abnormal_account_hit_rules) if record is not None else []
),
}
if record is not None and record.staff_id_card:
allowed_identity_cards = tuple([record.staff_id_card, *record.family_id_cards])
else:
allowed_identity_cards = resolve_identity_cards(log_id)
rng = random.Random(f"statement:{log_id}")
seeded_statements = build_seed_statements_for_rule_plan(
group_id=group_id,
log_id=log_id,
rule_plan=rule_plan,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
staff_id_card=record.staff_id_card if record is not None else None,
family_id_cards=record.family_id_cards if record is not None else None,
abnormal_accounts=record.abnormal_accounts if record is not None else None,
)
2026-03-25 10:05:30 +08:00
safe_all_mode_noise = settings.RULE_HIT_MODE == "all" and record is not None
total_count = max(count, len(seeded_statements))
statements = list(seeded_statements)
2026-03-25 10:05:30 +08:00
for noise_index in range(total_count - len(seeded_statements)):
statements.append(
self._generate_random_statement(
group_id,
log_id,
primary_enterprise_name,
primary_account_no,
allowed_identity_cards,
rng,
2026-03-25 10:05:30 +08:00
noise_index=noise_index,
safe_all_mode_noise=safe_all_mode_noise,
)
)
statements = self._assign_statement_ids(statements, group_id, log_id)
rng.shuffle(statements)
return statements
def _apply_primary_binding(
self,
statements: List[Dict],
primary_enterprise_name: str,
primary_account_no: str,
) -> None:
"""将解析出的主绑定统一回填到已有流水记录。"""
for statement in statements:
statement["leName"] = primary_enterprise_name
statement["accountMaskNo"] = primary_account_no
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表。"""
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
log_id = request.get("logId", 10000)
page_now = request.get("pageNow", 1)
page_size = request.get("pageSize", 10)
else:
group_id = request.groupId
log_id = request.logId
page_now = request.pageNow
page_size = request.pageSize
if log_id not in self._cache:
2026-03-19 17:18:02 +08:00
total_count = self.FIXED_TOTAL_COUNT
all_statements = self._generate_statements(group_id, log_id, total_count)
self._cache[log_id] = (all_statements, total_count)
all_statements, total_count = self._cache[log_id]
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
self._apply_primary_binding(all_statements, primary_enterprise_name, primary_account_no)
start = (page_now - 1) * page_size
end = start + page_size
page_data = all_statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}