feat(services): implement token, file, and statement services

This commit is contained in:
wkc
2026-03-03 09:26:07 +08:00
parent e6bc2d64dd
commit 0d4fcd089b
6 changed files with 234 additions and 0 deletions

View File

@@ -0,0 +1 @@
# Services package

View File

@@ -0,0 +1,151 @@
from fastapi import BackgroundTasks, UploadFile
from models.request import FetchInnerFlowRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict, List
import time
from datetime import datetime
class FileService:
"""文件上传和解析服务"""
def __init__(self):
self.file_records = {} # logId -> record
self.parsing_status = {} # logId -> is_parsing
self.log_counter = settings.INITIAL_LOG_ID
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一logId
self.log_counter += 1
log_id = self.log_counter
# 获取当前时间
upload_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 立即存储文件记录(初始状态:解析中)
self.file_records[log_id] = {
"logId": log_id,
"groupId": group_id,
"status": -5,
"uploadStatusDesc": "parsing",
"uploadFileName": file.filename,
"fileSize": 0, # 简化处理
"bankName": "MOCK",
"uploadTime": upload_time,
}
# 标记为解析中
self.parsing_status[log_id] = True
# 启动后台任务,延迟解析
background_tasks.add_task(
self._simulate_parsing, log_id, settings.PARSE_DELAY_SECONDS
)
# 构建响应
response = ResponseBuilder.build_success_response(
"upload", log_id=log_id, upload_time=upload_time
)
return response
def _simulate_parsing(self, log_id: int, delay_seconds: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
delay_seconds: 延迟秒数
"""
time.sleep(delay_seconds)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id]["uploadStatusDesc"] = (
"data.wait.confirm.newaccount"
)
self.parsing_status[log_id] = False
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
# 检查是否还在解析中
is_parsing = any(
self.parsing_status.get(log_id, False) for log_id in log_ids
)
# 获取待处理列表
pending_list = [
self.file_records[log_id]
for log_id in log_ids
if log_id in self.file_records
]
return {
"code": "200",
"data": {"parsing": is_parsing, "pendingList": pending_list},
"status": "200",
"successResponse": True,
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
for log_id in log_ids:
self.file_records.pop(log_id, None)
self.parsing_status.pop(log_id, None)
return {
"code": "200",
"data": {"message": "delete.files.success"},
"status": "200",
"successResponse": True,
}
def fetch_inner_flow(self, request: FetchInnerFlowRequest) -> Dict:
"""拉取行内流水(模拟无数据场景)
Args:
request: 拉取流水请求
Returns:
流水响应字典
"""
# 模拟无行内流水文件场景
return {
"code": "200",
"data": {"code": "501014", "message": "无行内流水文件"},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,33 @@
from models.request import GetBankStatementRequest
from utils.response_builder import ResponseBuilder
from typing import Dict
class StatementService:
"""流水数据服务"""
def get_bank_statement(self, request: GetBankStatementRequest) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求
Returns:
银行流水响应字典
"""
# 加载模板
template = ResponseBuilder.load_template("bank_statement")
statements = template["success_response"]["data"]["bankStatementList"]
total_count = len(statements)
# 模拟分页
start = (request.pageNow - 1) * request.pageSize
end = start + request.pageSize
page_data = statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,49 @@
from models.request import GetTokenRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict
class TokenService:
"""Token管理服务"""
def __init__(self):
self.project_counter = settings.INITIAL_PROJECT_ID
self.tokens = {} # projectId -> token_data
def create_token(self, request: GetTokenRequest) -> Dict:
"""创建Token
Args:
request: 获取Token请求
Returns:
Token响应字典
"""
# 生成唯一项目ID
self.project_counter += 1
project_id = self.project_counter
# 构建响应
response = ResponseBuilder.build_success_response(
"token",
project_id=project_id,
project_no=request.projectNo,
entity_name=request.entityName
)
# 存储token信息
self.tokens[project_id] = response.get("data")
return response
def get_project(self, project_id: int) -> Dict:
"""获取项目信息
Args:
project_id: 项目ID
Returns:
项目信息字典
"""
return self.tokens.get(project_id)