from textwrap import dedent from typing import Dict, List, Tuple from config.settings import settings class Phase2BaselineService: """第二期数据库基线服务。 """ SUPPLIER_PURCHASE_ID = "LSFXMOCKP2PUR001" SUPPLIER_NAME = "兰溪市联调供应链有限公司" LOW_INCOME_RELATIVE_RULE_CODE = "LOW_INCOME_RELATIVE_LARGE_TRANSACTION" ASSET_IDENTIFIERS = { "HOUSE_REGISTRATION_MISMATCH": "LSFX Mock P2 HOUSE_REGISTRATION_MISMATCH", "PROPERTY_FEE_REGISTRATION_MISMATCH": "LSFX Mock P2 PROPERTY_FEE_REGISTRATION_MISMATCH", "TAX_ASSET_REGISTRATION_MISMATCH": "LSFX Mock P2 TAX_ASSET_REGISTRATION_MISMATCH", } def __init__(self): self.db_config = { "host": settings.CCDI_DB_HOST, "port": settings.CCDI_DB_PORT, "database": settings.CCDI_DB_NAME, "username": settings.CCDI_DB_USERNAME, "password": settings.CCDI_DB_PASSWORD, "connect_timeout_seconds": settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS, } @staticmethod def _sql_quote(value: str) -> str: return "'" + value.replace("'", "''") + "'" def _connect(self): try: import pymysql except ImportError as exc: raise RuntimeError("缺少 PyMySQL 依赖,无法写入第二期数据库基线") from exc return pymysql.connect( host=settings.CCDI_DB_HOST, port=settings.CCDI_DB_PORT, user=settings.CCDI_DB_USERNAME, password=settings.CCDI_DB_PASSWORD, database=settings.CCDI_DB_NAME, charset="utf8mb4", connect_timeout=settings.CCDI_DB_CONNECT_TIMEOUT_SECONDS, autocommit=False, ) def _resolve_asset_owner_scopes( self, staff_id_card: str, family_id_cards: List[str], ) -> Dict[str, Tuple[str, str]]: family_pool = [card for card in family_id_cards if card] first_family = family_pool[0] if family_pool else staff_id_card second_family = family_pool[1] if len(family_pool) > 1 else first_family return { "HOUSE_REGISTRATION_MISMATCH": (staff_id_card, staff_id_card), "PROPERTY_FEE_REGISTRATION_MISMATCH": (staff_id_card, first_family), "TAX_ASSET_REGISTRATION_MISMATCH": (staff_id_card, second_family), } def _build_supplier_concentration_sql(self, staff_id_card: str) -> List[str]: purchase_id = self.SUPPLIER_PURCHASE_ID supplier_name = self.SUPPLIER_NAME sql = [ dedent( f""" DELETE FROM ccdi_purchase_transaction WHERE purchase_id = {self._sql_quote(purchase_id)}; """ ).strip(), dedent( f""" INSERT INTO ccdi_purchase_transaction ( purchase_id, purchase_category, project_name, subject_name, subject_desc, purchase_qty, budget_amount, bid_amount, actual_amount, contract_amount, settlement_amount, purchase_method, supplier_name, contact_person, contact_phone, supplier_uscc, supplier_bank_account, apply_date, plan_approve_date, announce_date, bid_open_date, contract_sign_date, expected_delivery_date, actual_delivery_date, acceptance_date, settlement_date, applicant_id, applicant_name, apply_department, purchase_leader_id, purchase_leader_name, purchase_department, created_by, updated_by ) SELECT {self._sql_quote(purchase_id)}, '设备采购', 'LSFX Mock P2', '可疑采购基线', '用于命中 SUPPLIER_CONCENTRATION 真实规则', 1, 188000.00, 186000.00, 186000.00, 186000.00, 186000.00, '竞争性谈判', {self._sql_quote(supplier_name)}, '联调联系人', '13800000000', '91330781P2PUR00011', '6222000000001234', CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CURRENT_DATE, CAST(s.staff_id AS CHAR), s.name, '纪检初核部', NULL, NULL, NULL, 'admin', 'admin' FROM ccdi_base_staff s WHERE s.id_card = {self._sql_quote(staff_id_card)} LIMIT 1; """ ).strip(), ] return sql def _build_asset_mismatch_sql( self, rule_code: str, family_id: str, person_id: str, ) -> List[str]: asset_name = self.ASSET_IDENTIFIERS[rule_code] asset_main_type = "房产" asset_sub_type = "商铺" return [ dedent( f""" DELETE FROM ccdi_asset_info WHERE asset_name = {self._sql_quote(asset_name)}; """ ).strip(), dedent( f""" INSERT INTO ccdi_asset_info ( family_id, person_id, asset_main_type, asset_sub_type, asset_name, ownership_ratio, purchase_eval_date, original_value, current_value, valuation_date, asset_status, remarks, create_by, update_by ) VALUES ( {self._sql_quote(family_id)}, {self._sql_quote(person_id)}, {self._sql_quote(asset_main_type)}, {self._sql_quote(asset_sub_type)}, {self._sql_quote(asset_name)}, 100.00, CURRENT_DATE, 1880000.00, 1880000.00, CURRENT_DATE, '正常', {self._sql_quote(f'用于命中 {rule_code} 真实规则的第二期基线')}, 'admin', 'admin' ); """ ).strip(), ] def _build_low_income_family_baseline_sql( self, staff_id_card: str, family_id_cards: List[str], ) -> List[str]: target_family_id_card = next((card for card in family_id_cards if card), None) if not target_family_id_card: return [] return [ dedent( f""" INSERT INTO ccdi_staff_fmy_relation ( person_id, relation_type, relation_name, gender, relation_cert_type, relation_cert_no, relation_desc, status, effective_date, remark, data_source, is_emp_family, is_cust_family, created_by, updated_by, annual_income ) VALUES ( {self._sql_quote(staff_id_card)}, '父亲', 'LSFX低收入亲属', 'M', '身份证', {self._sql_quote(target_family_id_card)}, '用于命中 LOW_INCOME_RELATIVE_LARGE_TRANSACTION 真实规则', 1, NOW(), 'LSFX Mock 低收入亲属基线', 'SYSTEM', 1, 0, 'admin', 'admin', 0.00 ) ON DUPLICATE KEY UPDATE annual_income = 0.00, status = 1, updated_by = 'admin', update_time = CURRENT_TIMESTAMP; """ ).strip() ] def build_sql_plan( self, staff_id_card: str, family_id_cards: List[str], baseline_rule_codes: List[str], ) -> List[str]: """生成第二期基线 SQL 计划。""" selected_rule_codes = [] for rule_code in baseline_rule_codes or []: if rule_code not in selected_rule_codes: selected_rule_codes.append(rule_code) sql_plan: List[str] = [] asset_owner_ids = self._resolve_asset_owner_scopes( staff_id_card, family_id_cards or [], ) for rule_code in selected_rule_codes: if rule_code == "SUPPLIER_CONCENTRATION": sql_plan.extend(self._build_supplier_concentration_sql(staff_id_card)) elif rule_code == self.LOW_INCOME_RELATIVE_RULE_CODE: sql_plan.extend( self._build_low_income_family_baseline_sql( staff_id_card=staff_id_card, family_id_cards=family_id_cards or [], ) ) elif rule_code in self.ASSET_IDENTIFIERS: family_id, person_id = asset_owner_ids[rule_code] sql_plan.extend( self._build_asset_mismatch_sql( rule_code=rule_code, family_id=family_id, person_id=person_id, ) ) return sql_plan def apply( self, staff_id_card: str, family_id_cards: List[str], baseline_rule_codes: List[str], ) -> None: """按当前命中的规则将第二期基线幂等写入数据库。""" sql_plan = self.build_sql_plan(staff_id_card, family_id_cards, baseline_rule_codes) if not sql_plan: return None with self._connect() as connection: try: with connection.cursor() as cursor: for sql in sql_plan: cursor.execute(sql) connection.commit() except Exception: connection.rollback() raise return None