# 流水分析 Mock 服务器 - 实施计划 **创建日期**: 2026-03-02 **状态**: 待执行 **预计完成时间**: 2-3 天 --- ## 项目目标 开发一个基于 Python + FastAPI 的 Mock 服务器,用于模拟流水分析平台的 7 个核心接口,支持: - 配置文件驱动的响应数据 - 文件上传解析延迟模拟(4秒) - 错误场景触发机制(通过 error_XXXX 标记) - 自动生成的 Swagger API 文档 --- ## 技术栈 | 技术 | 版本 | 用途 | |------|------|------| | Python | 3.11+ | 编程语言 | | FastAPI | 0.104.1 | Web框架 | | Pydantic | 2.5.0 | 数据验证 | | Uvicorn | 0.24.0 | ASGI服务器 | | pytest | latest | 测试框架 | --- ## 实施任务列表 ### Task 1: 项目初始化和基础设置 **状态**: ⏳ 待开始 **预计时间**: 1 小时 **阻塞任务**: 无 **目标**: 创建项目目录结构、配置文件和依赖管理 **实施步骤**: 1. 创建项目根目录 `lsfx-mock-server/` 2. 创建目录结构: ``` lsfx-mock-server/ ├── config/ │ └── responses/ ├── models/ ├── services/ ├── routers/ ├── utils/ └── tests/ ``` 3. 创建 `requirements.txt`: ```txt fastapi==0.104.1 uvicorn[standard]==0.24.0 pydantic==2.5.0 python-multipart==0.0.6 pytest>=7.0.0 pytest-cov>=4.0.0 httpx>=0.25.0 ``` 4. 创建 `config/settings.py`: - 使用 Pydantic BaseSettings - 支持环境变量覆盖(.env 文件) - 配置项:APP_NAME, HOST, PORT, DEBUG, PARSE_DELAY_SECONDS 5. 创建 4 个 JSON 响应模板文件: - `config/responses/token.json` - Token 响应模板 - `config/responses/upload.json` - 上传文件响应模板 - `config/responses/parse_status.json` - 解析状态响应模板 - `config/responses/bank_statement.json` - 银行流水响应模板 - 每个模板包含占位符(如 {project_id}, {log_id}) **验证标准**: - ✅ 虚拟环境创建并激活 - ✅ 依赖安装成功(无错误) - ✅ 配置文件能正确导入(`from config.settings import settings`) - ✅ JSON 模板文件格式正确(使用 `json.load()` 验证) - ✅ settings 能读取环境变量 **提交检查点**: ```bash git add requirements.txt config/ git commit -m "feat(mock): initialize project structure and configuration" ``` --- ### Task 2: 实现数据模型层 **状态**: ⏳ 待开始(等待 Task 1) **预计时间**: 1.5 小时 **阻塞任务**: Task 1 **目标**: 创建所有请求和响应的 Pydantic 模型类 **实施步骤**: 1. 创建 `models/__init__.py`(空文件) 2. 创建 `models/request.py`: - 定义 6 个请求模型: - GetTokenRequest(10+ 字段,可选字段有默认值) - UploadFileRequest(通过 Form 数据接收) - FetchInnerFlowRequest(7 个必填字段) - CheckParseStatusRequest(2 个字段) - DeleteFilesRequest(3 个字段) - GetBankStatementRequest(4 个字段) - 所有字段添加 Field 描述(用于 Swagger) - 可选字段使用 `Optional[Type] = default_value` 3. 创建 `models/response.py`: - 定义嵌套数据模型: - TokenData(5 个字段) - UploadLogItem(15+ 字段) - BankStatementItem(30+ 字段) - PendingItem(15+ 字段) - 定义 6 个响应模型: - GetTokenResponse - UploadFileResponse - FetchInnerFlowResponse - CheckParseStatusResponse - DeleteFilesResponse - GetBankStatementResponse - 所有响应模型包含通用字段:code, message, status, successResponse **验证标准**: - ✅ 所有模型类能正确实例化 - ✅ 可选字段默认值正确 - ✅ Pydantic 验证功能正常(类型错误会抛出 ValidationError) - ✅ 模型序列化为 JSON 正确(`model.model_dump_json()`) - ✅ Swagger 自动文档显示所有字段和描述 **提交检查点**: ```bash git add models/ git commit -m "feat(models): implement Pydantic request and response models" ``` --- ### Task 3: 实现工具类 **状态**: ⏳ 待开始(可与 Task 2 并行) **预计时间**: 1 小时 **阻塞任务**: 无 **目标**: 实现错误检测和响应构建工具 **实施步骤**: 1. 创建 `utils/__init__.py` 2. 创建 `utils/error_simulator.py`: ```python class ErrorSimulator: ERROR_CODES = { "40101": {"code": "40101", "message": "appId错误"}, "40102": {"code": "40102", "message": "appSecretCode错误"}, "40104": {"code": "40104", "message": "可使用项目次数为0"}, "40105": {"code": "40105", "message": "只读模式下无法新建项目"}, "40106": {"code": "40106", "message": "错误的分析类型"}, "40107": {"code": "40107", "message": "当前系统不支持的分析类型"}, "40108": {"code": "40108", "message": "当前用户所属行社无权限"}, "501014": {"code": "501014", "message": "无行内流水文件"}, } @staticmethod def detect_error_marker(value: str) -> Optional[str]: """检测 error_XXXX 模式""" import re if not value: return None pattern = r'error_(\d+)' match = re.search(pattern, value) return match.group(1) if match else None @staticmethod def build_error_response(error_code: str) -> Dict: """构建错误响应""" if error_code in ErrorSimulator.ERROR_CODES: error_info = ErrorSimulator.ERROR_CODES[error_code] return { "code": error_info["code"], "message": error_info["message"], "status": error_info["code"], "successResponse": False } return None ``` 3. 创建 `utils/response_builder.py`: ```python import json from pathlib import Path from typing import Dict, Any class ResponseBuilder: TEMPLATE_DIR = Path(__file__).parent.parent / "config" / "responses" @staticmethod def load_template(template_name: str) -> Dict: """加载 JSON 模板""" file_path = ResponseBuilder.TEMPLATE_DIR / f"{template_name}.json" with open(file_path, 'r', encoding='utf-8') as f: return json.load(f) @staticmethod def replace_placeholders(template: Dict, **kwargs) -> Dict: """递归替换占位符""" def replace_value(value): if isinstance(value, str): for key, val in kwargs.items(): placeholder = f"{{{key}}}" if placeholder in value: return value.replace(placeholder, str(val)) return value elif isinstance(value, dict): return {k: replace_value(v) for k, v in value.items()} elif isinstance(value, list): return [replace_value(item) for item in value] return value return replace_value(template) @staticmethod def build_success_response(template_name: str, **kwargs) -> Dict: """构建成功响应""" template = ResponseBuilder.load_template(template_name) return ResponseBuilder.replace_placeholders( template["success_response"], **kwargs ) ``` **验证标准**: - ✅ ErrorSimulator.detect_error_marker() 能正确识别错误标记 - ✅ ErrorSimulator.build_error_response() 返回正确的错误响应 - ✅ ResponseBuilder 能正确加载 JSON 模板 - ✅ 占位符替换功能正常(支持嵌套字典和列表) - ✅ 所有 8 个错误码都有对应响应 **提交检查点**: ```bash git add utils/ git commit -m "feat(utils): implement error simulator and response builder" ``` --- ### Task 4: 实现服务层 **状态**: ⏳ 待开始(等待 Task 1, 2, 3) **预计时间**: 2 小时 **阻塞任务**: Task 1, Task 2, Task 3 **目标**: 实现核心业务服务类 **实施步骤**: 1. 创建 `services/__init__.py` 2. 创建 `services/token_service.py`: ```python class TokenService: def __init__(self): self.project_counter = 0 self.tokens = {} # projectId -> token_data def create_token(self, request: GetTokenRequest) -> Dict: self.project_counter += 1 project_id = self.project_counter token = f"mock_token_{project_id}" return ResponseBuilder.build_success_response( "token", project_id=project_id, project_no=request.projectNo, entity_name=request.entityName ) ``` 3. 创建 `services/file_service.py`: ```python from fastapi import BackgroundTasks import time from uuid import uuid4 class FileService: def __init__(self): self.file_records = {} # logId -> record self.parsing_status = {} # logId -> is_parsing self.log_counter = 0 async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks) -> Dict: self.log_counter += 1 log_id = self.log_counter # 立即存储记录 self.file_records[log_id] = { "logId": log_id, "groupId": group_id, "status": -5, "uploadStatusDesc": "parsing", "uploadFileName": file.filename, "fileSize": file.size, # ... 其他字段 } self.parsing_status[log_id] = True # 启动后台任务 background_tasks.add_task( self._simulate_parsing, log_id, settings.PARSE_DELAY_SECONDS ) return ResponseBuilder.build_success_response( "upload", log_id=log_id ) def _simulate_parsing(self, log_id: int, delay_seconds: int): """后台任务:模拟解析""" time.sleep(delay_seconds) if log_id in self.file_records: self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount" self.parsing_status[log_id] = False def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict: """检查解析状态""" log_ids = [int(x.strip()) for x in inprogress_list.split(",")] is_parsing = any( self.parsing_status.get(log_id, False) for log_id in log_ids ) pending_list = [ self.file_records[log_id] for log_id in log_ids if log_id in self.file_records ] return { "code": "200", "data": { "parsing": is_parsing, "pendingList": pending_list }, "status": "200", "successResponse": True } def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict: """删除文件""" for log_id in log_ids: self.file_records.pop(log_id, None) self.parsing_status.pop(log_id, None) return { "code": "200", "data": {"message": "delete.files.success"}, "status": "200", "successResponse": True } def fetch_inner_flow(self, request: FetchInnerFlowRequest) -> Dict: """拉取行内流水(模拟无数据)""" return { "code": "200", "data": { "code": "501014", "message": "无行内流水文件" }, "status": "200", "successResponse": True } ``` 4. 创建 `services/statement_service.py`: ```python class StatementService: def get_bank_statement(self, request: GetBankStatementRequest) -> Dict: # 加载模板 template = ResponseBuilder.load_template("bank_statement") statements = template["success_response"]["data"]["bankStatementList"] # 模拟分页 start = (request.pageNow - 1) * request.pageSize end = start + request.pageSize page_data = statements[start:end] return { "code": "200", "data": { "bankStatementList": page_data, "totalCount": len(statements) }, "status": "200", "successResponse": True } ``` **验证标准**: - ✅ TokenService 能创建唯一 token - ✅ FileService.upload_file() 返回正确状态 - ✅ 后台任务执行后,解析状态从 True 变为 False - ✅ check_parse_status() 正确返回 parsing 状态 - ✅ StatementService 支持分页功能 - ✅ 所有方法返回正确格式 **提交检查点**: ```bash git add services/ git commit -m "feat(services): implement token, file, and statement services" ``` --- ### Task 5: 实现 API 路由 **状态**: ⏳ 待开始(等待 Task 2, 3, 4) **预计时间**: 1.5 小时 **阻塞任务**: Task 1, Task 2, Task 3, Task 4 **目标**: 实现所有 6 个 API 接口路由 **实施步骤**: 1. 创建 `routers/__init__.py` 2. 创建 `routers/api.py`: ```python from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form from models.request import * from models.response import * from services.token_service import TokenService from services.file_service import FileService from services.statement_service import StatementService from utils.error_simulator import ErrorSimulator router = APIRouter() token_service = TokenService() file_service = FileService() statement_service = StatementService() @router.post("/account/common/getToken") async def get_token(request: GetTokenRequest): """获取Token""" error_code = ErrorSimulator.detect_error_marker(request.projectNo) if error_code: return ErrorSimulator.build_error_response(error_code) return token_service.create_token(request) @router.post("/watson/api/project/remoteUploadSplitFile") async def upload_file( background_tasks: BackgroundTasks, groupId: int = Form(...), file: UploadFile = File(...) ): """上传文件""" return await file_service.upload_file(groupId, file, background_tasks) @router.post("/watson/api/project/getJZFileOrZjrcuFile") async def fetch_inner_flow(request: FetchInnerFlowRequest): """拉取行内流水""" error_code = ErrorSimulator.detect_error_marker(request.customerNo) if error_code: return ErrorSimulator.build_error_response(error_code) return file_service.fetch_inner_flow(request) @router.post("/watson/api/project/upload/getpendings") async def check_parse_status(request: CheckParseStatusRequest): """检查文件解析状态""" return file_service.check_parse_status(request.groupId, request.inprogressList) @router.post("/watson/api/project/batchDeleteUploadFile") async def delete_files( groupId: int, logIds: List[int], userId: int ): """删除文件""" return file_service.delete_files(groupId, logIds, userId) @router.post("/watson/api/project/getBSByLogId") async def get_bank_statement(request: GetBankStatementRequest): """获取银行流水""" return statement_service.get_bank_statement(request) ``` **验证标准**: - ✅ 所有 6 个接口在 Swagger UI 中可见 - ✅ 每个接口能正常响应 - ✅ 错误标记功能正常(包含 error_XXXX 的参数触发错误) - ✅ 文件上传接口能接收文件 - ✅ 所有接口有正确的 Swagger 描述 - ✅ 响应格式符合文档要求 **提交检查点**: ```bash git add routers/ git commit -m "feat(routers): implement all 6 API endpoints" ``` --- ### Task 6: 实现主应用 **状态**: ⏳ 待开始(等待 Task 1, 4, 5) **预计时间**: 0.5 小时 **阻塞任务**: Task 1, Task 4, Task 5 **目标**: 实现 FastAPI 应用主入口 **实施步骤**: 1. 创建 `main.py`: ```python from fastapi import FastAPI from routers import api from config.settings import settings app = FastAPI( title=settings.APP_NAME, description="模拟流水分析平台的7个核心接口", version="1.0.0", docs_url="/docs", redoc_url="/redoc" ) app.include_router(api.router, tags=["流水分析接口"]) @app.get("/health") async def health_check(): return {"status": "healthy", "service": settings.APP_NAME} if __name__ == "__main__": import uvicorn uvicorn.run( app, host=settings.HOST, port=settings.PORT, log_level="debug" if settings.DEBUG else "info" ) ``` **验证标准**: - ✅ 应用能启动:`python main.py` - ✅ 访问 http://localhost:8000/docs 显示 Swagger UI - ✅ 访问 http://localhost:8000/redoc 显示 ReDoc - ✅ 健康检查端点返回正确响应 - ✅ 所有接口在文档中可见 **提交检查点**: ```bash git add main.py git commit -m "feat(main): implement FastAPI application entry point" ``` --- ### Task 7: 编写测试套件 **状态**: ⏳ 待开始(等待 Task 1-6) **预计时间**: 2 小时 **阻塞任务**: Task 1, Task 2, Task 3, Task 4, Task 5, Task 6 **目标**: 创建完整的测试套件 **实施步骤**: 1. 创建 `tests/conftest.py`: ```python import pytest from fastapi.testclient import TestClient from main import app @pytest.fixture def client(): return TestClient(app) ``` 2. 创建 `tests/test_models.py` - 测试所有数据模型 3. 创建 `tests/test_utils.py` - 测试工具类 4. 创建 `tests/test_services.py` - 测试服务类 5. 创建 `tests/test_api.py` - 测试 API 端点 **验证标准**: - ✅ 运行 `pytest tests/ -v` 所有测试通过 - ✅ 代码覆盖率 > 80% - ✅ 所有错误场景有测试 - ✅ 生成 HTML 覆盖率报告 **提交检查点**: ```bash git add tests/ git commit -m "test: add comprehensive test suite" ``` --- ### Task 8: 编写文档和部署配置 **状态**: ⏳ 待开始(等待 Task 1-7) **预计时间**: 1 小时 **阻塞任务**: Task 1-7 **目标**: 创建项目文档和部署说明 **实施步骤**: 1. 创建 `README.md`(包含安装、使用、测试说明) 2. 创建 `.env.example` 3. 创建 `Dockerfile` 4. 创建 `docker-compose.yml` **验证标准**: - ✅ README 中所有命令可执行 - ✅ Docker 镜像构建成功 - ✅ Docker Compose 启动成功 **提交检查点**: ```bash git add README.md .env.example Dockerfile docker-compose.yml git commit -m "docs: add README and deployment configuration" ``` --- ### Task 9: 创建集成测试 **状态**: ⏳ 待开始(等待 Task 8) **预计时间**: 1 小时 **阻塞任务**: Task 8 **目标**: 创建端到端集成测试脚本 **实施步骤**: 1. 创建 `tests/integration/test_full_workflow.py` 2. 实现完整的接口调用流程测试 3. 添加错误场景测试 **验证标准**: - ✅ 集成测试通过 - ✅ 完整流程测试成功 - ✅ 错误场景测试成功 **提交检查点**: ```bash git add tests/integration/ git commit -m "test: add integration tests for full workflow" ``` --- ### Task 10: 代码审查和提交 **状态**: ⏳ 待开始(等待 Task 1-9) **预计时间**: 1 小时 **阻塞任务**: Task 1-9 **目标**: 代码审查、优化和 Git 提交 **审查清单**: 1. **代码质量** - ✅ 所有代码符合 PEP 8 - ✅ 类型提示完整 - ✅ 无硬编码配置 - ✅ 注释充分 2. **安全性** - ✅ 输入验证完整(Pydantic) - ✅ 无注入风险 3. **测试覆盖** - ✅ 单元测试覆盖率 > 80% - ✅ 集成测试通过 **验证标准**: - ✅ 所有测试通过 - ✅ 代码覆盖率报告生成 - ✅ 手动测试所有接口 - ✅ README 验证完成 **最终提交**: ```bash git add . git commit -m "feat(lsfx-mock): complete lsfx mock server implementation" git push origin feature/lsfx-mock-server ``` --- ## 开发注意事项 ### 环境要求 - Python 3.11+ - 虚拟环境(venv) - 端口 8000 可用 ### 开发流程 1. 每完成一个任务,立即提交代码 2. 运行相关测试确保功能正确 3. 更新任务状态 4. 开始下一个任务 ### 测试策略 - **单元测试**: 每个模块独立测试 - **集成测试**: 完整流程测试 - **手动测试**: 使用 Swagger UI 验证接口 ### 代码规范 - 遵循 PEP 8 - 使用类型提示 - 函数和类添加文档字符串 - 保持代码简洁(YAGNI, DRY) --- ## 预期成果 1. ✅ 完整的 Mock 服务器,模拟 7 个核心接口 2. ✅ 配置文件驱动的响应数据 3. ✅ 文件解析延迟模拟 4. ✅ 错误场景触发机制 5. ✅ 自动生成的 API 文档 6. ✅ 完整的测试套件(覆盖率 > 80%) 7. ✅ 清晰的 README 和部署文档 8. ✅ Docker 部署支持 --- ## 风险和缓解 | 风险 | 影响 | 缓解措施 | |------|------|----------| | FastAPI 框架不熟悉 | 延期 | 变更预计时间到 3-4 天 | | 异步任务调试困难 | 中等 | 添加详细日志,分步测试 | | 响应格式与真实接口不符 | 高 | 严格对照接口文档,多次验证 | --- ## 后续优化方向 1. 添加数据库持久化(SQLite) 2. 实现更复杂的场景模拟 3. 添加请求日志记录 4. 创建 Web 管理界面 5. 支持 WebSocket 实时通知 --- **预计总开发时间**: 10-12 小时 **建议开发模式**: 按顺序执行,每完成一个任务立即测试验证