# LSFX Mock Server 异常账户基线同步后端 Implementation Plan > **For agentic workers:** REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (`- [ ]`) syntax for tracking. > > 仓库约束:当前仓库明确禁止开启 subagent,执行时统一使用 `superpowers:executing-plans`。 **Goal:** 在 `lsfx-mock-server` 创建 `logId` 时一次性把异常账户事实同步到 `ccdi_account_info`,让同一个 `logId` 下的异常账户事实、命中流水和真实打标前提稳定闭环。 **Architecture:** 继续复用现有 `FileService -> FileRecord -> StatementService` 主链路,不新增接口,也不把写库副作用混进 `getBSByLogId`。新增一个很小的 `AbnormalAccountBaselineService` 负责按 `account_no` 幂等 upsert `ccdi_account_info`,由 `FileService` 在保存 `file_records[log_id]` 前调用;`StatementService` 仍只读取 `FileRecord` 生成异常账户流水样本。 **Tech Stack:** Python 3, FastAPI, PyMySQL, pytest, dataclasses, Markdown docs --- ## File Structure - `lsfx-mock-server/services/abnormal_account_baseline_service.py`: 新增异常账户基线写库服务,封装数据库连接、输入校验和按账号幂等 upsert。 - `lsfx-mock-server/services/file_service.py`: 注入异常账户基线服务,并在 `fetch_inner_flow(...)` / 上传建档链路中于保存 `FileRecord` 前触发基线同步。 - `lsfx-mock-server/services/statement_service.py`: 只补链路一致性断言,不新增写库逻辑。 - `lsfx-mock-server/tests/test_abnormal_account_baseline_service.py`: 新增服务层单测,覆盖空输入、异常输入、插入和更新。 - `lsfx-mock-server/tests/test_file_service.py`: 锁定 `fetch_inner_flow(...)` 调用基线同步服务及失败回滚语义。 - `lsfx-mock-server/tests/test_statement_service.py`: 锁定异常账户样本流水与 `record.abnormal_accounts` 账号一致。 - `docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md`: 记录本次后端实施结果。 - `docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md`: 记录 pytest 验证命令、结果和进程清理结论。 ## Task 1: 先锁定 `FileService` 的基线同步触发点 **Files:** - Modify: `lsfx-mock-server/tests/test_file_service.py` - Modify: `lsfx-mock-server/services/file_service.py` - Reference: `docs/design/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-design.md` - [ ] **Step 1: Write the failing test** 在 `lsfx-mock-server/tests/test_file_service.py` 中新增 fake baseline service 和两条失败测试: ```python class FakeAbnormalAccountBaselineService: def __init__(self, should_fail=False): self.should_fail = should_fail self.calls = [] def apply(self, staff_id_card, abnormal_accounts): self.calls.append( { "staff_id_card": staff_id_card, "abnormal_accounts": [dict(item) for item in abnormal_accounts], } ) if self.should_fail: raise RuntimeError("baseline sync failed") def test_fetch_inner_flow_should_sync_abnormal_account_baselines_before_caching(): baseline_service = FakeAbnormalAccountBaselineService() service = FileService( staff_identity_repository=FakeStaffIdentityRepository(), abnormal_account_baseline_service=baseline_service, ) response = service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_baseline", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = service.file_records[log_id] assert baseline_service.calls assert baseline_service.calls[0]["staff_id_card"] == record.staff_id_card assert baseline_service.calls[0]["abnormal_accounts"] == record.abnormal_accounts def test_fetch_inner_flow_should_not_cache_log_id_when_abnormal_account_baseline_sync_fails(): baseline_service = FakeAbnormalAccountBaselineService(should_fail=True) service = FileService( staff_identity_repository=FakeStaffIdentityRepository(), abnormal_account_baseline_service=baseline_service, ) with pytest.raises(RuntimeError, match="baseline sync failed"): service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_baseline_fail", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) assert service.file_records == {} ``` - [ ] **Step 2: Run test to verify it fails** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_file_service.py -k "abnormal_account_baseline" -v ``` Expected: - `FAIL` - 原因是 `FileService` 还不接受 `abnormal_account_baseline_service` 注入,也没有在建档阶段触发写库 - [ ] **Step 3: Write minimal implementation** 在 `lsfx-mock-server/services/file_service.py` 中按最小路径补齐: 1. 新增构造参数: ```python def __init__(..., abnormal_account_baseline_service=None): self.abnormal_account_baseline_service = ( abnormal_account_baseline_service or AbnormalAccountBaselineService() ) ``` 2. 新增内部封装: ```python def _apply_abnormal_account_baselines(self, file_record: FileRecord) -> None: if not file_record.abnormal_account_hit_rules: return if not file_record.abnormal_accounts: raise RuntimeError("异常账户命中计划存在,但未生成账户事实") self.abnormal_account_baseline_service.apply( staff_id_card=file_record.staff_id_card, abnormal_accounts=file_record.abnormal_accounts, ) ``` 3. 在 `fetch_inner_flow(...)` 和上传链路中,将: ```python self.file_records[log_id] = file_record ``` 调整为: ```python self._apply_abnormal_account_baselines(file_record) self.file_records[log_id] = file_record ``` - [ ] **Step 4: Run test to verify it passes** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_file_service.py -k "abnormal_account_baseline" -v ``` Expected: - `PASS` - 成功路径会先调用 baseline service - 失败路径不会把半成品 `logId` 写入 `file_records` - [ ] **Step 5: Commit** ```bash git add lsfx-mock-server/services/file_service.py lsfx-mock-server/tests/test_file_service.py git commit -m "接入异常账户基线同步触发点" ``` ## Task 2: 实现异常账户基线写库服务 **Files:** - Create: `lsfx-mock-server/services/abnormal_account_baseline_service.py` - Modify: `lsfx-mock-server/tests/test_abnormal_account_baseline_service.py` - Reference: `docs/design/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-design.md` - [ ] **Step 1: Write the failing tests** 新建 `lsfx-mock-server/tests/test_abnormal_account_baseline_service.py`,先锁定四类行为: ```python def test_apply_should_skip_when_abnormal_accounts_is_empty(): service = AbnormalAccountBaselineService() fake_connection = FakeConnection() service._connect = lambda: fake_connection service.apply("330101199001010001", []) assert fake_connection.executed_sql == [] def test_apply_should_raise_when_fact_owner_mismatches_staff(): service = AbnormalAccountBaselineService() with pytest.raises(RuntimeError, match="owner_id_card"): service.apply( "330101199001010001", [ { "account_no": "6222000000000001", "owner_id_card": "330101199001010099", "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", "rule_code": "SUDDEN_ACCOUNT_CLOSURE", } ], ) def test_apply_should_insert_new_account_fact_by_account_no(): ... def test_apply_should_update_existing_account_fact_by_account_no(): ... ``` `FakeConnection` / `FakeCursor` 只需记录 `execute(...)` 调用和提交次数,不需要真实数据库。 - [ ] **Step 2: Run test to verify it fails** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_abnormal_account_baseline_service.py -v ``` Expected: - `FAIL` - 原因是服务文件尚不存在 - [ ] **Step 3: Write minimal implementation** 在 `lsfx-mock-server/services/abnormal_account_baseline_service.py` 中实现: 1. `__init__` 直接复用现有 `settings.CCDI_DB_*` 2. `_connect()` 使用 `pymysql.connect(..., charset="utf8mb4", autocommit=False)` 3. `apply(staff_id_card, abnormal_accounts)` 内部规则: - 空列表直接返回 - 若任一 `owner_id_card` 与 `staff_id_card` 不一致,直接抛错 - 对每条 fact 执行单条 upsert - 成功后统一 `commit()`,失败则 `rollback()` 建议 upsert 语句形态: ```sql INSERT INTO ccdi_account_info ( account_no, account_type, account_name, owner_type, owner_id, bank, bank_code, currency, is_self_account, trans_risk_level, status, effective_date, invalid_date, create_by, update_by ) VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) ON DUPLICATE KEY UPDATE account_name = VALUES(account_name), owner_type = VALUES(owner_type), owner_id = VALUES(owner_id), bank = VALUES(bank), bank_code = VALUES(bank_code), currency = VALUES(currency), is_self_account = VALUES(is_self_account), trans_risk_level = VALUES(trans_risk_level), status = VALUES(status), effective_date = VALUES(effective_date), invalid_date = VALUES(invalid_date), update_by = VALUES(update_by), update_time = NOW() ``` 固定值约束: - `account_type = 'DEBIT'` - `owner_type = 'EMPLOYEE'` - `bank = '兰溪农商银行'` - `bank_code = 'LXNCSY'` - `currency = 'CNY'` - `is_self_account = 1` - `trans_risk_level = 'HIGH'` - `create_by/update_by = 'lsfx-mock-server'` - [ ] **Step 4: Run test to verify it passes** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_abnormal_account_baseline_service.py -v ``` Expected: - `PASS` - 覆盖空输入、校验失败、插入和更新四类行为 - [ ] **Step 5: Commit** ```bash git add lsfx-mock-server/services/abnormal_account_baseline_service.py lsfx-mock-server/tests/test_abnormal_account_baseline_service.py git commit -m "新增异常账户基线写库服务" ``` ## Task 3: 锁定异常账户事实与返回流水的一致性 **Files:** - Modify: `lsfx-mock-server/tests/test_statement_service.py` - Reference: `lsfx-mock-server/services/statement_service.py` - Reference: `lsfx-mock-server/services/statement_rule_samples.py` - [ ] **Step 1: Write the failing test** 在 `lsfx-mock-server/tests/test_statement_service.py` 中新增一条只校验一致性的用例: ```python def test_get_bank_statement_should_only_use_abnormal_account_numbers_from_file_record(): file_service = FileService( staff_identity_repository=FakeStaffIdentityRepository(), abnormal_account_baseline_service=FakeAbnormalAccountBaselineService(), ) statement_service = StatementService(file_service=file_service) response = file_service.fetch_inner_flow( { "groupId": 1001, "customerNo": "customer_abnormal_statement_consistency", "dataChannelCode": "test_code", "requestDateId": 20240101, "dataStartDateId": 20240101, "dataEndDateId": 20240131, "uploadUserId": 902001, } ) log_id = response["data"][0] record = file_service.file_records[log_id] record.abnormal_account_hit_rules = ["SUDDEN_ACCOUNT_CLOSURE"] record.abnormal_accounts = [ { "account_no": "6222000000000099", "owner_id_card": record.staff_id_card, "account_name": "测试员工工资卡", "status": 2, "effective_date": "2024-01-01", "invalid_date": "2026-03-20", "rule_code": "SUDDEN_ACCOUNT_CLOSURE", } ] result = statement_service.get_bank_statement( {"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 500} ) abnormal_numbers = { item["accountMaskNo"] for item in result["data"]["bankStatementList"] if "销户" in item["userMemo"] or "异常账户" in item["userMemo"] } assert abnormal_numbers == {"6222000000000099"} ``` - [ ] **Step 2: Run test to verify it fails when chain drifts** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_statement_service.py::test_get_bank_statement_should_only_use_abnormal_account_numbers_from_file_record -v ``` Expected: - 若当前实现已满足,可直接 `PASS` - 若失败,只允许修正账号回填链路,禁止引入写库逻辑到 `StatementService` - [ ] **Step 3: Keep implementation minimal** 若失败,仅允许在 `lsfx-mock-server/services/statement_service.py` 中做最小修正: - `_apply_primary_binding(...)` 继续只兜底空账号 - 不覆盖异常账户样本已有 `accountMaskNo` - 不新增数据库连接或写库逻辑 - [ ] **Step 4: Run focused statement tests** Run: ```bash cd lsfx-mock-server python3 -m pytest tests/test_statement_service.py -k "abnormal_account" -v ``` Expected: - `PASS` - 既有异常账户样本日期/金额测试与新增一致性测试同时通过 - [ ] **Step 5: Commit** ```bash git add lsfx-mock-server/services/statement_service.py lsfx-mock-server/tests/test_statement_service.py git commit -m "锁定异常账户流水与账户事实一致性" ``` ## Task 4: 完成回归验证并补实施记录 **Files:** - Create: `docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md` - Create: `docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md` - [ ] **Step 1: Run full targeted backend tests** Run: ```bash cd lsfx-mock-server python3 -m pytest \ tests/test_abnormal_account_baseline_service.py \ tests/test_file_service.py \ tests/test_statement_service.py -k "abnormal_account or abnormal_account_baseline" -v ``` Expected: - `PASS` - 无新增异常账户相关失败 - [ ] **Step 2: Write implementation record** 在 `docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md` 中记录: - 新增 `AbnormalAccountBaselineService` - `FileService` 在建 `logId` 时同步异常账户基线 - 失败回滚语义 - 异常账户事实与返回流水的一致性约束 - [ ] **Step 3: Write verification record** 在 `docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md` 中记录: - 执行过的 pytest 命令 - 关键通过点 - 本次未启动前后端长驻进程,因此无需额外杀进程 - [ ] **Step 4: Verify final diff scope** Run: ```bash git diff --name-only HEAD~3..HEAD ``` Expected: - 仅包含本次异常账户基线同步相关服务、测试和文档 - [ ] **Step 5: Commit** ```bash git add \ docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md \ docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md git commit -m "记录异常账户基线同步后端实施" ```