Files
ccdi/docs/plans/2026-03-02-lsfx-mock-server-design.md

573 lines
15 KiB
Markdown
Raw Permalink Normal View History

# 流水分析 Mock 服务器设计方案
**创建日期**: 2026-03-02
**作者**: Claude Code
## 项目概述
### 背景
当前项目需要与流水分析平台进行接口对接,但在开发和测试过程中,依赖外部真实服务存在以下问题:
- 网络连接不稳定,影响测试效率
- 无法控制返回数据,难以测试各种场景
- 无法模拟错误场景和边界情况
- 团队成员无法共享测试环境
### 解决方案
开发一个独立的 Mock 服务器,基于 Python + FastAPI 技术栈,模拟流水分析平台的 7 个核心接口,支持:
- 配置文件驱动的响应数据
- 文件上传解析延迟模拟
- 错误场景触发机制
- 自动生成的 API 文档
### 技术选型
| 技术组件 | 选择 | 理由 |
|---------|------|------|
| Web框架 | FastAPI | 现代异步框架自动生成API文档强类型支持 |
| 数据验证 | Pydantic | 与FastAPI原生集成类型提示清晰 |
| 配置管理 | JSON文件 | 易于修改,非开发人员也能调整测试数据 |
| 状态存储 | 内存字典 | 轻量级重启清空适合Mock场景 |
---
## 整体架构
```
lsfx-mock-server/
├── main.py # 应用入口
├── config/
│ ├── settings.py # 全局配置
│ └── responses/ # 响应模板配置文件
│ ├── token.json
│ ├── upload.json
│ ├── parse_status.json
│ └── bank_statement.json
├── models/
│ ├── request.py # 请求模型Pydantic
│ └── response.py # 响应模型Pydantic
├── services/
│ ├── token_service.py # Token管理
│ ├── file_service.py # 文件上传和解析模拟
│ └── statement_service.py # 流水数据管理
├── routers/
│ └── api.py # 所有API路由
├── utils/
│ ├── response_builder.py # 响应构建器
│ └── error_simulator.py # 错误场景模拟
└── requirements.txt
```
### 核心设计思想
1. **配置驱动** - 所有响应数据在JSON配置文件中方便修改
2. **内存状态管理** - 使用全局字典存储运行时状态tokens、文件记录等
3. **异步任务** - 使用FastAPI后台任务模拟文件解析延迟
4. **错误标记识别** - 检测请求参数中的特殊标记触发错误响应
---
## 数据模型设计
### 请求模型
对应Java项目中的DTO类
```python
# models/request.py
from pydantic import BaseModel
from typing import Optional
class GetTokenRequest(BaseModel):
projectNo: str
entityName: str
userId: str
userName: str
orgCode: str
entityId: Optional[str] = None
xdRelatedPersons: Optional[str] = None
jzDataDateId: Optional[str] = "0"
innerBSStartDateId: Optional[str] = "0"
innerBSEndDateId: Optional[str] = "0"
analysisType: Optional[int] = -1
departmentCode: Optional[str] = None
class UploadFileRequest(BaseModel):
groupId: int
class FetchInnerFlowRequest(BaseModel):
groupId: int
customerNo: str
dataChannelCode: str
requestDateId: int
dataStartDateId: int
dataEndDateId: int
uploadUserId: int
class CheckParseStatusRequest(BaseModel):
groupId: int
inprogressList: str
class GetBankStatementRequest(BaseModel):
groupId: int
logId: int
pageNow: int
pageSize: int
```
### 响应模型
对应Java项目中的VO类
```python
# models/response.py
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
class TokenData(BaseModel):
token: str
projectId: int
projectNo: str
entityName: str
analysisType: int
class GetTokenResponse(BaseModel):
code: str = "200"
data: Optional[TokenData] = None
message: str = "create.token.success"
status: str = "200"
successResponse: bool = True
# 其他响应模型类似...
```
---
## 核心业务逻辑
### 文件解析延迟模拟
**实现机制:**
1. 上传接口立即返回,状态为"解析中"
2. 使用FastAPI的BackgroundTasks在后台延迟执行
3. 延迟3-5秒后更新状态为"解析完成"
4. 轮询检查接口返回当前解析状态
```python
# services/file_service.py
class FileService:
def __init__(self):
self.file_records: Dict[int, Dict] = {}
self.parsing_status: Dict[int, bool] = {}
async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks):
log_id = generate_log_id()
# 立即存储记录,标记为解析中
self.file_records[log_id] = {
"logId": log_id,
"status": -5,
"uploadStatusDesc": "parsing",
...
}
self.parsing_status[log_id] = True
# 启动后台任务延迟4秒后完成解析
background_tasks.add_task(
self._simulate_parsing,
log_id,
delay_seconds=4
)
return log_id
def _simulate_parsing(self, log_id: int, delay_seconds: int):
time.sleep(delay_seconds)
if log_id in self.file_records:
self.file_records[log_id]["status"] = -5
self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount"
self.parsing_status[log_id] = False
```
---
## 错误场景模拟机制
### 错误触发规则
通过请求参数中的特殊标记触发对应的错误响应:
**错误码映射表:**
```python
ERROR_CODES = {
"40101": {"code": "40101", "message": "appId错误"},
"40102": {"code": "40102", "message": "appSecretCode错误"},
"40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
"40105": {"code": "40105", "message": "只读模式下无法新建项目"},
"40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
"40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
"40108": {"code": "40108", "message": "当前用户所属行社无权限"},
"501014": {"code": "501014", "message": "无行内流水文件"},
}
```
**检测逻辑:**
```python
@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
"""检测字符串中的错误标记
规则:如果字符串包含 error_XXXX则返回 XXXX
例如:
- "project_error_40101" -> "40101"
- "test_error_501014" -> "501014"
"""
if not value:
return None
import re
pattern = r'error_(\d+)'
match = re.search(pattern, value)
if match:
return match.group(1)
return None
```
**使用示例:**
```python
# 在服务中使用
def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 正常流程...
```
**测试方式:**
```python
# 触发 40101 错误
request_data = {
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
...
}
```
---
## 配置文件结构
### 响应模板配置
```json
// config/responses/token.json
{
"success_response": {
"code": "200",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
"projectId": "{project_id}",
"projectNo": "{project_no}",
"entityName": "{entity_name}",
"analysisType": 0
},
"message": "create.token.success",
"status": "200",
"successResponse": true
}
}
```
```json
// config/responses/upload.json
{
"success_response": {
"code": "200",
"data": {
"accountsOfLog": {},
"uploadLogList": [
{
"logId": "{log_id}",
"status": -5,
"uploadStatusDesc": "data.wait.confirm.newaccount",
...
}
]
}
}
}
```
### 全局配置
```python
# config/settings.py
from pydantic import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "流水分析Mock服务"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
HOST: str = "0.0.0.0"
PORT: int = 8000
# 模拟配置
PARSE_DELAY_SECONDS: int = 4
MAX_FILE_SIZE: int = 10485760 # 10MB
class Config:
env_file = ".env"
settings = Settings()
```
---
## API 路由实现
### 核心接口
```python
# routers/api.py
from fastapi import APIRouter, BackgroundTasks, UploadFile, File
router = APIRouter()
# 接口1获取Token
@router.post("/account/common/getToken")
async def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return token_service.create_token(request)
# 接口2上传文件
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
background_tasks: BackgroundTasks,
groupId: int = Form(...),
file: UploadFile = File(...)
):
return file_service.upload_file(groupId, file, background_tasks)
# 接口3拉取行内流水
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(request: FetchInnerFlowRequest):
error_code = ErrorSimulator.detect_error_marker(request.customerNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return file_service.fetch_inner_flow(request)
# 接口4检查解析状态
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(request: CheckParseStatusRequest):
return file_service.check_parse_status(request.groupId, request.inprogressList)
# 接口5删除文件
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(request: dict):
return file_service.delete_files(
request.get("groupId"),
request.get("logIds"),
request.get("userId")
)
# 接口6获取银行流水
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(request: GetBankStatementRequest):
return statement_service.get_bank_statement(request)
```
### 主应用
```python
# main.py
from fastapi import FastAPI
from routers import api
app = FastAPI(
title="流水分析Mock服务",
description="模拟流水分析平台的7个核心接口",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
app.include_router(api.router, tags=["流水分析接口"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
---
## 测试和使用说明
### 启动服务
```bash
# 安装依赖
pip install -r requirements.txt
# 启动服务
python main.py
# 或使用uvicorn启动支持热重载
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### 访问API文档
- **Swagger UI:** http://localhost:8000/docs
- **ReDoc:** http://localhost:8000/redoc
### 测试示例
#### 1. 正常流程测试
```python
import requests
# 获取Token
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
result = response.json()
token = result["data"]["token"]
project_id = result["data"]["projectId"]
# 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
"http://localhost:8000/watson/api/project/remoteUploadSplitFile",
files=files,
data={"groupId": project_id},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]
# 轮询检查解析状态
import time
for i in range(10):
response = requests.post(
"http://localhost:8000/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": str(log_id)},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
result = response.json()
if not result["data"]["parsing"]:
print("解析完成")
break
time.sleep(1)
# 获取银行流水
response = requests.post(
"http://localhost:8000/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": log_id,
"pageNow": 1,
"pageSize": 10
},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
```
#### 2. 错误场景测试
```python
# 触发 40101 错误appId错误
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
# 返回: {"code": "40101", "message": "appId错误", ...}
# 触发 501014 错误(无行内流水文件)
response = requests.post(
"http://localhost:8000/watson/api/project/getJZFileOrZjrcuFile",
json={
"groupId": 1,
"customerNo": "test_error_501014", # 包含错误标记
"dataChannelCode": "ZJRCU",
"requestDateId": 20260302,
"dataStartDateId": 20260201,
"dataEndDateId": 20260228,
"uploadUserId": 902001
}
)
# 返回: {"code": "501014", "message": "无行内流水文件", ...}
```
### 配置修改
- 修改 `config/responses/` 下的JSON文件可以自定义响应数据
- 修改 `config/settings.py` 可以调整延迟时间、端口等配置
- 支持 `.env` 文件覆盖配置
---
## 依赖清单
```txt
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-multipart==0.0.6
```
---
## 使用场景
### A. 开发阶段测试
在业务代码开发过程中,修改配置文件 `application-dev.yml`,将 `lsfx.api.base-url` 改为 `http://localhost:8000`启动Mock服务器后业务代码即可连接Mock服务进行测试。
### B. 完全替换测试
直接使用 Mock 服务器进行接口测试,验证业务逻辑的正确性。生产环境切换到真实服务。
### C. CI/CD 集成
在持续集成流程中使用 Mock 服务器,自动化测试接口调用逻辑。
---
## 扩展性考虑
### 后续可能的增强功能
1. **数据持久化** - 如需保留历史记录可集成SQLite
2. **更复杂的场景模拟** - 支持配置文件定义多个场景
3. **请求日志记录** - 记录所有请求用于调试
4. **Web管理界面** - 可视化管理Mock数据和状态
5. **Docker部署** - 提供Dockerfile方便部署
当前设计已满足核心需求,保持简洁实用。
---
## 总结
这是一个**配置驱动、轻量级、易于使用**的 Mock 服务器设计,核心特点:
**完整性** - 覆盖所有7个核心接口
**真实性** - 模拟文件解析延迟等真实场景
**灵活性** - 配置文件驱动,错误场景可触发
**易用性** - 自动API文档零配置启动
**可维护** - 代码结构清晰与Java项目对应
满足您的Mock测试需求提升开发和测试效率。