Files
ccdi/lsfx-mock-server/docs/plans/2026-03-04-interface-alignment-implementation.md

21 KiB
Raw Blame History

接口完整对齐实施计划

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 根据 兰溪-流水分析对接3.md 文档,完整对齐 Mock 服务器的所有7个接口实现

Architecture: 保持无数据库架构通过扩展内存数据模型支持完整字段新增1个接口完善6个现有接口的响应字段

Tech Stack: FastAPI, Python 3.8+, Pydantic


Task 1: 扩展 FileRecord 数据模型

Files:

  • Modify: services/file_service.py

Step 1: 读取现有 file_service.py 文件

先查看当前的 FileRecord 实现。

Step 2: 扩展 FileRecord 类添加所有新字段

FileRecord 类中添加以下字段:

from dataclasses import dataclass, field
from typing import List
import uuid
from datetime import datetime

@dataclass
class FileRecord:
    """文件记录模型(扩展版)"""
    # 原有字段
    log_id: int
    group_id: int
    file_name: str
    status: int = -5  # -5 表示解析成功
    upload_status_desc: str = "data.wait.confirm.newaccount"
    parsing: bool = True  # True表示正在解析

    # 新增字段 - 账号和主体信息
    account_no_list: List[str] = field(default_factory=list)
    enterprise_name_list: List[str] = field(default_factory=list)

    # 新增字段 - 银行和模板信息
    bank_name: str = "ZJRCU"
    real_bank_name: str = "ZJRCU"
    template_name: str = "ZJRCU_T251114"
    data_type_info: List[str] = field(default_factory=lambda: ["CSV", ","])

    # 新增字段 - 文件元数据
    file_size: int = 50000
    download_file_name: str = ""
    file_package_id: str = field(default_factory=lambda: str(uuid.uuid4()).replace('-', ''))

    # 新增字段 - 上传用户信息
    file_upload_by: int = 448
    file_upload_by_user_name: str = "admin@support.com"
    file_upload_time: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

    # 新增字段 - 法律实体信息
    le_id: int = 10000
    login_le_id: int = 10000
    log_type: str = "bankstatement"
    log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":true}"
    lost_header: List[str] = field(default_factory=list)

    # 新增字段 - 记录统计
    rows: int = 0
    source: str = "http"
    total_records: int = 150
    is_split: int = 0

    # 新增字段 - 交易日期范围
    trx_date_start_id: int = 20240101
    trx_date_end_id: int = 20241231

Step 3: 验证服务能正常启动

python main.py

预期:服务启动成功,无报错信息。


Task 2: 更新 upload_file 方法初始化所有字段

Files:

  • Modify: services/file_service.py

Step 1: 读取 upload_file 方法

查看当前的 upload_file 方法实现。

Step 2: 根据文件名推断银行名称

upload_file 方法中添加银行名称推断逻辑:

def _infer_bank_name(self, filename: str) -> tuple:
    """根据文件名推断银行名称和模板名称"""
    if "支付宝" in filename or "alipay" in filename.lower():
        return "ALIPAY", "ALIPAY_T220708"
    elif "绍兴银行" in filename or "BSX" in filename:
        return "BSX", "BSX_T240925"
    else:
        return "ZJRCU", "ZJRCU_T251114"

async def upload_file(self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks) -> dict:
    """上传文件并初始化所有字段"""
    # 生成新的 log_id
    self.current_log_id += 1
    log_id = self.current_log_id

    # 推断银行信息
    bank_name, template_name = self._infer_bank_name(file.filename)

    # 生成合理的交易日期范围
    import random
    from datetime import datetime, timedelta

    end_date = datetime.now()
    start_date = end_date - timedelta(days=random.randint(90, 365))
    trx_date_start_id = int(start_date.strftime("%Y%m%d"))
    trx_date_end_id = int(end_date.strftime("%Y%m%d"))

    # 生成随机账号和主体
    account_no = f"{random.randint(10000000000, 99999999999)}"
    enterprise_names = ["测试主体"] if random.random() > 0.3 else [""]

    # 创建完整的文件记录
    file_record = FileRecord(
        log_id=log_id,
        group_id=group_id,
        file_name=file.filename,
        download_file_name=file.filename,
        bank_name=bank_name,
        real_bank_name=bank_name,
        template_name=template_name,
        account_no_list=[account_no],
        enterprise_name_list=enterprise_names,
        le_id=10000 + random.randint(0, 9999),
        login_le_id=10000 + random.randint(0, 9999),
        file_size=random.randint(10000, 100000),
        total_records=random.randint(100, 300),
        trx_date_start_id=trx_date_start_id,
        trx_date_end_id=trx_date_end_id,
        parsing=True,
        status=-5
    )

    # 存储记录
    self.file_records[log_id] = file_record

    # 添加后台任务(延迟解析)
    background_tasks.add_task(self._delayed_parse, log_id)

    # 构建响应
    return self._build_upload_response(file_record)

Step 3: 实现 _build_upload_response 方法

def _build_upload_response(self, file_record: FileRecord) -> dict:
    """构建上传接口的完整响应"""
    return {
        "code": "200",
        "data": {
            "accountsOfLog": {
                str(file_record.log_id): [
                    {
                        "bank": file_record.bank_name,
                        "accountName": file_record.enterprise_name_list[0] if file_record.enterprise_name_list else "",
                        "accountNo": file_record.account_no_list[0] if file_record.account_no_list else "",
                        "currency": "CNY"
                    }
                ]
            },
            "uploadLogList": [
                {
                    "accountNoList": file_record.account_no_list,
                    "bankName": file_record.bank_name,
                    "dataTypeInfo": file_record.data_type_info,
                    "downloadFileName": file_record.download_file_name,
                    "enterpriseNameList": file_record.enterprise_name_list,
                    "filePackageId": file_record.file_package_id,
                    "fileSize": file_record.file_size,
                    "fileUploadBy": file_record.file_upload_by,
                    "fileUploadByUserName": file_record.file_upload_by_user_name,
                    "fileUploadTime": file_record.file_upload_time,
                    "leId": file_record.le_id,
                    "logId": file_record.log_id,
                    "logMeta": file_record.log_meta,
                    "logType": file_record.log_type,
                    "loginLeId": file_record.login_le_id,
                    "lostHeader": file_record.lost_header,
                    "realBankName": file_record.real_bank_name,
                    "rows": file_record.rows,
                    "source": file_record.source,
                    "status": file_record.status,
                    "templateName": file_record.template_name,
                    "totalRecords": file_record.total_records,
                    "trxDateEndId": file_record.trx_date_end_id,
                    "trxDateStartId": file_record.trx_date_start_id,
                    "uploadFileName": file_record.file_name,
                    "uploadStatusDesc": file_record.upload_status_desc
                }
            ],
            "uploadStatus": 1
        },
        "status": "200",
        "successResponse": True
    }

Step 4: 验证上传接口返回完整字段

重启服务并调用上传接口,检查响应是否包含所有字段。


Task 3: 添加 get_upload_status 方法

Files:

  • Modify: services/file_service.py

Step 1: 实现 get_upload_status 方法

FileService 类中添加新方法:

def get_upload_status(self, group_id: int, log_id: int = None) -> dict:
    """获取文件上传状态接口5"""
    logs = []

    if log_id:
        # 返回特定文件的状态
        if log_id in self.file_records:
            record = self.file_records[log_id]
            if record.group_id == group_id:
                logs.append(self._build_log_detail(record))
    else:
        # 返回该项目的所有文件状态
        for record in self.file_records.values():
            if record.group_id == group_id:
                logs.append(self._build_log_detail(record))

    # 构建响应
    return {
        "code": "200",
        "data": {
            "logs": logs,
            "status": "",
            "accountId": 8954,
            "currency": "CNY"
        },
        "status": "200",
        "successResponse": True
    }

def _build_log_detail(self, record: FileRecord) -> dict:
    """构建日志详情对象"""
    return {
        "accountNoList": record.account_no_list,
        "bankName": record.bank_name,
        "dataTypeInfo": record.data_type_info,
        "downloadFileName": record.download_file_name,
        "enterpriseNameList": record.enterprise_name_list,
        "fileSize": record.file_size,
        "fileUploadBy": record.file_upload_by,
        "fileUploadByUserName": record.file_upload_by_user_name,
        "fileUploadTime": record.file_upload_time,
        "isSplit": record.is_split,
        "leId": record.le_id,
        "logId": record.log_id,
        "logMeta": record.log_meta,
        "logType": record.log_type,
        "loginLeId": record.login_le_id,
        "lostHeader": record.lost_header,
        "realBankName": record.real_bank_name,
        "rows": record.rows,
        "source": record.source,
        "status": record.status,
        "templateName": record.template_name,
        "totalRecords": record.total_records,
        "trxDateEndId": record.trx_date_end_id,
        "trxDateStartId": record.trx_date_start_id,
        "uploadFileName": record.file_name,
        "uploadStatusDesc": record.upload_status_desc
    }

Step 2: 验证方法能正确查询文件记录

在代码中确保 file_records 字典正确初始化和管理。


Task 4: 在 API 路由中添加新接口

Files:

  • Modify: routers/api.py

Step 1: 读取现有 api.py 文件

查看当前的路由定义。

Step 2: 添加 GET 接口路由

在接口5的位置check_parse_status 和 delete_files 之间)添加:

# ==================== 接口5获取文件上传状态 ====================
@router.get("/watson/api/project/bs/upload")
async def get_upload_status(
    groupId: int = Form(..., description="项目id"),
    logId: Optional[int] = Form(None, description="文件id"),
):
    """获取单个文件上传后的状态

    如果不提供 logId返回该项目的所有文件状态
    """
    return file_service.get_upload_status(groupId, logId)

Step 3: 确认导入了 Optional

在文件顶部确认:

from typing import List, Optional

Step 4: 验证新接口出现在 Swagger 文档中

重启服务,访问 http://localhost:8000/docs确认能看到新的 GET 接口。


Task 5: 更新 check_parse_status 响应字段

Files:

  • Modify: services/file_service.py

Step 1: 修改 check_parse_status 方法

确保返回的 pendingList 包含所有字段:

def check_parse_status(self, group_id: int, inprogress_list: str) -> dict:
    """检查文件解析状态"""
    log_ids = [int(id.strip()) for id in inprogress_list.split(",")]

    pending_list = []
    all_parsing_complete = True

    for log_id in log_ids:
        if log_id in self.file_records:
            record = self.file_records[log_id]
            if record.parsing:
                all_parsing_complete = False

            pending_list.append(self._build_log_detail(record))

    return {
        "code": "200",
        "data": {
            "parsing": not all_parsing_complete,
            "pendingList": pending_list
        },
        "status": "200",
        "successResponse": True
    }

Step 2: 验证解析状态接口返回完整字段

调用接口4检查响应中的 pendingList 是否包含所有字段。


Task 6: 更新 delete_files 方法响应格式

Files:

  • Modify: services/file_service.py

Step 1: 修改 delete_files 方法

确保响应的 code 字段为 "200 OK"

def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> dict:
    """批量删除文件"""
    deleted_count = 0
    for log_id in log_ids:
        if log_id in self.file_records:
            del self.file_records[log_id]
            deleted_count += 1

    return {
        "code": "200 OK",  # 注意:这里是 "200 OK" 不是 "200"
        "data": {
            "message": "delete.files.success"
        },
        "message": "delete.files.success",
        "status": "200",
        "successResponse": True
    }

Step 2: 验证删除接口响应格式正确

调用删除接口,检查响应的 code 字段是否为 "200 OK"。


Task 7: 更新 token.json 响应模板

Files:

  • Modify: config/responses/token.json

Step 1: 确认 analysisType 为 Integer

确保 token.json 中的 analysisType 字段类型正确:

{
  "success_response": {
    "code": "200",
    "data": {
      "token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
      "projectId": "{project_id}",
      "projectNo": "{project_no}",
      "entityName": "{entity_name}",
      "analysisType": 0
    },
    "message": "create.token.success",
    "status": "200",
    "successResponse": true
  }
}

确认 analysisType 的值是数字 0不是字符串 "0"。

Step 2: 验证接口1响应正确

调用 getToken 接口,检查 analysisType 的类型。


Task 8: 创建 upload_status.json 响应模板

Files:

  • Create: config/responses/upload_status.json

Step 1: 创建新的响应模板文件

创建文件并添加内容(虽然实际响应在代码中构建,但保留模板作为参考):

{
  "success_response": {
    "code": "200",
    "data": {
      "logs": [
        {
          "accountNoList": ["18785967364"],
          "bankName": "ALIPAY",
          "dataTypeInfo": ["CSV", ","],
          "downloadFileName": "支付宝.csv",
          "enterpriseNameList": ["曾孝成"],
          "fileSize": 16322,
          "fileUploadBy": 448,
          "fileUploadByUserName": "admin@support.com",
          "fileUploadTime": "2025-03-13 08:45:32",
          "isSplit": 0,
          "leId": 10741,
          "logId": 13994,
          "logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
          "logType": "bankstatement",
          "loginLeId": 10741,
          "lostHeader": [],
          "realBankName": "ALIPAY",
          "rows": 0,
          "source": "http",
          "status": -5,
          "templateName": "ALIPAY_T220708",
          "totalRecords": 127,
          "trxDateEndId": 20231231,
          "trxDateStartId": 20230102,
          "uploadFileName": "支付宝.pdf",
          "uploadStatusDesc": "data.wait.confirm.newaccount"
        }
      ],
      "status": "",
      "accountId": 8954,
      "currency": "CNY"
    },
    "status": "200",
    "successResponse": true
  }
}

Task 9: 更新 bank_statement.json 响应模板

Files:

  • Modify: config/responses/bank_statement.json

Step 1: 补充流水记录的所有字段

确保 bank_statement.json 包含所有50+个字段:

{
  "success_response": {
    "code": "200",
    "data": {
      "bankStatementList": [
        {
          "accountId": 0,
          "accountMaskNo": "101015251071645",
          "accountingDate": "2024-02-01",
          "accountingDateId": 20240201,
          "archivingFlag": 0,
          "attachments": 0,
          "balanceAmount": 4814.82,
          "bank": "ZJRCU",
          "bankComments": "",
          "bankStatementId": 12847662,
          "bankTrxNumber": "1a10458dd5c3366d7272285812d434fc",
          "batchId": 19135,
          "cashType": "1",
          "commentsNum": 0,
          "crAmount": 0,
          "cretNo": "230902199012261247",
          "currency": "CNY",
          "customerAccountMaskNo": "597671502",
          "customerBank": "",
          "customerId": -1,
          "customerName": "小店",
          "customerReference": "",
          "downPaymentFlag": 0,
          "drAmount": 245.8,
          "exceptionType": "",
          "groupId": 16238,
          "internalFlag": 0,
          "leId": 16308,
          "leName": "张传伟",
          "overrideBsId": 0,
          "paymentMethod": "",
          "sourceCatalogId": 0,
          "split": 0,
          "subBankstatementId": 0,
          "toDoFlag": 0,
          "transAmount": 245.8,
          "transFlag": "P",
          "transTypeId": 0,
          "transformAmount": 0,
          "transformCrAmount": 0,
          "transformDrAmount": 0,
          "transfromBalanceAmount": 0,
          "trxBalance": 0,
          "trxDate": "2024-02-01 10:33:44",
          "userMemo": "财付通消费_小店"
        }
      ],
      "totalCount": 131
    },
    "status": "200",
    "successResponse": true
  }
}

Step 2: 验证流水查询接口返回所有字段

调用 getBSByLogId 接口,检查响应是否包含所有字段。


Task 10: 添加 40100 错误码

Files:

  • Modify: utils/error_simulator.py

Step 1: 在 ERROR_CODES 字典中添加新错误码

ERROR_CODES = {
    "40100": {"code": "40100", "message": "未知异常"},
    "40101": {"code": "40101", "message": "appId错误"},
    "40102": {"code": "40102", "message": "appSecretCode错误"},
    "40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
    "40105": {"code": "40105", "message": "只读模式下无法新建项目"},
    "40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
    "40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
    "40108": {"code": "40108", "message": "当前用户所属行社无权限"},
    "501014": {"code": "501014", "message": "无行内流水文件"},
}

Step 2: 验证错误码能正确触发

调用任意接口,在参数中包含 error_40100,检查是否返回对应错误。


Task 11: 更新 CLAUDE.md 文档

Files:

  • Modify: CLAUDE.md

Step 1: 更新接口列表说明

在 "API 接口说明" 部分更新为:

## API 接口说明

7个核心接口

1. `/account/common/getToken` (POST) - 创建项目并获取 Token
2. `/watson/api/project/remoteUploadSplitFile` (POST) - 上传流水文件multipart/form-data
3. `/watson/api/project/getJZFileOrZjrcuFile` (POST) - 拉取行内流水
4. `/watson/api/project/upload/getpendings` (POST) - 检查文件解析状态
5. `/watson/api/project/bs/upload` (GET) - 获取单个文件上传后的状态
6. `/watson/api/project/batchDeleteUploadFile` (POST) - 批量删除文件
7. `/watson/api/project/getBSByLogId` (POST) - 获取银行流水(分页)

详细接口文档请访问 Swagger UI (`/docs`) 或查看 `assets/兰溪-流水分析对接3.md`

Step 2: 更新注意事项

添加关于响应字段完整性的说明:

## 注意事项

- **数据持久化**: 所有数据存储在内存中,服务重启后数据丢失
- **响应字段完整性**: 所有接口响应字段完全对齐接口文档示例
- **并发安全**: 当前实现未考虑多线程安全,生产环境需要加锁
- **文件存储**: 上传的文件不实际保存,仅模拟元数据
- **错误标记**: 错误触发通过字符串匹配实现,确保测试数据唯一性
- **后台任务**: FastAPI BackgroundTasks 在同一进程内执行,不会阻塞响应
- **请求头处理**: X-Xencio-Client-Id 请求头不验证,接受任意值

Task 12: 最终验证

Files:

  • All modified files

Step 1: 启动服务

python main.py

预期:服务正常启动,无报错。

Step 2: 访问 Swagger 文档

访问 http://localhost:8000/docs

预期能看到所有7个接口包括新增的 GET 接口。

Step 3: 测试所有7个接口

使用 Swagger UI 或 curl 测试每个接口,确保:

  1. 接口1返回包含 analysisType (Integer) 的响应
  2. 接口2返回包含 accountsOfLog 和完整 uploadLogList 的响应
  3. 接口3返回 logId 数组
  4. 接口4返回包含完整字段的 pendingList
  5. 接口5返回包含完整字段的 logs 数组
  6. 接口6返回 code 为 "200 OK" 的响应
  7. 接口7返回包含所有50+字段的 bankStatementList

Step 4: 测试错误码

调用接口1使用参数 projectNo: "test_error_40100"

预期:返回 40100 错误。


Success Criteria

  • FileRecord 包含所有必需字段
  • upload_file 方法正确初始化所有字段
  • get_upload_status 方法正确实现
  • 新接口出现在 /docs 中
  • 所有响应字段完全对齐文档示例
  • 40100 错误码能正确触发
  • 服务启动无报错
  • 所有7个接口都能正常调用

Notes

  • 所有代码修改都保持向后兼容
  • 无需数据库迁移(使用内存存储)
  • 错误处理机制保持不变
  • 请求头 X-Xencio-Client-Id 不验证