diff --git a/lsfx-mock-server/services/file_service.py b/lsfx-mock-server/services/file_service.py index 50a02d4b..758196a6 100644 --- a/lsfx-mock-server/services/file_service.py +++ b/lsfx-mock-server/services/file_service.py @@ -136,6 +136,11 @@ class FileRecord: abnormal_accounts: List[dict] = field(default_factory=list) +class _NoopAbnormalAccountBaselineService: + def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None: + return None + + class FileService: """文件上传和解析服务""" @@ -143,11 +148,19 @@ class FileService: LOG_ID_MIN = settings.INITIAL_LOG_ID LOG_ID_MAX = 99999 - def __init__(self, staff_identity_repository=None, phase2_baseline_service=None): + def __init__( + self, + staff_identity_repository=None, + phase2_baseline_service=None, + abnormal_account_baseline_service=None, + ): self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord self.log_counter = settings.INITIAL_LOG_ID self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository() self.phase2_baseline_service = phase2_baseline_service or Phase2BaselineService() + self.abnormal_account_baseline_service = ( + abnormal_account_baseline_service or _NoopAbnormalAccountBaselineService() + ) def get_file_record(self, log_id: int) -> FileRecord: """按 logId 获取已存在的文件记录。""" @@ -452,6 +465,17 @@ class FileService: baseline_rule_codes=baseline_rule_codes, ) + def _apply_abnormal_account_baselines(self, file_record: FileRecord) -> None: + """按当前记录命中的异常账户规则幂等补齐账户事实。""" + if not file_record.abnormal_account_hit_rules: + return + if not file_record.abnormal_accounts: + raise RuntimeError("异常账户命中计划存在,但未生成账户事实") + self.abnormal_account_baseline_service.apply( + staff_id_card=file_record.staff_id_card, + abnormal_accounts=file_record.abnormal_accounts, + ) + async def upload_file( self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks ) -> Dict: @@ -513,7 +537,7 @@ class FileService: ), ) - # 存储记录 + self._apply_abnormal_account_baselines(file_record) self.file_records[log_id] = file_record self._rebalance_all_mode_group_rule_plans(group_id) self._apply_phase2_baselines(file_record) @@ -851,6 +875,7 @@ class FileService: parsing=False, ) + self._apply_abnormal_account_baselines(file_record) self.file_records[log_id] = file_record self._rebalance_all_mode_group_rule_plans(group_id) self._apply_phase2_baselines(file_record) diff --git a/lsfx-mock-server/tests/test_file_service.py b/lsfx-mock-server/tests/test_file_service.py index aa90c767..d62251cc 100644 --- a/lsfx-mock-server/tests/test_file_service.py +++ b/lsfx-mock-server/tests/test_file_service.py @@ -5,6 +5,7 @@ FileService 单一主绑定语义测试 import asyncio import io +import pytest from fastapi import BackgroundTasks from fastapi.datastructures import UploadFile @@ -27,6 +28,22 @@ class FakeStaffIdentityRepository: } +class FakeAbnormalAccountBaselineService: + def __init__(self, should_fail=False): + self.should_fail = should_fail + self.calls = [] + + def apply(self, staff_id_card, abnormal_accounts): + self.calls.append( + { + "staff_id_card": staff_id_card, + "abnormal_accounts": [dict(item) for item in abnormal_accounts], + } + ) + if self.should_fail: + raise RuntimeError("baseline sync failed") + + def test_upload_file_primary_binding_response(monkeypatch): """同一 logId 的主绑定必须稳定且只保留一组主体/账号信息。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository()) @@ -186,6 +203,56 @@ def test_fetch_inner_flow_should_attach_abnormal_account_rule_plan(): assert isinstance(record.abnormal_accounts, list) +def test_fetch_inner_flow_should_sync_abnormal_account_baselines_before_caching(): + baseline_service = FakeAbnormalAccountBaselineService() + service = FileService( + staff_identity_repository=FakeStaffIdentityRepository(), + abnormal_account_baseline_service=baseline_service, + ) + + response = service.fetch_inner_flow( + { + "groupId": 1001, + "customerNo": "customer_abnormal_baseline", + "dataChannelCode": "test_code", + "requestDateId": 20240101, + "dataStartDateId": 20240101, + "dataEndDateId": 20240131, + "uploadUserId": 902001, + } + ) + + log_id = response["data"][0] + record = service.file_records[log_id] + + assert baseline_service.calls + assert baseline_service.calls[0]["staff_id_card"] == record.staff_id_card + assert baseline_service.calls[0]["abnormal_accounts"] == record.abnormal_accounts + + +def test_fetch_inner_flow_should_not_cache_log_id_when_abnormal_account_baseline_sync_fails(): + baseline_service = FakeAbnormalAccountBaselineService(should_fail=True) + service = FileService( + staff_identity_repository=FakeStaffIdentityRepository(), + abnormal_account_baseline_service=baseline_service, + ) + + with pytest.raises(RuntimeError, match="baseline sync failed"): + service.fetch_inner_flow( + { + "groupId": 1001, + "customerNo": "customer_abnormal_baseline_fail", + "dataChannelCode": "test_code", + "requestDateId": 20240101, + "dataStartDateId": 20240101, + "dataEndDateId": 20240131, + "uploadUserId": 902001, + } + ) + + assert service.file_records == {} + + def test_generate_log_id_should_retry_when_random_value_conflicts(monkeypatch): """随机 logId 命中已存在记录时必须重试并返回未占用值。""" service = FileService(staff_identity_repository=FakeStaffIdentityRepository())