Files
ccdi/docs/plans/2026-03-02-lsfx-mock-server-implementation-plan.md
wkc b022ec75b8 fix(lsfx): 修复流水分析对接模块的代码质量问题
1. 修复配置问题
   - 替换app-secret占位符为正确的密钥dXj6eHRmPv

2. 添加异常处理
   - HttpUtil所有方法添加完整的异常处理
   - 统一使用LsfxApiException包装异常
   - 检查HTTP状态码和响应体

3. 添加日志记录
   - Client所有方法添加详细的日志记录
   - 记录请求参数、响应结果、耗时
   - 异常情况记录错误日志

4. 完善参数校验
   - 接口1:添加6个必填字段校验
   - 接口2:添加groupId和文件校验,限制文件大小10MB
   - 接口3:添加7个参数校验和日期范围校验
   - 接口4:添加groupId和inprogressList校验

5. 性能优化
   - RestTemplate使用Apache HttpClient连接池
   - 最大连接数100,每个路由最大20个连接
   - 支持连接复用,提升性能

6. 代码审查文档
   - 添加详细的代码审查报告
   - 记录发现的问题和改进建议

修改的文件:
- ccdi-lsfx/pom.xml
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/client/LsfxAnalysisClient.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/config/RestTemplateConfig.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/controller/LsfxTestController.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/util/HttpUtil.java
- ruoyi-admin/src/main/resources/application-dev.yml
- doc/implementation/lsfx-code-review-20260302.md
2026-03-03 09:35:27 +08:00

21 KiB
Raw Permalink Blame History

流水分析 Mock 服务器 - 实施计划

创建日期: 2026-03-02 状态: 待执行 预计完成时间: 2-3 天


项目目标

开发一个基于 Python + FastAPI 的 Mock 服务器,用于模拟流水分析平台的 7 个核心接口,支持:

  • 配置文件驱动的响应数据
  • 文件上传解析延迟模拟4秒
  • 错误场景触发机制(通过 error_XXXX 标记)
  • 自动生成的 Swagger API 文档

技术栈

技术 版本 用途
Python 3.11+ 编程语言
FastAPI 0.104.1 Web框架
Pydantic 2.5.0 数据验证
Uvicorn 0.24.0 ASGI服务器
pytest latest 测试框架

实施任务列表

Task 1: 项目初始化和基础设置

状态: 待开始 预计时间: 1 小时 阻塞任务: 无

目标: 创建项目目录结构、配置文件和依赖管理

实施步骤:

  1. 创建项目根目录 lsfx-mock-server/

  2. 创建目录结构:

    lsfx-mock-server/
    ├── config/
    │   └── responses/
    ├── models/
    ├── services/
    ├── routers/
    ├── utils/
    └── tests/
    
  3. 创建 requirements.txt:

    fastapi==0.104.1
    uvicorn[standard]==0.24.0
    pydantic==2.5.0
    python-multipart==0.0.6
    pytest>=7.0.0
    pytest-cov>=4.0.0
    httpx>=0.25.0
    
  4. 创建 config/settings.py:

    • 使用 Pydantic BaseSettings
    • 支持环境变量覆盖(.env 文件)
    • 配置项APP_NAME, HOST, PORT, DEBUG, PARSE_DELAY_SECONDS
  5. 创建 4 个 JSON 响应模板文件:

    • config/responses/token.json - Token 响应模板
    • config/responses/upload.json - 上传文件响应模板
    • config/responses/parse_status.json - 解析状态响应模板
    • config/responses/bank_statement.json - 银行流水响应模板
    • 每个模板包含占位符(如 {project_id}, {log_id}

验证标准:

  • 虚拟环境创建并激活
  • 依赖安装成功(无错误)
  • 配置文件能正确导入(from config.settings import settings
  • JSON 模板文件格式正确(使用 json.load() 验证)
  • settings 能读取环境变量

提交检查点:

git add requirements.txt config/
git commit -m "feat(mock): initialize project structure and configuration"

Task 2: 实现数据模型层

状态: 待开始(等待 Task 1 预计时间: 1.5 小时 阻塞任务: Task 1

目标: 创建所有请求和响应的 Pydantic 模型类

实施步骤:

  1. 创建 models/__init__.py(空文件)

  2. 创建 models/request.py:

    • 定义 6 个请求模型:
      • GetTokenRequest10+ 字段,可选字段有默认值)
      • UploadFileRequest通过 Form 数据接收)
      • FetchInnerFlowRequest7 个必填字段)
      • CheckParseStatusRequest2 个字段)
      • DeleteFilesRequest3 个字段)
      • GetBankStatementRequest4 个字段)
    • 所有字段添加 Field 描述(用于 Swagger
    • 可选字段使用 Optional[Type] = default_value
  3. 创建 models/response.py:

    • 定义嵌套数据模型:
      • TokenData5 个字段)
      • UploadLogItem15+ 字段)
      • BankStatementItem30+ 字段)
      • PendingItem15+ 字段)
    • 定义 6 个响应模型:
      • GetTokenResponse
      • UploadFileResponse
      • FetchInnerFlowResponse
      • CheckParseStatusResponse
      • DeleteFilesResponse
      • GetBankStatementResponse
    • 所有响应模型包含通用字段code, message, status, successResponse

验证标准:

  • 所有模型类能正确实例化
  • 可选字段默认值正确
  • Pydantic 验证功能正常(类型错误会抛出 ValidationError
  • 模型序列化为 JSON 正确(model.model_dump_json()
  • Swagger 自动文档显示所有字段和描述

提交检查点:

git add models/
git commit -m "feat(models): implement Pydantic request and response models"

Task 3: 实现工具类

状态: 待开始(可与 Task 2 并行) 预计时间: 1 小时 阻塞任务: 无

目标: 实现错误检测和响应构建工具

实施步骤:

  1. 创建 utils/__init__.py

  2. 创建 utils/error_simulator.py:

    class ErrorSimulator:
        ERROR_CODES = {
            "40101": {"code": "40101", "message": "appId错误"},
            "40102": {"code": "40102", "message": "appSecretCode错误"},
            "40104": {"code": "40104", "message": "可使用项目次数为0"},
            "40105": {"code": "40105", "message": "只读模式下无法新建项目"},
            "40106": {"code": "40106", "message": "错误的分析类型"},
            "40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
            "40108": {"code": "40108", "message": "当前用户所属行社无权限"},
            "501014": {"code": "501014", "message": "无行内流水文件"},
        }
    
        @staticmethod
        def detect_error_marker(value: str) -> Optional[str]:
            """检测 error_XXXX 模式"""
            import re
            if not value:
                return None
            pattern = r'error_(\d+)'
            match = re.search(pattern, value)
            return match.group(1) if match else None
    
        @staticmethod
        def build_error_response(error_code: str) -> Dict:
            """构建错误响应"""
            if error_code in ErrorSimulator.ERROR_CODES:
                error_info = ErrorSimulator.ERROR_CODES[error_code]
                return {
                    "code": error_info["code"],
                    "message": error_info["message"],
                    "status": error_info["code"],
                    "successResponse": False
                }
            return None
    
  3. 创建 utils/response_builder.py:

    import json
    from pathlib import Path
    from typing import Dict, Any
    
    class ResponseBuilder:
        TEMPLATE_DIR = Path(__file__).parent.parent / "config" / "responses"
    
        @staticmethod
        def load_template(template_name: str) -> Dict:
            """加载 JSON 模板"""
            file_path = ResponseBuilder.TEMPLATE_DIR / f"{template_name}.json"
            with open(file_path, 'r', encoding='utf-8') as f:
                return json.load(f)
    
        @staticmethod
        def replace_placeholders(template: Dict, **kwargs) -> Dict:
            """递归替换占位符"""
            def replace_value(value):
                if isinstance(value, str):
                    for key, val in kwargs.items():
                        placeholder = f"{{{key}}}"
                        if placeholder in value:
                            return value.replace(placeholder, str(val))
                    return value
                elif isinstance(value, dict):
                    return {k: replace_value(v) for k, v in value.items()}
                elif isinstance(value, list):
                    return [replace_value(item) for item in value]
                return value
    
            return replace_value(template)
    
        @staticmethod
        def build_success_response(template_name: str, **kwargs) -> Dict:
            """构建成功响应"""
            template = ResponseBuilder.load_template(template_name)
            return ResponseBuilder.replace_placeholders(
                template["success_response"],
                **kwargs
            )
    

验证标准:

  • ErrorSimulator.detect_error_marker() 能正确识别错误标记
  • ErrorSimulator.build_error_response() 返回正确的错误响应
  • ResponseBuilder 能正确加载 JSON 模板
  • 占位符替换功能正常(支持嵌套字典和列表)
  • 所有 8 个错误码都有对应响应

提交检查点:

git add utils/
git commit -m "feat(utils): implement error simulator and response builder"

Task 4: 实现服务层

状态: 待开始(等待 Task 1, 2, 3 预计时间: 2 小时 阻塞任务: Task 1, Task 2, Task 3

目标: 实现核心业务服务类

实施步骤:

  1. 创建 services/__init__.py

  2. 创建 services/token_service.py:

    class TokenService:
        def __init__(self):
            self.project_counter = 0
            self.tokens = {}  # projectId -> token_data
    
        def create_token(self, request: GetTokenRequest) -> Dict:
            self.project_counter += 1
            project_id = self.project_counter
            token = f"mock_token_{project_id}"
    
            return ResponseBuilder.build_success_response(
                "token",
                project_id=project_id,
                project_no=request.projectNo,
                entity_name=request.entityName
            )
    
  3. 创建 services/file_service.py:

    from fastapi import BackgroundTasks
    import time
    from uuid import uuid4
    
    class FileService:
        def __init__(self):
            self.file_records = {}  # logId -> record
            self.parsing_status = {}  # logId -> is_parsing
            self.log_counter = 0
    
        async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks) -> Dict:
            self.log_counter += 1
            log_id = self.log_counter
    
            # 立即存储记录
            self.file_records[log_id] = {
                "logId": log_id,
                "groupId": group_id,
                "status": -5,
                "uploadStatusDesc": "parsing",
                "uploadFileName": file.filename,
                "fileSize": file.size,
                # ... 其他字段
            }
            self.parsing_status[log_id] = True
    
            # 启动后台任务
            background_tasks.add_task(
                self._simulate_parsing,
                log_id,
                settings.PARSE_DELAY_SECONDS
            )
    
            return ResponseBuilder.build_success_response(
                "upload",
                log_id=log_id
            )
    
        def _simulate_parsing(self, log_id: int, delay_seconds: int):
            """后台任务:模拟解析"""
            time.sleep(delay_seconds)
            if log_id in self.file_records:
                self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount"
                self.parsing_status[log_id] = False
    
        def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
            """检查解析状态"""
            log_ids = [int(x.strip()) for x in inprogress_list.split(",")]
            is_parsing = any(
                self.parsing_status.get(log_id, False)
                for log_id in log_ids
            )
    
            pending_list = [
                self.file_records[log_id]
                for log_id in log_ids
                if log_id in self.file_records
            ]
    
            return {
                "code": "200",
                "data": {
                    "parsing": is_parsing,
                    "pendingList": pending_list
                },
                "status": "200",
                "successResponse": True
            }
    
        def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
            """删除文件"""
            for log_id in log_ids:
                self.file_records.pop(log_id, None)
                self.parsing_status.pop(log_id, None)
    
            return {
                "code": "200",
                "data": {"message": "delete.files.success"},
                "status": "200",
                "successResponse": True
            }
    
        def fetch_inner_flow(self, request: FetchInnerFlowRequest) -> Dict:
            """拉取行内流水(模拟无数据)"""
            return {
                "code": "200",
                "data": {
                    "code": "501014",
                    "message": "无行内流水文件"
                },
                "status": "200",
                "successResponse": True
            }
    
  4. 创建 services/statement_service.py:

    class StatementService:
        def get_bank_statement(self, request: GetBankStatementRequest) -> Dict:
            # 加载模板
            template = ResponseBuilder.load_template("bank_statement")
            statements = template["success_response"]["data"]["bankStatementList"]
    
            # 模拟分页
            start = (request.pageNow - 1) * request.pageSize
            end = start + request.pageSize
            page_data = statements[start:end]
    
            return {
                "code": "200",
                "data": {
                    "bankStatementList": page_data,
                    "totalCount": len(statements)
                },
                "status": "200",
                "successResponse": True
            }
    

验证标准:

  • TokenService 能创建唯一 token
  • FileService.upload_file() 返回正确状态
  • 后台任务执行后,解析状态从 True 变为 False
  • check_parse_status() 正确返回 parsing 状态
  • StatementService 支持分页功能
  • 所有方法返回正确格式

提交检查点:

git add services/
git commit -m "feat(services): implement token, file, and statement services"

Task 5: 实现 API 路由

状态: 待开始(等待 Task 2, 3, 4 预计时间: 1.5 小时 阻塞任务: Task 1, Task 2, Task 3, Task 4

目标: 实现所有 6 个 API 接口路由

实施步骤:

  1. 创建 routers/__init__.py
  2. 创建 routers/api.py:
    from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form
    from models.request import *
    from models.response import *
    from services.token_service import TokenService
    from services.file_service import FileService
    from services.statement_service import StatementService
    from utils.error_simulator import ErrorSimulator
    
    router = APIRouter()
    token_service = TokenService()
    file_service = FileService()
    statement_service = StatementService()
    
    @router.post("/account/common/getToken")
    async def get_token(request: GetTokenRequest):
        """获取Token"""
        error_code = ErrorSimulator.detect_error_marker(request.projectNo)
        if error_code:
            return ErrorSimulator.build_error_response(error_code)
        return token_service.create_token(request)
    
    @router.post("/watson/api/project/remoteUploadSplitFile")
    async def upload_file(
        background_tasks: BackgroundTasks,
        groupId: int = Form(...),
        file: UploadFile = File(...)
    ):
        """上传文件"""
        return await file_service.upload_file(groupId, file, background_tasks)
    
    @router.post("/watson/api/project/getJZFileOrZjrcuFile")
    async def fetch_inner_flow(request: FetchInnerFlowRequest):
        """拉取行内流水"""
        error_code = ErrorSimulator.detect_error_marker(request.customerNo)
        if error_code:
            return ErrorSimulator.build_error_response(error_code)
        return file_service.fetch_inner_flow(request)
    
    @router.post("/watson/api/project/upload/getpendings")
    async def check_parse_status(request: CheckParseStatusRequest):
        """检查文件解析状态"""
        return file_service.check_parse_status(request.groupId, request.inprogressList)
    
    @router.post("/watson/api/project/batchDeleteUploadFile")
    async def delete_files(
        groupId: int,
        logIds: List[int],
        userId: int
    ):
        """删除文件"""
        return file_service.delete_files(groupId, logIds, userId)
    
    @router.post("/watson/api/project/getBSByLogId")
    async def get_bank_statement(request: GetBankStatementRequest):
        """获取银行流水"""
        return statement_service.get_bank_statement(request)
    

验证标准:

  • 所有 6 个接口在 Swagger UI 中可见
  • 每个接口能正常响应
  • 错误标记功能正常(包含 error_XXXX 的参数触发错误)
  • 文件上传接口能接收文件
  • 所有接口有正确的 Swagger 描述
  • 响应格式符合文档要求

提交检查点:

git add routers/
git commit -m "feat(routers): implement all 6 API endpoints"

Task 6: 实现主应用

状态: 待开始(等待 Task 1, 4, 5 预计时间: 0.5 小时 阻塞任务: Task 1, Task 4, Task 5

目标: 实现 FastAPI 应用主入口

实施步骤:

  1. 创建 main.py:
    from fastapi import FastAPI
    from routers import api
    from config.settings import settings
    
    app = FastAPI(
        title=settings.APP_NAME,
        description="模拟流水分析平台的7个核心接口",
        version="1.0.0",
        docs_url="/docs",
        redoc_url="/redoc"
    )
    
    app.include_router(api.router, tags=["流水分析接口"])
    
    @app.get("/health")
    async def health_check():
        return {"status": "healthy", "service": settings.APP_NAME}
    
    if __name__ == "__main__":
        import uvicorn
        uvicorn.run(
            app,
            host=settings.HOST,
            port=settings.PORT,
            log_level="debug" if settings.DEBUG else "info"
        )
    

验证标准:

提交检查点:

git add main.py
git commit -m "feat(main): implement FastAPI application entry point"

Task 7: 编写测试套件

状态: 待开始(等待 Task 1-6 预计时间: 2 小时 阻塞任务: Task 1, Task 2, Task 3, Task 4, Task 5, Task 6

目标: 创建完整的测试套件

实施步骤:

  1. 创建 tests/conftest.py:

    import pytest
    from fastapi.testclient import TestClient
    from main import app
    
    @pytest.fixture
    def client():
        return TestClient(app)
    
  2. 创建 tests/test_models.py - 测试所有数据模型

  3. 创建 tests/test_utils.py - 测试工具类

  4. 创建 tests/test_services.py - 测试服务类

  5. 创建 tests/test_api.py - 测试 API 端点

验证标准:

  • 运行 pytest tests/ -v 所有测试通过
  • 代码覆盖率 > 80%
  • 所有错误场景有测试
  • 生成 HTML 覆盖率报告

提交检查点:

git add tests/
git commit -m "test: add comprehensive test suite"

Task 8: 编写文档和部署配置

状态: 待开始(等待 Task 1-7 预计时间: 1 小时 阻塞任务: Task 1-7

目标: 创建项目文档和部署说明

实施步骤:

  1. 创建 README.md(包含安装、使用、测试说明)
  2. 创建 .env.example
  3. 创建 Dockerfile
  4. 创建 docker-compose.yml

验证标准:

  • README 中所有命令可执行
  • Docker 镜像构建成功
  • Docker Compose 启动成功

提交检查点:

git add README.md .env.example Dockerfile docker-compose.yml
git commit -m "docs: add README and deployment configuration"

Task 9: 创建集成测试

状态: 待开始(等待 Task 8 预计时间: 1 小时 阻塞任务: Task 8

目标: 创建端到端集成测试脚本

实施步骤:

  1. 创建 tests/integration/test_full_workflow.py
  2. 实现完整的接口调用流程测试
  3. 添加错误场景测试

验证标准:

  • 集成测试通过
  • 完整流程测试成功
  • 错误场景测试成功

提交检查点:

git add tests/integration/
git commit -m "test: add integration tests for full workflow"

Task 10: 代码审查和提交

状态: 待开始(等待 Task 1-9 预计时间: 1 小时 阻塞任务: Task 1-9

目标: 代码审查、优化和 Git 提交

审查清单:

  1. 代码质量

    • 所有代码符合 PEP 8
    • 类型提示完整
    • 无硬编码配置
    • 注释充分
  2. 安全性

    • 输入验证完整Pydantic
    • 无注入风险
  3. 测试覆盖

    • 单元测试覆盖率 > 80%
    • 集成测试通过

验证标准:

  • 所有测试通过
  • 代码覆盖率报告生成
  • 手动测试所有接口
  • README 验证完成

最终提交:

git add .
git commit -m "feat(lsfx-mock): complete lsfx mock server implementation"
git push origin feature/lsfx-mock-server

开发注意事项

环境要求

  • Python 3.11+
  • 虚拟环境venv
  • 端口 8000 可用

开发流程

  1. 每完成一个任务,立即提交代码
  2. 运行相关测试确保功能正确
  3. 更新任务状态
  4. 开始下一个任务

测试策略

  • 单元测试: 每个模块独立测试
  • 集成测试: 完整流程测试
  • 手动测试: 使用 Swagger UI 验证接口

代码规范

  • 遵循 PEP 8
  • 使用类型提示
  • 函数和类添加文档字符串
  • 保持代码简洁YAGNI, DRY

预期成果

  1. 完整的 Mock 服务器,模拟 7 个核心接口
  2. 配置文件驱动的响应数据
  3. 文件解析延迟模拟
  4. 错误场景触发机制
  5. 自动生成的 API 文档
  6. 完整的测试套件(覆盖率 > 80%
  7. 清晰的 README 和部署文档
  8. Docker 部署支持

风险和缓解

风险 影响 缓解措施
FastAPI 框架不熟悉 延期 变更预计时间到 3-4 天
异步任务调试困难 中等 添加详细日志,分步测试
响应格式与真实接口不符 严格对照接口文档,多次验证

后续优化方向

  1. 添加数据库持久化SQLite
  2. 实现更复杂的场景模拟
  3. 添加请求日志记录
  4. 创建 Web 管理界面
  5. 支持 WebSocket 实时通知

预计总开发时间: 10-12 小时 建议开发模式: 按顺序执行,每完成一个任务立即测试验证