Files
ccdi/lsfx-mock-server/docs/plans/2026-03-12-upload-status-api-implementation.md

12 KiB
Raw Blame History

获取单个文件上传状态接口优化实施计划

For Claude: REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.

Goal: 优化 /watson/api/project/bs/upload 接口,实现基于 logId 的确定性数据生成,不依赖上传记录。

Architecture: 使用 random.seed(log_id) 确保相同 logId 生成相同数据,完全独立于文件上传记录,符合 Mock 服务器定位。

Tech Stack: FastAPI, Python random/datetime, pytest


Task 1: 修复 FileRecord 类的 log_meta 默认值

Files:

  • Modify: services/file_service.py:47

Step 1: 修改 log_meta 默认值

services/file_service.py 第 47 行,将:

log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":true}"

改为:

log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}"

Step 2: 验证修改

运行: python -c "from services.file_service import FileRecord; r = FileRecord(log_id=1, group_id=1, file_name='test.csv'); print(r.log_meta)"

预期输出:

{"lostHeader":[],"balanceAmount":"-1"}

Step 3: 提交修复

git add services/file_service.py
git commit -m "fix: 修复 FileRecord.log_meta 中 balanceAmount 值为字符串 '-1'"

Task 2: 编写测试 - 带 logId 查询返回数据

Files:

  • Modify: tests/test_api.py

Step 1: 编写测试用例

tests/test_api.py 文件末尾添加:

def test_get_upload_status_with_log_id():
    """测试带 logId 参数查询返回非空 logs"""
    response = client.get("/watson/api/project/bs/upload?groupId=1000&logId=13994")

    assert response.status_code == 200
    data = response.json()

    # 验证基本响应结构
    assert data["code"] == "200"
    assert data["status"] == "200"
    assert data["successResponse"] is True

    # 验证 logs 不为空
    assert len(data["data"]["logs"]) == 1

    # 验证返回的 logId 正确
    log = data["data"]["logs"][0]
    assert log["logId"] == 13994

    # 验证固定成功状态
    assert log["status"] == -5
    assert log["uploadStatusDesc"] == "data.wait.confirm.newaccount"

    # 验证 logMeta 格式正确
    assert log["logMeta"] == "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}"

Step 2: 运行测试验证失败

运行: pytest tests/test_api.py::test_get_upload_status_with_log_id -v

预期: FAIL因为还未实现

Step 3: 提交测试

git add tests/test_api.py
git commit -m "test: 添加带 logId 查询的测试用例"

Task 3: 编写测试 - 不带 logId 查询返回空数组

Files:

  • Modify: tests/test_api.py

Step 1: 编写测试用例

tests/test_api.py 文件末尾添加:

def test_get_upload_status_without_log_id():
    """测试不带 logId 参数查询返回空 logs 数组"""
    response = client.get("/watson/api/project/bs/upload?groupId=1000")

    assert response.status_code == 200
    data = response.json()

    # 验证基本响应结构
    assert data["code"] == "200"
    assert data["status"] == "200"
    assert data["successResponse"] is True

    # 验证 logs 为空
    assert len(data["data"]["logs"]) == 0

    # 验证其他字段存在
    assert data["data"]["status"] == ""
    assert data["data"]["accountId"] == 8954
    assert data["data"]["currency"] == "CNY"

Step 2: 运行测试验证失败

运行: pytest tests/test_api.py::test_get_upload_status_without_log_id -v

预期: FAIL因为还未实现

Step 3: 提交测试

git add tests/test_api.py
git commit -m "test: 添加不带 logId 查询的测试用例"

Task 4: 编写测试 - 确定性数据生成

Files:

  • Modify: tests/test_api.py

Step 1: 编写测试用例

tests/test_api.py 文件末尾添加:

def test_deterministic_data_generation():
    """测试相同 logId 多次查询返回相同的核心字段值"""
    # 第一次查询
    response1 = client.get("/watson/api/project/bs/upload?groupId=1000&logId=13994")
    log1 = response1.json()["data"]["logs"][0]

    # 第二次查询
    response2 = client.get("/watson/api/project/bs/upload?groupId=1000&logId=13994")
    log2 = response2.json()["data"]["logs"][0]

    # 验证关键字段相同
    assert log1["logId"] == log2["logId"]
    assert log1["bankName"] == log2["bankName"]
    assert log1["accountNoList"] == log2["accountNoList"]
    assert log1["enterpriseNameList"] == log2["enterpriseNameList"]
    assert log1["status"] == log2["status"]
    assert log1["logMeta"] == log2["logMeta"]
    assert log1["templateName"] == log2["templateName"]
    assert log1["trxDateStartId"] == log2["trxDateStartId"]
    assert log1["trxDateEndId"] == log2["trxDateEndId"]

def test_field_completeness():
    """测试返回数据包含完整的 26 个字段"""
    response = client.get("/watson/api/project/bs/upload?groupId=1000&logId=13994")
    log = response.json()["data"]["logs"][0]

    # 验证所有必需字段存在
    required_fields = [
        "accountNoList", "bankName", "dataTypeInfo", "downloadFileName",
        "enterpriseNameList", "fileSize", "fileUploadBy", "fileUploadByUserName",
        "fileUploadTime", "isSplit", "leId", "logId", "logMeta", "logType",
        "loginLeId", "lostHeader", "realBankName", "rows", "source", "status",
        "templateName", "totalRecords", "trxDateEndId", "trxDateStartId",
        "uploadFileName", "uploadStatusDesc"
    ]

    for field in required_fields:
        assert field in log, f"缺少字段: {field}"

Step 2: 运行测试验证失败

运行: pytest tests/test_api.py::test_deterministic_data_generation tests/test_api.py::test_field_completeness -v

预期: FAIL因为还未实现

Step 3: 提交测试

git add tests/test_api.py
git commit -m "test: 添加确定性和字段完整性测试用例"

Task 5: 实现 _generate_deterministic_record() 方法

Files:

  • Modify: services/file_service.py

Step 1: 在 FileService 类中添加新方法

services/file_service.pyFileService 类中,在 _delayed_parse 方法之后(约第 200 行)添加:

def _generate_deterministic_record(self, log_id: int, group_id: int) -> dict:
    """
    基于 logId 生成确定性的文件记录

    Args:
        log_id: 文件ID用作随机种子
        group_id: 项目ID

    Returns:
        文件记录字典26个字段
    """
    # 银行类型选项
    bank_options = [
        ("ALIPAY", "ALIPAY_T220708"),
        ("BSX", "BSX_T240925"),
        ("ZJRCU", "ZJRCU_T251114")
    ]

    bank_name, template_name = random.choice(bank_options)

    # 生成交易日期范围
    end_date = datetime.now()
    start_date = end_date - timedelta(days=random.randint(90, 365))

    # 生成账号和主体
    account_no = f"{random.randint(10000000000, 99999999999)}"
    enterprise_names = ["测试主体"] if random.random() > 0.3 else [""]

    return {
        "accountNoList": [account_no],
        "bankName": bank_name,
        "dataTypeInfo": ["CSV", ","],
        "downloadFileName": f"测试文件_{log_id}.csv",
        "enterpriseNameList": enterprise_names,
        "fileSize": random.randint(10000, 100000),
        "fileUploadBy": 448,
        "fileUploadByUserName": "admin@support.com",
        "fileUploadTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
        "isSplit": 0,
        "leId": 10000 + random.randint(0, 9999),
        "logId": log_id,
        "logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
        "logType": "bankstatement",
        "loginLeId": 10000 + random.randint(0, 9999),
        "lostHeader": [],
        "realBankName": bank_name,
        "rows": 0,
        "source": "http",
        "status": -5,
        "templateName": template_name,
        "totalRecords": random.randint(100, 300),
        "trxDateEndId": int(end_date.strftime("%Y%m%d")),
        "trxDateStartId": int(start_date.strftime("%Y%m%d")),
        "uploadFileName": f"测试文件_{log_id}.pdf",
        "uploadStatusDesc": "data.wait.confirm.newaccount"
    }

Step 2: 验证语法正确

运行: python -m py_compile services/file_service.py

预期: 无输出(表示语法正确)

Step 3: 提交代码

git add services/file_service.py
git commit -m "feat: 添加 _generate_deterministic_record 方法"

Task 6: 重构 get_upload_status() 方法

Files:

  • Modify: services/file_service.py:265-300

Step 1: 替换整个 get_upload_status() 方法

services/file_service.py 中,找到 get_upload_status 方法(约第 265-300 行),完全替换为:

def get_upload_status(self, group_id: int, log_id: int = None) -> dict:
    """
    获取文件上传状态(基于 logId 生成确定性数据)

    Args:
        group_id: 项目ID
        log_id: 文件ID可选

    Returns:
        上传状态响应字典
    """
    logs = []

    if log_id:
        # 使用 logId 作为随机种子,确保相同 logId 返回相同数据
        random.seed(log_id)

        # 生成确定性的文件记录
        record = self._generate_deterministic_record(log_id, group_id)
        logs.append(record)

    # 返回响应
    return {
        "code": "200",
        "data": {
            "logs": logs,
            "status": "",
            "accountId": 8954,
            "currency": "CNY"
        },
        "status": "200",
        "successResponse": True
    }

Step 2: 验证语法正确

运行: python -m py_compile services/file_service.py

预期: 无输出(表示语法正确)

Step 3: 提交重构

git add services/file_service.py
git commit -m "refactor: 重构 get_upload_status 方法实现独立数据生成"

Task 7: 运行所有测试验证功能

Files:

  • Test: tests/test_api.py

Step 1: 运行新增的测试用例

运行: pytest tests/test_api.py::test_get_upload_status_with_log_id tests/test_api.py::test_get_upload_status_without_log_id tests/test_api.py::test_deterministic_data_generation tests/test_api.py::test_field_completeness -v

预期: 所有测试 PASS

Step 2: 运行完整的测试套件

运行: pytest tests/ -v

预期: 所有测试 PASS确保没有破坏其他功能

Step 3: 手动测试接口

运行: python main.py(在后台启动服务器)

在另一个终端运行:

curl "http://localhost:8000/watson/api/project/bs/upload?groupId=1000&logId=13994"

预期: 返回包含 logId=13994 的 JSON 数据

Step 4: 提交验证记录

git add tests/
git commit -m "test: 验证所有测试通过"

Task 8: 更新文档并提交

Files:

  • Modify: CLAUDE.md(可选)

Step 1: 检查是否需要更新 CLAUDE.md

查看项目根目录的 CLAUDE.md 文件,确认是否需要添加关于接口独立性的说明。如果需要,在适当位置添加:

### 接口说明

**获取单个文件上传状态接口 (`/watson/api/project/bs/upload`)**
- 此接口完全独立工作,不依赖文件上传记录
- 基于 logId 参数生成确定性的随机数据
- 相同 logId 每次查询返回相同的核心字段值

Step 2: 提交文档更新(如果有)

git add CLAUDE.md
git commit -m "docs: 更新接口独立性说明"

Step 3: 最终提交

确保所有修改已提交:

git status

预期: 工作目录干净


验收清单

实施完成后,确认以下验收标准:

功能验收

  • 带 logId 参数查询返回非空 logs 数组
  • 不带 logId 参数查询返回空 logs 数组
  • 相同 logId 多次查询返回相同的核心字段值
  • 返回数据包含完整的 26 个字段
  • status 字段值为 -5
  • logMeta 字段中 balanceAmount 为字符串 "-1"

质量验收

  • 所有单元测试通过
  • 代码符合项目编码规范
  • 无语法错误和运行时错误
  • API 文档Swagger UI正确展示接口

文档验收

  • 代码注释完整清晰
  • 测试用例覆盖所有场景

实施说明

  1. TDD 流程: 严格遵循"先写测试 → 运行失败 → 写代码 → 运行通过 → 提交"的流程
  2. 频繁提交: 每个小的步骤都有独立的提交,便于回滚和追踪
  3. 独立性: 此修改不影响其他接口(上传、解析状态检查等)
  4. 确定性: 使用 random.seed(log_id) 确保相同 logId 生成相同数据
  5. 简单高效: 代码简洁,无过度设计,符合 YAGNI 原则

参考资料

  • 设计文档: docs/plans/2026-03-12-upload-status-api-design.md
  • 接口文档: assets/兰溪-流水分析对接3.md 第374-516行
  • 当前实现: services/file_service.py 第265-300行