让Mock流水查询复用logId主体账号绑定

This commit is contained in:
wkc
2026-03-18 15:54:11 +08:00
parent 0a85c098e8
commit 5195617a70
5 changed files with 150 additions and 7 deletions

View File

@@ -13,7 +13,7 @@ router = APIRouter()
# 初始化服务实例
token_service = TokenService()
file_service = FileService()
statement_service = StatementService()
statement_service = StatementService(file_service=file_service)
def _parse_log_ids(log_ids: str) -> List[int]:

View File

@@ -67,6 +67,10 @@ class FileService:
self.file_records: Dict[int, FileRecord] = {} # logId -> FileRecord
self.log_counter = settings.INITIAL_LOG_ID
def get_file_record(self, log_id: int) -> FileRecord:
"""按 logId 获取已存在的文件记录。"""
return self.file_records.get(log_id)
def _infer_bank_name(self, filename: str) -> tuple:
"""根据文件名推断银行名称和模板名称"""
if "支付宝" in filename or "alipay" in filename.lower():

View File

@@ -13,19 +13,38 @@ logger = logging.getLogger(__name__)
class StatementService:
"""流水数据服务"""
def __init__(self):
def __init__(self, file_service=None):
# 缓存logId -> (statements_list, total_count)
self._cache: Dict[int, tuple] = {}
self.file_service = file_service
# 配置日志级别为 INFO
logger.info(f"StatementService initialized with empty cache")
def _generate_random_statement(self, index: int, group_id: int, log_id: int) -> Dict:
def _resolve_primary_binding(self, log_id: int) -> tuple:
"""优先从 FileService 读取真实主绑定,不存在时再走 fallback。"""
if self.file_service is not None:
record = self.file_service.get_file_record(log_id)
if record is not None:
return record.primary_enterprise_name, record.primary_account_no
return "张传伟", f"{random.randint(100000000000000, 999999999999999)}"
def _generate_random_statement(
self,
index: int,
group_id: int,
log_id: int,
primary_enterprise_name: str,
primary_account_no: str,
) -> Dict:
"""生成单条随机流水记录
Args:
index: 流水序号
group_id: 项目ID
log_id: 文件ID
primary_enterprise_name: 本方主体名称
primary_account_no: 本方账号
Returns:
单条流水记录字典
@@ -75,7 +94,7 @@ class StatementService:
return {
"accountId": 0,
"accountMaskNo": f"{random.randint(100000000000000, 999999999999999)}",
"accountMaskNo": primary_account_no,
"accountingDate": accounting_date,
"accountingDateId": accounting_date_id,
"archivingFlag": 0,
@@ -104,7 +123,7 @@ class StatementService:
"groupId": group_id,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"leName": primary_enterprise_name,
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
@@ -137,11 +156,31 @@ class StatementService:
Returns:
流水记录列表
"""
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
statements = []
for i in range(count):
statements.append(self._generate_random_statement(i, group_id, log_id))
statements.append(
self._generate_random_statement(
i,
group_id,
log_id,
primary_enterprise_name,
primary_account_no,
)
)
return statements
def _apply_primary_binding(
self,
statements: List[Dict],
primary_enterprise_name: str,
primary_account_no: str,
) -> None:
"""将解析出的主绑定统一回填到已有流水记录。"""
for statement in statements:
statement["leName"] = primary_enterprise_name
statement["accountMaskNo"] = primary_account_no
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
@@ -174,6 +213,12 @@ class StatementService:
# 从缓存获取数据
all_statements, total_count = self._cache[log_id]
primary_enterprise_name, primary_account_no = self._resolve_primary_binding(log_id)
self._apply_primary_binding(
all_statements,
primary_enterprise_name,
primary_account_no,
)
# 模拟分页
start = (page_now - 1) * page_size

View File

@@ -2,7 +2,6 @@
集成测试 - 完整的接口调用流程测试
"""
import pytest
import time
def test_complete_workflow(client):
@@ -123,3 +122,51 @@ def test_pagination(client):
if page1["data"]["totalCount"] > 1:
assert len(page1["data"]["bankStatementList"]) == 1
assert len(page2["data"]["bankStatementList"]) >= 0
def test_upload_status_and_bank_statement_share_same_primary_binding(client, monkeypatch):
"""上传状态接口与银行流水接口对同一 logId 必须使用同一组主体/账号绑定。"""
from routers.api import file_service
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("链路主体", "6222555566667777"),
)
fetch_response = client.post(
"/watson/api/project/getJZFileOrZjrcuFile",
data={
"groupId": 1001,
"customerNo": "customer_002",
"dataChannelCode": "channel_code",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
},
)
assert fetch_response.status_code == 200
log_id = fetch_response.json()["data"][0]
status_response = client.get(f"/watson/api/project/bs/upload?groupId=1001&logId={log_id}")
assert status_response.status_code == 200
status_log = status_response.json()["data"]["logs"][0]
statement_response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 5,
},
)
assert statement_response.status_code == 200
statements = statement_response.json()["data"]["bankStatementList"]
assert status_log["enterpriseNameList"] == ["链路主体"]
assert status_log["accountNoList"] == ["6222555566667777"]
assert statements
assert all(item["leName"] == status_log["enterpriseNameList"][0] for item in statements)
assert all(item["accountMaskNo"] == status_log["accountNoList"][0] for item in statements)

View File

@@ -0,0 +1,47 @@
"""
StatementService 主绑定注入测试
"""
from services.file_service import FileService
from services.statement_service import StatementService
def test_get_bank_statement_uses_primary_binding_from_file_service(monkeypatch):
"""同一 logId 的流水记录必须复用 FileService 中的主体与账号绑定。"""
file_service = FileService()
statement_service = StatementService(file_service=file_service)
monkeypatch.setattr(
file_service,
"_generate_primary_binding",
lambda: ("绑定主体", "6222000011112222"),
)
response = file_service.fetch_inner_flow(
{
"groupId": 1001,
"customerNo": "customer_001",
"dataChannelCode": "test",
"requestDateId": 20240101,
"dataStartDateId": 20240101,
"dataEndDateId": 20240131,
"uploadUserId": 902001,
}
)
log_id = response["data"][0]
record = file_service.file_records[log_id]
statement_response = statement_service.get_bank_statement(
{
"groupId": 1001,
"logId": log_id,
"pageNow": 1,
"pageSize": 5,
}
)
statements = statement_response["data"]["bankStatementList"]
assert statements
assert all(item["leName"] == record.primary_enterprise_name for item in statements)
assert all(item["accountMaskNo"] == record.primary_account_no for item in statements)