# 流水分析 Mock 服务器设计方案 **创建日期**: 2026-03-02 **作者**: Claude Code ## 项目概述 ### 背景 当前项目需要与流水分析平台进行接口对接,但在开发和测试过程中,依赖外部真实服务存在以下问题: - 网络连接不稳定,影响测试效率 - 无法控制返回数据,难以测试各种场景 - 无法模拟错误场景和边界情况 - 团队成员无法共享测试环境 ### 解决方案 开发一个独立的 Mock 服务器,基于 Python + FastAPI 技术栈,模拟流水分析平台的 7 个核心接口,支持: - 配置文件驱动的响应数据 - 文件上传解析延迟模拟 - 错误场景触发机制 - 自动生成的 API 文档 ### 技术选型 | 技术组件 | 选择 | 理由 | |---------|------|------| | Web框架 | FastAPI | 现代异步框架,自动生成API文档,强类型支持 | | 数据验证 | Pydantic | 与FastAPI原生集成,类型提示清晰 | | 配置管理 | JSON文件 | 易于修改,非开发人员也能调整测试数据 | | 状态存储 | 内存字典 | 轻量级,重启清空,适合Mock场景 | --- ## 整体架构 ``` lsfx-mock-server/ ├── main.py # 应用入口 ├── config/ │ ├── settings.py # 全局配置 │ └── responses/ # 响应模板配置文件 │ ├── token.json │ ├── upload.json │ ├── parse_status.json │ └── bank_statement.json ├── models/ │ ├── request.py # 请求模型(Pydantic) │ └── response.py # 响应模型(Pydantic) ├── services/ │ ├── token_service.py # Token管理 │ ├── file_service.py # 文件上传和解析模拟 │ └── statement_service.py # 流水数据管理 ├── routers/ │ └── api.py # 所有API路由 ├── utils/ │ ├── response_builder.py # 响应构建器 │ └── error_simulator.py # 错误场景模拟 └── requirements.txt ``` ### 核心设计思想 1. **配置驱动** - 所有响应数据在JSON配置文件中,方便修改 2. **内存状态管理** - 使用全局字典存储运行时状态(tokens、文件记录等) 3. **异步任务** - 使用FastAPI后台任务模拟文件解析延迟 4. **错误标记识别** - 检测请求参数中的特殊标记触发错误响应 --- ## 数据模型设计 ### 请求模型 对应Java项目中的DTO类: ```python # models/request.py from pydantic import BaseModel from typing import Optional class GetTokenRequest(BaseModel): projectNo: str entityName: str userId: str userName: str orgCode: str entityId: Optional[str] = None xdRelatedPersons: Optional[str] = None jzDataDateId: Optional[str] = "0" innerBSStartDateId: Optional[str] = "0" innerBSEndDateId: Optional[str] = "0" analysisType: Optional[int] = -1 departmentCode: Optional[str] = None class UploadFileRequest(BaseModel): groupId: int class FetchInnerFlowRequest(BaseModel): groupId: int customerNo: str dataChannelCode: str requestDateId: int dataStartDateId: int dataEndDateId: int uploadUserId: int class CheckParseStatusRequest(BaseModel): groupId: int inprogressList: str class GetBankStatementRequest(BaseModel): groupId: int logId: int pageNow: int pageSize: int ``` ### 响应模型 对应Java项目中的VO类: ```python # models/response.py from pydantic import BaseModel from typing import Optional, List, Dict, Any class TokenData(BaseModel): token: str projectId: int projectNo: str entityName: str analysisType: int class GetTokenResponse(BaseModel): code: str = "200" data: Optional[TokenData] = None message: str = "create.token.success" status: str = "200" successResponse: bool = True # 其他响应模型类似... ``` --- ## 核心业务逻辑 ### 文件解析延迟模拟 **实现机制:** 1. 上传接口立即返回,状态为"解析中" 2. 使用FastAPI的BackgroundTasks在后台延迟执行 3. 延迟3-5秒后更新状态为"解析完成" 4. 轮询检查接口返回当前解析状态 ```python # services/file_service.py class FileService: def __init__(self): self.file_records: Dict[int, Dict] = {} self.parsing_status: Dict[int, bool] = {} async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks): log_id = generate_log_id() # 立即存储记录,标记为解析中 self.file_records[log_id] = { "logId": log_id, "status": -5, "uploadStatusDesc": "parsing", ... } self.parsing_status[log_id] = True # 启动后台任务,延迟4秒后完成解析 background_tasks.add_task( self._simulate_parsing, log_id, delay_seconds=4 ) return log_id def _simulate_parsing(self, log_id: int, delay_seconds: int): time.sleep(delay_seconds) if log_id in self.file_records: self.file_records[log_id]["status"] = -5 self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount" self.parsing_status[log_id] = False ``` --- ## 错误场景模拟机制 ### 错误触发规则 通过请求参数中的特殊标记触发对应的错误响应: **错误码映射表:** ```python ERROR_CODES = { "40101": {"code": "40101", "message": "appId错误"}, "40102": {"code": "40102", "message": "appSecretCode错误"}, "40104": {"code": "40104", "message": "可使用项目次数为0,无法创建项目"}, "40105": {"code": "40105", "message": "只读模式下无法新建项目"}, "40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"}, "40107": {"code": "40107", "message": "当前系统不支持的分析类型"}, "40108": {"code": "40108", "message": "当前用户所属行社无权限"}, "501014": {"code": "501014", "message": "无行内流水文件"}, } ``` **检测逻辑:** ```python @staticmethod def detect_error_marker(value: str) -> Optional[str]: """检测字符串中的错误标记 规则:如果字符串包含 error_XXXX,则返回 XXXX 例如: - "project_error_40101" -> "40101" - "test_error_501014" -> "501014" """ if not value: return None import re pattern = r'error_(\d+)' match = re.search(pattern, value) if match: return match.group(1) return None ``` **使用示例:** ```python # 在服务中使用 def get_token(request: GetTokenRequest): error_code = ErrorSimulator.detect_error_marker(request.projectNo) if error_code: return ErrorSimulator.build_error_response(error_code) # 正常流程... ``` **测试方式:** ```python # 触发 40101 错误 request_data = { "projectNo": "test_project_error_40101", # 包含错误标记 "entityName": "测试企业", ... } ``` --- ## 配置文件结构 ### 响应模板配置 ```json // config/responses/token.json { "success_response": { "code": "200", "data": { "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}", "projectId": "{project_id}", "projectNo": "{project_no}", "entityName": "{entity_name}", "analysisType": 0 }, "message": "create.token.success", "status": "200", "successResponse": true } } ``` ```json // config/responses/upload.json { "success_response": { "code": "200", "data": { "accountsOfLog": {}, "uploadLogList": [ { "logId": "{log_id}", "status": -5, "uploadStatusDesc": "data.wait.confirm.newaccount", ... } ] } } } ``` ### 全局配置 ```python # config/settings.py from pydantic import BaseSettings class Settings(BaseSettings): APP_NAME: str = "流水分析Mock服务" APP_VERSION: str = "1.0.0" DEBUG: bool = True HOST: str = "0.0.0.0" PORT: int = 8000 # 模拟配置 PARSE_DELAY_SECONDS: int = 4 MAX_FILE_SIZE: int = 10485760 # 10MB class Config: env_file = ".env" settings = Settings() ``` --- ## API 路由实现 ### 核心接口 ```python # routers/api.py from fastapi import APIRouter, BackgroundTasks, UploadFile, File router = APIRouter() # 接口1:获取Token @router.post("/account/common/getToken") async def get_token(request: GetTokenRequest): error_code = ErrorSimulator.detect_error_marker(request.projectNo) if error_code: return ErrorSimulator.build_error_response(error_code) return token_service.create_token(request) # 接口2:上传文件 @router.post("/watson/api/project/remoteUploadSplitFile") async def upload_file( background_tasks: BackgroundTasks, groupId: int = Form(...), file: UploadFile = File(...) ): return file_service.upload_file(groupId, file, background_tasks) # 接口3:拉取行内流水 @router.post("/watson/api/project/getJZFileOrZjrcuFile") async def fetch_inner_flow(request: FetchInnerFlowRequest): error_code = ErrorSimulator.detect_error_marker(request.customerNo) if error_code: return ErrorSimulator.build_error_response(error_code) return file_service.fetch_inner_flow(request) # 接口4:检查解析状态 @router.post("/watson/api/project/upload/getpendings") async def check_parse_status(request: CheckParseStatusRequest): return file_service.check_parse_status(request.groupId, request.inprogressList) # 接口5:删除文件 @router.post("/watson/api/project/batchDeleteUploadFile") async def delete_files(request: dict): return file_service.delete_files( request.get("groupId"), request.get("logIds"), request.get("userId") ) # 接口6:获取银行流水 @router.post("/watson/api/project/getBSByLogId") async def get_bank_statement(request: GetBankStatementRequest): return statement_service.get_bank_statement(request) ``` ### 主应用 ```python # main.py from fastapi import FastAPI from routers import api app = FastAPI( title="流水分析Mock服务", description="模拟流水分析平台的7个核心接口", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) app.include_router(api.router, tags=["流水分析接口"]) if __name__ == "__main__": import uvicorn uvicorn.run(app, host="0.0.0.0", port=8000) ``` --- ## 测试和使用说明 ### 启动服务 ```bash # 安装依赖 pip install -r requirements.txt # 启动服务 python main.py # 或使用uvicorn启动(支持热重载) uvicorn main:app --reload --host 0.0.0.0 --port 8000 ``` ### 访问API文档 - **Swagger UI:** http://localhost:8000/docs - **ReDoc:** http://localhost:8000/redoc ### 测试示例 #### 1. 正常流程测试 ```python import requests # 获取Token response = requests.post( "http://localhost:8000/account/common/getToken", json={ "projectNo": "test_project_001", "entityName": "测试企业", "userId": "902001", "userName": "902001", "orgCode": "902000" } ) result = response.json() token = result["data"]["token"] project_id = result["data"]["projectId"] # 上传文件 files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")} response = requests.post( "http://localhost:8000/watson/api/project/remoteUploadSplitFile", files=files, data={"groupId": project_id}, headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"} ) log_id = response.json()["data"]["uploadLogList"][0]["logId"] # 轮询检查解析状态 import time for i in range(10): response = requests.post( "http://localhost:8000/watson/api/project/upload/getpendings", json={"groupId": project_id, "inprogressList": str(log_id)}, headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"} ) result = response.json() if not result["data"]["parsing"]: print("解析完成") break time.sleep(1) # 获取银行流水 response = requests.post( "http://localhost:8000/watson/api/project/getBSByLogId", json={ "groupId": project_id, "logId": log_id, "pageNow": 1, "pageSize": 10 }, headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"} ) ``` #### 2. 错误场景测试 ```python # 触发 40101 错误(appId错误) response = requests.post( "http://localhost:8000/account/common/getToken", json={ "projectNo": "test_project_error_40101", # 包含错误标记 "entityName": "测试企业", "userId": "902001", "userName": "902001", "orgCode": "902000" } ) # 返回: {"code": "40101", "message": "appId错误", ...} # 触发 501014 错误(无行内流水文件) response = requests.post( "http://localhost:8000/watson/api/project/getJZFileOrZjrcuFile", json={ "groupId": 1, "customerNo": "test_error_501014", # 包含错误标记 "dataChannelCode": "ZJRCU", "requestDateId": 20260302, "dataStartDateId": 20260201, "dataEndDateId": 20260228, "uploadUserId": 902001 } ) # 返回: {"code": "501014", "message": "无行内流水文件", ...} ``` ### 配置修改 - 修改 `config/responses/` 下的JSON文件可以自定义响应数据 - 修改 `config/settings.py` 可以调整延迟时间、端口等配置 - 支持 `.env` 文件覆盖配置 --- ## 依赖清单 ```txt # requirements.txt fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0 python-multipart==0.0.6 ``` --- ## 使用场景 ### A. 开发阶段测试 在业务代码开发过程中,修改配置文件 `application-dev.yml`,将 `lsfx.api.base-url` 改为 `http://localhost:8000`,启动Mock服务器后,业务代码即可连接Mock服务进行测试。 ### B. 完全替换测试 直接使用 Mock 服务器进行接口测试,验证业务逻辑的正确性。生产环境切换到真实服务。 ### C. CI/CD 集成 在持续集成流程中使用 Mock 服务器,自动化测试接口调用逻辑。 --- ## 扩展性考虑 ### 后续可能的增强功能 1. **数据持久化** - 如需保留历史记录,可集成SQLite 2. **更复杂的场景模拟** - 支持配置文件定义多个场景 3. **请求日志记录** - 记录所有请求用于调试 4. **Web管理界面** - 可视化管理Mock数据和状态 5. **Docker部署** - 提供Dockerfile方便部署 当前设计已满足核心需求,保持简洁实用。 --- ## 总结 这是一个**配置驱动、轻量级、易于使用**的 Mock 服务器设计,核心特点: ✅ **完整性** - 覆盖所有7个核心接口 ✅ **真实性** - 模拟文件解析延迟等真实场景 ✅ **灵活性** - 配置文件驱动,错误场景可触发 ✅ **易用性** - 自动API文档,零配置启动 ✅ **可维护** - 代码结构清晰,与Java项目对应 满足您的Mock测试需求,提升开发和测试效率。