113 lines
3.8 KiB
Python
113 lines
3.8 KiB
Python
import json
|
|
import random
|
|
from datetime import date, timedelta
|
|
from pathlib import Path
|
|
from typing import Dict, List, Optional
|
|
|
|
|
|
class CreditPayloadService:
|
|
"""根据征信字段 schema 生成稳定随机的 mock payload。"""
|
|
|
|
def __init__(self, schema_path: str):
|
|
self.schema_path = schema_path
|
|
self.schema = self._load_schema()
|
|
|
|
def generate_payload(
|
|
self,
|
|
model: str,
|
|
h_type: str,
|
|
filename: str,
|
|
subject_identity: Optional[Dict[str, str]] = None,
|
|
) -> dict:
|
|
identity = subject_identity or {}
|
|
rng = random.Random(self._build_seed(model, h_type, filename, identity))
|
|
payload = {
|
|
"lx_header": {},
|
|
"lx_debt": {},
|
|
"lx_publictype": {},
|
|
}
|
|
|
|
for item in self.schema:
|
|
domain = item["domain"]
|
|
field = item["field"]
|
|
field_type = item["type"]
|
|
payload[domain][field] = self._generate_value(field, field_type, item, rng, identity)
|
|
|
|
return payload
|
|
|
|
def _load_schema(self) -> List[dict]:
|
|
schema_file = Path(self.schema_path)
|
|
if not schema_file.is_absolute():
|
|
schema_file = Path(__file__).resolve().parent.parent / schema_file
|
|
return json.loads(schema_file.read_text(encoding="utf-8"))
|
|
|
|
@staticmethod
|
|
def _build_seed(
|
|
model: str,
|
|
h_type: str,
|
|
filename: str,
|
|
subject_identity: Dict[str, str],
|
|
) -> str:
|
|
return (
|
|
f"{model}|{h_type}|{filename}|"
|
|
f"{subject_identity.get('staff_name', '')}|"
|
|
f"{subject_identity.get('staff_id_card', '')}"
|
|
)
|
|
|
|
def _generate_value(
|
|
self,
|
|
field: str,
|
|
field_type: str,
|
|
item: dict,
|
|
rng: random.Random,
|
|
subject_identity: Dict[str, str],
|
|
) -> str:
|
|
if field_type == "string":
|
|
return self._generate_string(field, rng, subject_identity)
|
|
if field_type == "amount":
|
|
return f"{rng.uniform(0, 500000):.2f}"
|
|
if field_type == "count":
|
|
return str(rng.randint(0, 20))
|
|
if field_type == "status":
|
|
return rng.choice(item["options"])
|
|
raise ValueError(f"Unsupported field type: {field_type}")
|
|
|
|
def _generate_string(
|
|
self,
|
|
field: str,
|
|
rng: random.Random,
|
|
subject_identity: Dict[str, str],
|
|
) -> str:
|
|
if field == "query_cert_no" and subject_identity.get("staff_id_card"):
|
|
return subject_identity["staff_id_card"]
|
|
if field == "query_cust_name" and subject_identity.get("staff_name"):
|
|
return subject_identity["staff_name"]
|
|
if field == "query_cert_no":
|
|
return self._generate_cert_no(rng)
|
|
if field == "query_cust_name":
|
|
return self._generate_name(rng)
|
|
if field == "report_time":
|
|
return self._generate_report_date(rng)
|
|
return f"mock_{rng.randint(1000, 9999)}"
|
|
|
|
@staticmethod
|
|
def _generate_cert_no(rng: random.Random) -> str:
|
|
area_code = "330781"
|
|
start_date = date(1980, 1, 1)
|
|
birthday = start_date + timedelta(days=rng.randint(0, 14000))
|
|
sequence = f"{rng.randint(100, 999)}"
|
|
check_code = rng.choice("0123456789X")
|
|
return f"{area_code}{birthday.strftime('%Y%m%d')}{sequence}{check_code}"
|
|
|
|
@staticmethod
|
|
def _generate_name(rng: random.Random) -> str:
|
|
surnames = ["张", "王", "李", "赵", "陈", "刘", "周", "吴"]
|
|
given_names = ["伟", "芳", "娜", "敏", "静", "磊", "洋", "婷", "超", "洁"]
|
|
return f"{rng.choice(surnames)}{rng.choice(given_names)}{rng.choice(given_names)}"
|
|
|
|
@staticmethod
|
|
def _generate_report_date(rng: random.Random) -> str:
|
|
base_date = date(2024, 1, 1)
|
|
report_date = base_date + timedelta(days=rng.randint(0, 365))
|
|
return report_date.strftime("%Y-%m-%d")
|