from typing import Dict, List, Union import logging import random import uuid from datetime import datetime, timedelta from config.settings import settings from services.statement_rule_samples import ( build_seed_statements_for_rule_plan, resolve_identity_cards, ) # 配置日志 logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) class StatementService: """流水数据服务""" FIXED_TOTAL_COUNT = 200 def __init__(self, file_service=None): # 缓存:logId -> (statements_list, total_count) self._cache: Dict[int, tuple] = {} self.file_service = file_service logger.info("StatementService initialized with empty cache") def _resolve_primary_binding(self, log_id: int) -> tuple: """优先从 FileService 读取真实主绑定,不存在时再走 deterministic fallback。""" if self.file_service is not None: record = self.file_service.get_file_record(log_id) if record is not None: return record.primary_enterprise_name, record.primary_account_no rng = random.Random(f"binding:{log_id}") return "张传伟", f"{rng.randint(100000000000000, 999999999999999)}" def _generate_random_statement( self, group_id: int, log_id: int, primary_enterprise_name: str, primary_account_no: str, allowed_identity_cards: tuple, rng: random.Random, noise_index: int = 0, safe_all_mode_noise: bool = False, ) -> Dict: """生成单条随机噪声流水记录。""" reference_now = datetime(2026, 3, 18, 9, 0, 0) days_ago = rng.randint(0, 365) trx_datetime = reference_now - timedelta(days=days_ago, minutes=rng.randint(0, 1439)) if safe_all_mode_noise: trans_amount = round(rng.uniform(10, 200), 2) dr_amount = trans_amount cr_amount = 0.0 trans_flag = "P" customer_name = f"日常消费商户{noise_index}" user_memo = f"日常消费_{noise_index}" else: trans_amount = round(rng.uniform(10, 10000), 2) if rng.random() > 0.5: dr_amount = trans_amount cr_amount = 0.0 trans_flag = "P" else: cr_amount = trans_amount dr_amount = 0.0 trans_flag = "R" customer_name = rng.choice( ["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"] ) user_memo = rng.choice( [ f"消费_{customer_name}", f"转账_{customer_name}", f"收款_{customer_name}", f"支付_{customer_name}", f"退款_{customer_name}", ] ) return { "accountId": 0, "accountMaskNo": primary_account_no, "accountingDate": trx_datetime.strftime("%Y-%m-%d"), "accountingDateId": int(trx_datetime.strftime("%Y%m%d")), "archivingFlag": 0, "attachments": 0, "balanceAmount": round(rng.uniform(1000, 50000), 2), "bank": "ZJRCU", "bankComments": "", "bankStatementId": 0, "bankTrxNumber": "", "batchId": log_id, "cashType": "1", "commentsNum": 0, "crAmount": cr_amount, "createDate": reference_now.strftime("%Y-%m-%d %H:%M:%S"), "createdBy": "902001", "cretNo": rng.choice(allowed_identity_cards), "currency": "CNY", "customerAccountMaskNo": str(rng.randint(100000000, 999999999)), "customerBank": "", "customerId": -1, "customerName": customer_name, "customerReference": "", "customerCertNo": rng.choice(allowed_identity_cards), "customerSocialCreditCode": "", "downPaymentFlag": 0, "drAmount": dr_amount, "exceptionType": "", "groupId": group_id, "internalFlag": 0, "leId": 16308, "leName": primary_enterprise_name, "overrideBsId": 0, "paymentMethod": "", "sourceCatalogId": 0, "split": 0, "subBankstatementId": 0, "toDoFlag": 0, "transAmount": trans_amount, "transFlag": trans_flag, "transTypeId": 0, "transformAmount": 0, "transformCrAmount": 0, "transformDrAmount": 0, "transfromBalanceAmount": 0, "trxBalance": 0, "trxDate": trx_datetime.strftime("%Y-%m-%d %H:%M:%S"), "uploadSequnceNumber": 0, "userMemo": user_memo, } def _assign_statement_ids(self, statements: List[Dict], group_id: int, log_id: int) -> List[Dict]: """为样本与噪声流水统一补齐稳定的流水标识。""" assigned: List[Dict] = [] base_id = log_id * 100000 for index, statement in enumerate(statements, start=1): item = dict(statement) item["groupId"] = group_id item["batchId"] = log_id item["bankStatementId"] = base_id + index item["bankTrxNumber"] = uuid.uuid5( uuid.NAMESPACE_DNS, f"lsfx-mock-{log_id}-{index}" ).hex item["uploadSequnceNumber"] = index item["transAmount"] = round(item.get("drAmount", 0) + item.get("crAmount", 0), 2) assigned.append(item) return assigned def _generate_statements(self, group_id: int, log_id: int, count: int) -> List[Dict]: """生成指定数量的流水记录。""" primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id) record = self.file_service.get_file_record(log_id) if self.file_service is not None else None rule_plan = { "large_transaction_hit_rules": ( list(record.large_transaction_hit_rules) if record is not None else [] ), "phase1_hit_rules": list(record.phase1_hit_rules) if record is not None else [], "phase2_statement_hit_rules": ( list(record.phase2_statement_hit_rules) if record is not None else [] ), "abnormal_account_hit_rules": ( list(record.abnormal_account_hit_rules) if record is not None else [] ), } if record is not None and record.staff_id_card: allowed_identity_cards = tuple([record.staff_id_card, *record.family_id_cards]) else: allowed_identity_cards = resolve_identity_cards(log_id) rng = random.Random(f"statement:{log_id}") seeded_statements = build_seed_statements_for_rule_plan( group_id=group_id, log_id=log_id, rule_plan=rule_plan, primary_enterprise_name=primary_enterprise_name, primary_account_no=primary_account_no, staff_id_card=record.staff_id_card if record is not None else None, family_id_cards=record.family_id_cards if record is not None else None, abnormal_accounts=record.abnormal_accounts if record is not None else None, ) safe_all_mode_noise = settings.RULE_HIT_MODE == "all" and record is not None total_count = max(count, len(seeded_statements)) statements = list(seeded_statements) for noise_index in range(total_count - len(seeded_statements)): statements.append( self._generate_random_statement( group_id, log_id, primary_enterprise_name, primary_account_no, allowed_identity_cards, rng, noise_index=noise_index, safe_all_mode_noise=safe_all_mode_noise, ) ) statements = self._assign_statement_ids(statements, group_id, log_id) rng.shuffle(statements) return statements def _apply_primary_binding( self, statements: List[Dict], primary_enterprise_name: str, primary_account_no: str, ) -> None: """将解析出的主绑定统一回填到已有流水记录。""" for statement in statements: statement["leName"] = primary_enterprise_name statement["accountMaskNo"] = statement.get("accountMaskNo") or primary_account_no def get_bank_statement(self, request: Union[Dict, object]) -> Dict: """获取银行流水列表。""" if isinstance(request, dict): group_id = request.get("groupId", 1000) log_id = request.get("logId", 10000) page_now = request.get("pageNow", 1) page_size = request.get("pageSize", 10) else: group_id = request.groupId log_id = request.logId page_now = request.pageNow page_size = request.pageSize if log_id not in self._cache: total_count = self.FIXED_TOTAL_COUNT all_statements = self._generate_statements(group_id, log_id, total_count) self._cache[log_id] = (all_statements, total_count) all_statements, total_count = self._cache[log_id] primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id) self._apply_primary_binding(all_statements, primary_enterprise_name, primary_account_no) start = (page_now - 1) * page_size end = start + page_size page_data = all_statements[start:end] return { "code": "200", "data": {"bankStatementList": page_data, "totalCount": total_count}, "status": "200", "successResponse": True, }