Files
ccdi/lsfx-mock-server/services/file_service.py

890 lines
34 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from fastapi import BackgroundTasks, UploadFile
from utils.response_builder import ResponseBuilder
from config.settings import settings
from services.phase2_baseline_service import Phase2BaselineService
from services.staff_identity_repository import StaffIdentityRepository
from typing import Dict, List, Union
from dataclasses import dataclass, field
import time
from datetime import datetime, timedelta
import random
import uuid
LARGE_TRANSACTION_RULE_CODES = [
"HOUSE_OR_CAR_EXPENSE",
"TAX_EXPENSE",
"SINGLE_LARGE_INCOME",
"CUMULATIVE_INCOME",
"ANNUAL_TURNOVER",
"LARGE_CASH_DEPOSIT",
"FREQUENT_CASH_DEPOSIT",
"LARGE_TRANSFER",
]
PHASE1_RULE_CODES = [
"GAMBLING_SENSITIVE_KEYWORD",
"SPECIAL_AMOUNT_TRANSACTION",
"SUSPICIOUS_INCOME_KEYWORD",
"FOREX_BUY_AMT",
"FOREX_SELL_AMT",
"STOCK_TFR_LARGE",
"LARGE_STOCK_TRADING",
"WITHDRAW_CNT",
]
PHASE2_STATEMENT_RULE_CODES = [
"LOW_INCOME_RELATIVE_LARGE_TRANSACTION",
"MULTI_PARTY_GAMBLING_TRANSFER",
"MONTHLY_FIXED_INCOME",
"FIXED_COUNTERPARTY_TRANSFER",
"SALARY_QUICK_TRANSFER",
"SALARY_UNUSED",
]
PHASE2_BASELINE_RULE_CODES = [
"HOUSE_REGISTRATION_MISMATCH",
"PROPERTY_FEE_REGISTRATION_MISMATCH",
"TAX_ASSET_REGISTRATION_MISMATCH",
"SUPPLIER_CONCENTRATION",
]
ABNORMAL_ACCOUNT_RULE_CODES = [
"SUDDEN_ACCOUNT_CLOSURE",
"DORMANT_ACCOUNT_LARGE_ACTIVATION",
]
MONTHLY_FIXED_INCOME_ISOLATED_LARGE_TRANSACTION_RULE_CODES = {
"SINGLE_LARGE_INCOME",
"CUMULATIVE_INCOME",
"ANNUAL_TURNOVER",
"LARGE_CASH_DEPOSIT",
"FREQUENT_CASH_DEPOSIT",
}
MONTHLY_FIXED_INCOME_ISOLATED_PHASE1_RULE_CODES = {
"SUSPICIOUS_INCOME_KEYWORD",
"FOREX_SELL_AMT",
}
MONTHLY_FIXED_INCOME_ISOLATED_PHASE2_RULE_CODES = {
"FIXED_COUNTERPARTY_TRANSFER",
}
RULE_CONFLICT_GROUPS = []
ALL_MODE_STATEMENT_BASELINE_RULE_CODES = {
"LOW_INCOME_RELATIVE_LARGE_TRANSACTION",
}
@dataclass
class FileRecord:
"""文件记录模型(扩展版)"""
# 原有字段
log_id: int
group_id: int
file_name: str
status: int = -5 # -5 表示解析成功
upload_status_desc: str = "data.wait.confirm.newaccount"
parsing: bool = True # True表示正在解析
# 新增字段 - 账号和主体信息
primary_enterprise_name: str = ""
primary_account_no: str = ""
account_no_list: List[str] = field(default_factory=list)
enterprise_name_list: List[str] = field(default_factory=list)
# 新增字段 - 银行和模板信息
bank_name: str = "ZJRCU"
real_bank_name: str = "ZJRCU"
template_name: str = "ZJRCU_T251114"
data_type_info: List[str] = field(default_factory=lambda: ["CSV", ","])
# 新增字段 - 文件元数据
file_size: int = 50000
download_file_name: str = ""
file_package_id: str = field(default_factory=lambda: str(uuid.uuid4()).replace('-', ''))
# 新增字段 - 上传用户信息
file_upload_by: int = 448
file_upload_by_user_name: str = "admin@support.com"
file_upload_time: str = field(default_factory=lambda: datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
# 新增字段 - 法律实体信息
le_id: int = 10000
login_le_id: int = 10000
log_type: str = "bankstatement"
log_meta: str = "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}"
lost_header: List[str] = field(default_factory=list)
# 新增字段 - 记录统计
rows: int = 0
source: str = "http"
total_records: int = 150
is_split: int = 0
# 新增字段 - 交易日期范围
trx_date_start_id: int = 20240101
trx_date_end_id: int = 20241231
# 新增字段 - 身份绑定
staff_name: str = ""
staff_id_card: str = ""
family_id_cards: List[str] = field(default_factory=list)
large_transaction_hit_rules: List[str] = field(default_factory=list)
phase1_hit_rules: List[str] = field(default_factory=list)
phase2_statement_hit_rules: List[str] = field(default_factory=list)
phase2_baseline_hit_rules: List[str] = field(default_factory=list)
abnormal_account_hit_rules: List[str] = field(default_factory=list)
abnormal_accounts: List[dict] = field(default_factory=list)
class _NoopAbnormalAccountBaselineService:
def apply(self, staff_id_card: str, abnormal_accounts: List[dict]) -> None:
return None
class FileService:
"""文件上传和解析服务"""
INNER_FLOW_TOTAL_RECORDS = 200
LOG_ID_MIN = settings.INITIAL_LOG_ID
LOG_ID_MAX = 99999
def __init__(
self,
staff_identity_repository=None,
phase2_baseline_service=None,
abnormal_account_baseline_service=None,
):
self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord
self.log_counter = settings.INITIAL_LOG_ID
self.staff_identity_repository = staff_identity_repository or StaffIdentityRepository()
self.phase2_baseline_service = phase2_baseline_service or Phase2BaselineService()
self.abnormal_account_baseline_service = (
abnormal_account_baseline_service or _NoopAbnormalAccountBaselineService()
)
def get_file_record(self, log_id: int) -> FileRecord:
"""按 logId 获取已存在的文件记录。"""
return self.file_records.get(log_id)
def _generate_log_id(self) -> int:
"""生成当前进程内未占用的随机 logId。"""
available_capacity = self.LOG_ID_MAX - self.LOG_ID_MIN + 1
if len(self.file_records) >= available_capacity:
raise RuntimeError("可用 logId 已耗尽")
while True:
candidate = random.randint(self.LOG_ID_MIN, self.LOG_ID_MAX)
if candidate not in self.file_records:
self.log_counter = candidate
return candidate
def _infer_bank_name(self, filename: str) -> tuple:
"""根据文件名推断银行名称和模板名称"""
if "支付宝" in filename or "alipay" in filename.lower():
return "ALIPAY", "ALIPAY_T220708"
elif "绍兴银行" in filename or "BSX" in filename:
return "BSX", "BSX_T240925"
else:
return "ZJRCU", "ZJRCU_T251114"
def _generate_primary_binding(self) -> tuple:
"""生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{random.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _generate_primary_binding_from_rng(self, rng: random.Random) -> tuple:
"""使用局部随机源生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{rng.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _build_primary_binding_lists(
self, primary_enterprise_name: str, primary_account_no: str
) -> dict:
"""基于主绑定事实源构建列表字段。"""
return {
"accountNoList": [primary_account_no],
"enterpriseNameList": [primary_enterprise_name],
}
def _pick_rule_subset(
self,
rng: random.Random,
rule_codes: List[str],
min_count: int,
max_count: int,
) -> List[str]:
"""按固定随机源选择稳定规则子集,并保留规则池原始顺序。"""
selected_codes = set(rng.sample(rule_codes, rng.randint(min_count, max_count)))
return [rule_code for rule_code in rule_codes if rule_code in selected_codes]
def _build_subset_rule_hit_plan(self, log_id: int) -> dict:
"""基于 logId 生成稳定的规则子集命中计划。"""
rng = random.Random(f"rule-plan:{log_id}")
return {
"large_transaction_hit_rules": self._pick_rule_subset(
rng, LARGE_TRANSACTION_RULE_CODES, 2, 4
),
"phase1_hit_rules": self._pick_rule_subset(rng, PHASE1_RULE_CODES, 2, 4),
"phase2_statement_hit_rules": self._pick_rule_subset(
rng, PHASE2_STATEMENT_RULE_CODES, 2, 4
),
"phase2_baseline_hit_rules": self._pick_rule_subset(
rng, PHASE2_BASELINE_RULE_CODES, 2, 4
),
"abnormal_account_hit_rules": self._pick_rule_subset(
rng, ABNORMAL_ACCOUNT_RULE_CODES, 1, len(ABNORMAL_ACCOUNT_RULE_CODES)
),
}
def _build_all_compatible_rule_hit_plan(self) -> dict:
"""生成全部兼容规则命中计划。"""
return {
"large_transaction_hit_rules": list(LARGE_TRANSACTION_RULE_CODES),
"phase1_hit_rules": list(PHASE1_RULE_CODES),
"phase2_statement_hit_rules": list(PHASE2_STATEMENT_RULE_CODES),
"phase2_baseline_hit_rules": list(PHASE2_BASELINE_RULE_CODES),
"abnormal_account_hit_rules": list(ABNORMAL_ACCOUNT_RULE_CODES),
}
def _build_monthly_fixed_income_isolated_rule_hit_plan(self) -> dict:
"""为月固定收入准备不受正向流入污染的 all 模式计划。"""
full_plan = self._build_all_compatible_rule_hit_plan()
return {
"large_transaction_hit_rules": [
rule_code
for rule_code in full_plan["large_transaction_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_LARGE_TRANSACTION_RULE_CODES
],
"phase1_hit_rules": [
rule_code
for rule_code in full_plan["phase1_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_PHASE1_RULE_CODES
],
"phase2_statement_hit_rules": [
rule_code
for rule_code in full_plan["phase2_statement_hit_rules"]
if rule_code not in MONTHLY_FIXED_INCOME_ISOLATED_PHASE2_RULE_CODES
],
"phase2_baseline_hit_rules": list(full_plan["phase2_baseline_hit_rules"]),
}
def _apply_conflict_groups(self, rule_plan: dict) -> dict:
"""按显式互斥组裁剪规则计划,同组仅保留固定优先级的首个规则。"""
resolved_plan = {plan_key: list(rule_codes) for plan_key, rule_codes in rule_plan.items()}
for plan_key, rule_codes in resolved_plan.items():
filtered_codes = list(rule_codes)
for conflict_group in RULE_CONFLICT_GROUPS:
kept_rule_code = next(
(rule_code for rule_code in conflict_group if rule_code in filtered_codes),
None,
)
if kept_rule_code is None:
continue
filtered_codes = [
rule_code
for rule_code in filtered_codes
if rule_code == kept_rule_code or rule_code not in conflict_group
]
resolved_plan[plan_key] = filtered_codes
return resolved_plan
def _build_rule_hit_plan(self, log_id: int) -> dict:
"""按配置模式生成规则命中计划。"""
if settings.RULE_HIT_MODE == "all":
return self._apply_conflict_groups(self._build_all_compatible_rule_hit_plan())
return self._build_subset_rule_hit_plan(log_id)
def _apply_rule_hit_plan_to_record(self, file_record: FileRecord, rule_hit_plan: dict) -> None:
"""将规则命中计划回填到指定文件记录。"""
file_record.large_transaction_hit_rules = list(
rule_hit_plan.get("large_transaction_hit_rules", [])
)
file_record.phase1_hit_rules = list(rule_hit_plan.get("phase1_hit_rules", []))
file_record.phase2_statement_hit_rules = list(
rule_hit_plan.get("phase2_statement_hit_rules", [])
)
file_record.phase2_baseline_hit_rules = list(
rule_hit_plan.get("phase2_baseline_hit_rules", [])
)
file_record.abnormal_account_hit_rules = list(
rule_hit_plan.get("abnormal_account_hit_rules", [])
)
file_record.abnormal_accounts = self._build_abnormal_accounts(
log_id=file_record.log_id,
staff_id_card=file_record.staff_id_card,
abnormal_account_hit_rules=file_record.abnormal_account_hit_rules,
)
def _build_abnormal_accounts(
self,
*,
log_id: int,
staff_id_card: str,
abnormal_account_hit_rules: List[str],
) -> List[dict]:
"""按命中规则生成最小异常账户事实。"""
if not abnormal_account_hit_rules:
return []
rng = random.Random(f"abnormal-account:{log_id}")
accounts = []
for index, rule_code in enumerate(abnormal_account_hit_rules, start=1):
account_no = f"622200{rng.randint(10**9, 10**10 - 1)}"
account_fact = {
"account_no": account_no,
"owner_id_card": staff_id_card,
"account_name": "测试员工工资卡",
"status": 1,
"effective_date": "2025-01-01",
"invalid_date": None,
"rule_code": rule_code,
}
if rule_code == "SUDDEN_ACCOUNT_CLOSURE":
account_fact["status"] = 2
account_fact["effective_date"] = "2024-01-01"
account_fact["invalid_date"] = "2026-03-20"
elif rule_code == "DORMANT_ACCOUNT_LARGE_ACTIVATION":
account_fact["status"] = 1
account_fact["effective_date"] = "2025-01-01"
account_fact["invalid_date"] = None
account_fact["account_no"] = f"{account_no[:-2]}{index:02d}"
accounts.append(account_fact)
return accounts
def _rebalance_all_mode_group_rule_plans(self, group_id: int) -> None:
"""同项目存在多文件时,隔离月固定收入样本,避免被其他正向流入规则污染。"""
if settings.RULE_HIT_MODE != "all":
return
group_records = [
record for record in self.file_records.values()
if record.group_id == group_id
]
if not group_records:
return
full_plan = self._apply_conflict_groups(self._build_all_compatible_rule_hit_plan())
if len(group_records) == 1:
return
monthly_safe_plan = self._apply_conflict_groups(
self._build_monthly_fixed_income_isolated_rule_hit_plan()
)
self._apply_rule_hit_plan_to_record(group_records[0], monthly_safe_plan)
for record in group_records[1:]:
self._apply_rule_hit_plan_to_record(record, full_plan)
def _create_file_record(
self,
*,
log_id: int,
group_id: int,
file_name: str,
download_file_name: str,
bank_name: str,
template_name: str,
primary_enterprise_name: str,
primary_account_no: str,
file_size: int,
total_records: int,
trx_date_start_id: int,
trx_date_end_id: int,
le_id: int,
login_le_id: int,
staff_name: str = "",
staff_id_card: str = "",
family_id_cards: List[str] = None,
large_transaction_hit_rules: List[str] = None,
phase1_hit_rules: List[str] = None,
phase2_statement_hit_rules: List[str] = None,
phase2_baseline_hit_rules: List[str] = None,
abnormal_account_hit_rules: List[str] = None,
abnormal_accounts: List[dict] = None,
parsing: bool = True,
status: int = -5,
) -> FileRecord:
"""创建文件记录并写入主绑定信息。"""
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name,
primary_account_no,
)
return FileRecord(
log_id=log_id,
group_id=group_id,
file_name=file_name,
download_file_name=download_file_name,
bank_name=bank_name,
real_bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
account_no_list=binding_lists["accountNoList"],
enterprise_name_list=binding_lists["enterpriseNameList"],
le_id=le_id,
login_le_id=login_le_id,
file_size=file_size,
total_records=total_records,
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
staff_name=staff_name,
staff_id_card=staff_id_card,
family_id_cards=list(family_id_cards or []),
large_transaction_hit_rules=list(large_transaction_hit_rules or []),
phase1_hit_rules=list(phase1_hit_rules or []),
phase2_statement_hit_rules=list(phase2_statement_hit_rules or []),
phase2_baseline_hit_rules=list(phase2_baseline_hit_rules or []),
abnormal_account_hit_rules=list(abnormal_account_hit_rules or []),
abnormal_accounts=[dict(account) for account in (abnormal_accounts or [])],
parsing=parsing,
status=status,
)
def _select_staff_identity_scope(self) -> dict:
"""读取一个员工及其亲属身份范围。"""
return self.staff_identity_repository.select_random_staff_with_families()
def _apply_phase2_baselines(self, file_record: FileRecord) -> None:
"""按当前记录命中的第二期基线规则幂等补齐外部事实。"""
baseline_rule_codes = list(file_record.phase2_baseline_hit_rules)
if settings.RULE_HIT_MODE == "all":
for rule_code in file_record.phase2_statement_hit_rules:
if (
rule_code in ALL_MODE_STATEMENT_BASELINE_RULE_CODES
and rule_code not in baseline_rule_codes
):
baseline_rule_codes.append(rule_code)
self.phase2_baseline_service.apply(
staff_id_card=file_record.staff_id_card,
family_id_cards=file_record.family_id_cards,
baseline_rule_codes=baseline_rule_codes,
)
def _apply_abnormal_account_baselines(self, file_record: FileRecord) -> None:
"""按当前记录命中的异常账户规则幂等补齐账户事实。"""
if not file_record.abnormal_account_hit_rules:
return
if not file_record.abnormal_accounts:
raise RuntimeError("异常账户命中计划存在,但未生成账户事实")
self.abnormal_account_baseline_service.apply(
staff_id_card=file_record.staff_id_card,
abnormal_accounts=file_record.abnormal_accounts,
)
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一 logId
log_id = self._generate_log_id()
# 推断银行信息
bank_name, template_name = self._infer_bank_name(file.filename)
rule_hit_plan = self._build_rule_hit_plan(log_id)
# 生成合理的交易日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=random.randint(90, 365))
trx_date_start_id = int(start_date.strftime("%Y%m%d"))
trx_date_end_id = int(end_date.strftime("%Y%m%d"))
# 生成单一主绑定
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
# 创建完整的文件记录
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=file.filename,
download_file_name=file.filename,
bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(150, 200),
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
large_transaction_hit_rules=rule_hit_plan.get("large_transaction_hit_rules", []),
phase1_hit_rules=rule_hit_plan.get("phase1_hit_rules", []),
phase2_statement_hit_rules=rule_hit_plan.get("phase2_statement_hit_rules", []),
phase2_baseline_hit_rules=rule_hit_plan.get("phase2_baseline_hit_rules", []),
abnormal_account_hit_rules=rule_hit_plan.get("abnormal_account_hit_rules", []),
abnormal_accounts=self._build_abnormal_accounts(
log_id=log_id,
staff_id_card=identity_scope["staff_id_card"],
abnormal_account_hit_rules=rule_hit_plan.get("abnormal_account_hit_rules", []),
),
)
self._apply_abnormal_account_baselines(file_record)
self.file_records[log_id] = file_record
self._rebalance_all_mode_group_rule_plans(group_id)
self._apply_phase2_baselines(file_record)
# 添加后台任务(延迟解析)
background_tasks.add_task(self._delayed_parse, log_id)
# 构建响应
return self._build_upload_response(file_record)
def _build_upload_response(self, file_record: FileRecord) -> dict:
"""构建上传接口的完整响应"""
return {
"code": "200",
"data": {
"accountsOfLog": {
str(file_record.log_id): [
{
"bank": file_record.bank_name,
"accountName": file_record.primary_enterprise_name,
"accountNo": file_record.primary_account_no,
"currency": "CNY"
}
]
},
"uploadLogList": [
{
**self._build_primary_binding_lists(
file_record.primary_enterprise_name,
file_record.primary_account_no,
),
"bankName": file_record.bank_name,
"dataTypeInfo": file_record.data_type_info,
"downloadFileName": file_record.download_file_name,
"filePackageId": file_record.file_package_id,
"fileSize": file_record.file_size,
"fileUploadBy": file_record.file_upload_by,
"fileUploadByUserName": file_record.file_upload_by_user_name,
"fileUploadTime": file_record.file_upload_time,
"leId": file_record.le_id,
"logId": file_record.log_id,
"logMeta": file_record.log_meta,
"logType": file_record.log_type,
"loginLeId": file_record.login_le_id,
"lostHeader": file_record.lost_header,
"realBankName": file_record.real_bank_name,
"rows": file_record.rows,
"source": file_record.source,
"status": file_record.status,
"templateName": file_record.template_name,
"totalRecords": file_record.total_records,
"trxDateEndId": file_record.trx_date_end_id,
"trxDateStartId": file_record.trx_date_start_id,
"uploadFileName": file_record.file_name,
"uploadStatusDesc": file_record.upload_status_desc
}
],
"uploadStatus": 1
},
"status": "200",
"successResponse": True
}
def _delayed_parse(self, log_id: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
"""
time.sleep(settings.PARSE_DELAY_SECONDS)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id].parsing = False
def _generate_deterministic_record(
self, log_id: int, group_id: int, rng: random.Random
) -> dict:
"""
基于 logId 生成确定性的文件记录
Args:
log_id: 文件ID用作随机种子
group_id: 项目ID
Returns:
文件记录字典26个字段
"""
# 银行类型选项
bank_options = [
("ALIPAY", "ALIPAY_T220708"),
("BSX", "BSX_T240925"),
("ZJRCU", "ZJRCU_T251114")
]
bank_name, template_name = rng.choice(bank_options)
# 生成基于种子的稳定时间范围,确保同一 logId 重复查询完全一致
base_datetime = datetime(2024, 1, 1, 8, 0, 0)
end_date = base_datetime + timedelta(days=rng.randint(180, 540))
start_date = end_date - timedelta(days=rng.randint(90, 365))
file_upload_time = (
base_datetime
+ timedelta(
days=rng.randint(0, 540),
hours=rng.randint(0, 23),
minutes=rng.randint(0, 59),
seconds=rng.randint(0, 59),
)
)
# 生成账号和主体
primary_enterprise_name, primary_account_no = self._generate_primary_binding_from_rng(rng)
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name, primary_account_no
)
return {
**binding_lists,
"bankName": bank_name,
"dataTypeInfo": ["CSV", ","],
"downloadFileName": f"测试文件_{log_id}.csv",
"fileSize": rng.randint(10000, 100000),
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": file_upload_time.strftime("%Y-%m-%d %H:%M:%S"),
"isSplit": 0,
"leId": 10000 + rng.randint(0, 9999),
"logId": log_id,
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
"logType": "bankstatement",
"loginLeId": 10000 + rng.randint(0, 9999),
"lostHeader": [],
"realBankName": bank_name,
"rows": 0,
"source": "http",
"status": -5,
"templateName": template_name,
"totalRecords": rng.randint(100, 300),
"trxDateEndId": int(end_date.strftime("%Y%m%d")),
"trxDateStartId": int(start_date.strftime("%Y%m%d")),
"uploadFileName": f"测试文件_{log_id}.pdf",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
def _build_deterministic_log_detail(self, log_id: int, group_id: int) -> dict:
"""构建 deterministic 回退的单条日志详情。"""
rng = random.Random(log_id)
return self._generate_deterministic_record(log_id, group_id, rng)
def _build_log_detail(self, record: FileRecord) -> dict:
"""构建日志详情对象"""
return {
**self._build_primary_binding_lists(
record.primary_enterprise_name,
record.primary_account_no,
),
"bankName": record.bank_name,
"dataTypeInfo": record.data_type_info,
"downloadFileName": record.download_file_name,
"fileSize": record.file_size,
"fileUploadBy": record.file_upload_by,
"fileUploadByUserName": record.file_upload_by_user_name,
"fileUploadTime": record.file_upload_time,
"isSplit": record.is_split,
"leId": record.le_id,
"logId": record.log_id,
"logMeta": record.log_meta,
"logType": record.log_type,
"loginLeId": record.login_le_id,
"lostHeader": record.lost_header,
"realBankName": record.real_bank_name,
"rows": record.rows,
"source": record.source,
"status": record.status,
"templateName": record.template_name,
"totalRecords": record.total_records,
"trxDateEndId": record.trx_date_end_id,
"trxDateStartId": record.trx_date_start_id,
"uploadFileName": record.file_name,
"uploadStatusDesc": record.upload_status_desc
}
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
pending_list = []
all_parsing_complete = True
for log_id in log_ids:
if log_id in self.file_records:
record = self.file_records[log_id]
if record.parsing:
all_parsing_complete = False
pending_list.append(self._build_log_detail(record))
return {
"code": "200",
"data": {
"parsing": not all_parsing_complete,
"pendingList": pending_list
},
"status": "200",
"successResponse": True
}
def get_upload_status(self, group_id: int, log_id: int = None) -> dict:
"""
获取文件上传状态(基于 logId 生成确定性数据)
Args:
group_id: 项目ID
log_id: 文件ID可选
Returns:
上传状态响应字典
"""
logs = []
if log_id is not None:
if log_id in self.file_records:
log_detail = self._build_log_detail(self.file_records[log_id])
else:
log_detail = self._build_deterministic_log_detail(log_id, group_id)
logs.append(log_detail)
# 返回响应
return {
"code": "200",
"data": {
"logs": logs,
"status": "",
"accountId": 8954,
"currency": "CNY"
},
"status": "200",
"successResponse": True
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
deleted_count = 0
for log_id in log_ids:
if log_id in self.file_records:
del self.file_records[log_id]
deleted_count += 1
return {
"code": "200 OK", # 注意:这里是 "200 OK" 不是 "200"
"data": {
"message": "delete.files.success"
},
"message": "delete.files.success",
"status": "200",
"successResponse": True
}
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(创建并保存绑定记录)
Args:
request: 拉取流水请求(可以是字典或对象)
Returns:
流水响应字典包含创建并保存的logId数组
"""
# 支持 dict 或对象
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
customer_no = request.get("customerNo", "")
data_start_date_id = request.get("dataStartDateId", 20240101)
data_end_date_id = request.get("dataEndDateId", 20241231)
else:
group_id = request.groupId
customer_no = request.customerNo
data_start_date_id = request.dataStartDateId
data_end_date_id = request.dataEndDateId
# 使用随机 logId确保与上传链路一致且不覆盖现有记录
log_id = self._generate_log_id()
rule_hit_plan = self._build_rule_hit_plan(log_id)
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
identity_scope = self._select_staff_identity_scope()
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
download_file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
bank_name="ZJRCU",
template_name="ZJRCU_T251114",
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=self.INNER_FLOW_TOTAL_RECORDS,
trx_date_start_id=data_start_date_id,
trx_date_end_id=data_end_date_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
staff_name=identity_scope["staff_name"],
staff_id_card=identity_scope["staff_id_card"],
family_id_cards=identity_scope["family_id_cards"],
large_transaction_hit_rules=rule_hit_plan.get("large_transaction_hit_rules", []),
phase1_hit_rules=rule_hit_plan.get("phase1_hit_rules", []),
phase2_statement_hit_rules=rule_hit_plan.get("phase2_statement_hit_rules", []),
phase2_baseline_hit_rules=rule_hit_plan.get("phase2_baseline_hit_rules", []),
abnormal_account_hit_rules=rule_hit_plan.get("abnormal_account_hit_rules", []),
abnormal_accounts=self._build_abnormal_accounts(
log_id=log_id,
staff_id_card=identity_scope["staff_id_card"],
abnormal_account_hit_rules=rule_hit_plan.get("abnormal_account_hit_rules", []),
),
parsing=False,
)
self._apply_abnormal_account_baselines(file_record)
self.file_records[log_id] = file_record
self._rebalance_all_mode_group_rule_plans(group_id)
self._apply_phase2_baselines(file_record)
# 返回成功的响应包含logId数组
return {
"code": "200",
"data": [log_id],
"status": "200",
"successResponse": True,
}