Files
ccdi/lsfx-mock-server/services/statement_service.py

189 lines
6.1 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from utils.response_builder import ResponseBuilder
from typing import Dict, Union, List
import random
from datetime import datetime, timedelta
import uuid
import logging
# 配置日志
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
class StatementService:
"""流水数据服务"""
def __init__(self):
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
# 配置日志级别为 INFO
logger.info(f"StatementService initialized with empty cache")
def _generate_random_statement(self, index: int, group_id: int, log_id: int) -> Dict:
"""生成单条随机流水记录
Args:
index: 流水序号
group_id: 项目ID
log_id: 文件ID
Returns:
单条流水记录字典
"""
# 随机生成交易日期最近1年内
days_ago = random.randint(0, 365)
trx_datetime = datetime.now() - timedelta(days=days_ago)
trx_date = trx_datetime.strftime("%Y-%m-%d %H:%M:%S")
accounting_date = trx_datetime.strftime("%Y-%m-%d")
accounting_date_id = int(trx_datetime.strftime("%Y%m%d"))
# 生成创建日期格式YYYY-MM-DD HH:MM:SS
create_date = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 随机生成交易金额
trans_amount = round(random.uniform(10, 10000), 2)
# 随机决定是收入还是支出
if random.random() > 0.5:
# 支出
dr_amount = trans_amount
cr_amount = 0
trans_flag = "P"
else:
# 收入
cr_amount = trans_amount
dr_amount = 0
trans_flag = "R"
# 随机余额
balance_amount = round(random.uniform(1000, 50000), 2)
# 随机客户信息
customers = ["小店", "支付宝", "微信支付", "财付通", "美团", "京东", "淘宝", "银行转账"]
customer_name = random.choice(customers)
customer_account = str(random.randint(100000000, 999999999))
# 随机交易描述
memos = [
f"消费_{customer_name}",
f"转账_{customer_name}",
f"收款_{customer_name}",
f"支付_{customer_name}",
f"退款_{customer_name}",
]
user_memo = random.choice(memos)
return {
"accountId": 0,
"accountMaskNo": f"{random.randint(100000000000000, 999999999999999)}",
"accountingDate": accounting_date,
"accountingDateId": accounting_date_id,
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": balance_amount,
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847662 + index,
"bankTrxNumber": uuid.uuid4().hex,
"batchId": log_id,
"cashType": "1",
"commentsNum": 0,
"crAmount": cr_amount,
"createDate": create_date,
"createdBy": "902001",
"cretNo": "230902199012261247",
"currency": "CNY",
"customerAccountMaskNo": customer_account,
"customerBank": "",
"customerId": -1,
"customerName": customer_name,
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": dr_amount,
"exceptionType": "",
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": trans_amount,
"transFlag": trans_flag,
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": trx_date,
"uploadSequnceNumber": index + 1,
"userMemo": user_memo
}
def _generate_statements(self, group_id: int, log_id: int, count: int) -> List[Dict]:
"""生成指定数量的流水记录
Args:
group_id: 项目ID
log_id: 文件ID
count: 生成数量
Returns:
流水记录列表
"""
statements = []
for i in range(count):
statements.append(self._generate_random_statement(i, group_id, log_id))
return statements
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求(可以是字典或对象)
Returns:
银行流水响应字典
"""
# 支持 dict 或对象
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
log_id = request.get("logId", 10000)
page_now = request.get("pageNow", 1)
page_size = request.get("pageSize", 10)
else:
group_id = request.groupId
log_id = request.logId
page_now = request.pageNow
page_size = request.pageSize
# 检查缓存中是否已有该logId的数据
if log_id not in self._cache:
# 随机生成总条数1200-1500之间
total_count = random.randint(1200, 1500)
# 生成所有流水记录
all_statements = self._generate_statements(group_id, log_id, total_count)
# 存入缓存
self._cache[log_id] = (all_statements, total_count)
# 从缓存获取数据
all_statements, total_count = self._cache[log_id]
# 模拟分页
start = (page_now - 1) * page_size
end = start + page_size
page_data = all_statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}