import json import random from datetime import date, timedelta from pathlib import Path from typing import Dict, List, Optional class CreditPayloadService: """根据征信字段 schema 生成稳定随机的 mock payload。""" def __init__(self, schema_path: str): self.schema_path = schema_path self.schema = self._load_schema() def generate_payload( self, model: str, h_type: str, filename: str, subject_identity: Optional[Dict[str, str]] = None, ) -> dict: identity = subject_identity or {} rng = random.Random(self._build_seed(model, h_type, filename, identity)) payload = { "lx_header": {}, "lx_debt": {}, "lx_publictype": {}, } for item in self.schema: domain = item["domain"] field = item["field"] field_type = item["type"] payload[domain][field] = self._generate_value(field, field_type, item, rng, identity) return payload def _load_schema(self) -> List[dict]: schema_file = Path(self.schema_path) if not schema_file.is_absolute(): schema_file = Path(__file__).resolve().parent.parent / schema_file return json.loads(schema_file.read_text(encoding="utf-8")) @staticmethod def _build_seed( model: str, h_type: str, filename: str, subject_identity: Dict[str, str], ) -> str: return ( f"{model}|{h_type}|{filename}|" f"{subject_identity.get('staff_name', '')}|" f"{subject_identity.get('staff_id_card', '')}" ) def _generate_value( self, field: str, field_type: str, item: dict, rng: random.Random, subject_identity: Dict[str, str], ) -> str: if field_type == "string": return self._generate_string(field, rng, subject_identity) if field_type == "amount": return f"{rng.uniform(0, 500000):.2f}" if field_type == "count": return str(rng.randint(0, 20)) if field_type == "status": return rng.choice(item["options"]) raise ValueError(f"Unsupported field type: {field_type}") def _generate_string( self, field: str, rng: random.Random, subject_identity: Dict[str, str], ) -> str: if field == "query_cert_no" and subject_identity.get("staff_id_card"): return subject_identity["staff_id_card"] if field == "query_cust_name" and subject_identity.get("staff_name"): return subject_identity["staff_name"] if field == "query_cert_no": return self._generate_cert_no(rng) if field == "query_cust_name": return self._generate_name(rng) if field == "report_time": return self._generate_report_date(rng) return f"mock_{rng.randint(1000, 9999)}" @staticmethod def _generate_cert_no(rng: random.Random) -> str: area_code = "330781" start_date = date(1980, 1, 1) birthday = start_date + timedelta(days=rng.randint(0, 14000)) sequence = f"{rng.randint(100, 999)}" check_code = rng.choice("0123456789X") return f"{area_code}{birthday.strftime('%Y%m%d')}{sequence}{check_code}" @staticmethod def _generate_name(rng: random.Random) -> str: surnames = ["张", "王", "李", "赵", "陈", "刘", "周", "吴"] given_names = ["伟", "芳", "娜", "敏", "静", "磊", "洋", "婷", "超", "洁"] return f"{rng.choice(surnames)}{rng.choice(given_names)}{rng.choice(given_names)}" @staticmethod def _generate_report_date(rng: random.Random) -> str: base_date = date(2024, 1, 1) report_date = base_date + timedelta(days=rng.randint(0, 365)) return report_date.strftime("%Y-%m-%d")