16 KiB
LSFX Mock Server 异常账户基线同步后端 Implementation Plan
For agentic workers: REQUIRED: Use superpowers:subagent-driven-development (if subagents available) or superpowers:executing-plans to implement this plan. Steps use checkbox (
- [ ]) syntax for tracking.仓库约束:当前仓库明确禁止开启 subagent,执行时统一使用
superpowers:executing-plans。
Goal: 在 lsfx-mock-server 创建 logId 时一次性把异常账户事实同步到 ccdi_account_info,让同一个 logId 下的异常账户事实、命中流水和真实打标前提稳定闭环。
Architecture: 继续复用现有 FileService -> FileRecord -> StatementService 主链路,不新增接口,也不把写库副作用混进 getBSByLogId。新增一个很小的 AbnormalAccountBaselineService 负责按 account_no 幂等 upsert ccdi_account_info,由 FileService 在保存 file_records[log_id] 前调用;StatementService 仍只读取 FileRecord 生成异常账户流水样本。
Tech Stack: Python 3, FastAPI, PyMySQL, pytest, dataclasses, Markdown docs
File Structure
lsfx-mock-server/services/abnormal_account_baseline_service.py: 新增异常账户基线写库服务,封装数据库连接、输入校验和按账号幂等 upsert。lsfx-mock-server/services/file_service.py: 注入异常账户基线服务,并在fetch_inner_flow(...)/ 上传建档链路中于保存FileRecord前触发基线同步。lsfx-mock-server/services/statement_service.py: 只补链路一致性断言,不新增写库逻辑。lsfx-mock-server/tests/test_abnormal_account_baseline_service.py: 新增服务层单测,覆盖空输入、异常输入、插入和更新。lsfx-mock-server/tests/test_file_service.py: 锁定fetch_inner_flow(...)调用基线同步服务及失败回滚语义。lsfx-mock-server/tests/test_statement_service.py: 锁定异常账户样本流水与record.abnormal_accounts账号一致。docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md: 记录本次后端实施结果。docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md: 记录 pytest 验证命令、结果和进程清理结论。
Task 1: 先锁定 FileService 的基线同步触发点
Files:
-
Modify:
lsfx-mock-server/tests/test_file_service.py -
Modify:
lsfx-mock-server/services/file_service.py -
Reference:
docs/design/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-design.md -
Step 1: Write the failing test
在 lsfx-mock-server/tests/test_file_service.py 中新增 fake baseline service 和两条失败测试:
class FakeAbnormalAccountBaselineService:
def __init__(self, should_fail=False):
self.should_fail = should_fail
self.calls = []
def apply(self, staff_id_card, abnormal_accounts):
self.calls.append(
{
"staff_id_card": staff_id_card,
"abnormal_accounts": [dict(item) for item in abnormal_accounts],
}
)
if self.should_fail:
raise RuntimeError("baseline sync failed")
def test_fetch_inner_flow_should_sync_abnormal_account_baselines_before_caching():
baseline_service = FakeAbnormalAccountBaselineService()
service = FileService(
staff_identity_repository=FakeStaffIdentityRepository(),
abnormal_account_baseline_service=baseline_service,
)
response = service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_abnormal_baseline",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = service.file_records[log_id]
assert baseline_service.calls
assert baseline_service.calls[0]["staff_id_card"] == record.staff_id_card
assert baseline_service.calls[0]["abnormal_accounts"] == record.abnormal_accounts
def test_fetch_inner_flow_should_not_cache_log_id_when_abnormal_account_baseline_sync_fails():
baseline_service = FakeAbnormalAccountBaselineService(should_fail=True)
service = FileService(
staff_identity_repository=FakeStaffIdentityRepository(),
abnormal_account_baseline_service=baseline_service,
)
with pytest.raises(RuntimeError, match="baseline sync failed"):
service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_abnormal_baseline_fail",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
assert service.file_records == {}
- Step 2: Run test to verify it fails
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_file_service.py -k "abnormal_account_baseline" -v
Expected:
-
FAIL -
原因是
FileService还不接受abnormal_account_baseline_service注入,也没有在建档阶段触发写库 -
Step 3: Write minimal implementation
在 lsfx-mock-server/services/file_service.py 中按最小路径补齐:
- 新增构造参数:
def __init__(..., abnormal_account_baseline_service=None):
self.abnormal_account_baseline_service = (
abnormal_account_baseline_service or AbnormalAccountBaselineService()
)
- 新增内部封装:
def _apply_abnormal_account_baselines(self, file_record: FileRecord) -> None:
if not file_record.abnormal_account_hit_rules:
return
if not file_record.abnormal_accounts:
raise RuntimeError("异常账户命中计划存在,但未生成账户事实")
self.abnormal_account_baseline_service.apply(
staff_id_card=file_record.staff_id_card,
abnormal_accounts=file_record.abnormal_accounts,
)
- 在
fetch_inner_flow(...)和上传链路中,将:
self.file_records[log_id] = file_record
调整为:
self._apply_abnormal_account_baselines(file_record)
self.file_records[log_id] = file_record
- Step 4: Run test to verify it passes
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_file_service.py -k "abnormal_account_baseline" -v
Expected:
-
PASS -
成功路径会先调用 baseline service
-
失败路径不会把半成品
logId写入file_records -
Step 5: Commit
git add lsfx-mock-server/services/file_service.py lsfx-mock-server/tests/test_file_service.py
git commit -m "接入异常账户基线同步触发点"
Task 2: 实现异常账户基线写库服务
Files:
-
Create:
lsfx-mock-server/services/abnormal_account_baseline_service.py -
Modify:
lsfx-mock-server/tests/test_abnormal_account_baseline_service.py -
Reference:
docs/design/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-design.md -
Step 1: Write the failing tests
新建 lsfx-mock-server/tests/test_abnormal_account_baseline_service.py,先锁定四类行为:
def test_apply_should_skip_when_abnormal_accounts_is_empty():
service = AbnormalAccountBaselineService()
fake_connection = FakeConnection()
service._connect = lambda: fake_connection
service.apply("330101199001010001", [])
assert fake_connection.executed_sql == []
def test_apply_should_raise_when_fact_owner_mismatches_staff():
service = AbnormalAccountBaselineService()
with pytest.raises(RuntimeError, match="owner_id_card"):
service.apply(
"330101199001010001",
[
{
"account_no": "6222000000000001",
"owner_id_card": "330101199001010099",
"account_name": "测试员工工资卡",
"status": 2,
"effective_date": "2024-01-01",
"invalid_date": "2026-03-20",
"rule_code": "SUDDEN_ACCOUNT_CLOSURE",
}
],
)
def test_apply_should_insert_new_account_fact_by_account_no():
...
def test_apply_should_update_existing_account_fact_by_account_no():
...
FakeConnection / FakeCursor 只需记录 execute(...) 调用和提交次数,不需要真实数据库。
- Step 2: Run test to verify it fails
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_abnormal_account_baseline_service.py -v
Expected:
-
FAIL -
原因是服务文件尚不存在
-
Step 3: Write minimal implementation
在 lsfx-mock-server/services/abnormal_account_baseline_service.py 中实现:
__init__直接复用现有settings.CCDI_DB_*_connect()使用pymysql.connect(..., charset="utf8mb4", autocommit=False)apply(staff_id_card, abnormal_accounts)内部规则:- 空列表直接返回
- 若任一
owner_id_card与staff_id_card不一致,直接抛错 - 对每条 fact 执行单条 upsert
- 成功后统一
commit(),失败则rollback()
建议 upsert 语句形态:
INSERT INTO ccdi_account_info (
account_no,
account_type,
account_name,
owner_type,
owner_id,
bank,
bank_code,
currency,
is_self_account,
trans_risk_level,
status,
effective_date,
invalid_date,
create_by,
update_by
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
account_name = VALUES(account_name),
owner_type = VALUES(owner_type),
owner_id = VALUES(owner_id),
bank = VALUES(bank),
bank_code = VALUES(bank_code),
currency = VALUES(currency),
is_self_account = VALUES(is_self_account),
trans_risk_level = VALUES(trans_risk_level),
status = VALUES(status),
effective_date = VALUES(effective_date),
invalid_date = VALUES(invalid_date),
update_by = VALUES(update_by),
update_time = NOW()
固定值约束:
-
account_type = 'DEBIT' -
owner_type = 'EMPLOYEE' -
bank = '兰溪农商银行' -
bank_code = 'LXNCSY' -
currency = 'CNY' -
is_self_account = 1 -
trans_risk_level = 'HIGH' -
create_by/update_by = 'lsfx-mock-server' -
Step 4: Run test to verify it passes
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_abnormal_account_baseline_service.py -v
Expected:
-
PASS -
覆盖空输入、校验失败、插入和更新四类行为
-
Step 5: Commit
git add lsfx-mock-server/services/abnormal_account_baseline_service.py lsfx-mock-server/tests/test_abnormal_account_baseline_service.py
git commit -m "新增异常账户基线写库服务"
Task 3: 锁定异常账户事实与返回流水的一致性
Files:
-
Modify:
lsfx-mock-server/tests/test_statement_service.py -
Reference:
lsfx-mock-server/services/statement_service.py -
Reference:
lsfx-mock-server/services/statement_rule_samples.py -
Step 1: Write the failing test
在 lsfx-mock-server/tests/test_statement_service.py 中新增一条只校验一致性的用例:
def test_get_bank_statement_should_only_use_abnormal_account_numbers_from_file_record():
file_service = FileService(
staff_identity_repository=FakeStaffIdentityRepository(),
abnormal_account_baseline_service=FakeAbnormalAccountBaselineService(),
)
statement_service = StatementService(file_service=file_service)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_abnormal_statement_consistency",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
record.abnormal_account_hit_rules = ["SUDDEN_ACCOUNT_CLOSURE"]
record.abnormal_accounts = [
{
"account_no": "6222000000000099",
"owner_id_card": record.staff_id_card,
"account_name": "测试员工工资卡",
"status": 2,
"effective_date": "2024-01-01",
"invalid_date": "2026-03-20",
"rule_code": "SUDDEN_ACCOUNT_CLOSURE",
}
]
result = statement_service.get_bank_statement(
{"groupId": 1001, "logId": log_id, "pageNow": 1, "pageSize": 500}
)
abnormal_numbers = {
item["accountMaskNo"]
for item in result["data"]["bankStatementList"]
if "销户" in item["userMemo"] or "异常账户" in item["userMemo"]
}
assert abnormal_numbers == {"6222000000000099"}
- Step 2: Run test to verify it fails when chain drifts
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_statement_service.py::test_get_bank_statement_should_only_use_abnormal_account_numbers_from_file_record -v
Expected:
-
若当前实现已满足,可直接
PASS -
若失败,只允许修正账号回填链路,禁止引入写库逻辑到
StatementService -
Step 3: Keep implementation minimal
若失败,仅允许在 lsfx-mock-server/services/statement_service.py 中做最小修正:
-
_apply_primary_binding(...)继续只兜底空账号 -
不覆盖异常账户样本已有
accountMaskNo -
不新增数据库连接或写库逻辑
-
Step 4: Run focused statement tests
Run:
cd lsfx-mock-server
python3 -m pytest tests/test_statement_service.py -k "abnormal_account" -v
Expected:
-
PASS -
既有异常账户样本日期/金额测试与新增一致性测试同时通过
-
Step 5: Commit
git add lsfx-mock-server/services/statement_service.py lsfx-mock-server/tests/test_statement_service.py
git commit -m "锁定异常账户流水与账户事实一致性"
Task 4: 完成回归验证并补实施记录
Files:
-
Create:
docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md -
Create:
docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md -
Step 1: Run full targeted backend tests
Run:
cd lsfx-mock-server
python3 -m pytest \
tests/test_abnormal_account_baseline_service.py \
tests/test_file_service.py \
tests/test_statement_service.py -k "abnormal_account or abnormal_account_baseline" -v
Expected:
-
PASS -
无新增异常账户相关失败
-
Step 2: Write implementation record
在 docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md 中记录:
-
新增
AbnormalAccountBaselineService -
FileService在建logId时同步异常账户基线 -
失败回滚语义
-
异常账户事实与返回流水的一致性约束
-
Step 3: Write verification record
在 docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md 中记录:
-
执行过的 pytest 命令
-
关键通过点
-
本次未启动前后端长驻进程,因此无需额外杀进程
-
Step 4: Verify final diff scope
Run:
git diff --name-only HEAD~3..HEAD
Expected:
-
仅包含本次异常账户基线同步相关服务、测试和文档
-
Step 5: Commit
git add \
docs/reports/implementation/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-implementation.md \
docs/tests/records/2026-03-31-lsfx-mock-server-abnormal-account-baseline-sync-backend-verification.md
git commit -m "记录异常账户基线同步后端实施"