Files
ccdi/lsfx-mock-server/services/file_service.py

555 lines
20 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import BackgroundTasks, UploadFile
from utils.response_builder import ResponseBuilder
from config.settings import settings
from services.staff_identity_repository import StaffIdentityRepository
from typing import Dict, List, Union
from dataclasses import dataclass, field
import time
from datetime import datetime, timedelta
import random
import uuid
@dataclass
class FileRecord:
"""文件记录模型(扩展版)"""
# 原有字段
log_id: int
group_id: int
file_name: str
status: int = -5 # -5 表示解析成功
upload_status_desc: str = "data.wait.confirm.newaccount"
parsing: bool = True # True表示正在解析
# 新增字段 - 账号和主体信息
primary_enterprise_name: str = ""
primary_account_no: str = ""
account_no_list: List[str] = field(default_factory=list)
enterprise_name_list: List[str] = field(default_factory=list)
# 新增字段 - 银行和模板信息
bank_name: str = "ZJRCU"
real_bank_name: str = "ZJRCU"
template_name: str = "ZJRCU_T251114"
data_type_info: List[str] = field(default_factory=lambda: ["CSV", ","])
# 新增字段 - 文件元数据
file_size: int = 50000
download_file_name: str = ""
file_package_id: str = field(default_factory=lambda: str(uuid.uuid4()).replace('-', ''))
# 新增字段 - 上传用户信息
file_upload_by: int = 448
file_upload_by_user_name: str = "admin@support.com"
file_upload_time: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 新增字段 - 法律实体信息
le_id: int = 10000
login_le_id: int = 10000
log_type: str = "bankstatement"
log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}"
lost_header: List[str] = field(default_factory=list)
# 新增字段 - 记录统计
rows: int = 0
source: str = "http"
total_records: int = 150
is_split: int = 0
# 新增字段 - 交易日期范围
trx_date_start_id: int = 20240101
trx_date_end_id: int = 20241231
# 新增字段 - 身份绑定
staff_name: str = ""
staff_id_card: str = ""
family_id_cards: List[str] = field(default_factory=list)
class FileService:
"""文件上传和解析服务"""
def __init__(self, staff_identity_repository=None):
self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord
self.log_counter = settings.INITIAL_LOG_ID
self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository()
def get_file_record(self, log_id: int) -> FileRecord:
"""按 logId 获取已存在的文件记录。"""
return self.file_records.get(log_id)
def _infer_bank_name(self, filename: str) -> tuple:
"""根据文件名推断银行名称和模板名称"""
if "支付宝" in filename or "alipay" in filename.lower():
return "ALIPAY", "ALIPAY_T220708"
elif "绍兴银行" in filename or "BSX" in filename:
return "BSX", "BSX_T240925"
else:
return "ZJRCU", "ZJRCU_T251114"
def _generate_primary_binding(self) -> tuple:
"""生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{random.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _generate_primary_binding_from_rng(self, rng: random.Random) -> tuple:
"""使用局部随机源生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{rng.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _build_primary_binding_lists(
self, primary_enterprise_name: str, primary_account_no: str
) -> dict:
"""基于主绑定事实源构建列表字段。"""
return {
"accountNoList": [primary_account_no],
"enterpriseNameList": [primary_enterprise_name],
}
def _create_file_record(
self,
*,
log_id: int,
group_id: int,
file_name: str,
download_file_name: str,
bank_name: str,
template_name: str,
primary_enterprise_name: str,
primary_account_no: str,
file_size: int,
total_records: int,
trx_date_start_id: int,
trx_date_end_id: int,
le_id: int,
login_le_id: int,
staff_name: str = "",
staff_id_card: str = "",
family_id_cards: List[str] = None,
parsing: bool = True,
status: int = -5,
) -> FileRecord:
"""创建文件记录并写入主绑定信息。"""
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name,
primary_account_no,
)
return FileRecord(
log_id=log_id,
group_id=group_id,
file_name=file_name,
download_file_name=download_file_name,
bank_name=bank_name,
real_bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
account_no_list=binding_lists["accountNoList"],
enterprise_name_list=binding_lists["enterpriseNameList"],
le_id=le_id,
login_le_id=login_le_id,
file_size=file_size,
total_records=total_records,
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
staff_name=staff_name,
staff_id_card=staff_id_card,
family_id_cards=list(family_id_cards or []),
parsing=parsing,
status=status,
)
def _select_staff_identity_scope(self) -> dict:
"""读取一个员工及其亲属身份范围。"""
return self.staff_identity_repository.select_random_staff_with_families()
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一logId
self.log_counter += 1
log_id = self.log_counter
# 推断银行信息
bank_name, template_name = self._infer_bank_name(file.filename)
# 生成合理的交易日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=random.randint(90, 365))
trx_date_start_id = int(start_date.strftime("%Y%m%d"))
trx_date_end_id = int(end_date.strftime("%Y%m%d"))
# 生成单一主绑定
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
# 创建完整的文件记录
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=file.filename,
download_file_name=file.filename,
bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(150, 200),
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
)
# 存储记录
self.file_records[log_id] = file_record
# 添加后台任务(延迟解析)
background_tasks.add_task(self._delayed_parse, log_id)
# 构建响应
return self._build_upload_response(file_record)
def _build_upload_response(self, file_record: FileRecord) -> dict:
"""构建上传接口的完整响应"""
return {
"code": "200",
"data": {
"accountsOfLog": {
str(file_record.log_id): [
{
"bank": file_record.bank_name,
"accountName": file_record.primary_enterprise_name,
"accountNo": file_record.primary_account_no,
"currency": "CNY"
}
]
},
"uploadLogList": [
{
**self._build_primary_binding_lists(
file_record.primary_enterprise_name,
file_record.primary_account_no,
),
"bankName": file_record.bank_name,
"dataTypeInfo": file_record.data_type_info,
"downloadFileName": file_record.download_file_name,
"filePackageId": file_record.file_package_id,
"fileSize": file_record.file_size,
"fileUploadBy": file_record.file_upload_by,
"fileUploadByUserName": file_record.file_upload_by_user_name,
"fileUploadTime": file_record.file_upload_time,
"leId": file_record.le_id,
"logId": file_record.log_id,
"logMeta": file_record.log_meta,
"logType": file_record.log_type,
"loginLeId": file_record.login_le_id,
"lostHeader": file_record.lost_header,
"realBankName": file_record.real_bank_name,
"rows": file_record.rows,
"source": file_record.source,
"status": file_record.status,
"templateName": file_record.template_name,
"totalRecords": file_record.total_records,
"trxDateEndId": file_record.trx_date_end_id,
"trxDateStartId": file_record.trx_date_start_id,
"uploadFileName": file_record.file_name,
"uploadStatusDesc": file_record.upload_status_desc
}
],
"uploadStatus": 1
},
"status": "200",
"successResponse": True
}
def _delayed_parse(self, log_id: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
"""
time.sleep(settings.PARSE_DELAY_SECONDS)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id].parsing = False
def _generate_deterministic_record(
self, log_id: int, group_id: int, rng: random.Random
) -> dict:
"""
基于 logId 生成确定性的文件记录
Args:
log_id: 文件ID用作随机种子
group_id: 项目ID
Returns:
文件记录字典26个字段
"""
# 银行类型选项
bank_options = [
("ALIPAY", "ALIPAY_T220708"),
("BSX", "BSX_T240925"),
("ZJRCU", "ZJRCU_T251114")
]
bank_name, template_name = rng.choice(bank_options)
# 生成基于种子的稳定时间范围,确保同一 logId 重复查询完全一致
base_datetime = datetime(2024, 1, 1, 8, 0, 0)
end_date = base_datetime + timedelta(days=rng.randint(180, 540))
start_date = end_date - timedelta(days=rng.randint(90, 365))
file_upload_time = (
base_datetime
+ timedelta(
days=rng.randint(0, 540),
hours=rng.randint(0, 23),
minutes=rng.randint(0, 59),
seconds=rng.randint(0, 59),
)
)
# 生成账号和主体
primary_enterprise_name, primary_account_no = self._generate_primary_binding_from_rng(rng)
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name, primary_account_no
)
return {
**binding_lists,
"bankName": bank_name,
"dataTypeInfo": ["CSV", ","],
"downloadFileName": f"测试文件_{log_id}.csv",
"fileSize": rng.randint(10000, 100000),
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": file_upload_time.strftime("%Y-%m-%d %H:%M:%S"),
"isSplit": 0,
"leId": 10000 + rng.randint(0, 9999),
"logId": log_id,
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
"logType": "bankstatement",
"loginLeId": 10000 + rng.randint(0, 9999),
"lostHeader": [],
"realBankName": bank_name,
"rows": 0,
"source": "http",
"status": -5,
"templateName": template_name,
"totalRecords": rng.randint(100, 300),
"trxDateEndId": int(end_date.strftime("%Y%m%d")),
"trxDateStartId": int(start_date.strftime("%Y%m%d")),
"uploadFileName": f"测试文件_{log_id}.pdf",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
def _build_deterministic_log_detail(self, log_id: int, group_id: int) -> dict:
"""构建 deterministic 回退的单条日志详情。"""
rng = random.Random(log_id)
return self._generate_deterministic_record(log_id, group_id, rng)
def _build_log_detail(self, record: FileRecord) -> dict:
"""构建日志详情对象"""
return {
**self._build_primary_binding_lists(
record.primary_enterprise_name,
record.primary_account_no,
),
"bankName": record.bank_name,
"dataTypeInfo": record.data_type_info,
"downloadFileName": record.download_file_name,
"fileSize": record.file_size,
"fileUploadBy": record.file_upload_by,
"fileUploadByUserName": record.file_upload_by_user_name,
"fileUploadTime": record.file_upload_time,
"isSplit": record.is_split,
"leId": record.le_id,
"logId": record.log_id,
"logMeta": record.log_meta,
"logType": record.log_type,
"loginLeId": record.login_le_id,
"lostHeader": record.lost_header,
"realBankName": record.real_bank_name,
"rows": record.rows,
"source": record.source,
"status": record.status,
"templateName": record.template_name,
"totalRecords": record.total_records,
"trxDateEndId": record.trx_date_end_id,
"trxDateStartId": record.trx_date_start_id,
"uploadFileName": record.file_name,
"uploadStatusDesc": record.upload_status_desc
}
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
pending_list = []
all_parsing_complete = True
for log_id in log_ids:
if log_id in self.file_records:
record = self.file_records[log_id]
if record.parsing:
all_parsing_complete = False
pending_list.append(self._build_log_detail(record))
return {
"code": "200",
"data": {
"parsing": not all_parsing_complete,
"pendingList": pending_list
},
"status": "200",
"successResponse": True
}
def get_upload_status(self, group_id: int, log_id: int = None) -> dict:
"""
获取文件上传状态(基于 logId 生成确定性数据)
Args:
group_id: 项目ID
log_id: 文件ID可选
Returns:
上传状态响应字典
"""
logs = []
if log_id is not None:
if log_id in self.file_records:
log_detail = self._build_log_detail(self.file_records[log_id])
else:
log_detail = self._build_deterministic_log_detail(log_id, group_id)
logs.append(log_detail)
# 返回响应
return {
"code": "200",
"data": {
"logs": logs,
"status": "",
"accountId": 8954,
"currency": "CNY"
},
"status": "200",
"successResponse": True
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
deleted_count = 0
for log_id in log_ids:
if log_id in self.file_records:
del self.file_records[log_id]
deleted_count += 1
return {
"code": "200 OK", # 注意:这里是 "200 OK" 不是 "200"
"data": {
"message": "delete.files.success"
},
"message": "delete.files.success",
"status": "200",
"successResponse": True
}
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(创建并保存绑定记录)
Args:
request: 拉取流水请求(可以是字典或对象)
Returns:
流水响应字典包含创建并保存的logId数组
"""
# 支持 dict 或对象
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
customer_no = request.get("customerNo", "")
data_start_date_id = request.get("dataStartDateId", 20240101)
data_end_date_id = request.get("dataEndDateId", 20241231)
else:
group_id = request.groupId
customer_no = request.customerNo
data_start_date_id = request.dataStartDateId
data_end_date_id = request.dataEndDateId
# 使用递增 logId确保与上传链路一致
self.log_counter += 1
log_id = self.log_counter
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
download_file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
bank_name="ZJRCU",
template_name="ZJRCU_T251114",
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(100, 300),
trx_date_start_id=data_start_date_id,
trx_date_end_id=data_end_date_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
parsing=False,
)
self.file_records[log_id] = file_record
# 返回成功的响应包含logId数组
return {
"code": "200",
"data": [log_id],
"status": "200",
"successResponse": True,
}