From 51810a325e5e211aee2a488eb913f1461cd3ee98 Mon Sep 17 00:00:00 2001 From: wkc <978997012@qq.com> Date: Tue, 31 Mar 2026 22:15:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=96=B0=E5=A2=9E=E5=BC=82=E5=B8=B8=E8=B4=A6?= =?UTF-8?q?=E6=88=B7=E5=9F=BA=E7=BA=BF=E5=86=99=E5=BA=93=E6=9C=8D=E5=8A=A1?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../abnormal_account_baseline_service.py | 114 ++++++++++++++ lsfx-mock-server/services/file_service.py | 9 +- .../test_abnormal_account_baseline_service.py | 149 ++++++++++++++++++ 3 files changed, 265 insertions(+), 7 deletions(-) create mode 100644 lsfx-mock-server/services/abnormal_account_baseline_service.py create mode 100644 lsfx-mock-server/tests/test_abnormal_account_baseline_service.py diff --git a/lsfx-mock-server/services/abnormal_account_baseline_service.py b/lsfx-mock-server/services/abnormal_account_baseline_service.py new file mode 100644 index 00000000..7f68b714 --- /dev/null +++ b/lsfx-mock-server/services/abnormal_account_baseline_service.py @@ -0,0 +1,114 @@ +from typing import List + +from config.settings import settings + + +class AbnormalAccountBaselineService: + """异常账户基线写库服务。""" + + UPSERT_SQL = """ + INSERT INTO ccdi_account_info ( + account_no, + account_type, + account_name, + owner_type, + owner_id, + bank, + bank_code, + currency, + is_self_account, + trans_risk_level, + status, + effective_date, + invalid_date, + create_by, + update_by + ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s) + ON DUPLICATE KEY UPDATE + account_name = VALUES(account_name), + owner_type = VALUES(owner_type), + owner_id = VALUES(owner_id), + bank = VALUES(bank), + bank_code = VALUES(bank_code), + currency = VALUES(currency), + is_self_account = VALUES(is_self_account), + trans_risk_level = VALUES(trans_risk_level), + status = VALUES(status), + effective_date = VALUES(effective_date), + invalid_date = VALUES(invalid_date), + update_by = VALUES(update_by), + update_time = NOW() + """ + + def __init__(self): + self.db_config = { + "host": settings.CCDI_DB_HOST, + "port": settings.CCDI_DB_PORT, + "database": settings.CCDI_DB_NAME, + "username": settings.CCDI_DB_USERNAME, + "password": settings.CCDI_DB_PASSWORD, + "connect_timeout_seconds": settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS, + } + + def _connect(self): + try: + import pymysql + except ImportError as exc: + raise RuntimeError("缺少 PyMySQL 依赖,无法写入异常账户基线") from exc + + return pymysql.connect( + host=settings.CCDI_DB_HOST, + port=settings.CCDI_DB_PORT, + user=settings.CCDI_DB_USERNAME, + password=settings.CCDI_DB_PASSWORD, + database=settings.CCDI_DB_NAME, + charset="utf8mb4", + connect_timeout=settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS, + autocommit=False, + ) + + def _validate_fact_owner(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None: + for account_fact in abnormal_accounts: + owner_id_card = account_fact.get("owner_id_card") + if owner_id_card != staff_id_card: + raise RuntimeError( + f"异常账户 owner_id_card 与 staff_id_card 不一致: {owner_id_card}" + ) + + def _build_upsert_params(self, account_fact: dict) -> tuple: + return ( + account_fact["account_no"], + "DEBIT", + account_fact["account_name"], + "EMPLOYEE", + account_fact["owner_id_card"], + "兰溪农商银行", + "LXNCSY", + "CNY", + 1, + "HIGH", + account_fact["status"], + account_fact["effective_date"], + account_fact.get("invalid_date"), + "lsfx-mock-server", + "lsfx-mock-server", + ) + + def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None: + if not abnormal_accounts: + return + + self._validate_fact_owner(staff_id_card, abnormal_accounts) + + connection = self._connect() + try: + with connection.cursor() as cursor: + for account_fact in abnormal_accounts: + cursor.execute(self.UPSERT_SQL, self._build_upsert_params(account_fact)) + connection.commit() + except Exception: + connection.rollback() + raise + finally: + connection.close() diff --git a/lsfx-mock-server/services/file_service.py b/lsfx-mock-server/services/file_service.py index 758196a6..b81c5827 100644 --- a/lsfx-mock-server/services/file_service.py +++ b/lsfx-mock-server/services/file_service.py @@ -1,6 +1,7 @@ from fastapi import BackgroundTasks, UploadFile from utils.response_builder import ResponseBuilder from config.settings import settings +from services.abnormal_account_baseline_service import AbnormalAccountBaselineService from services.phase2_baseline_service import Phase2BaselineService from services.staff_identity_repository import StaffIdentityRepository from typing import Dict, List, Union @@ -135,12 +136,6 @@ class FileRecord: abnormal_account_hit_rules: List[str] = field(default_factory=list) abnormal_accounts: List[dict] = field(default_factory=list) - -class _NoopAbnormalAccountBaselineService: - def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None: - return None - - class FileService: """文件上传和解析服务""" @@ -159,7 +154,7 @@ class FileService: self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository() self.phase2_baseline_service = phase2_baseline_service or Phase2BaselineService() self.abnormal_account_baseline_service = ( - abnormal_account_baseline_service or _NoopAbnormalAccountBaselineService() + abnormal_account_baseline_service or AbnormalAccountBaselineService() ) def get_file_record(self, log_id: int) -> FileRecord: diff --git a/lsfx-mock-server/tests/test_abnormal_account_baseline_service.py b/lsfx-mock-server/tests/test_abnormal_account_baseline_service.py new file mode 100644 index 00000000..3ec9599d --- /dev/null +++ b/lsfx-mock-server/tests/test_abnormal_account_baseline_service.py @@ -0,0 +1,149 @@ +import pytest + +from services.abnormal_account_baseline_service import AbnormalAccountBaselineService + + +class FakeCursor: + def __init__(self, connection): + self.connection = connection + + def execute(self, sql, params=None): + self.connection.executed_sql.append( + { + "sql": sql, + "params": params, + } + ) + + def __enter__(self): + return self + + def __exit__(self, exc_type, exc, tb): + return False + + +class FakeConnection: + def __init__(self): + self.executed_sql = [] + self.commit_count = 0 + self.rollback_count = 0 + + def cursor(self): + return FakeCursor(self) + + def commit(self): + self.commit_count += 1 + + def rollback(self): + self.rollback_count += 1 + + def close(self): + return None + + +def test_apply_should_skip_when_abnormal_accounts_is_empty(): + service = AbnormalAccountBaselineService() + fake_connection = FakeConnection() + service._connect = lambda: fake_connection + + service.apply("330101199001010001", []) + + assert fake_connection.executed_sql == [] + assert fake_connection.commit_count == 0 + assert fake_connection.rollback_count == 0 + + +def test_apply_should_raise_when_fact_owner_mismatches_staff(): + service = AbnormalAccountBaselineService() + + with pytest.raises(RuntimeError, match="owner_id_card"): + service.apply( + "330101199001010001", + [ + { + "account_no": "6222000000000001", + "owner_id_card": "330101199001010099", + "account_name": "测试员工工资卡", + "status": 2, + "effective_date": "2024-01-01", + "invalid_date": "2026-03-20", + "rule_code": "SUDDEN_ACCOUNT_CLOSURE", + } + ], + ) + + +def test_apply_should_insert_new_account_fact_by_account_no(): + service = AbnormalAccountBaselineService() + fake_connection = FakeConnection() + service._connect = lambda: fake_connection + + service.apply( + "330101199001010001", + [ + { + "account_no": "6222000000000001", + "owner_id_card": "330101199001010001", + "account_name": "测试员工工资卡", + "status": 2, + "effective_date": "2024-01-01", + "invalid_date": "2026-03-20", + "rule_code": "SUDDEN_ACCOUNT_CLOSURE", + } + ], + ) + + assert len(fake_connection.executed_sql) == 1 + executed = fake_connection.executed_sql[0] + assert "INSERT INTO ccdi_account_info" in executed["sql"] + assert executed["params"] == ( + "6222000000000001", + "DEBIT", + "测试员工工资卡", + "EMPLOYEE", + "330101199001010001", + "兰溪农商银行", + "LXNCSY", + "CNY", + 1, + "HIGH", + 2, + "2024-01-01", + "2026-03-20", + "lsfx-mock-server", + "lsfx-mock-server", + ) + assert fake_connection.commit_count == 1 + assert fake_connection.rollback_count == 0 + + +def test_apply_should_update_existing_account_fact_by_account_no(): + service = AbnormalAccountBaselineService() + fake_connection = FakeConnection() + service._connect = lambda: fake_connection + + service.apply( + "330101199001010001", + [ + { + "account_no": "6222000000000001", + "owner_id_card": "330101199001010001", + "account_name": "测试员工结算卡", + "status": 1, + "effective_date": "2025-01-01", + "invalid_date": None, + "rule_code": "DORMANT_ACCOUNT_LARGE_ACTIVATION", + } + ], + ) + + assert len(fake_connection.executed_sql) == 1 + executed = fake_connection.executed_sql[0] + assert "ON DUPLICATE KEY UPDATE" in executed["sql"] + assert executed["params"][0] == "6222000000000001" + assert executed["params"][2] == "测试员工结算卡" + assert executed["params"][10] == 1 + assert executed["params"][11] == "2025-01-01" + assert executed["params"][12] is None + assert fake_connection.commit_count == 1 + assert fake_connection.rollback_count == 0