Files
ccdi/lsfx-mock-server/services/statement_service.py

212 lines
7.8 KiB
Python
Raw Normal View History

from typing import Dict, List, Union
import logging
import random
import uuid
from datetime import datetime, timedelta
from services.statement_rule_samples import (
IDENTITY_CARD_POOL,
build_large_transaction_seed_statements,
)
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StatementService:
"""流水数据服务"""
def __init__(self, file_service=None):
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
self.file_service = file_service
logger.info("StatementService initialized with empty cache")
def _resolve_primary_binding(self, log_id: int) -> tuple:
"""优先从 FileService 读取真实主绑定,不存在时再走 deterministic fallback。"""
if self.file_service is not None:
record = self.file_service.get_file_record(log_id)
if record is not None:
return record.primary_enterprise_name, record.primary_account_no
rng = random.Random(f"binding:{log_id}")
return "张传伟", f"{rng.randint(100000000000000, 999999999999999)}"
def _generate_random_statement(
self,
group_id: int,
log_id: int,
primary_enterprise_name: str,
primary_account_no: str,
rng: random.Random,
) -> Dict:
"""生成单条随机噪声流水记录。"""
reference_now = datetime(2026, 3, 18, 9, 0, 0)
days_ago = rng.randint(0, 365)
trx_datetime = reference_now - timedelta(days=days_ago, minutes=rng.randint(0, 1439))
trans_amount = round(rng.uniform(10, 10000), 2)
if rng.random() > 0.5:
dr_amount = trans_amount
cr_amount = 0.0
trans_flag = "P"
else:
cr_amount = trans_amount
dr_amount = 0.0
trans_flag = "R"
customer_name = rng.choice(
["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"]
)
user_memo = rng.choice(
[
f"消费_{customer_name}",
f"转账_{customer_name}",
f"收款_{customer_name}",
f"支付_{customer_name}",
f"退款_{customer_name}",
]
)
return {
"accountId": 0,
"accountMaskNo": primary_account_no,
"accountingDate": trx_datetime.strftime("%Y-%m-%d"),
"accountingDateId": int(trx_datetime.strftime("%Y%m%d")),
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": round(rng.uniform(1000, 50000), 2),
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 0,
"bankTrxNumber": "",
"batchId": log_id,
"cashType": "1",
"commentsNum": 0,
"crAmount": cr_amount,
"createDate": reference_now.strftime("%Y-%m-%d %H:%M:%S"),
"createdBy": "902001",
"cretNo": rng.choice(IDENTITY_CARD_POOL),
"currency": "CNY",
"customerAccountMaskNo": str(rng.randint(100000000, 999999999)),
"customerBank": "",
"customerId": -1,
"customerName": customer_name,
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": dr_amount,
"exceptionType": "",
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": primary_enterprise_name,
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": trans_amount,
"transFlag": trans_flag,
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": trx_datetime.strftime("%Y-%m-%d %H:%M:%S"),
"uploadSequnceNumber": 0,
"userMemo": user_memo,
}
def _assign_statement_ids(self, statements: List[Dict], group_id: int, log_id: int) -> List[Dict]:
"""为样本与噪声流水统一补齐稳定的流水标识。"""
assigned: List[Dict] = []
base_id = log_id * 100000
for index, statement in enumerate(statements, start=1):
item = dict(statement)
item["groupId"] = group_id
item["batchId"] = log_id
item["bankStatementId"] = base_id + index
item["bankTrxNumber"] = uuid.uuid5(
uuid.NAMESPACE_DNS, f"lsfx-mock-{log_id}-{index}"
).hex
item["uploadSequnceNumber"] = index
item["transAmount"] = round(item.get("drAmount", 0) + item.get("crAmount", 0), 2)
assigned.append(item)
return assigned
def _generate_statements(self, group_id: int, log_id: int, count: int) -> List[Dict]:
"""生成指定数量的流水记录。"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
rng = random.Random(f"statement:{log_id}")
seeded_statements = build_large_transaction_seed_statements(
group_id=group_id,
log_id=log_id,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
)
total_count = max(count, len(seeded_statements))
statements = list(seeded_statements)
for _ in range(total_count - len(seeded_statements)):
statements.append(
self._generate_random_statement(
group_id,
log_id,
primary_enterprise_name,
primary_account_no,
rng,
)
)
statements = self._assign_statement_ids(statements, group_id, log_id)
rng.shuffle(statements)
return statements
def _apply_primary_binding(
self,
statements: List[Dict],
primary_enterprise_name: str,
primary_account_no: str,
) -> None:
"""将解析出的主绑定统一回填到已有流水记录。"""
for statement in statements:
statement["leName"] = primary_enterprise_name
statement["accountMaskNo"] = primary_account_no
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表。"""
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
log_id = request.get("logId", 10000)
page_now = request.get("pageNow", 1)
page_size = request.get("pageSize", 10)
else:
group_id = request.groupId
log_id = request.logId
page_now = request.pageNow
page_size = request.pageSize
if log_id not in self._cache:
total_rng = random.Random(f"total:{log_id}")
total_count = total_rng.randint(1200, 1500)
all_statements = self._generate_statements(group_id, log_id, total_count)
self._cache[log_id] = (all_statements, total_count)
all_statements, total_count = self._cache[log_id]
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
self._apply_primary_binding(all_statements, primary_enterprise_name, primary_account_no)
start = (page_now - 1) * page_size
end = start + page_size
page_data = all_statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}