新增征信解析字段配置与生成服务
This commit is contained in:
86
lsfx-mock-server/services/credit_payload_service.py
Normal file
86
lsfx-mock-server/services/credit_payload_service.py
Normal file
@@ -0,0 +1,86 @@
|
||||
import json
|
||||
import random
|
||||
from datetime import date, timedelta
|
||||
from pathlib import Path
|
||||
from typing import Dict, List
|
||||
|
||||
|
||||
class CreditPayloadService:
|
||||
"""根据征信字段 schema 生成稳定随机的 mock payload。"""
|
||||
|
||||
def __init__(self, schema_path: str):
|
||||
self.schema_path = schema_path
|
||||
self.schema = self._load_schema()
|
||||
|
||||
def generate_payload(self, model: str, h_type: str, filename: str) -> dict:
|
||||
rng = random.Random(self._build_seed(model, h_type, filename))
|
||||
payload = {
|
||||
"lx_header": {},
|
||||
"lx_debt": {},
|
||||
"lx_publictype": {},
|
||||
}
|
||||
|
||||
for item in self.schema:
|
||||
domain = item["domain"]
|
||||
field = item["field"]
|
||||
field_type = item["type"]
|
||||
payload[domain][field] = self._generate_value(field, field_type, item, rng)
|
||||
|
||||
return payload
|
||||
|
||||
def _load_schema(self) -> List[dict]:
|
||||
schema_file = Path(self.schema_path)
|
||||
if not schema_file.is_absolute():
|
||||
schema_file = Path(__file__).resolve().parent.parent / schema_file
|
||||
return json.loads(schema_file.read_text(encoding="utf-8"))
|
||||
|
||||
@staticmethod
|
||||
def _build_seed(model: str, h_type: str, filename: str) -> str:
|
||||
return f"{model}|{h_type}|{filename}"
|
||||
|
||||
def _generate_value(
|
||||
self,
|
||||
field: str,
|
||||
field_type: str,
|
||||
item: dict,
|
||||
rng: random.Random,
|
||||
) -> str:
|
||||
if field_type == "string":
|
||||
return self._generate_string(field, rng)
|
||||
if field_type == "amount":
|
||||
return f"{rng.uniform(0, 500000):.2f}"
|
||||
if field_type == "count":
|
||||
return str(rng.randint(0, 20))
|
||||
if field_type == "status":
|
||||
return rng.choice(item["options"])
|
||||
raise ValueError(f"Unsupported field type: {field_type}")
|
||||
|
||||
def _generate_string(self, field: str, rng: random.Random) -> str:
|
||||
if field == "query_cert_no":
|
||||
return self._generate_cert_no(rng)
|
||||
if field == "query_cust_name":
|
||||
return self._generate_name(rng)
|
||||
if field == "report_time":
|
||||
return self._generate_report_date(rng)
|
||||
return f"mock_{rng.randint(1000, 9999)}"
|
||||
|
||||
@staticmethod
|
||||
def _generate_cert_no(rng: random.Random) -> str:
|
||||
area_code = "330781"
|
||||
start_date = date(1980, 1, 1)
|
||||
birthday = start_date + timedelta(days=rng.randint(0, 14000))
|
||||
sequence = f"{rng.randint(100, 999)}"
|
||||
check_code = rng.choice("0123456789X")
|
||||
return f"{area_code}{birthday.strftime('%Y%m%d')}{sequence}{check_code}"
|
||||
|
||||
@staticmethod
|
||||
def _generate_name(rng: random.Random) -> str:
|
||||
surnames = ["张", "王", "李", "赵", "陈", "刘", "周", "吴"]
|
||||
given_names = ["伟", "芳", "娜", "敏", "静", "磊", "洋", "婷", "超", "洁"]
|
||||
return f"{rng.choice(surnames)}{rng.choice(given_names)}{rng.choice(given_names)}"
|
||||
|
||||
@staticmethod
|
||||
def _generate_report_date(rng: random.Random) -> str:
|
||||
base_date = date(2024, 1, 1)
|
||||
report_date = base_date + timedelta(days=rng.randint(0, 365))
|
||||
return report_date.strftime("%Y-%m-%d")
|
||||
Reference in New Issue
Block a user