新增异常账户基线写库服务

This commit is contained in:
wkc
2026-03-31 22:15:41 +08:00
parent 6b24e02ba9
commit 51810a325e
3 changed files with 265 additions and 7 deletions

View File

@@ -0,0 +1,114 @@
from typing import List
from config.settings import settings
class AbnormalAccountBaselineService:
"""异常账户基线写库服务。"""
UPSERT_SQL = """
INSERT INTO ccdi_account_info (
account_no,
account_type,
account_name,
owner_type,
owner_id,
bank,
bank_code,
currency,
is_self_account,
trans_risk_level,
status,
effective_date,
invalid_date,
create_by,
update_by
)
VALUES (%s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s, %s)
ON DUPLICATE KEY UPDATE
account_name = VALUES(account_name),
owner_type = VALUES(owner_type),
owner_id = VALUES(owner_id),
bank = VALUES(bank),
bank_code = VALUES(bank_code),
currency = VALUES(currency),
is_self_account = VALUES(is_self_account),
trans_risk_level = VALUES(trans_risk_level),
status = VALUES(status),
effective_date = VALUES(effective_date),
invalid_date = VALUES(invalid_date),
update_by = VALUES(update_by),
update_time = NOW()
"""
def __init__(self):
self.db_config = {
"host": settings.CCDI_DB_HOST,
"port": settings.CCDI_DB_PORT,
"database": settings.CCDI_DB_NAME,
"username": settings.CCDI_DB_USERNAME,
"password": settings.CCDI_DB_PASSWORD,
"connect_timeout_seconds": settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
}
def _connect(self):
try:
import pymysql
except ImportError as exc:
raise RuntimeError("缺少 PyMySQL 依赖,无法写入异常账户基线") from exc
return pymysql.connect(
host=settings.CCDI_DB_HOST,
port=settings.CCDI_DB_PORT,
user=settings.CCDI_DB_USERNAME,
password=settings.CCDI_DB_PASSWORD,
database=settings.CCDI_DB_NAME,
charset="utf8mb4",
connect_timeout=settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
autocommit=False,
)
def _validate_fact_owner(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None:
for account_fact in abnormal_accounts:
owner_id_card = account_fact.get("owner_id_card")
if owner_id_card != staff_id_card:
raise RuntimeError(
f"异常账户 owner_id_card 与 staff_id_card 不一致: {owner_id_card}"
)
def _build_upsert_params(self, account_fact: dict) -> tuple:
return (
account_fact["account_no"],
"DEBIT",
account_fact["account_name"],
"EMPLOYEE",
account_fact["owner_id_card"],
"兰溪农商银行",
"LXNCSY",
"CNY",
1,
"HIGH",
account_fact["status"],
account_fact["effective_date"],
account_fact.get("invalid_date"),
"lsfx-mock-server",
"lsfx-mock-server",
)
def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None:
if not abnormal_accounts:
return
self._validate_fact_owner(staff_id_card, abnormal_accounts)
connection = self._connect()
try:
with connection.cursor() as cursor:
for account_fact in abnormal_accounts:
cursor.execute(self.UPSERT_SQL, self._build_upsert_params(account_fact))
connection.commit()
except Exception:
connection.rollback()
raise
finally:
connection.close()

View File

@@ -1,6 +1,7 @@
from fastapi import BackgroundTasks, UploadFile
from utils.response_builder import ResponseBuilder
from config.settings import settings
from services.abnormal_account_baseline_service import AbnormalAccountBaselineService
from services.phase2_baseline_service import Phase2BaselineService
from services.staff_identity_repository import StaffIdentityRepository
from typing import Dict, List, Union
@@ -135,12 +136,6 @@ class FileRecord:
abnormal_account_hit_rules: List[str] = field(default_factory=list)
abnormal_accounts: List[dict] = field(default_factory=list)
class _NoopAbnormalAccountBaselineService:
def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None:
return None
class FileService:
"""文件上传和解析服务"""
@@ -159,7 +154,7 @@ class FileService:
self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository()
self.phase2_baseline_service = phase2_baseline_service or Phase2BaselineService()
self.abnormal_account_baseline_service = (
abnormal_account_baseline_service or _NoopAbnormalAccountBaselineService()
abnormal_account_baseline_service or AbnormalAccountBaselineService()
)
def get_file_record(self, log_id: int) -> FileRecord:

View File

@@ -0,0 +1,149 @@
import pytest
from services.abnormal_account_baseline_service import AbnormalAccountBaselineService
class FakeCursor:
def __init__(self, connection):
self.connection = connection
def execute(self, sql, params=None):
self.connection.executed_sql.append(
{
"sql": sql,
"params": params,
}
)
def __enter__(self):
return self
def __exit__(self, exc_type, exc, tb):
return False
class FakeConnection:
def __init__(self):
self.executed_sql = []
self.commit_count = 0
self.rollback_count = 0
def cursor(self):
return FakeCursor(self)
def commit(self):
self.commit_count += 1
def rollback(self):
self.rollback_count += 1
def close(self):
return None
def test_apply_should_skip_when_abnormal_accounts_is_empty():
service = AbnormalAccountBaselineService()
fake_connection = FakeConnection()
service._connect = lambda: fake_connection
service.apply("330101199001010001", [])
assert fake_connection.executed_sql == []
assert fake_connection.commit_count == 0
assert fake_connection.rollback_count == 0
def test_apply_should_raise_when_fact_owner_mismatches_staff():
service = AbnormalAccountBaselineService()
with pytest.raises(RuntimeError, match="owner_id_card"):
service.apply(
"330101199001010001",
[
{
"account_no": "6222000000000001",
"owner_id_card": "330101199001010099",
"account_name": "测试员工工资卡",
"status": 2,
"effective_date": "2024-01-01",
"invalid_date": "2026-03-20",
"rule_code": "SUDDEN_ACCOUNT_CLOSURE",
}
],
)
def test_apply_should_insert_new_account_fact_by_account_no():
service = AbnormalAccountBaselineService()
fake_connection = FakeConnection()
service._connect = lambda: fake_connection
service.apply(
"330101199001010001",
[
{
"account_no": "6222000000000001",
"owner_id_card": "330101199001010001",
"account_name": "测试员工工资卡",
"status": 2,
"effective_date": "2024-01-01",
"invalid_date": "2026-03-20",
"rule_code": "SUDDEN_ACCOUNT_CLOSURE",
}
],
)
assert len(fake_connection.executed_sql) == 1
executed = fake_connection.executed_sql[0]
assert "INSERT INTO ccdi_account_info" in executed["sql"]
assert executed["params"] == (
"6222000000000001",
"DEBIT",
"测试员工工资卡",
"EMPLOYEE",
"330101199001010001",
"兰溪农商银行",
"LXNCSY",
"CNY",
1,
"HIGH",
2,
"2024-01-01",
"2026-03-20",
"lsfx-mock-server",
"lsfx-mock-server",
)
assert fake_connection.commit_count == 1
assert fake_connection.rollback_count == 0
def test_apply_should_update_existing_account_fact_by_account_no():
service = AbnormalAccountBaselineService()
fake_connection = FakeConnection()
service._connect = lambda: fake_connection
service.apply(
"330101199001010001",
[
{
"account_no": "6222000000000001",
"owner_id_card": "330101199001010001",
"account_name": "测试员工结算卡",
"status": 1,
"effective_date": "2025-01-01",
"invalid_date": None,
"rule_code": "DORMANT_ACCOUNT_LARGE_ACTIVATION",
}
],
)
assert len(fake_connection.executed_sql) == 1
executed = fake_connection.executed_sql[0]
assert "ON DUPLICATE KEY UPDATE" in executed["sql"]
assert executed["params"][0] == "6222000000000001"
assert executed["params"][2] == "测试员工结算卡"
assert executed["params"][10] == 1
assert executed["params"][11] == "2025-01-01"
assert executed["params"][12] is None
assert fake_connection.commit_count == 1
assert fake_connection.rollback_count == 0