feat: 修复接口参数并改为form-data格式

- 添加缺失的认证参数:appId, appSecretCode, role
- 修复 analysisType 和 departmentCode 参数
- 将所有接口改为使用 Form 参数(form-data 格式)
- 更新服务层支持字典参数
- 更新所有测试代码
- 所有测试通过(7/7)
This commit is contained in:
wkc
2026-03-03 13:40:56 +08:00
parent a1f062d09d
commit 626f7d566b
21 changed files with 2527 additions and 52 deletions

View File

@@ -1,8 +1,7 @@
from fastapi import BackgroundTasks, UploadFile
from models.request import FetchInnerFlowRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict, List
from typing import Dict, List, Union
import time
from datetime import datetime
@@ -133,11 +132,11 @@ class FileService:
"successResponse": True,
}
def fetch_inner_flow(self, request: FetchInnerFlowRequest) -> Dict:
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(模拟无数据场景)
Args:
request: 拉取流水请求
request: 拉取流水请求(可以是字典或对象)
Returns:
流水响应字典

View File

@@ -1,28 +1,35 @@
from models.request import GetBankStatementRequest
from utils.response_builder import ResponseBuilder
from typing import Dict
from typing import Dict, Union
class StatementService:
"""流水数据服务"""
def get_bank_statement(self, request: GetBankStatementRequest) -> Dict:
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求
request: 获取银行流水请求(可以是字典或对象)
Returns:
银行流水响应字典
"""
# 支持 dict 或对象
if isinstance(request, dict):
page_now = request.get("pageNow", 1)
page_size = request.get("pageSize", 10)
else:
page_now = request.pageNow
page_size = request.pageSize
# 加载模板
template = ResponseBuilder.load_template("bank_statement")
statements = template["success_response"]["data"]["bankStatementList"]
total_count = len(statements)
# 模拟分页
start = (request.pageNow - 1) * request.pageSize
end = start + request.pageSize
start = (page_now - 1) * page_size
end = start + page_size
page_data = statements[start:end]
return {

View File

@@ -1,7 +1,7 @@
from models.request import GetTokenRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict
from typing import Dict, Union
class TokenService:
@@ -11,15 +11,23 @@ class TokenService:
self.project_counter = settings.INITIAL_PROJECT_ID
self.tokens = {} # projectId -> token_data
def create_token(self, request: GetTokenRequest) -> Dict:
def create_token(self, request: Union[GetTokenRequest, Dict]) -> Dict:
"""创建Token
Args:
request: 获取Token请求
request: 获取Token请求(可以是 GetTokenRequest 对象或字典)
Returns:
Token响应字典
"""
# 支持 dict 或 GetTokenRequest 对象
if isinstance(request, dict):
project_no = request.get("projectNo")
entity_name = request.get("entityName")
else:
project_no = request.projectNo
entity_name = request.entityName
# 生成唯一项目ID
self.project_counter += 1
project_id = self.project_counter
@@ -28,8 +36,8 @@ class TokenService:
response = ResponseBuilder.build_success_response(
"token",
project_id=project_id,
project_no=request.projectNo,
entity_name=request.entityName
project_no=project_no,
entity_name=entity_name
)
# 存储token信息