from fastapi import BackgroundTasks, UploadFile from utils.response_builder import ResponseBuilder from config.settings import settings from typing import Dict, List, Union import time from datetime import datetime class FileService: """文件上传和解析服务""" def __init__(self): self.file_records = {} # logId -> record self.parsing_status = {} # logId -> is_parsing self.log_counter = settings.INITIAL_LOG_ID async def upload_file( self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks ) -> Dict: """上传文件并启动后台解析任务 Args: group_id: 项目ID file: 上传的文件 background_tasks: FastAPI后台任务 Returns: 上传响应字典 """ # 生成唯一logId self.log_counter += 1 log_id = self.log_counter # 获取当前时间 upload_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S") # 立即存储文件记录(初始状态:解析中) self.file_records[log_id] = { "logId": log_id, "groupId": group_id, "status": -5, "uploadStatusDesc": "parsing", "uploadFileName": file.filename, "fileSize": 0, # 简化处理 "bankName": "MOCK", "uploadTime": upload_time, } # 标记为解析中 self.parsing_status[log_id] = True # 启动后台任务,延迟解析 background_tasks.add_task( self._simulate_parsing, log_id, settings.PARSE_DELAY_SECONDS ) # 构建响应 response = ResponseBuilder.build_success_response( "upload", log_id=log_id, upload_time=upload_time ) return response def _simulate_parsing(self, log_id: int, delay_seconds: int): """后台任务:模拟文件解析过程 Args: log_id: 日志ID delay_seconds: 延迟秒数 """ time.sleep(delay_seconds) # 解析完成,更新状态 if log_id in self.file_records: self.file_records[log_id]["uploadStatusDesc"] = ( "data.wait.confirm.newaccount" ) self.parsing_status[log_id] = False def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict: """检查文件解析状态 Args: group_id: 项目ID inprogress_list: 文件ID列表(逗号分隔) Returns: 解析状态响应字典 """ # 解析logId列表 log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()] # 检查是否还在解析中 is_parsing = any( self.parsing_status.get(log_id, False) for log_id in log_ids ) # 获取待处理列表 pending_list = [ self.file_records[log_id] for log_id in log_ids if log_id in self.file_records ] return { "code": "200", "data": {"parsing": is_parsing, "pendingList": pending_list}, "status": "200", "successResponse": True, } def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict: """删除文件 Args: group_id: 项目ID log_ids: 文件ID列表 user_id: 用户ID Returns: 删除响应字典 """ # 删除文件记录 for log_id in log_ids: self.file_records.pop(log_id, None) self.parsing_status.pop(log_id, None) return { "code": "200", "data": {"message": "delete.files.success"}, "status": "200", "successResponse": True, } def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict: """拉取行内流水(模拟无数据场景) Args: request: 拉取流水请求(可以是字典或对象) Returns: 流水响应字典 """ # 模拟无行内流水文件场景 return { "code": "200", "data": {"code": "501014", "message": "无行内流水文件"}, "status": "200", "successResponse": True, }