Files
ccdi/lsfx-mock-server/services/credit_payload_service.py

113 lines
3.8 KiB
Python

import json
import random
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List, Optional
class CreditPayloadService:
"""根据征信字段 schema 生成稳定随机的 mock payload。"""
def __init__(self, schema_path: str):
self.schema_path = schema_path
self.schema = self._load_schema()
def generate_payload(
self,
model: str,
h_type: str,
filename: str,
subject_identity: Optional[Dict[str, str]] = None,
) -> dict:
identity = subject_identity or {}
rng = random.Random(self._build_seed(model, h_type, filename, identity))
payload = {
"lx_header": {},
"lx_debt": {},
"lx_publictype": {},
}
for item in self.schema:
domain = item["domain"]
field = item["field"]
field_type = item["type"]
payload[domain][field] = self._generate_value(field, field_type, item, rng, identity)
return payload
def _load_schema(self) -> List[dict]:
schema_file = Path(self.schema_path)
if not schema_file.is_absolute():
schema_file = Path(__file__).resolve().parent.parent / schema_file
return json.loads(schema_file.read_text(encoding="utf-8"))
@staticmethod
def _build_seed(
model: str,
h_type: str,
filename: str,
subject_identity: Dict[str, str],
) -> str:
return (
f"{model}|{h_type}|{filename}|"
f"{subject_identity.get('staff_name', '')}|"
f"{subject_identity.get('staff_id_card', '')}"
)
def _generate_value(
self,
field: str,
field_type: str,
item: dict,
rng: random.Random,
subject_identity: Dict[str, str],
) -> str:
if field_type == "string":
return self._generate_string(field, rng, subject_identity)
if field_type == "amount":
return f"{rng.uniform(0, 500000):.2f}"
if field_type == "count":
return str(rng.randint(0, 20))
if field_type == "status":
return rng.choice(item["options"])
raise ValueError(f"Unsupported field type: {field_type}")
def _generate_string(
self,
field: str,
rng: random.Random,
subject_identity: Dict[str, str],
) -> str:
if field == "query_cert_no" and subject_identity.get("staff_id_card"):
return subject_identity["staff_id_card"]
if field == "query_cust_name" and subject_identity.get("staff_name"):
return subject_identity["staff_name"]
if field == "query_cert_no":
return self._generate_cert_no(rng)
if field == "query_cust_name":
return self._generate_name(rng)
if field == "report_time":
return self._generate_report_date(rng)
return f"mock_{rng.randint(1000, 9999)}"
@staticmethod
def _generate_cert_no(rng: random.Random) -> str:
area_code = "330781"
start_date = date(1980, 1, 1)
birthday = start_date + timedelta(days=rng.randint(0, 14000))
sequence = f"{rng.randint(100, 999)}"
check_code = rng.choice("0123456789X")
return f"{area_code}{birthday.strftime('%Y%m%d')}{sequence}{check_code}"
@staticmethod
def _generate_name(rng: random.Random) -> str:
surnames = ["", "", "", "", "", "", "", ""]
given_names = ["", "", "", "", "", "", "", "", "", ""]
return f"{rng.choice(surnames)}{rng.choice(given_names)}{rng.choice(given_names)}"
@staticmethod
def _generate_report_date(rng: random.Random) -> str:
base_date = date(2024, 1, 1)
report_date = base_date + timedelta(days=rng.randint(0, 365))
return report_date.strftime("%Y-%m-%d")