Files
ccdi/docs/plans/2026-03-02-lsfx-mock-server-design.md
wkc b022ec75b8 fix(lsfx): 修复流水分析对接模块的代码质量问题
1. 修复配置问题
   - 替换app-secret占位符为正确的密钥dXj6eHRmPv

2. 添加异常处理
   - HttpUtil所有方法添加完整的异常处理
   - 统一使用LsfxApiException包装异常
   - 检查HTTP状态码和响应体

3. 添加日志记录
   - Client所有方法添加详细的日志记录
   - 记录请求参数、响应结果、耗时
   - 异常情况记录错误日志

4. 完善参数校验
   - 接口1:添加6个必填字段校验
   - 接口2:添加groupId和文件校验,限制文件大小10MB
   - 接口3:添加7个参数校验和日期范围校验
   - 接口4:添加groupId和inprogressList校验

5. 性能优化
   - RestTemplate使用Apache HttpClient连接池
   - 最大连接数100,每个路由最大20个连接
   - 支持连接复用,提升性能

6. 代码审查文档
   - 添加详细的代码审查报告
   - 记录发现的问题和改进建议

修改的文件:
- ccdi-lsfx/pom.xml
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/client/LsfxAnalysisClient.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/config/RestTemplateConfig.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/controller/LsfxTestController.java
- ccdi-lsfx/src/main/java/com/ruoyi/lsfx/util/HttpUtil.java
- ruoyi-admin/src/main/resources/application-dev.yml
- doc/implementation/lsfx-code-review-20260302.md
2026-03-03 09:35:27 +08:00

15 KiB
Raw Blame History

流水分析 Mock 服务器设计方案

创建日期: 2026-03-02 作者: Claude Code

项目概述

背景

当前项目需要与流水分析平台进行接口对接,但在开发和测试过程中,依赖外部真实服务存在以下问题:

  • 网络连接不稳定,影响测试效率
  • 无法控制返回数据,难以测试各种场景
  • 无法模拟错误场景和边界情况
  • 团队成员无法共享测试环境

解决方案

开发一个独立的 Mock 服务器,基于 Python + FastAPI 技术栈,模拟流水分析平台的 7 个核心接口,支持:

  • 配置文件驱动的响应数据
  • 文件上传解析延迟模拟
  • 错误场景触发机制
  • 自动生成的 API 文档

技术选型

技术组件 选择 理由
Web框架 FastAPI 现代异步框架自动生成API文档强类型支持
数据验证 Pydantic 与FastAPI原生集成类型提示清晰
配置管理 JSON文件 易于修改,非开发人员也能调整测试数据
状态存储 内存字典 轻量级重启清空适合Mock场景

整体架构

lsfx-mock-server/
├── main.py                 # 应用入口
├── config/
│   ├── settings.py         # 全局配置
│   └── responses/          # 响应模板配置文件
│       ├── token.json
│       ├── upload.json
│       ├── parse_status.json
│       └── bank_statement.json
├── models/
│   ├── request.py          # 请求模型Pydantic
│   └── response.py         # 响应模型Pydantic
├── services/
│   ├── token_service.py    # Token管理
│   ├── file_service.py     # 文件上传和解析模拟
│   └── statement_service.py # 流水数据管理
├── routers/
│   └── api.py              # 所有API路由
├── utils/
│   ├── response_builder.py # 响应构建器
│   └── error_simulator.py  # 错误场景模拟
└── requirements.txt

核心设计思想

  1. 配置驱动 - 所有响应数据在JSON配置文件中方便修改
  2. 内存状态管理 - 使用全局字典存储运行时状态tokens、文件记录等
  3. 异步任务 - 使用FastAPI后台任务模拟文件解析延迟
  4. 错误标记识别 - 检测请求参数中的特殊标记触发错误响应

数据模型设计

请求模型

对应Java项目中的DTO类

# models/request.py
from pydantic import BaseModel
from typing import Optional

class GetTokenRequest(BaseModel):
    projectNo: str
    entityName: str
    userId: str
    userName: str
    orgCode: str
    entityId: Optional[str] = None
    xdRelatedPersons: Optional[str] = None
    jzDataDateId: Optional[str] = "0"
    innerBSStartDateId: Optional[str] = "0"
    innerBSEndDateId: Optional[str] = "0"
    analysisType: Optional[int] = -1
    departmentCode: Optional[str] = None

class UploadFileRequest(BaseModel):
    groupId: int

class FetchInnerFlowRequest(BaseModel):
    groupId: int
    customerNo: str
    dataChannelCode: str
    requestDateId: int
    dataStartDateId: int
    dataEndDateId: int
    uploadUserId: int

class CheckParseStatusRequest(BaseModel):
    groupId: int
    inprogressList: str

class GetBankStatementRequest(BaseModel):
    groupId: int
    logId: int
    pageNow: int
    pageSize: int

响应模型

对应Java项目中的VO类

# models/response.py
from pydantic import BaseModel
from typing import Optional, List, Dict, Any

class TokenData(BaseModel):
    token: str
    projectId: int
    projectNo: str
    entityName: str
    analysisType: int

class GetTokenResponse(BaseModel):
    code: str = "200"
    data: Optional[TokenData] = None
    message: str = "create.token.success"
    status: str = "200"
    successResponse: bool = True

# 其他响应模型类似...

核心业务逻辑

文件解析延迟模拟

实现机制:

  1. 上传接口立即返回,状态为"解析中"
  2. 使用FastAPI的BackgroundTasks在后台延迟执行
  3. 延迟3-5秒后更新状态为"解析完成"
  4. 轮询检查接口返回当前解析状态
# services/file_service.py
class FileService:
    def __init__(self):
        self.file_records: Dict[int, Dict] = {}
        self.parsing_status: Dict[int, bool] = {}

    async def upload_file(self, group_id: int, file, background_tasks: BackgroundTasks):
        log_id = generate_log_id()

        # 立即存储记录,标记为解析中
        self.file_records[log_id] = {
            "logId": log_id,
            "status": -5,
            "uploadStatusDesc": "parsing",
            ...
        }
        self.parsing_status[log_id] = True

        # 启动后台任务延迟4秒后完成解析
        background_tasks.add_task(
            self._simulate_parsing,
            log_id,
            delay_seconds=4
        )

        return log_id

    def _simulate_parsing(self, log_id: int, delay_seconds: int):
        time.sleep(delay_seconds)
        if log_id in self.file_records:
            self.file_records[log_id]["status"] = -5
            self.file_records[log_id]["uploadStatusDesc"] = "data.wait.confirm.newaccount"
            self.parsing_status[log_id] = False

错误场景模拟机制

错误触发规则

通过请求参数中的特殊标记触发对应的错误响应:

错误码映射表:

ERROR_CODES = {
    "40101": {"code": "40101", "message": "appId错误"},
    "40102": {"code": "40102", "message": "appSecretCode错误"},
    "40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
    "40105": {"code": "40105", "message": "只读模式下无法新建项目"},
    "40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
    "40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
    "40108": {"code": "40108", "message": "当前用户所属行社无权限"},
    "501014": {"code": "501014", "message": "无行内流水文件"},
}

检测逻辑:

@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
    """检测字符串中的错误标记

    规则:如果字符串包含 error_XXXX则返回 XXXX
    例如:
    - "project_error_40101" -> "40101"
    - "test_error_501014" -> "501014"
    """
    if not value:
        return None

    import re
    pattern = r'error_(\d+)'
    match = re.search(pattern, value)
    if match:
        return match.group(1)
    return None

使用示例:

# 在服务中使用
def get_token(request: GetTokenRequest):
    error_code = ErrorSimulator.detect_error_marker(request.projectNo)
    if error_code:
        return ErrorSimulator.build_error_response(error_code)

    # 正常流程...

测试方式:

# 触发 40101 错误
request_data = {
    "projectNo": "test_project_error_40101",  # 包含错误标记
    "entityName": "测试企业",
    ...
}

配置文件结构

响应模板配置

// config/responses/token.json
{
  "success_response": {
    "code": "200",
    "data": {
      "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
      "projectId": "{project_id}",
      "projectNo": "{project_no}",
      "entityName": "{entity_name}",
      "analysisType": 0
    },
    "message": "create.token.success",
    "status": "200",
    "successResponse": true
  }
}
// config/responses/upload.json
{
  "success_response": {
    "code": "200",
    "data": {
      "accountsOfLog": {},
      "uploadLogList": [
        {
          "logId": "{log_id}",
          "status": -5,
          "uploadStatusDesc": "data.wait.confirm.newaccount",
          ...
        }
      ]
    }
  }
}

全局配置

# config/settings.py
from pydantic import BaseSettings

class Settings(BaseSettings):
    APP_NAME: str = "流水分析Mock服务"
    APP_VERSION: str = "1.0.0"
    DEBUG: bool = True
    HOST: str = "0.0.0.0"
    PORT: int = 8000

    # 模拟配置
    PARSE_DELAY_SECONDS: int = 4
    MAX_FILE_SIZE: int = 10485760  # 10MB

    class Config:
        env_file = ".env"

settings = Settings()

API 路由实现

核心接口

# routers/api.py
from fastapi import APIRouter, BackgroundTasks, UploadFile, File

router = APIRouter()

# 接口1获取Token
@router.post("/account/common/getToken")
async def get_token(request: GetTokenRequest):
    error_code = ErrorSimulator.detect_error_marker(request.projectNo)
    if error_code:
        return ErrorSimulator.build_error_response(error_code)
    return token_service.create_token(request)

# 接口2上传文件
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
    background_tasks: BackgroundTasks,
    groupId: int = Form(...),
    file: UploadFile = File(...)
):
    return file_service.upload_file(groupId, file, background_tasks)

# 接口3拉取行内流水
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(request: FetchInnerFlowRequest):
    error_code = ErrorSimulator.detect_error_marker(request.customerNo)
    if error_code:
        return ErrorSimulator.build_error_response(error_code)
    return file_service.fetch_inner_flow(request)

# 接口4检查解析状态
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(request: CheckParseStatusRequest):
    return file_service.check_parse_status(request.groupId, request.inprogressList)

# 接口5删除文件
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(request: dict):
    return file_service.delete_files(
        request.get("groupId"),
        request.get("logIds"),
        request.get("userId")
    )

# 接口6获取银行流水
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(request: GetBankStatementRequest):
    return statement_service.get_bank_statement(request)

主应用

# main.py
from fastapi import FastAPI
from routers import api

app = FastAPI(
    title="流水分析Mock服务",
    description="模拟流水分析平台的7个核心接口",
    version="1.0.0",
    docs_url="/docs",
    redoc_url="/redoc"
)

app.include_router(api.router, tags=["流水分析接口"])

if __name__ == "__main__":
    import uvicorn
    uvicorn.run(app, host="0.0.0.0", port=8000)

测试和使用说明

启动服务

# 安装依赖
pip install -r requirements.txt

# 启动服务
python main.py

# 或使用uvicorn启动支持热重载
uvicorn main:app --reload --host 0.0.0.0 --port 8000

访问API文档

测试示例

1. 正常流程测试

import requests

# 获取Token
response = requests.post(
    "http://localhost:8000/account/common/getToken",
    json={
        "projectNo": "test_project_001",
        "entityName": "测试企业",
        "userId": "902001",
        "userName": "902001",
        "orgCode": "902000"
    }
)
result = response.json()
token = result["data"]["token"]
project_id = result["data"]["projectId"]

# 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
    "http://localhost:8000/watson/api/project/remoteUploadSplitFile",
    files=files,
    data={"groupId": project_id},
    headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]

# 轮询检查解析状态
import time
for i in range(10):
    response = requests.post(
        "http://localhost:8000/watson/api/project/upload/getpendings",
        json={"groupId": project_id, "inprogressList": str(log_id)},
        headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
    )
    result = response.json()
    if not result["data"]["parsing"]:
        print("解析完成")
        break
    time.sleep(1)

# 获取银行流水
response = requests.post(
    "http://localhost:8000/watson/api/project/getBSByLogId",
    json={
        "groupId": project_id,
        "logId": log_id,
        "pageNow": 1,
        "pageSize": 10
    },
    headers={"X-Xencio-Client-Id": "26e5b9239853436b85c623f4b7a6d0e6"}
)

2. 错误场景测试

# 触发 40101 错误appId错误
response = requests.post(
    "http://localhost:8000/account/common/getToken",
    json={
        "projectNo": "test_project_error_40101",  # 包含错误标记
        "entityName": "测试企业",
        "userId": "902001",
        "userName": "902001",
        "orgCode": "902000"
    }
)
# 返回: {"code": "40101", "message": "appId错误", ...}

# 触发 501014 错误(无行内流水文件)
response = requests.post(
    "http://localhost:8000/watson/api/project/getJZFileOrZjrcuFile",
    json={
        "groupId": 1,
        "customerNo": "test_error_501014",  # 包含错误标记
        "dataChannelCode": "ZJRCU",
        "requestDateId": 20260302,
        "dataStartDateId": 20260201,
        "dataEndDateId": 20260228,
        "uploadUserId": 902001
    }
)
# 返回: {"code": "501014", "message": "无行内流水文件", ...}

配置修改

  • 修改 config/responses/ 下的JSON文件可以自定义响应数据
  • 修改 config/settings.py 可以调整延迟时间、端口等配置
  • 支持 .env 文件覆盖配置

依赖清单

# requirements.txt
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-multipart==0.0.6

使用场景

A. 开发阶段测试

在业务代码开发过程中,修改配置文件 application-dev.yml,将 lsfx.api.base-url 改为 http://localhost:8000启动Mock服务器后业务代码即可连接Mock服务进行测试。

B. 完全替换测试

直接使用 Mock 服务器进行接口测试,验证业务逻辑的正确性。生产环境切换到真实服务。

C. CI/CD 集成

在持续集成流程中使用 Mock 服务器,自动化测试接口调用逻辑。


扩展性考虑

后续可能的增强功能

  1. 数据持久化 - 如需保留历史记录可集成SQLite
  2. 更复杂的场景模拟 - 支持配置文件定义多个场景
  3. 请求日志记录 - 记录所有请求用于调试
  4. Web管理界面 - 可视化管理Mock数据和状态
  5. Docker部署 - 提供Dockerfile方便部署

当前设计已满足核心需求,保持简洁实用。


总结

这是一个配置驱动、轻量级、易于使用的 Mock 服务器设计,核心特点:

完整性 - 覆盖所有7个核心接口 真实性 - 模拟文件解析延迟等真实场景 灵活性 - 配置文件驱动,错误场景可触发 易用性 - 自动API文档零配置启动 可维护 - 代码结构清晰与Java项目对应

满足您的Mock测试需求提升开发和测试效率。