Files
ccdi/lsfx-mock-server/services/phase2_baseline_service.py

265 lines
8.7 KiB
Python

from textwrap import dedent
from typing import Dict, List, Tuple
from config.settings import settings
class Phase2BaselineService:
"""第二期数据库基线服务。
"""
SUPPLIER_PURCHASE_ID = "LSFXMOCKP2PUR001"
SUPPLIER_NAME = "兰溪市联调供应链有限公司"
ASSET_IDENTIFIERS = {
"HOUSE_REGISTRATION_MISMATCH": "LSFX Mock P2 HOUSE_REGISTRATION_MISMATCH",
"PROPERTY_FEE_REGISTRATION_MISMATCH": "LSFX Mock P2 PROPERTY_FEE_REGISTRATION_MISMATCH",
"TAX_ASSET_REGISTRATION_MISMATCH": "LSFX Mock P2 TAX_ASSET_REGISTRATION_MISMATCH",
}
def __init__(self):
self.db_config = {
"host": settings.CCDI_DB_HOST,
"port": settings.CCDI_DB_PORT,
"database": settings.CCDI_DB_NAME,
"username": settings.CCDI_DB_USERNAME,
"password": settings.CCDI_DB_PASSWORD,
"connect_timeout_seconds": settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
}
@staticmethod
def _sql_quote(value: str) -> str:
return "'" + value.replace("'", "''") + "'"
def _connect(self):
try:
import pymysql
except ImportError as exc:
raise RuntimeError("缺少 PyMySQL 依赖,无法写入第二期数据库基线") from exc
return pymysql.connect(
host=settings.CCDI_DB_HOST,
port=settings.CCDI_DB_PORT,
user=settings.CCDI_DB_USERNAME,
password=settings.CCDI_DB_PASSWORD,
database=settings.CCDI_DB_NAME,
charset="utf8mb4",
connect_timeout=settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS,
autocommit=False,
)
def _resolve_asset_owner_scopes(
self,
staff_id_card: str,
family_id_cards: List[str],
) -> Dict[str, Tuple[str, str]]:
family_pool = [card for card in family_id_cards if card]
first_family = family_pool[0] if family_pool else staff_id_card
second_family = family_pool[1] if len(family_pool) > 1 else first_family
return {
"HOUSE_REGISTRATION_MISMATCH": (staff_id_card, staff_id_card),
"PROPERTY_FEE_REGISTRATION_MISMATCH": (staff_id_card, first_family),
"TAX_ASSET_REGISTRATION_MISMATCH": (staff_id_card, second_family),
}
def _build_supplier_concentration_sql(self, staff_id_card: str) -> List[str]:
purchase_id = self.SUPPLIER_PURCHASE_ID
supplier_name = self.SUPPLIER_NAME
sql = [
dedent(
f"""
DELETE FROM ccdi_purchase_transaction
WHERE purchase_id = {self._sql_quote(purchase_id)};
"""
).strip(),
dedent(
f"""
INSERT INTO ccdi_purchase_transaction (
purchase_id,
purchase_category,
project_name,
subject_name,
subject_desc,
purchase_qty,
budget_amount,
bid_amount,
actual_amount,
contract_amount,
settlement_amount,
purchase_method,
supplier_name,
contact_person,
contact_phone,
supplier_uscc,
supplier_bank_account,
apply_date,
plan_approve_date,
announce_date,
bid_open_date,
contract_sign_date,
expected_delivery_date,
actual_delivery_date,
acceptance_date,
settlement_date,
applicant_id,
applicant_name,
apply_department,
purchase_leader_id,
purchase_leader_name,
purchase_department,
created_by,
updated_by
)
SELECT
{self._sql_quote(purchase_id)},
'设备采购',
'LSFX Mock P2',
'可疑采购基线',
'用于命中 SUPPLIER_CONCENTRATION 真实规则',
1,
188000.00,
186000.00,
186000.00,
186000.00,
186000.00,
'竞争性谈判',
{self._sql_quote(supplier_name)},
'联调联系人',
'13800000000',
'91330781P2PUR00011',
'6222000000001234',
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CURRENT_DATE,
CAST(s.staff_id AS CHAR),
s.name,
'纪检初核部',
NULL,
NULL,
NULL,
'admin',
'admin'
FROM ccdi_base_staff s
WHERE s.id_card = {self._sql_quote(staff_id_card)}
LIMIT 1;
"""
).strip(),
]
return sql
def _build_asset_mismatch_sql(
self,
rule_code: str,
family_id: str,
person_id: str,
) -> List[str]:
asset_name = self.ASSET_IDENTIFIERS[rule_code]
asset_main_type = "房产"
asset_sub_type = "商铺"
return [
dedent(
f"""
DELETE FROM ccdi_asset_info
WHERE asset_name = {self._sql_quote(asset_name)};
"""
).strip(),
dedent(
f"""
INSERT INTO ccdi_asset_info (
family_id,
person_id,
asset_main_type,
asset_sub_type,
asset_name,
ownership_ratio,
purchase_eval_date,
original_value,
current_value,
valuation_date,
asset_status,
remarks,
create_by,
update_by
)
VALUES (
{self._sql_quote(family_id)},
{self._sql_quote(person_id)},
{self._sql_quote(asset_main_type)},
{self._sql_quote(asset_sub_type)},
{self._sql_quote(asset_name)},
100.00,
CURRENT_DATE,
1880000.00,
1880000.00,
CURRENT_DATE,
'正常',
{self._sql_quote(f'用于命中 {rule_code} 真实规则的第二期基线')},
'admin',
'admin'
);
"""
).strip(),
]
def build_sql_plan(
self,
staff_id_card: str,
family_id_cards: List[str],
baseline_rule_codes: List[str],
) -> List[str]:
"""生成第二期基线 SQL 计划。"""
selected_rule_codes = []
for rule_code in baseline_rule_codes or []:
if rule_code not in selected_rule_codes:
selected_rule_codes.append(rule_code)
sql_plan: List[str] = []
asset_owner_ids = self._resolve_asset_owner_scopes(
staff_id_card,
family_id_cards or [],
)
for rule_code in selected_rule_codes:
if rule_code == "SUPPLIER_CONCENTRATION":
sql_plan.extend(self._build_supplier_concentration_sql(staff_id_card))
elif rule_code in self.ASSET_IDENTIFIERS:
family_id, person_id = asset_owner_ids[rule_code]
sql_plan.extend(
self._build_asset_mismatch_sql(
rule_code=rule_code,
family_id=family_id,
person_id=person_id,
)
)
return sql_plan
def apply(
self,
staff_id_card: str,
family_id_cards: List[str],
baseline_rule_codes: List[str],
) -> None:
"""按当前命中的规则将第二期基线幂等写入数据库。"""
sql_plan = self.build_sql_plan(staff_id_card, family_id_cards, baseline_rule_codes)
if not sql_plan:
return None
with self._connect() as connection:
try:
with connection.cursor() as cursor:
for sql in sql_plan:
cursor.execute(sql)
connection.commit()
except Exception:
connection.rollback()
raise
return None