1. 修复配置问题 - 替换app-secret占位符为正确的密钥dXj6eHRmPv 2. 添加异常处理 - HttpUtil所有方法添加完整的异常处理 - 统一使用LsfxApiException包装异常 - 检查HTTP状态码和响应体 3. 添加日志记录 - Client所有方法添加详细的日志记录 - 记录请求参数、响应结果、耗时 - 异常情况记录错误日志 4. 完善参数校验 - 接口1:添加6个必填字段校验 - 接口2:添加groupId和文件校验,限制文件大小10MB - 接口3:添加7个参数校验和日期范围校验 - 接口4:添加groupId和inprogressList校验 5. 性能优化 - RestTemplate使用Apache HttpClient连接池 - 最大连接数100,每个路由最大20个连接 - 支持连接复用,提升性能 6. 代码审查文档 - 添加详细的代码审查报告 - 记录发现的问题和改进建议 修改的文件: - ccdi-lsfx/pom.xml - ccdi-lsfx/src/main/java/com/ruoyi/lsfx/client/LsfxAnalysisClient.java - ccdi-lsfx/src/main/java/com/ruoyi/lsfx/config/RestTemplateConfig.java - ccdi-lsfx/src/main/java/com/ruoyi/lsfx/controller/LsfxTestController.java - ccdi-lsfx/src/main/java/com/ruoyi/lsfx/util/HttpUtil.java - ruoyi-admin/src/main/resources/application-dev.yml - doc/implementation/lsfx-code-review-20260302.md
15 KiB
15 KiB
流水分析 Mock 服务器设计方案
创建日期: 2026-03-02 作者: Claude Code
项目概述
背景
当前项目需要与流水分析平台进行接口对接,但在开发和测试过程中,依赖外部真实服务存在以下问题:
- 网络连接不稳定,影响测试效率
- 无法控制返回数据,难以测试各种场景
- 无法模拟错误场景和边界情况
- 团队成员无法共享测试环境
解决方案
开发一个独立的 Mock 服务器,基于 Python + FastAPI 技术栈,模拟流水分析平台的 7 个核心接口,支持:
- 配置文件驱动的响应数据
- 文件上传解析延迟模拟
- 错误场景触发机制
- 自动生成的 API 文档
技术选型
| 技术组件 | 选择 | 理由 |
|---|---|---|
| Web框架 | FastAPI | 现代异步框架,自动生成API文档,强类型支持 |
| 数据验证 | Pydantic | 与FastAPI原生集成,类型提示清晰 |
| 配置管理 | JSON文件 | 易于修改,非开发人员也能调整测试数据 |
| 状态存储 | 内存字典 | 轻量级,重启清空,适合Mock场景 |
整体架构
lsfx-mock-server/
├── main.py # 应用入口
├── config/
│ ├── settings.py # 全局配置
│ └── responses/ # 响应模板配置文件
│ ├── token.json
│ ├── upload.json
│ ├── parse_status.json
│ └── bank_statement.json
├── models/
│ ├── request.py # 请求模型(Pydantic)
│ └── response.py # 响应模型(Pydantic)
├── services/
│ ├── token_service.py # Token管理
│ ├── file_service.py # 文件上传和解析模拟
│ └── statement_service.py # 流水数据管理
├── routers/
│ └── api.py # 所有API路由
├── utils/
│ ├── response_builder.py # 响应构建器
│ └── error_simulator.py # 错误场景模拟
└── requirements.txt
核心设计思想
- 配置驱动 - 所有响应数据在JSON配置文件中,方便修改
- 内存状态管理 - 使用全局字典存储运行时状态(tokens、文件记录等)
- 异步任务 - 使用FastAPI后台任务模拟文件解析延迟
- 错误标记识别 - 检测请求参数中的特殊标记触发错误响应
数据模型设计
请求模型
对应Java项目中的DTO类:
# models/request.py
from pydantic import BaseModel
from typing import Optional
class GetTokenRequest(BaseModel):
projectNo: str
entityName: str
userId: str
userName: str
orgCode: str
entityId: Optional[str] = None
xdRelatedPersons: Optional[str] = None
jzDataDateId: Optional[str] = "0"
innerBSStartDateId: Optional[str] = "0"
innerBSEndDateId: Optional[str] = "0"
analysisType: Optional[int] = -1
departmentCode: Optional[str] = None
class UploadFileRequest(BaseModel):
groupId: int
class FetchInnerFlowRequest(BaseModel):
groupId: int
customerNo: str
dataChannelCode: str
requestDateId: int
dataStartDateId: int
dataEndDateId: int
uploadUserId: int
class CheckParseStatusRequest(BaseModel):
groupId: int
inprogressList: str
class GetBankStatementRequest(BaseModel):
groupId: int
logId: int
pageNow: int
pageSize: int
响应模型
对应Java项目中的VO类:
# models/response.py
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
class TokenData(BaseModel):
token: str
projectId: int
projectNo: str
entityName: str
analysisType: int
class GetTokenResponse(BaseModel):
code: str = "200"
data: Optional[TokenData] = None
message: str = "create.token.success"
status: str = "200"
successResponse: bool = True
# 其他响应模型类似...
核心业务逻辑
文件解析延迟模拟
实现机制:
- 上传接口立即返回,状态为"解析中"
- 使用FastAPI的BackgroundTasks在后台延迟执行
- 延迟3-5秒后更新状态为"解析完成"
- 轮询检查接口返回当前解析状态
# services/file_service.py
class FileService:
def __init__(self):
self.file_records: Dict[int, Dict] = {}
self.parsing_status: Dict[int, bool] = {}
async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks):
log_id = generate_log_id()
# 立即存储记录,标记为解析中
self.file_records[log_id] = {
"logId": log_id,
"status": -5,
"uploadStatusDesc": "parsing",
...
}
self.parsing_status[log_id] = True
# 启动后台任务,延迟4秒后完成解析
background_tasks.add_task(
self._simulate_parsing,
log_id,
delay_seconds=4
)
return log_id
def _simulate_parsing(self, log_id: int, delay_seconds: int):
time.sleep(delay_seconds)
if log_id in self.file_records:
self.file_records[log_id]["status"] = -5
self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount"
self.parsing_status[log_id] = False
错误场景模拟机制
错误触发规则
通过请求参数中的特殊标记触发对应的错误响应:
错误码映射表:
ERROR_CODES = {
"40101": {"code": "40101", "message": "appId错误"},
"40102": {"code": "40102", "message": "appSecretCode错误"},
"40104": {"code": "40104", "message": "可使用项目次数为0,无法创建项目"},
"40105": {"code": "40105", "message": "只读模式下无法新建项目"},
"40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
"40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
"40108": {"code": "40108", "message": "当前用户所属行社无权限"},
"501014": {"code": "501014", "message": "无行内流水文件"},
}
检测逻辑:
@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
"""检测字符串中的错误标记
规则:如果字符串包含 error_XXXX,则返回 XXXX
例如:
- "project_error_40101" -> "40101"
- "test_error_501014" -> "501014"
"""
if not value:
return None
import re
pattern = r'error_(\d+)'
match = re.search(pattern, value)
if match:
return match.group(1)
return None
使用示例:
# 在服务中使用
def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 正常流程...
测试方式:
# 触发 40101 错误
request_data = {
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
...
}
配置文件结构
响应模板配置
// config/responses/token.json
{
"success_response": {
"code": "200",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
"projectId": "{project_id}",
"projectNo": "{project_no}",
"entityName": "{entity_name}",
"analysisType": 0
},
"message": "create.token.success",
"status": "200",
"successResponse": true
}
}
// config/responses/upload.json
{
"success_response": {
"code": "200",
"data": {
"accountsOfLog": {},
"uploadLogList": [
{
"logId": "{log_id}",
"status": -5,
"uploadStatusDesc": "data.wait.confirm.newaccount",
...
}
]
}
}
}
全局配置
# config/settings.py
from pydantic import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "流水分析Mock服务"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
HOST: str = "0.0.0.0"
PORT: int = 8000
# 模拟配置
PARSE_DELAY_SECONDS: int = 4
MAX_FILE_SIZE: int = 10485760 # 10MB
class Config:
env_file = ".env"
settings = Settings()
API 路由实现
核心接口
# routers/api.py
from fastapi import APIRouter, BackgroundTasks, UploadFile, File
router = APIRouter()
# 接口1:获取Token
@router.post("/account/common/getToken")
async def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return token_service.create_token(request)
# 接口2:上传文件
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
background_tasks: BackgroundTasks,
groupId: int = Form(...),
file: UploadFile = File(...)
):
return file_service.upload_file(groupId, file, background_tasks)
# 接口3:拉取行内流水
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(request: FetchInnerFlowRequest):
error_code = ErrorSimulator.detect_error_marker(request.customerNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return file_service.fetch_inner_flow(request)
# 接口4:检查解析状态
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(request: CheckParseStatusRequest):
return file_service.check_parse_status(request.groupId, request.inprogressList)
# 接口5:删除文件
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(request: dict):
return file_service.delete_files(
request.get("groupId"),
request.get("logIds"),
request.get("userId")
)
# 接口6:获取银行流水
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(request: GetBankStatementRequest):
return statement_service.get_bank_statement(request)
主应用
# main.py
from fastapi import FastAPI
from routers import api
app = FastAPI(
title="流水分析Mock服务",
description="模拟流水分析平台的7个核心接口",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
app.include_router(api.router, tags=["流水分析接口"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
测试和使用说明
启动服务
# 安装依赖
pip install -r requirements.txt
# 启动服务
python main.py
# 或使用uvicorn启动(支持热重载)
uvicorn main:app --reload --host 0.0.0.0 --port 8000
访问API文档
- Swagger UI: http://localhost:8000/docs
- ReDoc: http://localhost:8000/redoc
测试示例
1. 正常流程测试
import requests
# 获取Token
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
result = response.json()
token = result["data"]["token"]
project_id = result["data"]["projectId"]
# 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
"http://localhost:8000/watson/api/project/remoteUploadSplitFile",
files=files,
data={"groupId": project_id},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]
# 轮询检查解析状态
import time
for i in range(10):
response = requests.post(
"http://localhost:8000/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": str(log_id)},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
result = response.json()
if not result["data"]["parsing"]:
print("解析完成")
break
time.sleep(1)
# 获取银行流水
response = requests.post(
"http://localhost:8000/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": log_id,
"pageNow": 1,
"pageSize": 10
},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
2. 错误场景测试
# 触发 40101 错误(appId错误)
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
# 返回: {"code": "40101", "message": "appId错误", ...}
# 触发 501014 错误(无行内流水文件)
response = requests.post(
"http://localhost:8000/watson/api/project/getJZFileOrZjrcuFile",
json={
"groupId": 1,
"customerNo": "test_error_501014", # 包含错误标记
"dataChannelCode": "ZJRCU",
"requestDateId": 20260302,
"dataStartDateId": 20260201,
"dataEndDateId": 20260228,
"uploadUserId": 902001
}
)
# 返回: {"code": "501014", "message": "无行内流水文件", ...}
配置修改
- 修改
config/responses/下的JSON文件可以自定义响应数据 - 修改
config/settings.py可以调整延迟时间、端口等配置 - 支持
.env文件覆盖配置
依赖清单
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-multipart==0.0.6
使用场景
A. 开发阶段测试
在业务代码开发过程中,修改配置文件 application-dev.yml,将 lsfx.api.base-url 改为 http://localhost:8000,启动Mock服务器后,业务代码即可连接Mock服务进行测试。
B. 完全替换测试
直接使用 Mock 服务器进行接口测试,验证业务逻辑的正确性。生产环境切换到真实服务。
C. CI/CD 集成
在持续集成流程中使用 Mock 服务器,自动化测试接口调用逻辑。
扩展性考虑
后续可能的增强功能
- 数据持久化 - 如需保留历史记录,可集成SQLite
- 更复杂的场景模拟 - 支持配置文件定义多个场景
- 请求日志记录 - 记录所有请求用于调试
- Web管理界面 - 可视化管理Mock数据和状态
- Docker部署 - 提供Dockerfile方便部署
当前设计已满足核心需求,保持简洁实用。
总结
这是一个配置驱动、轻量级、易于使用的 Mock 服务器设计,核心特点:
✅ 完整性 - 覆盖所有7个核心接口 ✅ 真实性 - 模拟文件解析延迟等真实场景 ✅ 灵活性 - 配置文件驱动,错误场景可触发 ✅ 易用性 - 自动API文档,零配置启动 ✅ 可维护 - 代码结构清晰,与Java项目对应
满足您的Mock测试需求,提升开发和测试效率。