41 lines
1.3 KiB
Python
41 lines
1.3 KiB
Python
|
|
"""
|
||
|
|
FileService 单一主绑定语义测试
|
||
|
|
"""
|
||
|
|
|
||
|
|
import asyncio
|
||
|
|
import io
|
||
|
|
|
||
|
|
from fastapi import BackgroundTasks
|
||
|
|
from fastapi.datastructures import UploadFile
|
||
|
|
|
||
|
|
from services.file_service import FileService
|
||
|
|
|
||
|
|
|
||
|
|
def test_upload_file_primary_binding_response(monkeypatch):
|
||
|
|
"""同一 logId 的主绑定必须稳定且只保留一组主体/账号信息。"""
|
||
|
|
service = FileService()
|
||
|
|
|
||
|
|
monkeypatch.setattr(
|
||
|
|
service,
|
||
|
|
"_generate_primary_binding",
|
||
|
|
lambda: ("测试主体A", "6222021234567890"),
|
||
|
|
)
|
||
|
|
|
||
|
|
background_tasks = BackgroundTasks()
|
||
|
|
file = UploadFile(filename="测试文件.csv", file=io.BytesIO(b"mock"))
|
||
|
|
|
||
|
|
response = asyncio.run(service.upload_file(1001, file, background_tasks))
|
||
|
|
|
||
|
|
log = response["data"]["uploadLogList"][0]
|
||
|
|
account_info = response["data"]["accountsOfLog"][str(log["logId"])][0]
|
||
|
|
record = service.file_records[log["logId"]]
|
||
|
|
|
||
|
|
assert log["enterpriseNameList"] == ["测试主体A"]
|
||
|
|
assert log["accountNoList"] == ["6222021234567890"]
|
||
|
|
assert account_info["accountName"] == "测试主体A"
|
||
|
|
assert account_info["accountNo"] == "6222021234567890"
|
||
|
|
assert record.primary_enterprise_name == "测试主体A"
|
||
|
|
assert record.primary_account_no == "6222021234567890"
|
||
|
|
assert record.enterprise_name_list == ["测试主体A"]
|
||
|
|
assert record.account_no_list == ["6222021234567890"]
|