Merge branch 'codex/lsfx-logid-primary-binding' into dev

This commit is contained in:
wkc
2026-03-18 15:57:22 +08:00
10 changed files with 610 additions and 49 deletions

View File

@@ -8,6 +8,22 @@
**Tech Stack:** Python 3, FastAPI, pytest, httpx TestClient
## 执行结果
- 2026-03-18 已按计划完成 Task 1 至 Task 5功能实现、实施记录与最终验证均已落地。
- 代码提交顺序:
- `0120d09` `收敛Mock文件记录主体账号绑定模型`
- `6fb7287` `让拉取本行信息链路复用Mock主体账号绑定`
- `0a85c09` `统一Mock上传状态主体账号绑定优先级`
- `5195617` `让Mock流水查询复用logId主体账号绑定`
- 最终验证已通过:
- `python3 -m pytest tests/test_file_service.py -v`
- `python3 -m pytest tests/test_statement_service.py -v`
- `python3 -m pytest tests/test_api.py -v`
- `python3 -m pytest tests/integration/test_full_workflow.py -v`
- `python3 verify_implementation.py`
- 实施细节见 `docs/reports/implementation/2026-03-18-lsfx-logid-primary-binding-implementation.md`
---
### Task 1: 收敛 FileRecord 为单一主体账号绑定模型

View File

@@ -0,0 +1,49 @@
# LSFX Mock LogId 主体账号绑定实施记录
## 变更概述
-`lsfx-mock-server/services/file_service.py``FileRecord` 增加 `primary_enterprise_name``primary_account_no`,统一维护 `logId -> 主体/账号` 单一主绑定。
- 上传文件、拉取本行信息两条链路都改为创建并保存完整 `FileRecord`,后续上传状态查询优先读取真实记录,未命中时才回退到 deterministic 生成。
- `lsfx-mock-server/services/statement_service.py` 改为按 `logId``FileService` 读取主绑定,并将 `leName``accountMaskNo` 统一注入分页流水结果。
- `lsfx-mock-server/routers/api.py` 改为让 `statement_service``file_service` 共享同一份 `FileService` 实例,确保上传状态接口与查流水接口使用同一组绑定数据。
- 补齐 `tests/test_file_service.py``tests/test_statement_service.py``tests/test_api.py``tests/integration/test_full_workflow.py` 回归测试,并在 `tests/conftest.py` 增加单例状态重置夹具,避免测试串扰。
## 联动结果
- 上传文件:创建 `FileRecord` 时同时生成主绑定,并同步回填 `accountsOfLog``uploadLogList`
- 拉取本行信息:返回 `logId` 前即落库 `FileRecord`,后续 `bs/upload``getpendings``getBSByLogId` 都可复用同一主绑定。
- 上传状态:优先读取真实 `FileRecord`;只有当 `logId` 没有已存记录时,才走 deterministic fallback。
- 银行流水:优先从 `FileService` 读取 `primary_enterprise_name``primary_account_no`,统一写入每条流水的 `leName``accountMaskNo`
## 优先级说明
- `FileService.get_upload_status()`:真实 `FileRecord` 优先deterministic fallback 兜底。
- `StatementService` 主绑定解析:真实 `FileRecord` 优先,服务内 fallback 仅在记录不存在时生效。
- deterministic fallback 的时间字段、主体字段、账号字段都已改为基于 `logId` 的稳定生成,保证同一 `logId` 重复查询结果一致。
## 提交记录
- `0120d09` `收敛Mock文件记录主体账号绑定模型`
- `6fb7287` `让拉取本行信息链路复用Mock主体账号绑定`
- `0a85c09` `统一Mock上传状态主体账号绑定优先级`
- `5195617` `让Mock流水查询复用logId主体账号绑定`
## 验证记录
- `cd lsfx-mock-server && python3 -m pytest tests/test_file_service.py -v`
- `cd lsfx-mock-server && python3 -m pytest tests/test_statement_service.py -v`
- `cd lsfx-mock-server && python3 -m pytest tests/test_api.py -v`
- `cd lsfx-mock-server && python3 -m pytest tests/integration/test_full_workflow.py -v`
- `cd lsfx-mock-server && python3 verify_implementation.py`
## 验证结果
- `tests/test_file_service.py`3 个用例全部通过。
- `tests/test_statement_service.py`1 个用例通过。
- `tests/test_api.py`14 个用例全部通过。
- `tests/integration/test_full_workflow.py`4 个用例全部通过。
- `verify_implementation.py`:接口字段完整性与模板文件校验全部通过。
## 进程说明
- 本次验证仅运行 pytest 与 `verify_implementation.py`,未启动 `python main.py``uvicorn`,无需额外停止服务进程。

View File

@@ -13,7 +13,7 @@ router = APIRouter()
# 初始化服务实例
token_service = TokenService()
file_service = FileService()
statement_service = StatementService()
statement_service = StatementService(file_service=file_service)
def _parse_log_ids(log_ids: str) -> List[int]:

View File

@@ -21,6 +21,8 @@ class FileRecord:
parsing: bool = True # True表示正在解析
# 新增字段 - 账号和主体信息
primary_enterprise_name: str = ""
primary_account_no: str = ""
account_no_list: List[str] = field(default_factory=list)
enterprise_name_list: List[str] = field(default_factory=list)
@@ -65,6 +67,10 @@ class FileService:
self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord
self.log_counter = settings.INITIAL_LOG_ID
def get_file_record(self, log_id: int) -> FileRecord:
"""按 logId 获取已存在的文件记录。"""
return self.file_records.get(log_id)
def _infer_bank_name(self, filename: str) -> tuple:
"""根据文件名推断银行名称和模板名称"""
if "支付宝" in filename or "alipay" in filename.lower():
@@ -74,6 +80,75 @@ class FileService:
else:
return "ZJRCU", "ZJRCU_T251114"
def _generate_primary_binding(self) -> tuple:
"""生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{random.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _generate_primary_binding_from_rng(self, rng: random.Random) -> tuple:
"""使用局部随机源生成单一稳定的本方主体/本方账号绑定。"""
primary_account_no = f"{rng.randint(10000000000, 99999999999)}"
primary_enterprise_name = "测试主体"
return primary_enterprise_name, primary_account_no
def _build_primary_binding_lists(
self, primary_enterprise_name: str, primary_account_no: str
) -> dict:
"""基于主绑定事实源构建列表字段。"""
return {
"accountNoList": [primary_account_no],
"enterpriseNameList": [primary_enterprise_name],
}
def _create_file_record(
self,
*,
log_id: int,
group_id: int,
file_name: str,
download_file_name: str,
bank_name: str,
template_name: str,
primary_enterprise_name: str,
primary_account_no: str,
file_size: int,
total_records: int,
trx_date_start_id: int,
trx_date_end_id: int,
le_id: int,
login_le_id: int,
parsing: bool = True,
status: int = -5,
) -> FileRecord:
"""创建文件记录并写入主绑定信息。"""
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name,
primary_account_no,
)
return FileRecord(
log_id=log_id,
group_id=group_id,
file_name=file_name,
download_file_name=download_file_name,
bank_name=bank_name,
real_bank_name=bank_name,
template_name=template_name,
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
account_no_list=binding_lists["accountNoList"],
enterprise_name_list=binding_lists["enterpriseNameList"],
le_id=le_id,
login_le_id=login_le_id,
file_size=file_size,
total_records=total_records,
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
parsing=parsing,
status=status,
)
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
@@ -100,29 +175,25 @@ class FileService:
trx_date_start_id = int(start_date.strftime("%Y%m%d"))
trx_date_end_id = int(end_date.strftime("%Y%m%d"))
# 生成随机账号和主体
account_no = f"{random.randint(10000000000, 99999999999)}"
enterprise_names = ["测试主体"] if random.random() > 0.3 else [""]
# 生成单一主绑定
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
# 创建完整的文件记录
file_record = FileRecord(
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=file.filename,
download_file_name=file.filename,
bank_name=bank_name,
real_bank_name=bank_name,
template_name=template_name,
account_no_list=[account_no],
enterprise_name_list=enterprise_names,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(100, 300),
trx_date_start_id=trx_date_start_id,
trx_date_end_id=trx_date_end_id,
parsing=True,
status=-5
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
)
# 存储记录
@@ -143,19 +214,21 @@ class FileService:
str(file_record.log_id): [
{
"bank": file_record.bank_name,
"accountName": file_record.enterprise_name_list[0] if file_record.enterprise_name_list else "",
"accountNo": file_record.account_no_list[0] if file_record.account_no_list else "",
"accountName": file_record.primary_enterprise_name,
"accountNo": file_record.primary_account_no,
"currency": "CNY"
}
]
},
"uploadLogList": [
{
"accountNoList": file_record.account_no_list,
**self._build_primary_binding_lists(
file_record.primary_enterprise_name,
file_record.primary_account_no,
),
"bankName": file_record.bank_name,
"dataTypeInfo": file_record.data_type_info,
"downloadFileName": file_record.download_file_name,
"enterpriseNameList": file_record.enterprise_name_list,
"filePackageId": file_record.file_package_id,
"fileSize": file_record.file_size,
"fileUploadBy": file_record.file_upload_by,
@@ -197,7 +270,9 @@ class FileService:
if log_id in self.file_records:
self.file_records[log_id].parsing = False
def _generate_deterministic_record(self, log_id: int, group_id: int) -> dict:
def _generate_deterministic_record(
self, log_id: int, group_id: int, rng: random.Random
) -> dict:
"""
基于 logId 生成确定性的文件记录
@@ -215,53 +290,71 @@ class FileService:
("ZJRCU", "ZJRCU_T251114")
]
bank_name, template_name = random.choice(bank_options)
bank_name, template_name = rng.choice(bank_options)
# 生成交易日期范围
end_date = datetime.now()
start_date = end_date - timedelta(days=random.randint(90, 365))
# 生成基于种子的稳定时间范围,确保同一 logId 重复查询完全一致
base_datetime = datetime(2024, 1, 1, 8, 0, 0)
end_date = base_datetime + timedelta(days=rng.randint(180, 540))
start_date = end_date - timedelta(days=rng.randint(90, 365))
file_upload_time = (
base_datetime
+ timedelta(
days=rng.randint(0, 540),
hours=rng.randint(0, 23),
minutes=rng.randint(0, 59),
seconds=rng.randint(0, 59),
)
)
# 生成账号和主体
account_no = f"{random.randint(10000000000, 99999999999)}"
enterprise_names = ["测试主体"] if random.random() > 0.3 else [""]
primary_enterprise_name, primary_account_no = self._generate_primary_binding_from_rng(rng)
binding_lists = self._build_primary_binding_lists(
primary_enterprise_name, primary_account_no
)
return {
"accountNoList": [account_no],
**binding_lists,
"bankName": bank_name,
"dataTypeInfo": ["CSV", ","],
"downloadFileName": f"测试文件_{log_id}.csv",
"enterpriseNameList": enterprise_names,
"fileSize": random.randint(10000, 100000),
"fileSize": rng.randint(10000, 100000),
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": datetime.now().strftime("%Y-%m-%d %H:%M:%S"),
"fileUploadTime": file_upload_time.strftime("%Y-%m-%d %H:%M:%S"),
"isSplit": 0,
"leId": 10000 + random.randint(0, 9999),
"leId": 10000 + rng.randint(0, 9999),
"logId": log_id,
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":\"-1\"}",
"logType": "bankstatement",
"loginLeId": 10000 + random.randint(0, 9999),
"loginLeId": 10000 + rng.randint(0, 9999),
"lostHeader": [],
"realBankName": bank_name,
"rows": 0,
"source": "http",
"status": -5,
"templateName": template_name,
"totalRecords": random.randint(100, 300),
"totalRecords": rng.randint(100, 300),
"trxDateEndId": int(end_date.strftime("%Y%m%d")),
"trxDateStartId": int(start_date.strftime("%Y%m%d")),
"uploadFileName": f"测试文件_{log_id}.pdf",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
def _build_deterministic_log_detail(self, log_id: int, group_id: int) -> dict:
"""构建 deterministic 回退的单条日志详情。"""
rng = random.Random(log_id)
return self._generate_deterministic_record(log_id, group_id, rng)
def _build_log_detail(self, record: FileRecord) -> dict:
"""构建日志详情对象"""
return {
"accountNoList": record.account_no_list,
**self._build_primary_binding_lists(
record.primary_enterprise_name,
record.primary_account_no,
),
"bankName": record.bank_name,
"dataTypeInfo": record.data_type_info,
"downloadFileName": record.download_file_name,
"enterpriseNameList": record.enterprise_name_list,
"fileSize": record.file_size,
"fileUploadBy": record.file_upload_by,
"fileUploadByUserName": record.file_upload_by_user_name,
@@ -332,13 +425,13 @@ class FileService:
"""
logs = []
if log_id:
# 使用 logId 作为随机种子,确保相同 logId 返回相同数据
random.seed(log_id)
if log_id is not None:
if log_id in self.file_records:
log_detail = self._build_log_detail(self.file_records[log_id])
else:
log_detail = self._build_deterministic_log_detail(log_id, group_id)
# 生成确定性的文件记录
record = self._generate_deterministic_record(log_id, group_id)
logs.append(record)
logs.append(log_detail)
# 返回响应
return {
@@ -382,16 +475,50 @@ class FileService:
}
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(返回随机logId
"""拉取行内流水(创建并保存绑定记录
Args:
request: 拉取流水请求(保留参数以符合接口规范当前Mock实现不使用
request: 拉取流水请求(可以是字典或对象
Returns:
流水响应字典,包含随机生成的logId数组
流水响应字典,包含创建并保存的logId数组
"""
# 随机生成一个logId范围10000-99999
log_id = random.randint(10000, 99999)
# 支持 dict 或对象
if isinstance(request, dict):
group_id = request.get("groupId", 1000)
customer_no = request.get("customerNo", "")
data_start_date_id = request.get("dataStartDateId", 20240101)
data_end_date_id = request.get("dataEndDateId", 20241231)
else:
group_id = request.groupId
customer_no = request.customerNo
data_start_date_id = request.dataStartDateId
data_end_date_id = request.dataEndDateId
# 使用递增 logId确保与上传链路一致
self.log_counter += 1
log_id = self.log_counter
primary_enterprise_name, primary_account_no = self._generate_primary_binding()
file_record = self._create_file_record(
log_id=log_id,
group_id=group_id,
file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
download_file_name=f"{customer_no or 'inner_flow'}_{log_id}.csv",
bank_name="ZJRCU",
template_name="ZJRCU_T251114",
primary_enterprise_name=primary_enterprise_name,
primary_account_no=primary_account_no,
file_size=random.randint(10000, 100000),
total_records=random.randint(100, 300),
trx_date_start_id=data_start_date_id,
trx_date_end_id=data_end_date_id,
le_id=10000 + random.randint(0, 9999),
login_le_id=10000 + random.randint(0, 9999),
parsing=False,
)
self.file_records[log_id] = file_record
# 返回成功的响应包含logId数组
return {

View File

@@ -13,19 +13,38 @@ logger = logging.getLogger(__name__)
class StatementService:
"""流水数据服务"""
def __init__(self):
def __init__(self, file_service=None):
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
self.file_service = file_service
# 配置日志级别为 INFO
logger.info(f"StatementService initialized with empty cache")
def _generate_random_statement(self, index: int, group_id: int, log_id: int) -> Dict:
def _resolve_primary_binding(self, log_id: int) -> tuple:
"""优先从 FileService 读取真实主绑定,不存在时再走 fallback。"""
if self.file_service is not None:
record = self.file_service.get_file_record(log_id)
if record is not None:
return record.primary_enterprise_name, record.primary_account_no
return "张传伟", f"{random.randint(100000000000000, 999999999999999)}"
def _generate_random_statement(
self,
index: int,
group_id: int,
log_id: int,
primary_enterprise_name: str,
primary_account_no: str,
) -> Dict:
"""生成单条随机流水记录
Args:
index: 流水序号
group_id: 项目ID
log_id: 文件ID
primary_enterprise_name: 本方主体名称
primary_account_no: 本方账号
Returns:
单条流水记录字典
@@ -75,7 +94,7 @@ class StatementService:
return {
"accountId": 0,
"accountMaskNo": f"{random.randint(100000000000000, 999999999999999)}",
"accountMaskNo": primary_account_no,
"accountingDate": accounting_date,
"accountingDateId": accounting_date_id,
"archivingFlag": 0,
@@ -104,7 +123,7 @@ class StatementService:
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"leName": primary_enterprise_name,
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
@@ -137,11 +156,31 @@ class StatementService:
Returns:
流水记录列表
"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
statements = []
for i in range(count):
statements.append(self._generate_random_statement(i, group_id, log_id))
statements.append(
self._generate_random_statement(
i,
group_id,
log_id,
primary_enterprise_name,
primary_account_no,
)
)
return statements
def _apply_primary_binding(
self,
statements: List[Dict],
primary_enterprise_name: str,
primary_account_no: str,
) -> None:
"""将解析出的主绑定统一回填到已有流水记录。"""
for statement in statements:
statement["leName"] = primary_enterprise_name
statement["accountMaskNo"] = primary_account_no
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
@@ -174,6 +213,12 @@ class StatementService:
# 从缓存获取数据
all_statements, total_count = self._cache[log_id]
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
self._apply_primary_binding(
all_statements,
primary_enterprise_name,
primary_account_no,
)
# 模拟分页
start = (page_now - 1) * page_size

View File

@@ -10,6 +10,18 @@ import os
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import app
from config.settings import settings
from routers.api import file_service
@pytest.fixture(autouse=True)
def reset_file_service_state():
"""避免 file_service 单例状态影响测试顺序。"""
file_service.file_records.clear()
file_service.log_counter = settings.INITIAL_LOG_ID
yield
file_service.file_records.clear()
file_service.log_counter = settings.INITIAL_LOG_ID
@pytest.fixture

View File

@@ -2,7 +2,6 @@
集成测试 - 完整的接口调用流程测试
"""
import pytest
import time
def test_complete_workflow(client):
@@ -123,3 +122,51 @@ def test_pagination(client):
if page1["data"]["totalCount"] > 1:
assert len(page1["data"]["bankStatementList"]) == 1
assert len(page2["data"]["bankStatementList"]) >= 0
def test_upload_status_and_bank_statement_share_same_primary_binding(client, monkeypatch):
"""上传状态接口与银行流水接口对同一 logId 必须使用同一组主体/账号绑定。"""
from routers.api import file_service
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("链路主体", "6222555566667777"),
)
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_002",
"dataChannelCode": "channel_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert fetch_response.status_code == 200
log_id = fetch_response.json()["data"][0]
status_response = client.get(f"/watson/api/project/bs/upload?groupId=1001&logId={log_id}")
assert status_response.status_code == 200
status_log = status_response.json()["data"]["logs"][0]
statement_response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 5,
},
)
assert statement_response.status_code == 200
statements = statement_response.json()["data"]["bankStatementList"]
assert status_log["enterpriseNameList"] == ["链路主体"]
assert status_log["accountNoList"] == ["6222555566667777"]
assert statements
assert all(item["leName"] == status_log["enterpriseNameList"][0] for item in statements)
assert all(item["accountMaskNo"] == status_log["accountNoList"][0] for item in statements)

View File

@@ -2,6 +2,8 @@
API 端点测试
"""
from routers.api import file_service
def test_root_endpoint(client):
"""测试根路径"""
@@ -87,6 +89,103 @@ def test_fetch_inner_flow_error_501014(client):
assert data["successResponse"] == False
def test_fetch_inner_flow_followed_by_upload_status(client):
"""拉取行内流水后,上传状态查询应命中同一条绑定记录。"""
response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "test_customer_002",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert response.status_code == 200
log_id = response.json()["data"][0]
assert log_id in file_service.file_records
record = file_service.file_records[log_id]
upload_response = client.get(
f"/watson/api/project/bs/upload?groupId=1001&logId={log_id}"
)
assert upload_response.status_code == 200
upload_data = upload_response.json()
assert upload_data["code"] == "200"
assert upload_data["successResponse"] is True
assert len(upload_data["data"]["logs"]) == 1
log = upload_data["data"]["logs"][0]
assert log["enterpriseNameList"] == [record.primary_enterprise_name]
assert log["accountNoList"] == [record.primary_account_no]
assert log["enterpriseNameList"][0] == record.primary_enterprise_name
assert log["accountNoList"][0] == record.primary_account_no
def test_upload_file_followed_by_upload_status_reads_real_record(client, monkeypatch):
"""上传文件后,上传状态查询应优先返回真实记录而不是 deterministic 回退。"""
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("上传主体", "6222777788889999"),
)
upload_response = client.post(
"/watson/api/project/remoteUploadSplitFile",
data={"groupId": 1001},
files={"files": ("测试文件.csv", b"mock", "text/csv")},
)
assert upload_response.status_code == 200
upload_data = upload_response.json()
upload_log = upload_data["data"]["uploadLogList"][0]
log_id = upload_log["logId"]
status_response = client.get(f"/watson/api/project/bs/upload?groupId=1001&logId={log_id}")
assert status_response.status_code == 200
status_data = status_response.json()
status_log = status_data["data"]["logs"][0]
assert status_log["enterpriseNameList"] == upload_log["enterpriseNameList"]
assert status_log["accountNoList"] == upload_log["accountNoList"]
assert status_log["enterpriseNameList"] == ["上传主体"]
assert status_log["accountNoList"] == ["6222777788889999"]
assert len(status_log["enterpriseNameList"]) == 1
assert len(status_log["accountNoList"]) == 1
def test_fetch_inner_flow_marks_pending_complete(client):
"""拉取行内流水后getpendings 应返回未解析状态。"""
response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "test_customer_003",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert response.status_code == 200
log_id = response.json()["data"][0]
pending_response = client.post(
"/watson/api/project/upload/getpendings",
data={"groupId": 1001, "inprogressList": str(log_id)},
)
assert pending_response.status_code == 200
pending_data = pending_response.json()
assert pending_data["data"]["parsing"] is False
assert len(pending_data["data"]["pendingList"]) == 1
assert pending_data["data"]["pendingList"][0]["logId"] == log_id
def test_get_upload_status_with_log_id(client):
"""测试带 logId 参数查询返回非空 logs"""
response = client.get("/watson/api/project/bs/upload?groupId=1000&logId=13994")
@@ -150,9 +249,12 @@ def test_deterministic_data_generation(client):
assert log1["bankName"] == log2["bankName"]
assert log1["accountNoList"] == log2["accountNoList"]
assert log1["enterpriseNameList"] == log2["enterpriseNameList"]
assert len(log1["accountNoList"]) == 1
assert len(log1["enterpriseNameList"]) == 1
assert log1["status"] == log2["status"]
assert log1["logMeta"] == log2["logMeta"]
assert log1["templateName"] == log2["templateName"]
assert log1["fileUploadTime"] == log2["fileUploadTime"]
assert log1["trxDateStartId"] == log2["trxDateStartId"]
assert log1["trxDateEndId"] == log2["trxDateEndId"]

View File

@@ -0,0 +1,116 @@
"""
FileService 单一主绑定语义测试
"""
import asyncio
import io
from fastapi import BackgroundTasks
from fastapi.datastructures import UploadFile
from services.file_service import FileService
def test_upload_file_primary_binding_response(monkeypatch):
"""同一 logId 的主绑定必须稳定且只保留一组主体/账号信息。"""
service = FileService()
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("测试主体A", "6222021234567890"),
)
background_tasks = BackgroundTasks()
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
response = asyncio.run(service.upload_file(1001, file, background_tasks))
log = response["data"]["uploadLogList"][0]
account_info = response["data"]["accountsOfLog"][str(log["logId"])][0]
record = service.file_records[log["logId"]]
assert log["enterpriseNameList"] == ["测试主体A"]
assert log["accountNoList"] == ["6222021234567890"]
assert account_info["accountName"] == "测试主体A"
assert account_info["accountNo"] == "6222021234567890"
assert record.primary_enterprise_name == "测试主体A"
assert record.primary_account_no == "6222021234567890"
assert record.enterprise_name_list == ["测试主体A"]
assert record.account_no_list == ["6222021234567890"]
def test_upload_file_then_upload_status_reads_same_record(monkeypatch):
"""上传后再查状态时,上传状态接口必须读取同一条真实记录。"""
service = FileService()
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("测试主体B", "6222333344445555"),
)
background_tasks = BackgroundTasks()
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
upload_response = asyncio.run(service.upload_file(1001, file, background_tasks))
log = upload_response["data"]["uploadLogList"][0]
monkeypatch.setattr(
service,
"_build_deterministic_log_detail",
lambda *args, **kwargs: (_ for _ in ()).throw(
AssertionError("真实记录存在时不应走 deterministic fallback")
),
)
status_response = service.get_upload_status(1001, log["logId"])
status_log = status_response["data"]["logs"][0]
assert status_log["enterpriseNameList"] == log["enterpriseNameList"]
assert status_log["accountNoList"] == log["accountNoList"]
assert status_log["bankName"] == log["bankName"]
assert status_log["templateName"] == log["templateName"]
assert status_log["uploadFileName"] == log["uploadFileName"]
assert status_log["trxDateStartId"] == log["trxDateStartId"]
assert status_log["trxDateEndId"] == log["trxDateEndId"]
assert status_log["enterpriseNameList"] == ["测试主体B"]
assert status_log["accountNoList"] == ["6222333344445555"]
assert len(status_log["enterpriseNameList"]) == 1
assert len(status_log["accountNoList"]) == 1
def test_fetch_inner_flow_persists_primary_binding_record(monkeypatch):
"""拉取行内流水必须创建并保存绑定记录。"""
service = FileService()
monkeypatch.setattr(
service,
"_generate_primary_binding",
lambda: ("行内主体", "6210987654321098"),
)
request = {
"groupId": 1001,
"customerNo": "test_customer_001",
"dataChannelCode": "test_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
response = service.fetch_inner_flow(request)
log_id = response["data"][0]
assert log_id == service.log_counter
assert log_id in service.file_records
record = service.file_records[log_id]
assert record.parsing is False
assert record.primary_enterprise_name
assert record.primary_account_no
assert record.primary_enterprise_name == "行内主体"
assert record.primary_account_no == "6210987654321098"
assert record.enterprise_name_list == ["行内主体"]
assert record.account_no_list == ["6210987654321098"]

View File

@@ -0,0 +1,47 @@
"""
StatementService 主绑定注入测试
"""
from services.file_service import FileService
from services.statement_service import StatementService
def test_get_bank_statement_uses_primary_binding_from_file_service(monkeypatch):
"""同一 logId 的流水记录必须复用 FileService 中的主体与账号绑定。"""
file_service = FileService()
statement_service = StatementService(file_service=file_service)
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("绑定主体", "6222000011112222"),
)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_001",
"dataChannelCode": "test",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
statement_response = statement_service.get_bank_statement(
{
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 5,
}
)
statements = statement_response["data"]["bankStatementList"]
assert statements
assert all(item["leName"] == record.primary_enterprise_name for item in statements)
assert all(item["accountMaskNo"] == record.primary_account_no for item in statements)