Files
ccdi/docs/plans/2026-03-02-lsfx-mock-server-design.md
wkc b022ec75b8 fix(lsfx): 修复流水分析对接模块的代码质量问题
1. 修复配置问题
   - 替换app-secret占位符为正确的密钥dXj6eHRmPv

2. 添加异常处理
   - HttpUtil所有方法添加完整的异常处理
   - 统一使用LsfxApiException包装异常
   - 检查HTTP状态码和响应体

3. 添加日志记录
   - Client所有方法添加详细的日志记录
   - 记录请求参数、响应结果、耗时
   - 异常情况记录错误日志

4. 完善参数校验
   - 接口1:添加6个必填字段校验
   - 接口2:添加groupId和文件校验,限制文件大小10MB
   - 接口3:添加7个参数校验和日期范围校验
   - 接口4:添加groupId和inprogressList校验

5. 性能优化
   - RestTemplate使用Apache HttpClient连接池
   - 最大连接数100,每个路由最大20个连接
   - 支持连接复用,提升性能

6. 代码审查文档
   - 添加详细的代码审查报告
   - 记录发现的问题和改进建议

修改的文件:
- ccdi-lsfx/pom.xml
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/client/LsfxAnalysisClient.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/config/RestTemplateConfig.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/controller/LsfxTestController.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/util/HttpUtil.java
- ruoyi-admin/src/main/resources/application-dev.yml
- doc/implementation/lsfx-code-review-20260302.md
2026-03-03 09:35:27 +08:00

573 lines
15 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# 流水分析 Mock 服务器设计方案
**创建日期**: 2026-03-02
**作者**: Claude Code
## 项目概述
### 背景
当前项目需要与流水分析平台进行接口对接,但在开发和测试过程中,依赖外部真实服务存在以下问题:
- 网络连接不稳定,影响测试效率
- 无法控制返回数据,难以测试各种场景
- 无法模拟错误场景和边界情况
- 团队成员无法共享测试环境
### 解决方案
开发一个独立的 Mock 服务器,基于 Python + FastAPI 技术栈,模拟流水分析平台的 7 个核心接口,支持:
- 配置文件驱动的响应数据
- 文件上传解析延迟模拟
- 错误场景触发机制
- 自动生成的 API 文档
### 技术选型
| 技术组件 | 选择 | 理由 |
|---------|------|------|
| Web框架 | FastAPI | 现代异步框架自动生成API文档强类型支持 |
| 数据验证 | Pydantic | 与FastAPI原生集成类型提示清晰 |
| 配置管理 | JSON文件 | 易于修改,非开发人员也能调整测试数据 |
| 状态存储 | 内存字典 | 轻量级重启清空适合Mock场景 |
---
## 整体架构
```
lsfx-mock-server/
├── main.py # 应用入口
├── config/
│ ├── settings.py # 全局配置
│ └── responses/ # 响应模板配置文件
│ ├── token.json
│ ├── upload.json
│ ├── parse_status.json
│ └── bank_statement.json
├── models/
│ ├── request.py # 请求模型Pydantic
│ └── response.py # 响应模型Pydantic
├── services/
│ ├── token_service.py # Token管理
│ ├── file_service.py # 文件上传和解析模拟
│ └── statement_service.py # 流水数据管理
├── routers/
│ └── api.py # 所有API路由
├── utils/
│ ├── response_builder.py # 响应构建器
│ └── error_simulator.py # 错误场景模拟
└── requirements.txt
```
### 核心设计思想
1. **配置驱动** - 所有响应数据在JSON配置文件中方便修改
2. **内存状态管理** - 使用全局字典存储运行时状态tokens、文件记录等
3. **异步任务** - 使用FastAPI后台任务模拟文件解析延迟
4. **错误标记识别** - 检测请求参数中的特殊标记触发错误响应
---
## 数据模型设计
### 请求模型
对应Java项目中的DTO类
```python
# models/request.py
from pydantic import BaseModel
from typing import Optional
class GetTokenRequest(BaseModel):
projectNo: str
entityName: str
userId: str
userName: str
orgCode: str
entityId: Optional[str] = None
xdRelatedPersons: Optional[str] = None
jzDataDateId: Optional[str] = "0"
innerBSStartDateId: Optional[str] = "0"
innerBSEndDateId: Optional[str] = "0"
analysisType: Optional[int] = -1
departmentCode: Optional[str] = None
class UploadFileRequest(BaseModel):
groupId: int
class FetchInnerFlowRequest(BaseModel):
groupId: int
customerNo: str
dataChannelCode: str
requestDateId: int
dataStartDateId: int
dataEndDateId: int
uploadUserId: int
class CheckParseStatusRequest(BaseModel):
groupId: int
inprogressList: str
class GetBankStatementRequest(BaseModel):
groupId: int
logId: int
pageNow: int
pageSize: int
```
### 响应模型
对应Java项目中的VO类
```python
# models/response.py
from pydantic import BaseModel
from typing import Optional, List, Dict, Any
class TokenData(BaseModel):
token: str
projectId: int
projectNo: str
entityName: str
analysisType: int
class GetTokenResponse(BaseModel):
code: str = "200"
data: Optional[TokenData] = None
message: str = "create.token.success"
status: str = "200"
successResponse: bool = True
# 其他响应模型类似...
```
---
## 核心业务逻辑
### 文件解析延迟模拟
**实现机制:**
1. 上传接口立即返回,状态为"解析中"
2. 使用FastAPI的BackgroundTasks在后台延迟执行
3. 延迟3-5秒后更新状态为"解析完成"
4. 轮询检查接口返回当前解析状态
```python
# services/file_service.py
class FileService:
def __init__(self):
self.file_records: Dict[int, Dict] = {}
self.parsing_status: Dict[int, bool] = {}
async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks):
log_id = generate_log_id()
# 立即存储记录,标记为解析中
self.file_records[log_id] = {
"logId": log_id,
"status": -5,
"uploadStatusDesc": "parsing",
...
}
self.parsing_status[log_id] = True
# 启动后台任务延迟4秒后完成解析
background_tasks.add_task(
self._simulate_parsing,
log_id,
delay_seconds=4
)
return log_id
def _simulate_parsing(self, log_id: int, delay_seconds: int):
time.sleep(delay_seconds)
if log_id in self.file_records:
self.file_records[log_id]["status"] = -5
self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount"
self.parsing_status[log_id] = False
```
---
## 错误场景模拟机制
### 错误触发规则
通过请求参数中的特殊标记触发对应的错误响应:
**错误码映射表:**
```python
ERROR_CODES = {
"40101": {"code": "40101", "message": "appId错误"},
"40102": {"code": "40102", "message": "appSecretCode错误"},
"40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
"40105": {"code": "40105", "message": "只读模式下无法新建项目"},
"40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
"40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
"40108": {"code": "40108", "message": "当前用户所属行社无权限"},
"501014": {"code": "501014", "message": "无行内流水文件"},
}
```
**检测逻辑:**
```python
@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
"""检测字符串中的错误标记
规则:如果字符串包含 error_XXXX则返回 XXXX
例如:
- "project_error_40101" -> "40101"
- "test_error_501014" -> "501014"
"""
if not value:
return None
import re
pattern = r'error_(\d+)'
match = re.search(pattern, value)
if match:
return match.group(1)
return None
```
**使用示例:**
```python
# 在服务中使用
def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 正常流程...
```
**测试方式:**
```python
# 触发 40101 错误
request_data = {
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
...
}
```
---
## 配置文件结构
### 响应模板配置
```json
// config/responses/token.json
{
"success_response": {
"code": "200",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
"projectId": "{project_id}",
"projectNo": "{project_no}",
"entityName": "{entity_name}",
"analysisType": 0
},
"message": "create.token.success",
"status": "200",
"successResponse": true
}
}
```
```json
// config/responses/upload.json
{
"success_response": {
"code": "200",
"data": {
"accountsOfLog": {},
"uploadLogList": [
{
"logId": "{log_id}",
"status": -5,
"uploadStatusDesc": "data.wait.confirm.newaccount",
...
}
]
}
}
}
```
### 全局配置
```python
# config/settings.py
from pydantic import BaseSettings
class Settings(BaseSettings):
APP_NAME: str = "流水分析Mock服务"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
HOST: str = "0.0.0.0"
PORT: int = 8000
# 模拟配置
PARSE_DELAY_SECONDS: int = 4
MAX_FILE_SIZE: int = 10485760 # 10MB
class Config:
env_file = ".env"
settings = Settings()
```
---
## API 路由实现
### 核心接口
```python
# routers/api.py
from fastapi import APIRouter, BackgroundTasks, UploadFile, File
router = APIRouter()
# 接口1获取Token
@router.post("/account/common/getToken")
async def get_token(request: GetTokenRequest):
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return token_service.create_token(request)
# 接口2上传文件
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
background_tasks: BackgroundTasks,
groupId: int = Form(...),
file: UploadFile = File(...)
):
return file_service.upload_file(groupId, file, background_tasks)
# 接口3拉取行内流水
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(request: FetchInnerFlowRequest):
error_code = ErrorSimulator.detect_error_marker(request.customerNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
return file_service.fetch_inner_flow(request)
# 接口4检查解析状态
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(request: CheckParseStatusRequest):
return file_service.check_parse_status(request.groupId, request.inprogressList)
# 接口5删除文件
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(request: dict):
return file_service.delete_files(
request.get("groupId"),
request.get("logIds"),
request.get("userId")
)
# 接口6获取银行流水
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(request: GetBankStatementRequest):
return statement_service.get_bank_statement(request)
```
### 主应用
```python
# main.py
from fastapi import FastAPI
from routers import api
app = FastAPI(
title="流水分析Mock服务",
description="模拟流水分析平台的7个核心接口",
version="1.0.0",
docs_url="/docs",
redoc_url="/redoc"
)
app.include_router(api.router, tags=["流水分析接口"])
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
```
---
## 测试和使用说明
### 启动服务
```bash
# 安装依赖
pip install -r requirements.txt
# 启动服务
python main.py
# 或使用uvicorn启动支持热重载
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### 访问API文档
- **Swagger UI:** http://localhost:8000/docs
- **ReDoc:** http://localhost:8000/redoc
### 测试示例
#### 1. 正常流程测试
```python
import requests
# 获取Token
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
result = response.json()
token = result["data"]["token"]
project_id = result["data"]["projectId"]
# 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
"http://localhost:8000/watson/api/project/remoteUploadSplitFile",
files=files,
data={"groupId": project_id},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]
# 轮询检查解析状态
import time
for i in range(10):
response = requests.post(
"http://localhost:8000/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": str(log_id)},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
result = response.json()
if not result["data"]["parsing"]:
print("解析完成")
break
time.sleep(1)
# 获取银行流水
response = requests.post(
"http://localhost:8000/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": log_id,
"pageNow": 1,
"pageSize": 10
},
headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
```
#### 2. 错误场景测试
```python
# 触发 40101 错误appId错误
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_error_40101", # 包含错误标记
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
# 返回: {"code": "40101", "message": "appId错误", ...}
# 触发 501014 错误(无行内流水文件)
response = requests.post(
"http://localhost:8000/watson/api/project/getJZFileOrZjrcuFile",
json={
"groupId": 1,
"customerNo": "test_error_501014", # 包含错误标记
"dataChannelCode": "ZJRCU",
"requestDateId": 20260302,
"dataStartDateId": 20260201,
"dataEndDateId": 20260228,
"uploadUserId": 902001
}
)
# 返回: {"code": "501014", "message": "无行内流水文件", ...}
```
### 配置修改
- 修改 `config/responses/` 下的JSON文件可以自定义响应数据
- 修改 `config/settings.py` 可以调整延迟时间、端口等配置
- 支持 `.env` 文件覆盖配置
---
## 依赖清单
```txt
# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-multipart==0.0.6
```
---
## 使用场景
### A. 开发阶段测试
在业务代码开发过程中,修改配置文件 `application-dev.yml`,将 `lsfx.api.base-url` 改为 `http://localhost:8000`启动Mock服务器后业务代码即可连接Mock服务进行测试。
### B. 完全替换测试
直接使用 Mock 服务器进行接口测试,验证业务逻辑的正确性。生产环境切换到真实服务。
### C. CI/CD 集成
在持续集成流程中使用 Mock 服务器,自动化测试接口调用逻辑。
---
## 扩展性考虑
### 后续可能的增强功能
1. **数据持久化** - 如需保留历史记录可集成SQLite
2. **更复杂的场景模拟** - 支持配置文件定义多个场景
3. **请求日志记录** - 记录所有请求用于调试
4. **Web管理界面** - 可视化管理Mock数据和状态
5. **Docker部署** - 提供Dockerfile方便部署
当前设计已满足核心需求,保持简洁实用。
---
## 总结
这是一个**配置驱动、轻量级、易于使用**的 Mock 服务器设计,核心特点:
**完整性** - 覆盖所有7个核心接口
**真实性** - 模拟文件解析延迟等真实场景
**灵活性** - 配置文件驱动,错误场景可触发
**易用性** - 自动API文档零配置启动
**可维护** - 代码结构清晰与Java项目对应
满足您的Mock测试需求提升开发和测试效率。