Files
ccdi/lsfx-mock-server/services/file_service.py

612 lines
22 KiB
Python
Raw Normal View History

from fastapi import BackgroundTasks, UploadFile
from utils.response_builder import ResponseBuilder
from config.settings import settings
from services.staff_identity_repository import StaffIdentityRepository
from typing import Dict, List, Union
from dataclasses import dataclass, field
import time
from datetime import datetime, timedelta
import random
import uuid
2026-03-20 14:42:11 +08:00
LARGE_TRANSACTION_RULE_CODES = [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
"SINGLE_LARGE_INCOME",
"CUMULATIVE_INCOME",
"ANNUAL_TURNOVER",
"LARGE_CASH_DEPOSIT",
"FREQUENT_CASH_DEPOSIT",
"LARGE_TRANSFER",
]
PHASE1_RULE_CODES = [
"GAMBLING_SENSITIVE_KEYWORD",
"SPECIAL_AMOUNT_TRANSACTION",
"SUSPICIOUS_INCOME_KEYWORD",
"FOREX_BUY_AMT",
"FOREX_SELL_AMT",
"STOCK_TFR_LARGE",
"LARGE_STOCK_TRADING",
"WITHDRAW_CNT",
]
@dataclass
class FileRecord:
"""文件记录模型(扩展版)"""
# 原有字段
log_id: int
group_id: int
file_name: str
status: int = -5 # -5 表示解析成功
upload_status_desc: str = "data.wait.confirm.newaccount"
parsing: bool = True # True表示正在解析
# 新增字段 - 账号和主体信息
primary_enterprise_name: str = ""
primary_account_no: str = ""
account_no_list: List[str] = field(default_factory=list)
enterprise_name_list: List[str] = field(default_factory=list)
# 新增字段 - 银行和模板信息
bank_name: str = "ZJRCU"
real_bank_name: str = "ZJRCU"
template_name: str = "ZJRCU_T251114"
data_type_info: List[str] = field(default_factory=lambda: ["CSV", ","])
# 新增字段 - 文件元数据
file_size: int = 50000
download_file_name: str = ""
file_package_id: str = field(default_factory=lambda: str(uuid.uuid4()).replace('-', ''))
# 新增字段 - 上传用户信息
file_upload_by: int = 448
file_upload_by_user_name: str = "admin@support.com"
file_upload_time: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 新增字段 - 法律实体信息
le_id: int = 10000
login_le_id: int = 10000
log_type: str = "bankstatement"
log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}"
lost_header: List[str] = field(default_factory=list)
# 新增字段 - 记录统计
rows: int = 0
source: str = "http"
total_records: int = 150
is_split: int = 0
# 新增字段 - 交易日期范围
trx_date_start_id: int = 20240101
trx_date_end_id: int = 20241231
# 新增字段 - 身份绑定
staff_name: str = ""
staff_id_card: str = ""
family_id_cards: List[str] = field(default_factory=list)
2026-03-20 14:42:11 +08:00
large_transaction_hit_rules: List[str] = field(default_factory=list)
phase1_hit_rules: List[str] = field(default_factory=list)
class FileService:
"""文件上传和解析服务"""
2026-03-19 17:18:02 +08:00
INNER_FLOW_TOTAL_RECORDS = 200
def __init__(self, staff_identity_repository=None):
self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord
self.log_counter = settings.INITIAL_LOG_ID
self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository()
def get_file_record(self, log_id: int) -> FileRecord:
"""按 logId 获取已存在的文件记录。"""
return self.file_records.get(log_id)
def _infer_bank_name(self, filename: str) -> tuple:
"""根据文件名推断银行名称和模板名称"""
if "支付宝" in filename or "alipay" in filename.lower():
return "ALIPAY", "ALIPAY_T220708"
elif "绍兴银行" in filename or "BSX" in filename:
return "BSX", "BSX_T240925"
else:
return "ZJRCU", "ZJRCU_T251114"
def _generate_primary_binding(self) -> tuple:
"""生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{random.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _generate_primary_binding_from_rng(self, rng: random.Random) -> tuple:
"""使用局部随机源生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{rng.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _build_primary_binding_lists(
self, primary_enterprise_name: str, primary_account_no: str
) -> dict:
"""基于主绑定事实源构建列表字段。"""
return {
"accountNoList": [primary_account_no],
"enterpriseNameList": [primary_enterprise_name],
}
2026-03-20 14:42:11 +08:00
def _pick_rule_subset(
self,
rng: random.Random,
rule_codes: List[str],
min_count: int,
max_count: int,
) -> List[str]:
"""按固定随机源选择稳定规则子集,并保留规则池原始顺序。"""
selected_codes = set(rng.sample(rule_codes, rng.randint(min_count, max_count)))
return [rule_code for rule_code in rule_codes if rule_code in selected_codes]
def _build_rule_hit_plan(self, log_id: int) -> dict:
"""基于 logId 生成稳定的规则命中计划。"""
rng = random.Random(f"rule-plan:{log_id}")
return {
"large_transaction_hit_rules": self._pick_rule_subset(
rng, LARGE_TRANSACTION_RULE_CODES, 2, 4
),
"phase1_hit_rules": self._pick_rule_subset(rng, PHASE1_RULE_CODES, 2, 4),
}
def _create_file_record(
self,
*,
log_id: int,
group_id: int,
file_name: str,
download_file_name: str,
bank_name: str,
template_name: str,
primary_enterprise_name: str,
primary_account_no: str,
file_size: int,
total_records: int,
trx_date_start_id: int,
trx_date_end_id: int,
le_id: int,
login_le_id: int,
staff_name: str = "",
staff_id_card: str = "",
family_id_cards: List[str] = None,
2026-03-20 14:42:11 +08:00
large_transaction_hit_rules: List[str] = None,
phase1_hit_rules: List[str] = None,
parsing: bool = True,
status: int = -5,
) -> FileRecord:
"""创建文件记录并写入主绑定信息。"""
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name,
primary_account_no,
)
return FileRecord(
log_id=log_id,
group_id=group_id,
file_name=file_name,
download_file_name=download_file_name,
bank_name=bank_name,
real_bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
account_no_list=binding_lists["accountNoList"],
enterprise_name_list=binding_lists["enterpriseNameList"],
le_id=le_id,
login_le_id=login_le_id,
file_size=file_size,
total_records=total_records,
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
staff_name=staff_name,
staff_id_card=staff_id_card,
family_id_cards=list(family_id_cards or []),
2026-03-20 14:42:11 +08:00
large_transaction_hit_rules=list(large_transaction_hit_rules or []),
phase1_hit_rules=list(phase1_hit_rules or []),
parsing=parsing,
status=status,
)
def _select_staff_identity_scope(self) -> dict:
"""读取一个员工及其亲属身份范围。"""
return self.staff_identity_repository.select_random_staff_with_families()
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一logId
self.log_counter += 1
log_id = self.log_counter
# 推断银行信息
bank_name, template_name = self._infer_bank_name(file.filename)
2026-03-20 14:42:11 +08:00
rule_hit_plan = self._build_rule_hit_plan(log_id)
# 生成合理的交易日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=random.randint(90, 365))
trx_date_start_id = int(start_date.strftime("%Y%m%d"))
trx_date_end_id = int(end_date.strftime("%Y%m%d"))
# 生成单一主绑定
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
# 创建完整的文件记录
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=file.filename,
download_file_name=file.filename,
bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(150, 200),
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
2026-03-20 14:42:11 +08:00
large_transaction_hit_rules=rule_hit_plan["large_transaction_hit_rules"],
phase1_hit_rules=rule_hit_plan["phase1_hit_rules"],
)
# 存储记录
self.file_records[log_id] = file_record
# 添加后台任务(延迟解析)
background_tasks.add_task(self._delayed_parse, log_id)
# 构建响应
return self._build_upload_response(file_record)
def _build_upload_response(self, file_record: FileRecord) -> dict:
"""构建上传接口的完整响应"""
return {
"code": "200",
"data": {
"accountsOfLog": {
str(file_record.log_id): [
{
"bank": file_record.bank_name,
"accountName": file_record.primary_enterprise_name,
"accountNo": file_record.primary_account_no,
"currency": "CNY"
}
]
},
"uploadLogList": [
{
**self._build_primary_binding_lists(
file_record.primary_enterprise_name,
file_record.primary_account_no,
),
"bankName": file_record.bank_name,
"dataTypeInfo": file_record.data_type_info,
"downloadFileName": file_record.download_file_name,
"filePackageId": file_record.file_package_id,
"fileSize": file_record.file_size,
"fileUploadBy": file_record.file_upload_by,
"fileUploadByUserName": file_record.file_upload_by_user_name,
"fileUploadTime": file_record.file_upload_time,
"leId": file_record.le_id,
"logId": file_record.log_id,
"logMeta": file_record.log_meta,
"logType": file_record.log_type,
"loginLeId": file_record.login_le_id,
"lostHeader": file_record.lost_header,
"realBankName": file_record.real_bank_name,
"rows": file_record.rows,
"source": file_record.source,
"status": file_record.status,
"templateName": file_record.template_name,
"totalRecords": file_record.total_records,
"trxDateEndId": file_record.trx_date_end_id,
"trxDateStartId": file_record.trx_date_start_id,
"uploadFileName": file_record.file_name,
"uploadStatusDesc": file_record.upload_status_desc
}
],
"uploadStatus": 1
},
"status": "200",
"successResponse": True
}
def _delayed_parse(self, log_id: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
"""
time.sleep(settings.PARSE_DELAY_SECONDS)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id].parsing = False
def _generate_deterministic_record(
self, log_id: int, group_id: int, rng: random.Random
) -> dict:
"""
基于 logId 生成确定性的文件记录
Args:
log_id: 文件ID用作随机种子
group_id: 项目ID
Returns:
文件记录字典26个字段
"""
# 银行类型选项
bank_options = [
("ALIPAY", "ALIPAY_T220708"),
("BSX", "BSX_T240925"),
("ZJRCU", "ZJRCU_T251114")
]
bank_name, template_name = rng.choice(bank_options)
# 生成基于种子的稳定时间范围,确保同一 logId 重复查询完全一致
base_datetime = datetime(2024, 1, 1, 8, 0, 0)
end_date = base_datetime + timedelta(days=rng.randint(180, 540))
start_date = end_date - timedelta(days=rng.randint(90, 365))
file_upload_time = (
base_datetime
+ timedelta(
days=rng.randint(0, 540),
hours=rng.randint(0, 23),
minutes=rng.randint(0, 59),
seconds=rng.randint(0, 59),
)
)
# 生成账号和主体
primary_enterprise_name, primary_account_no = self._generate_primary_binding_from_rng(rng)
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name, primary_account_no
)
return {
**binding_lists,
"bankName": bank_name,
"dataTypeInfo": ["CSV", ","],
"downloadFileName": f"测试文件_{log_id}.csv",
"fileSize": rng.randint(10000, 100000),
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": file_upload_time.strftime("%Y-%m-%d %H:%M:%S"),
"isSplit": 0,
"leId": 10000 + rng.randint(0, 9999),
"logId": log_id,
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
"logType": "bankstatement",
"loginLeId": 10000 + rng.randint(0, 9999),
"lostHeader": [],
"realBankName": bank_name,
"rows": 0,
"source": "http",
"status": -5,
"templateName": template_name,
"totalRecords": rng.randint(100, 300),
"trxDateEndId": int(end_date.strftime("%Y%m%d")),
"trxDateStartId": int(start_date.strftime("%Y%m%d")),
"uploadFileName": f"测试文件_{log_id}.pdf",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
def _build_deterministic_log_detail(self, log_id: int, group_id: int) -> dict:
"""构建 deterministic 回退的单条日志详情。"""
rng = random.Random(log_id)
return self._generate_deterministic_record(log_id, group_id, rng)
def _build_log_detail(self, record: FileRecord) -> dict:
"""构建日志详情对象"""
return {
**self._build_primary_binding_lists(
record.primary_enterprise_name,
record.primary_account_no,
),
"bankName": record.bank_name,
"dataTypeInfo": record.data_type_info,
"downloadFileName": record.download_file_name,
"fileSize": record.file_size,
"fileUploadBy": record.file_upload_by,
"fileUploadByUserName": record.file_upload_by_user_name,
"fileUploadTime": record.file_upload_time,
"isSplit": record.is_split,
"leId": record.le_id,
"logId": record.log_id,
"logMeta": record.log_meta,
"logType": record.log_type,
"loginLeId": record.login_le_id,
"lostHeader": record.lost_header,
"realBankName": record.real_bank_name,
"rows": record.rows,
"source": record.source,
"status": record.status,
"templateName": record.template_name,
"totalRecords": record.total_records,
"trxDateEndId": record.trx_date_end_id,
"trxDateStartId": record.trx_date_start_id,
"uploadFileName": record.file_name,
"uploadStatusDesc": record.upload_status_desc
}
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
pending_list = []
all_parsing_complete = True
for log_id in log_ids:
if log_id in self.file_records:
record = self.file_records[log_id]
if record.parsing:
all_parsing_complete = False
pending_list.append(self._build_log_detail(record))
return {
"code": "200",
"data": {
"parsing": not all_parsing_complete,
"pendingList": pending_list
},
"status": "200",
"successResponse": True
}
def get_upload_status(self, group_id: int, log_id: int = None) -> dict:
"""
获取文件上传状态基于 logId 生成确定性数据
Args:
group_id: 项目ID
log_id: 文件ID可选
Returns:
上传状态响应字典
"""
logs = []
if log_id is not None:
if log_id in self.file_records:
log_detail = self._build_log_detail(self.file_records[log_id])
else:
log_detail = self._build_deterministic_log_detail(log_id, group_id)
logs.append(log_detail)
# 返回响应
return {
"code": "200",
"data": {
"logs": logs,
"status": "",
"accountId": 8954,
"currency": "CNY"
},
"status": "200",
"successResponse": True
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
deleted_count = 0
for log_id in log_ids:
if log_id in self.file_records:
del self.file_records[log_id]
deleted_count += 1
return {
"code": "200 OK", # 注意:这里是 "200 OK" 不是 "200"
"data": {
"message": "delete.files.success"
},
"message": "delete.files.success",
"status": "200",
"successResponse": True
}
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(创建并保存绑定记录)
Args:
request: 拉取流水请求可以是字典或对象
Returns:
流水响应字典包含创建并保存的logId数组
"""
# 支持 dict 或对象
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
customer_no = request.get("customerNo", "")
data_start_date_id = request.get("dataStartDateId", 20240101)
data_end_date_id = request.get("dataEndDateId", 20241231)
else:
group_id = request.groupId
customer_no = request.customerNo
data_start_date_id = request.dataStartDateId
data_end_date_id = request.dataEndDateId
# 使用递增 logId确保与上传链路一致
self.log_counter += 1
log_id = self.log_counter
2026-03-20 14:42:11 +08:00
rule_hit_plan = self._build_rule_hit_plan(log_id)
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
download_file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
bank_name="ZJRCU",
template_name="ZJRCU_T251114",
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
2026-03-19 17:18:02 +08:00
total_records=self.INNER_FLOW_TOTAL_RECORDS,
trx_date_start_id=data_start_date_id,
trx_date_end_id=data_end_date_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
2026-03-20 14:42:11 +08:00
large_transaction_hit_rules=rule_hit_plan["large_transaction_hit_rules"],
phase1_hit_rules=rule_hit_plan["phase1_hit_rules"],
parsing=False,
)
self.file_records[log_id] = file_record
# 返回成功的响应包含logId数组
return {
"code": "200",
"data": [log_id],
"status": "200",
"successResponse": True,
}