完成LSFX Mock第二期稳定随机命中后端实施
This commit is contained in:
264
lsfx-mock-server/services/phase2_baseline_service.py
Normal file
264
lsfx-mock-server/services/phase2_baseline_service.py
Normal file
@@ -0,0 +1,264 @@
|
||||
from textwrap import dedent
|
||||
from typing import Dict, List, Tuple
|
||||
|
||||
from config.settings import settings
|
||||
|
||||
|
||||
class Phase2BaselineService:
|
||||
"""第二期数据库基线服务。
|
||||
"""
|
||||
|
||||
SUPPLIER_PURCHASE_ID = "LSFXMOCKP2PUR001"
|
||||
SUPPLIER_NAME = "兰溪市联调供应链有限公司"
|
||||
ASSET_IDENTIFIERS = {
|
||||
"HOUSE_REGISTRATION_MISMATCH": "LSFX Mock P2 HOUSE_REGISTRATION_MISMATCH",
|
||||
"PROPERTY_FEE_REGISTRATION_MISMATCH": "LSFX Mock P2 PROPERTY_FEE_REGISTRATION_MISMATCH",
|
||||
"TAX_ASSET_REGISTRATION_MISMATCH": "LSFX Mock P2 TAX_ASSET_REGISTRATION_MISMATCH",
|
||||
}
|
||||
|
||||
def __init__(self):
|
||||
self.db_config = {
|
||||
"host": settings.CCDI_DB_HOST,
|
||||
"port": settings.CCDI_DB_PORT,
|
||||
"database": settings.CCDI_DB_NAME,
|
||||
"username": settings.CCDI_DB_USERNAME,
|
||||
"password": settings.CCDI_DB_PASSWORD,
|
||||
"connect_timeout_seconds": settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def _sql_quote(value: str) -> str:
|
||||
return "'" + value.replace("'", "''") + "'"
|
||||
|
||||
def _connect(self):
|
||||
try:
|
||||
import pymysql
|
||||
except ImportError as exc:
|
||||
raise RuntimeError("缺少 PyMySQL 依赖,无法写入第二期数据库基线") from exc
|
||||
|
||||
return pymysql.connect(
|
||||
host=settings.CCDI_DB_HOST,
|
||||
port=settings.CCDI_DB_PORT,
|
||||
user=settings.CCDI_DB_USERNAME,
|
||||
password=settings.CCDI_DB_PASSWORD,
|
||||
database=settings.CCDI_DB_NAME,
|
||||
charset="utf8mb4",
|
||||
connect_timeout=settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
|
||||
autocommit=False,
|
||||
)
|
||||
|
||||
def _resolve_asset_owner_scopes(
|
||||
self,
|
||||
staff_id_card: str,
|
||||
family_id_cards: List[str],
|
||||
) -> Dict[str, Tuple[str, str]]:
|
||||
family_pool = [card for card in family_id_cards if card]
|
||||
first_family = family_pool[0] if family_pool else staff_id_card
|
||||
second_family = family_pool[1] if len(family_pool) > 1 else first_family
|
||||
return {
|
||||
"HOUSE_REGISTRATION_MISMATCH": (staff_id_card, staff_id_card),
|
||||
"PROPERTY_FEE_REGISTRATION_MISMATCH": (staff_id_card, first_family),
|
||||
"TAX_ASSET_REGISTRATION_MISMATCH": (staff_id_card, second_family),
|
||||
}
|
||||
|
||||
def _build_supplier_concentration_sql(self, staff_id_card: str) -> List[str]:
|
||||
purchase_id = self.SUPPLIER_PURCHASE_ID
|
||||
supplier_name = self.SUPPLIER_NAME
|
||||
sql = [
|
||||
dedent(
|
||||
f"""
|
||||
DELETE FROM ccdi_purchase_transaction
|
||||
WHERE purchase_id = {self._sql_quote(purchase_id)};
|
||||
"""
|
||||
).strip(),
|
||||
dedent(
|
||||
f"""
|
||||
INSERT INTO ccdi_purchase_transaction (
|
||||
purchase_id,
|
||||
purchase_category,
|
||||
project_name,
|
||||
subject_name,
|
||||
subject_desc,
|
||||
purchase_qty,
|
||||
budget_amount,
|
||||
bid_amount,
|
||||
actual_amount,
|
||||
contract_amount,
|
||||
settlement_amount,
|
||||
purchase_method,
|
||||
supplier_name,
|
||||
contact_person,
|
||||
contact_phone,
|
||||
supplier_uscc,
|
||||
supplier_bank_account,
|
||||
apply_date,
|
||||
plan_approve_date,
|
||||
announce_date,
|
||||
bid_open_date,
|
||||
contract_sign_date,
|
||||
expected_delivery_date,
|
||||
actual_delivery_date,
|
||||
acceptance_date,
|
||||
settlement_date,
|
||||
applicant_id,
|
||||
applicant_name,
|
||||
apply_department,
|
||||
purchase_leader_id,
|
||||
purchase_leader_name,
|
||||
purchase_department,
|
||||
created_by,
|
||||
updated_by
|
||||
)
|
||||
SELECT
|
||||
{self._sql_quote(purchase_id)},
|
||||
'设备采购',
|
||||
'LSFX Mock P2',
|
||||
'可疑采购基线',
|
||||
'用于命中 SUPPLIER_CONCENTRATION 真实规则',
|
||||
1,
|
||||
188000.00,
|
||||
186000.00,
|
||||
186000.00,
|
||||
186000.00,
|
||||
186000.00,
|
||||
'竞争性谈判',
|
||||
{self._sql_quote(supplier_name)},
|
||||
'联调联系人',
|
||||
'13800000000',
|
||||
'91330781P2PUR00011',
|
||||
'6222000000001234',
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CURRENT_DATE,
|
||||
CAST(s.staff_id AS CHAR),
|
||||
s.name,
|
||||
'纪检初核部',
|
||||
NULL,
|
||||
NULL,
|
||||
NULL,
|
||||
'admin',
|
||||
'admin'
|
||||
FROM ccdi_base_staff s
|
||||
WHERE s.id_card = {self._sql_quote(staff_id_card)}
|
||||
LIMIT 1;
|
||||
"""
|
||||
).strip(),
|
||||
]
|
||||
return sql
|
||||
|
||||
def _build_asset_mismatch_sql(
|
||||
self,
|
||||
rule_code: str,
|
||||
family_id: str,
|
||||
person_id: str,
|
||||
) -> List[str]:
|
||||
asset_name = self.ASSET_IDENTIFIERS[rule_code]
|
||||
asset_main_type = "房产"
|
||||
asset_sub_type = "商铺"
|
||||
|
||||
return [
|
||||
dedent(
|
||||
f"""
|
||||
DELETE FROM ccdi_asset_info
|
||||
WHERE asset_name = {self._sql_quote(asset_name)};
|
||||
"""
|
||||
).strip(),
|
||||
dedent(
|
||||
f"""
|
||||
INSERT INTO ccdi_asset_info (
|
||||
family_id,
|
||||
person_id,
|
||||
asset_main_type,
|
||||
asset_sub_type,
|
||||
asset_name,
|
||||
ownership_ratio,
|
||||
purchase_eval_date,
|
||||
original_value,
|
||||
current_value,
|
||||
valuation_date,
|
||||
asset_status,
|
||||
remarks,
|
||||
create_by,
|
||||
update_by
|
||||
)
|
||||
VALUES (
|
||||
{self._sql_quote(family_id)},
|
||||
{self._sql_quote(person_id)},
|
||||
{self._sql_quote(asset_main_type)},
|
||||
{self._sql_quote(asset_sub_type)},
|
||||
{self._sql_quote(asset_name)},
|
||||
100.00,
|
||||
CURRENT_DATE,
|
||||
1880000.00,
|
||||
1880000.00,
|
||||
CURRENT_DATE,
|
||||
'正常',
|
||||
{self._sql_quote(f'用于命中 {rule_code} 真实规则的第二期基线')},
|
||||
'admin',
|
||||
'admin'
|
||||
);
|
||||
"""
|
||||
).strip(),
|
||||
]
|
||||
|
||||
def build_sql_plan(
|
||||
self,
|
||||
staff_id_card: str,
|
||||
family_id_cards: List[str],
|
||||
baseline_rule_codes: List[str],
|
||||
) -> List[str]:
|
||||
"""生成第二期基线 SQL 计划。"""
|
||||
selected_rule_codes = []
|
||||
for rule_code in baseline_rule_codes or []:
|
||||
if rule_code not in selected_rule_codes:
|
||||
selected_rule_codes.append(rule_code)
|
||||
|
||||
sql_plan: List[str] = []
|
||||
asset_owner_ids = self._resolve_asset_owner_scopes(
|
||||
staff_id_card,
|
||||
family_id_cards or [],
|
||||
)
|
||||
|
||||
for rule_code in selected_rule_codes:
|
||||
if rule_code == "SUPPLIER_CONCENTRATION":
|
||||
sql_plan.extend(self._build_supplier_concentration_sql(staff_id_card))
|
||||
elif rule_code in self.ASSET_IDENTIFIERS:
|
||||
family_id, person_id = asset_owner_ids[rule_code]
|
||||
sql_plan.extend(
|
||||
self._build_asset_mismatch_sql(
|
||||
rule_code=rule_code,
|
||||
family_id=family_id,
|
||||
person_id=person_id,
|
||||
)
|
||||
)
|
||||
|
||||
return sql_plan
|
||||
|
||||
def apply(
|
||||
self,
|
||||
staff_id_card: str,
|
||||
family_id_cards: List[str],
|
||||
baseline_rule_codes: List[str],
|
||||
) -> None:
|
||||
"""按当前命中的规则将第二期基线幂等写入数据库。"""
|
||||
sql_plan = self.build_sql_plan(staff_id_card, family_id_cards, baseline_rule_codes)
|
||||
if not sql_plan:
|
||||
return None
|
||||
|
||||
with self._connect() as connection:
|
||||
try:
|
||||
with connection.cursor() as cursor:
|
||||
for sql in sql_plan:
|
||||
cursor.execute(sql)
|
||||
connection.commit()
|
||||
except Exception:
|
||||
connection.rollback()
|
||||
raise
|
||||
|
||||
return None
|
||||
Reference in New Issue
Block a user