from fastapi import BackgroundTasks, UploadFile from utils.response_builder import ResponseBuilder from config.settings import settings from typing import Dict, List, Union from dataclasses import dataclass, field import time from datetime import datetime, timedelta import random import uuid @dataclass class FileRecord: """文件记录模型(扩展版)""" # 原有字段 log_id: int group_id: int file_name: str status: int = -5 # -5 表示解析成功 upload_status_desc: str = "data.wait.confirm.newaccount" parsing: bool = True # True表示正在解析 # 新增字段 - 账号和主体信息 primary_enterprise_name: str = "" primary_account_no: str = "" account_no_list: List[str] = field(default_factory=list) enterprise_name_list: List[str] = field(default_factory=list) # 新增字段 - 银行和模板信息 bank_name: str = "ZJRCU" real_bank_name: str = "ZJRCU" template_name: str = "ZJRCU_T251114" data_type_info: List[str] = field(default_factory=lambda: ["CSV", ","]) # 新增字段 - 文件元数据 file_size: int = 50000 download_file_name: str = "" file_package_id: str = field(default_factory=lambda: str(uuid.uuid4()).replace('-', '')) # 新增字段 - 上传用户信息 file_upload_by: int = 448 file_upload_by_user_name: str = "admin@support.com" file_upload_time: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S")) # 新增字段 - 法律实体信息 le_id: int = 10000 login_le_id: int = 10000 log_type: str = "bankstatement" log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}" lost_header: List[str] = field(default_factory=list) # 新增字段 - 记录统计 rows: int = 0 source: str = "http" total_records: int = 150 is_split: int = 0 # 新增字段 - 交易日期范围 trx_date_start_id: int = 20240101 trx_date_end_id: int = 20241231 class FileService: """文件上传和解析服务""" def __init__(self): self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord self.log_counter = settings.INITIAL_LOG_ID def get_file_record(self, log_id: int) -> FileRecord: """按 logId 获取已存在的文件记录。""" return self.file_records.get(log_id) def _infer_bank_name(self, filename: str) -> tuple: """根据文件名推断银行名称和模板名称""" if "支付宝" in filename or "alipay" in filename.lower(): return "ALIPAY", "ALIPAY_T220708" elif "绍兴银行" in filename or "BSX" in filename: return "BSX", "BSX_T240925" else: return "ZJRCU", "ZJRCU_T251114" def _generate_primary_binding(self) -> tuple: """生成单一稳定的本方主体/本方账号绑定。""" primary_account_no = f"{random.randint(10000000000, 99999999999)}" primary_enterprise_name = "测试主体" return primary_enterprise_name, primary_account_no def _generate_primary_binding_from_rng(self, rng: random.Random) -> tuple: """使用局部随机源生成单一稳定的本方主体/本方账号绑定。""" primary_account_no = f"{rng.randint(10000000000, 99999999999)}" primary_enterprise_name = "测试主体" return primary_enterprise_name, primary_account_no def _build_primary_binding_lists( self, primary_enterprise_name: str, primary_account_no: str ) -> dict: """基于主绑定事实源构建列表字段。""" return { "accountNoList": [primary_account_no], "enterpriseNameList": [primary_enterprise_name], } def _create_file_record( self, *, log_id: int, group_id: int, file_name: str, download_file_name: str, bank_name: str, template_name: str, primary_enterprise_name: str, primary_account_no: str, file_size: int, total_records: int, trx_date_start_id: int, trx_date_end_id: int, le_id: int, login_le_id: int, parsing: bool = True, status: int = -5, ) -> FileRecord: """创建文件记录并写入主绑定信息。""" binding_lists = self._build_primary_binding_lists( primary_enterprise_name, primary_account_no, ) return FileRecord( log_id=log_id, group_id=group_id, file_name=file_name, download_file_name=download_file_name, bank_name=bank_name, real_bank_name=bank_name, template_name=template_name, primary_enterprise_name=primary_enterprise_name, primary_account_no=primary_account_no, account_no_list=binding_lists["accountNoList"], enterprise_name_list=binding_lists["enterpriseNameList"], le_id=le_id, login_le_id=login_le_id, file_size=file_size, total_records=total_records, trx_date_start_id=trx_date_start_id, trx_date_end_id=trx_date_end_id, parsing=parsing, status=status, ) async def upload_file( self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks ) -> Dict: """上传文件并启动后台解析任务 Args: group_id: 项目ID file: 上传的文件 background_tasks: FastAPI后台任务 Returns: 上传响应字典 """ # 生成唯一logId self.log_counter += 1 log_id = self.log_counter # 推断银行信息 bank_name, template_name = self._infer_bank_name(file.filename) # 生成合理的交易日期范围 end_date = datetime.now() start_date = end_date - timedelta(days=random.randint(90, 365)) trx_date_start_id = int(start_date.strftime("%Y%m%d")) trx_date_end_id = int(end_date.strftime("%Y%m%d")) # 生成单一主绑定 primary_enterprise_name, primary_account_no = self._generate_primary_binding() # 创建完整的文件记录 file_record = self._create_file_record( log_id=log_id, group_id=group_id, file_name=file.filename, download_file_name=file.filename, bank_name=bank_name, template_name=template_name, primary_enterprise_name=primary_enterprise_name, primary_account_no=primary_account_no, file_size=random.randint(10000, 100000), total_records=random.randint(100, 300), trx_date_start_id=trx_date_start_id, trx_date_end_id=trx_date_end_id, le_id=10000 + random.randint(0, 9999), login_le_id=10000 + random.randint(0, 9999), ) # 存储记录 self.file_records[log_id] = file_record # 添加后台任务(延迟解析) background_tasks.add_task(self._delayed_parse, log_id) # 构建响应 return self._build_upload_response(file_record) def _build_upload_response(self, file_record: FileRecord) -> dict: """构建上传接口的完整响应""" return { "code": "200", "data": { "accountsOfLog": { str(file_record.log_id): [ { "bank": file_record.bank_name, "accountName": file_record.primary_enterprise_name, "accountNo": file_record.primary_account_no, "currency": "CNY" } ] }, "uploadLogList": [ { **self._build_primary_binding_lists( file_record.primary_enterprise_name, file_record.primary_account_no, ), "bankName": file_record.bank_name, "dataTypeInfo": file_record.data_type_info, "downloadFileName": file_record.download_file_name, "filePackageId": file_record.file_package_id, "fileSize": file_record.file_size, "fileUploadBy": file_record.file_upload_by, "fileUploadByUserName": file_record.file_upload_by_user_name, "fileUploadTime": file_record.file_upload_time, "leId": file_record.le_id, "logId": file_record.log_id, "logMeta": file_record.log_meta, "logType": file_record.log_type, "loginLeId": file_record.login_le_id, "lostHeader": file_record.lost_header, "realBankName": file_record.real_bank_name, "rows": file_record.rows, "source": file_record.source, "status": file_record.status, "templateName": file_record.template_name, "totalRecords": file_record.total_records, "trxDateEndId": file_record.trx_date_end_id, "trxDateStartId": file_record.trx_date_start_id, "uploadFileName": file_record.file_name, "uploadStatusDesc": file_record.upload_status_desc } ], "uploadStatus": 1 }, "status": "200", "successResponse": True } def _delayed_parse(self, log_id: int): """后台任务:模拟文件解析过程 Args: log_id: 日志ID """ time.sleep(settings.PARSE_DELAY_SECONDS) # 解析完成,更新状态 if log_id in self.file_records: self.file_records[log_id].parsing = False def _generate_deterministic_record( self, log_id: int, group_id: int, rng: random.Random ) -> dict: """ 基于 logId 生成确定性的文件记录 Args: log_id: 文件ID(用作随机种子) group_id: 项目ID Returns: 文件记录字典(26个字段) """ # 银行类型选项 bank_options = [ ("ALIPAY", "ALIPAY_T220708"), ("BSX", "BSX_T240925"), ("ZJRCU", "ZJRCU_T251114") ] bank_name, template_name = rng.choice(bank_options) # 生成基于种子的稳定时间范围,确保同一 logId 重复查询完全一致 base_datetime = datetime(2024, 1, 1, 8, 0, 0) end_date = base_datetime + timedelta(days=rng.randint(180, 540)) start_date = end_date - timedelta(days=rng.randint(90, 365)) file_upload_time = ( base_datetime + timedelta( days=rng.randint(0, 540), hours=rng.randint(0, 23), minutes=rng.randint(0, 59), seconds=rng.randint(0, 59), ) ) # 生成账号和主体 primary_enterprise_name, primary_account_no = self._generate_primary_binding_from_rng(rng) binding_lists = self._build_primary_binding_lists( primary_enterprise_name, primary_account_no ) return { **binding_lists, "bankName": bank_name, "dataTypeInfo": ["CSV", ","], "downloadFileName": f"测试文件_{log_id}.csv", "fileSize": rng.randint(10000, 100000), "fileUploadBy": 448, "fileUploadByUserName": "admin@support.com", "fileUploadTime": file_upload_time.strftime("%Y-%m-%d %H:%M:%S"), "isSplit": 0, "leId": 10000 + rng.randint(0, 9999), "logId": log_id, "logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}", "logType": "bankstatement", "loginLeId": 10000 + rng.randint(0, 9999), "lostHeader": [], "realBankName": bank_name, "rows": 0, "source": "http", "status": -5, "templateName": template_name, "totalRecords": rng.randint(100, 300), "trxDateEndId": int(end_date.strftime("%Y%m%d")), "trxDateStartId": int(start_date.strftime("%Y%m%d")), "uploadFileName": f"测试文件_{log_id}.pdf", "uploadStatusDesc": "data.wait.confirm.newaccount" } def _build_deterministic_log_detail(self, log_id: int, group_id: int) -> dict: """构建 deterministic 回退的单条日志详情。""" rng = random.Random(log_id) return self._generate_deterministic_record(log_id, group_id, rng) def _build_log_detail(self, record: FileRecord) -> dict: """构建日志详情对象""" return { **self._build_primary_binding_lists( record.primary_enterprise_name, record.primary_account_no, ), "bankName": record.bank_name, "dataTypeInfo": record.data_type_info, "downloadFileName": record.download_file_name, "fileSize": record.file_size, "fileUploadBy": record.file_upload_by, "fileUploadByUserName": record.file_upload_by_user_name, "fileUploadTime": record.file_upload_time, "isSplit": record.is_split, "leId": record.le_id, "logId": record.log_id, "logMeta": record.log_meta, "logType": record.log_type, "loginLeId": record.login_le_id, "lostHeader": record.lost_header, "realBankName": record.real_bank_name, "rows": record.rows, "source": record.source, "status": record.status, "templateName": record.template_name, "totalRecords": record.total_records, "trxDateEndId": record.trx_date_end_id, "trxDateStartId": record.trx_date_start_id, "uploadFileName": record.file_name, "uploadStatusDesc": record.upload_status_desc } def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict: """检查文件解析状态 Args: group_id: 项目ID inprogress_list: 文件ID列表(逗号分隔) Returns: 解析状态响应字典 """ # 解析logId列表 log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()] pending_list = [] all_parsing_complete = True for log_id in log_ids: if log_id in self.file_records: record = self.file_records[log_id] if record.parsing: all_parsing_complete = False pending_list.append(self._build_log_detail(record)) return { "code": "200", "data": { "parsing": not all_parsing_complete, "pendingList": pending_list }, "status": "200", "successResponse": True } def get_upload_status(self, group_id: int, log_id: int = None) -> dict: """ 获取文件上传状态(基于 logId 生成确定性数据) Args: group_id: 项目ID log_id: 文件ID(可选) Returns: 上传状态响应字典 """ logs = [] if log_id is not None: if log_id in self.file_records: log_detail = self._build_log_detail(self.file_records[log_id]) else: log_detail = self._build_deterministic_log_detail(log_id, group_id) logs.append(log_detail) # 返回响应 return { "code": "200", "data": { "logs": logs, "status": "", "accountId": 8954, "currency": "CNY" }, "status": "200", "successResponse": True } def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict: """删除文件 Args: group_id: 项目ID log_ids: 文件ID列表 user_id: 用户ID Returns: 删除响应字典 """ # 删除文件记录 deleted_count = 0 for log_id in log_ids: if log_id in self.file_records: del self.file_records[log_id] deleted_count += 1 return { "code": "200 OK", # 注意:这里是 "200 OK" 不是 "200" "data": { "message": "delete.files.success" }, "message": "delete.files.success", "status": "200", "successResponse": True } def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict: """拉取行内流水(创建并保存绑定记录) Args: request: 拉取流水请求(可以是字典或对象) Returns: 流水响应字典,包含创建并保存的logId数组 """ # 支持 dict 或对象 if isinstance(request, dict): group_id = request.get("groupId", 1000) customer_no = request.get("customerNo", "") data_start_date_id = request.get("dataStartDateId", 20240101) data_end_date_id = request.get("dataEndDateId", 20241231) else: group_id = request.groupId customer_no = request.customerNo data_start_date_id = request.dataStartDateId data_end_date_id = request.dataEndDateId # 使用递增 logId,确保与上传链路一致 self.log_counter += 1 log_id = self.log_counter primary_enterprise_name, primary_account_no = self._generate_primary_binding() file_record = self._create_file_record( log_id=log_id, group_id=group_id, file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv", download_file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv", bank_name="ZJRCU", template_name="ZJRCU_T251114", primary_enterprise_name=primary_enterprise_name, primary_account_no=primary_account_no, file_size=random.randint(10000, 100000), total_records=random.randint(100, 300), trx_date_start_id=data_start_date_id, trx_date_end_id=data_end_date_id, le_id=10000 + random.randint(0, 9999), login_le_id=10000 + random.randint(0, 9999), parsing=False, ) self.file_records[log_id] = file_record # 返回成功的响应,包含logId数组 return { "code": "200", "data": [log_id], "status": "200", "successResponse": True, }