新增征信员工HTML样本并改造Mock解析

This commit is contained in:
wkc
2026-03-23 20:35:52 +08:00
parent 27aab7f9bc
commit 823e9f75a9
1016 changed files with 16400 additions and 9 deletions

View File

@@ -0,0 +1,37 @@
import html
import re
from typing import Dict, Union
class CreditHtmlIdentityService:
"""从征信样本 HTML 中提取员工姓名和身份证号。"""
_META_TEMPLATE = r'<meta\s+name="{name}"\s+content="(?P<value>[^"]+)"\s*/?>'
def extract_identity(self, html_content: Union[str, bytes]) -> Dict[str, str]:
text = self._normalize_html(html_content)
staff_name = self._extract_meta_content(text, "ccdi-staff-name")
staff_id_card = self._extract_meta_content(text, "ccdi-staff-id-card")
identity: Dict[str, str] = {}
if staff_name:
identity["staff_name"] = staff_name
if staff_id_card:
identity["staff_id_card"] = staff_id_card
return identity
def _extract_meta_content(self, text: str, meta_name: str) -> str:
matched = re.search(
self._META_TEMPLATE.format(name=re.escape(meta_name)),
text,
flags=re.IGNORECASE,
)
if not matched:
return ""
return html.unescape(matched.group("value")).strip()
@staticmethod
def _normalize_html(html_content: Union[str, bytes]) -> str:
if isinstance(html_content, bytes):
return html_content.decode("utf-8", errors="ignore")
return html_content

View File

@@ -2,7 +2,7 @@ import json
import random
from datetime import date, timedelta
from pathlib import Path
from typing import Dict, List
from typing import Dict, List, Optional
class CreditPayloadService:
@@ -12,8 +12,15 @@ class CreditPayloadService:
self.schema_path = schema_path
self.schema = self._load_schema()
def generate_payload(self, model: str, h_type: str, filename: str) -> dict:
rng = random.Random(self._build_seed(model, h_type, filename))
def generate_payload(
self,
model: str,
h_type: str,
filename: str,
subject_identity: Optional[Dict[str, str]] = None,
) -> dict:
identity = subject_identity or {}
rng = random.Random(self._build_seed(model, h_type, filename, identity))
payload = {
"lx_header": {},
"lx_debt": {},
@@ -24,7 +31,7 @@ class CreditPayloadService:
domain = item["domain"]
field = item["field"]
field_type = item["type"]
payload[domain][field] = self._generate_value(field, field_type, item, rng)
payload[domain][field] = self._generate_value(field, field_type, item, rng, identity)
return payload
@@ -35,8 +42,17 @@ class CreditPayloadService:
return json.loads(schema_file.read_text(encoding="utf-8"))
@staticmethod
def _build_seed(model: str, h_type: str, filename: str) -> str:
return f"{model}|{h_type}|{filename}"
def _build_seed(
model: str,
h_type: str,
filename: str,
subject_identity: Dict[str, str],
) -> str:
return (
f"{model}|{h_type}|{filename}|"
f"{subject_identity.get('staff_name', '')}|"
f"{subject_identity.get('staff_id_card', '')}"
)
def _generate_value(
self,
@@ -44,9 +60,10 @@ class CreditPayloadService:
field_type: str,
item: dict,
rng: random.Random,
subject_identity: Dict[str, str],
) -> str:
if field_type == "string":
return self._generate_string(field, rng)
return self._generate_string(field, rng, subject_identity)
if field_type == "amount":
return f"{rng.uniform(0, 500000):.2f}"
if field_type == "count":
@@ -55,7 +72,16 @@ class CreditPayloadService:
return rng.choice(item["options"])
raise ValueError(f"Unsupported field type: {field_type}")
def _generate_string(self, field: str, rng: random.Random) -> str:
def _generate_string(
self,
field: str,
rng: random.Random,
subject_identity: Dict[str, str],
) -> str:
if field == "query_cert_no" and subject_identity.get("staff_id_card"):
return subject_identity["staff_id_card"]
if field == "query_cust_name" and subject_identity.get("staff_name"):
return subject_identity["staff_name"]
if field == "query_cert_no":
return self._generate_cert_no(rng)
if field == "query_cust_name":

View File

@@ -0,0 +1,63 @@
import html
import re
from pathlib import Path
from typing import List
class StaffCreditHtmlExportService:
"""将员工姓名和身份证导出为征信解析样本 HTML。"""
def __init__(self, staff_identity_repository):
self.staff_identity_repository = staff_identity_repository
def export(self, output_dir: Path) -> List[Path]:
output_path = Path(output_dir)
output_path.mkdir(parents=True, exist_ok=True)
self._clear_existing_html(output_path)
staff_list = self.staff_identity_repository.select_active_staff_identities()
generated_files: List[Path] = []
for index, staff in enumerate(staff_list, start=1):
filename = self._build_filename(index, staff["staff_name"], staff["staff_id_card"])
file_path = output_path / filename
file_path.write_text(
self.build_html(staff["staff_name"], staff["staff_id_card"]),
encoding="utf-8",
)
generated_files.append(file_path)
return generated_files
def _clear_existing_html(self, output_dir: Path) -> None:
for html_file in output_dir.glob("*.html"):
html_file.unlink()
def _build_filename(self, index: int, staff_name: str, staff_id_card: str) -> str:
safe_name = self._sanitize_filename(staff_name)
return f"{index:04d}_{safe_name}_{staff_id_card[-4:]}.html"
@staticmethod
def _sanitize_filename(name: str) -> str:
sanitized = re.sub(r'[\\/:*?"<>|]+', "_", name).strip()
return sanitized or "unknown"
@staticmethod
def build_html(staff_name: str, staff_id_card: str) -> str:
escaped_name = html.escape(staff_name, quote=True)
escaped_id_card = html.escape(staff_id_card, quote=True)
return f"""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8" />
<meta name="ccdi-staff-name" content="{escaped_name}" />
<meta name="ccdi-staff-id-card" content="{escaped_id_card}" />
<title>征信解析员工样本</title>
</head>
<body>
<main>
<h1>征信解析员工样本</h1>
<p>姓名:{escaped_name}</p>
<p>身份证号:{escaped_id_card}</p>
</main>
</body>
</html>
"""

View File

@@ -85,3 +85,27 @@ class StaffIdentityRepository:
"staff_id_card": staff["staff_id_card"],
"family_id_cards": family_id_cards,
}
def select_active_staff_identities(self) -> List[Dict[str, str]]:
"""读取所有可用于征信样本导出的员工身份。"""
with self._connect() as connection:
with connection.cursor() as cursor:
cursor.execute(
"""
SELECT s.name AS staff_name, s.id_card AS staff_id_card
FROM ccdi_base_staff s
WHERE COALESCE(TRIM(s.name), '') <> ''
AND COALESCE(TRIM(s.id_card), '') <> ''
AND s.status = '0'
ORDER BY s.staff_id
"""
)
rows = cursor.fetchall()
return [
{
"staff_name": row["staff_name"],
"staff_id_card": row["staff_id_card"],
}
for row in rows
]