Merge branch 'worktree-lsfx-mock-server' into dev

This commit is contained in:
wkc
2026-03-03 09:40:24 +08:00
37 changed files with 1544 additions and 0 deletions

View File

@@ -0,0 +1,16 @@
# 应用配置
APP_NAME=流水分析Mock服务
APP_VERSION=1.0.0
DEBUG=true
# 服务器配置
HOST=0.0.0.0
PORT=8000
# 模拟配置
PARSE_DELAY_SECONDS=4
MAX_FILE_SIZE=10485760
# 初始ID配置
INITIAL_PROJECT_ID=1000
INITIAL_LOG_ID=10000

45
lsfx-mock-server/.gitignore vendored Normal file
View File

@@ -0,0 +1,45 @@
# Python
__pycache__/
*.py[cod]
*$py.class
*.so
.Python
build/
develop-eggs/
dist/
downloads/
eggs/
.eggs/
lib/
lib64/
parts/
sdist/
var/
wheels/
*.egg-info/
.installed.cfg
*.egg
# Virtual Environment
venv/
ENV/
env/
# IDE
.vscode/
.idea/
*.swp
*.swo
*~
# Environment
.env
# Testing
.pytest_cache/
.coverage
htmlcov/
# OS
.DS_Store
Thumbs.db

View File

@@ -0,0 +1,19 @@
FROM python:3.11-slim
# 设置工作目录
WORKDIR /app
# 复制依赖文件
COPY requirements.txt .
# 安装依赖
RUN pip install --no-cache-dir -r requirements.txt
# 复制项目文件
COPY . .
# 暴露端口
EXPOSE 8000
# 启动命令
CMD ["python", "main.py"]

236
lsfx-mock-server/README.md Normal file
View File

@@ -0,0 +1,236 @@
# 流水分析 Mock 服务器
基于 Python + FastAPI 的独立 Mock 服务器,用于模拟流水分析平台的 7 个核心接口。
## ✨ 特性
-**完整的接口模拟** - 实现所有 7 个核心接口
-**文件解析延迟** - 使用 FastAPI 后台任务模拟 4 秒解析延迟
-**错误场景触发** - 通过 `error_XXXX` 标记触发所有 8 个错误码
-**自动 API 文档** - Swagger UI 和 ReDoc 自动生成
-**配置驱动** - JSON 模板文件,易于修改响应数据
-**零配置启动** - 开箱即用,无需数据库
## 🚀 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 启动服务
```bash
python main.py
```
或使用 uvicorn支持热重载
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### 3. 访问 API 文档
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
## 📖 使用示例
### 正常流程
```python
import requests
# 1. 获取 Token
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
token_data = response.json()
project_id = token_data["data"]["projectId"]
# 2. 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
"http://localhost:8000/watson/api/project/remoteUploadSplitFile",
files=files,
data={"groupId": project_id}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]
# 3. 轮询检查解析状态
import time
for i in range(10):
response = requests.post(
"http://localhost:8000/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": str(log_id)}
)
result = response.json()
if not result["data"]["parsing"]:
print("解析完成")
break
time.sleep(1)
# 4. 获取银行流水
response = requests.post(
"http://localhost:8000/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": log_id,
"pageNow": 1,
"pageSize": 10
}
)
```
### 错误场景测试
```python
# 触发 40101 错误appId错误
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_error_40101", # 包含错误标记
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000"
}
)
# 返回: {"code": "40101", "message": "appId错误", ...}
```
## 🔧 配置说明
### 环境变量
创建 `.env` 文件(参考 `.env.example`
```bash
# 应用配置
APP_NAME=流水分析Mock服务
APP_VERSION=1.0.0
DEBUG=true
# 服务器配置
HOST=0.0.0.0
PORT=8000
# 模拟配置
PARSE_DELAY_SECONDS=4
MAX_FILE_SIZE=10485760
```
### 响应模板
修改 `config/responses/` 下的 JSON 文件可以自定义响应数据:
- `token.json` - Token 响应模板
- `upload.json` - 上传文件响应模板
- `parse_status.json` - 解析状态响应模板
- `bank_statement.json` - 银行流水响应模板
## 🐳 Docker 部署
### 使用 Docker
```bash
# 构建镜像
docker build -t lsfx-mock-server .
# 运行容器
docker run -d -p 8000:8000 --name lsfx-mock lsfx-mock-server
```
### 使用 Docker Compose
```bash
docker-compose up -d
```
## 📁 项目结构
```
lsfx-mock-server/
├── main.py # 应用入口
├── config/
│ ├── settings.py # 全局配置
│ └── responses/ # 响应模板
├── models/
│ ├── request.py # 请求模型
│ └── response.py # 响应模型
├── services/
│ ├── token_service.py # Token 管理
│ ├── file_service.py # 文件上传和解析
│ └── statement_service.py # 流水数据管理
├── routers/
│ └── api.py # API 路由
├── utils/
│ ├── error_simulator.py # 错误模拟
│ └── response_builder.py # 响应构建器
└── tests/ # 测试套件
```
## 🧪 运行测试
```bash
# 运行所有测试
pytest tests/ -v
# 生成覆盖率报告
pytest tests/ -v --cov=. --cov-report=html
```
## 🔌 API 接口列表
| 接口 | 方法 | 路径 | 描述 |
|------|------|------|------|
| 1 | POST | `/account/common/getToken` | 获取 Token |
| 2 | POST | `/watson/api/project/remoteUploadSplitFile` | 上传文件 |
| 3 | POST | `/watson/api/project/getJZFileOrZjrcuFile` | 拉取行内流水 |
| 4 | POST | `/watson/api/project/upload/getpendings` | 检查解析状态 |
| 5 | POST | `/watson/api/project/batchDeleteUploadFile` | 删除文件 |
| 6 | POST | `/watson/api/project/getBSByLogId` | 获取银行流水 |
## ⚠️ 错误码列表
| 错误码 | 描述 |
|--------|------|
| 40101 | appId错误 |
| 40102 | appSecretCode错误 |
| 40104 | 可使用项目次数为0无法创建项目 |
| 40105 | 只读模式下无法新建项目 |
| 40106 | 错误的分析类型,不在规定的取值范围内 |
| 40107 | 当前系统不支持的分析类型 |
| 40108 | 当前用户所属行社无权限 |
| 501014 | 无行内流水文件 |
## 🛠️ 开发指南
### 添加新接口
1.`models/request.py``models/response.py` 中添加模型
2.`services/` 中添加服务类
3.`routers/api.py` 中添加路由
4.`config/responses/` 中添加响应模板
5. 编写测试
### 修改响应数据
直接编辑 `config/responses/` 下的 JSON 文件,重启服务即可生效。
## 📝 License
MIT
## 🤝 Contributing
欢迎提交 Issue 和 Pull Request

View File

@@ -0,0 +1,106 @@
{
"success_response": {
"code": "200",
"data": {
"bankStatementList": [
{
"accountId": 0,
"accountMaskNo": "101015251071645",
"accountingDate": "2024-02-01",
"accountingDateId": 20240201,
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": 4814.82,
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847662,
"bankTrxNumber": "1a10458dd5c3366d7272285812d434fc",
"batchId": 19135,
"cashType": "1",
"commentsNum": 0,
"crAmount": 0,
"cretNo": "230902199012261247",
"currency": "CNY",
"customerAccountMaskNo": "597671502",
"customerBank": "",
"customerId": -1,
"customerName": "小店",
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": 245.8,
"exceptionType": "",
"groupId": 16238,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": 245.8,
"transFlag": "P",
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": "2024-02-01 10:33:44",
"userMemo": "财付通消费_小店"
},
{
"accountId": 0,
"accountMaskNo": "101015251071645",
"accountingDate": "2024-02-02",
"accountingDateId": 20240202,
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": 5000.00,
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847663,
"bankTrxNumber": "2b20568ee6d4477e8383396923e545gd",
"batchId": 19135,
"cashType": "1",
"commentsNum": 0,
"crAmount": 185.18,
"cretNo": "230902199012261247",
"currency": "CNY",
"customerAccountMaskNo": "123456789",
"customerBank": "",
"customerId": -1,
"customerName": "支付宝",
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": 0,
"exceptionType": "",
"groupId": 16238,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": 185.18,
"transFlag": "R",
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": "2024-02-02 14:22:18",
"userMemo": "支付宝转账_支付宝"
}
],
"totalCount": 131
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,41 @@
{
"success_response": {
"code": "200",
"data": {
"parsing": false,
"pendingList": [
{
"accountNoList": [],
"bankName": "ZJRCU",
"dataTypeInfo": ["CSV", ","],
"downloadFileName": "230902199012261247_20260201_20260201_1772096608615.csv",
"enterpriseNameList": [],
"filePackageId": "cde6c7cf5cab48e8892f0c1c36b2aa7d",
"fileSize": 53101,
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": "2026-02-27 09:50:18",
"isSplit": 0,
"leId": 16210,
"logId": "{log_id}",
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":true}",
"logType": "bankstatement",
"loginLeId": 16210,
"lostHeader": [],
"realBankName": "ZJRCU",
"rows": 0,
"source": "http",
"status": -5,
"templateName": "ZJRCU_T251114",
"totalRecords": 131,
"trxDateEndId": 20240228,
"trxDateStartId": 20240201,
"uploadFileName": "230902199012261247_20260201_20260201_1772096608615.csv",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
]
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,15 @@
{
"success_response": {
"code": "200",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
"projectId": "{project_id}",
"projectNo": "{project_no}",
"entityName": "{entity_name}",
"analysisType": 0
},
"message": "create.token.success",
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,49 @@
{
"success_response": {
"code": "200",
"data": {
"accountsOfLog": {
"{log_id}": [
{
"bank": "BSX",
"accountName": "测试账户",
"accountNo": "6222021234567890",
"currency": "CNY"
}
]
},
"uploadLogList": [
{
"accountNoList": [],
"bankName": "BSX",
"dataTypeInfo": ["CSV", ","],
"downloadFileName": "测试流水.csv",
"enterpriseNameList": [],
"filePackageId": "14b13103010e4d32b5406c764cfe3644",
"fileSize": 46724,
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": "{upload_time}",
"leId": 10724,
"logId": "{log_id}",
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":true}",
"logType": "bankstatement",
"loginLeId": 10724,
"realBankName": "BSX",
"rows": 0,
"source": "http",
"status": -5,
"templateName": "BSX_T240925",
"totalRecords": 280,
"trxDateEndId": 20240905,
"trxDateStartId": 20230914,
"uploadFileName": "测试流水.csv",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
],
"uploadStatus": 1
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,30 @@
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""全局配置类"""
# 应用配置
APP_NAME: str = "流水分析Mock服务"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
# 服务器配置
HOST: str = "0.0.0.0"
PORT: int = 8000
# 模拟配置
PARSE_DELAY_SECONDS: int = 4 # 文件解析延迟秒数
MAX_FILE_SIZE: int = 10485760 # 10MB
# 测试数据配置
INITIAL_PROJECT_ID: int = 1000
INITIAL_LOG_ID: int = 10000
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()

View File

@@ -0,0 +1,17 @@
version: '3.8'
services:
lsfx-mock-server:
build: .
container_name: lsfx-mock-server
ports:
- "8000:8000"
environment:
- APP_NAME=流水分析Mock服务
- APP_VERSION=1.0.0
- DEBUG=true
- HOST=0.0.0.0
- PORT=8000
- PARSE_DELAY_SECONDS=4
- MAX_FILE_SIZE=10485760
restart: unless-stopped

80
lsfx-mock-server/main.py Normal file
View File

@@ -0,0 +1,80 @@
"""
流水分析Mock服务器 - 主应用入口
基于 FastAPI 实现的 Mock 服务器,用于模拟流水分析平台的 7 个核心接口
"""
from fastapi import FastAPI
from routers import api
from config.settings import settings
# 创建 FastAPI 应用实例
app = FastAPI(
title=settings.APP_NAME,
description="""
## 流水分析 Mock 服务器
模拟流水分析平台的 7 个核心接口,用于开发和测试。
### 主要功能
- **Token管理** - 创建项目并获取访问Token
- **文件上传** - 上传流水文件支持异步解析4秒延迟
- **行内流水** - 拉取行内流水数据
- **解析状态** - 轮询检查文件解析状态
- **文件删除** - 批量删除上传的文件
- **流水查询** - 分页获取银行流水数据
### 错误模拟
在请求参数中包含 `error_XXXX` 标记可触发对应的错误响应。
例如:`projectNo: "test_error_40101"` 将返回 40101 错误。
### 使用方式
1. 获取Token: POST /account/common/getToken
2. 上传文件: POST /watson/api/project/remoteUploadSplitFile
3. 轮询解析状态: POST /watson/api/project/upload/getpendings
4. 获取流水: POST /watson/api/project/getBSByLogId
""",
version=settings.APP_VERSION,
docs_url="/docs",
redoc_url="/redoc",
)
# 包含 API 路由
app.include_router(api.router, tags=["流水分析接口"])
@app.get("/", summary="服务根路径")
async def root():
"""服务根路径,返回基本信息"""
return {
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
"swagger_docs": "/docs",
"redoc": "/redoc",
"status": "running",
}
@app.get("/health", summary="健康检查")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
}
if __name__ == "__main__":
import uvicorn
# 启动服务器
uvicorn.run(
app,
host=settings.HOST,
port=settings.PORT,
log_level="debug" if settings.DEBUG else "info",
)

View File

@@ -0,0 +1 @@
# Models package

View File

@@ -0,0 +1,50 @@
from pydantic import BaseModel, Field
from typing import Optional, List
class GetTokenRequest(BaseModel):
"""获取Token请求模型"""
projectNo: str = Field(..., description="项目编号格式902000_当前时间戳")
entityName: str = Field(..., description="项目名称")
userId: str = Field(..., description="操作人员编号,固定值")
userName: str = Field(..., description="操作人员姓名,固定值")
orgCode: str = Field(..., description="行社机构号,固定值")
entityId: Optional[str] = Field(None, description="企业统信码或个人身份证号")
xdRelatedPersons: Optional[str] = Field(None, description="信贷关联人信息")
jzDataDateId: Optional[str] = Field("0", description="拉取指定日期推送过来的金综链流水")
innerBSStartDateId: Optional[str] = Field("0", description="拉取行内流水开始日期")
innerBSEndDateId: Optional[str] = Field("0", description="拉取行内流水结束日期")
analysisType: Optional[int] = Field(-1, description="分析类型")
departmentCode: Optional[str] = Field(None, description="客户经理所属营业部/分理处的机构编码")
class FetchInnerFlowRequest(BaseModel):
"""拉取行内流水请求模型"""
groupId: int = Field(..., description="项目id")
customerNo: str = Field(..., description="客户身份证号")
dataChannelCode: str = Field(..., description="校验码")
requestDateId: int = Field(..., description="发起请求的时间")
dataStartDateId: int = Field(..., description="拉取开始日期")
dataEndDateId: int = Field(..., description="拉取结束日期")
uploadUserId: int = Field(..., description="柜员号")
class CheckParseStatusRequest(BaseModel):
"""检查文件解析状态请求模型"""
groupId: int = Field(..., description="项目id")
inprogressList: str = Field(..., description="文件id列表逗号分隔")
class GetBankStatementRequest(BaseModel):
"""获取银行流水请求模型"""
groupId: int = Field(..., description="项目id")
logId: int = Field(..., description="文件id")
pageNow: int = Field(..., description="当前页码")
pageSize: int = Field(..., description="查询条数")
class DeleteFilesRequest(BaseModel):
"""删除文件请求模型"""
groupId: int = Field(..., description="项目id")
logIds: List[int] = Field(..., description="文件id数组")
userId: int = Field(..., description="用户柜员号")

View File

@@ -0,0 +1,187 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
# ==================== Token相关模型 ====================
class TokenData(BaseModel):
"""Token数据"""
token: str = Field(..., description="token")
projectId: int = Field(..., description="见知项目Id")
projectNo: str = Field(..., description="项目编号")
entityName: str = Field(..., description="项目名称")
analysisType: int = Field(0, description="分析类型")
class GetTokenResponse(BaseModel):
"""获取Token响应"""
code: str = Field("200", description="返回码")
data: Optional[TokenData] = Field(None, description="返回数据")
message: str = Field("create.token.success", description="返回消息")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 文件上传相关模型 ====================
class AccountInfo(BaseModel):
"""账户信息"""
bank: str = Field(..., description="银行")
accountName: str = Field(..., description="账户名称")
accountNo: str = Field(..., description="账号")
currency: str = Field(..., description="币种")
class UploadLogItem(BaseModel):
"""上传日志项"""
accountNoList: List[str] = Field(default=[], description="账号列表")
bankName: str = Field(..., description="银行名称")
dataTypeInfo: List[str] = Field(default=[], description="数据类型信息")
downloadFileName: str = Field(..., description="下载文件名")
enterpriseNameList: List[str] = Field(default=[], description="企业名称列表")
filePackageId: str = Field(..., description="文件包ID")
fileSize: int = Field(..., description="文件大小")
fileUploadBy: int = Field(..., description="上传者ID")
fileUploadByUserName: str = Field(..., description="上传者用户名")
fileUploadTime: str = Field(..., description="上传时间")
leId: int = Field(..., description="企业ID")
logId: int = Field(..., description="日志ID")
logMeta: str = Field(..., description="日志元数据")
logType: str = Field(..., description="日志类型")
loginLeId: int = Field(..., description="登录企业ID")
realBankName: str = Field(..., description="真实银行名称")
rows: int = Field(0, description="行数")
source: str = Field(..., description="来源")
status: int = Field(-5, description="状态值")
templateName: str = Field(..., description="模板名称")
totalRecords: int = Field(0, description="总记录数")
trxDateEndId: int = Field(..., description="交易结束日期ID")
trxDateStartId: int = Field(..., description="交易开始日期ID")
uploadFileName: str = Field(..., description="上传文件名")
uploadStatusDesc: str = Field(..., description="上传状态描述")
class UploadFileResponse(BaseModel):
"""上传文件响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 检查解析状态相关模型 ====================
class PendingItem(BaseModel):
"""待处理项"""
accountNoList: List[str] = Field(default=[], description="账号列表")
bankName: str = Field(..., description="银行名称")
dataTypeInfo: List[str] = Field(default=[], description="数据类型信息")
downloadFileName: str = Field(..., description="下载文件名")
enterpriseNameList: List[str] = Field(default=[], description="企业名称列表")
filePackageId: str = Field(..., description="文件包ID")
fileSize: int = Field(..., description="文件大小")
fileUploadBy: int = Field(..., description="上传者ID")
fileUploadByUserName: str = Field(..., description="上传者用户名")
fileUploadTime: str = Field(..., description="上传时间")
isSplit: int = Field(0, description="是否分割")
leId: int = Field(..., description="企业ID")
logId: int = Field(..., description="日志ID")
logMeta: str = Field(..., description="日志元数据")
logType: str = Field(..., description="日志类型")
loginLeId: int = Field(..., description="登录企业ID")
lostHeader: List[str] = Field(default=[], description="丢失的头部")
realBankName: str = Field(..., description="真实银行名称")
rows: int = Field(0, description="行数")
source: str = Field(..., description="来源")
status: int = Field(-5, description="状态值")
templateName: str = Field(..., description="模板名称")
totalRecords: int = Field(0, description="总记录数")
trxDateEndId: int = Field(..., description="交易结束日期ID")
trxDateStartId: int = Field(..., description="交易开始日期ID")
uploadFileName: str = Field(..., description="上传文件名")
uploadStatusDesc: str = Field(..., description="上传状态描述")
class CheckParseStatusResponse(BaseModel):
"""检查解析状态响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据包含parsing和pendingList")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 银行流水相关模型 ====================
class BankStatementItem(BaseModel):
"""银行流水项"""
accountId: int = Field(0, description="账号ID")
accountMaskNo: str = Field(..., description="账号")
accountingDate: str = Field(..., description="记账日期")
accountingDateId: int = Field(..., description="记账日期ID")
archivingFlag: int = Field(0, description="归档标志")
attachments: int = Field(0, description="附件数")
balanceAmount: float = Field(..., description="余额")
bank: str = Field(..., description="银行")
bankComments: str = Field("", description="银行注释")
bankStatementId: int = Field(..., description="流水ID")
bankTrxNumber: str = Field(..., description="银行交易号")
batchId: int = Field(..., description="批次ID")
cashType: str = Field("1", description="现金类型")
commentsNum: int = Field(0, description="评论数")
crAmount: float = Field(0, description="贷方金额")
cretNo: str = Field(..., description="证件号")
currency: str = Field("CNY", description="币种")
customerAccountMaskNo: str = Field(..., description="客户账号")
customerBank: str = Field("", description="客户银行")
customerId: int = Field(-1, description="客户ID")
customerName: str = Field(..., description="客户名称")
customerReference: str = Field("", description="客户参考")
downPaymentFlag: int = Field(0, description="首付标志")
drAmount: float = Field(0, description="借方金额")
exceptionType: str = Field("", description="异常类型")
groupId: int = Field(0, description="项目ID")
internalFlag: int = Field(0, description="内部标志")
leId: int = Field(..., description="企业ID")
leName: str = Field(..., description="企业名称")
overrideBsId: int = Field(0, description="覆盖流水ID")
paymentMethod: str = Field("", description="支付方式")
sourceCatalogId: int = Field(0, description="来源目录ID")
split: int = Field(0, description="分割")
subBankstatementId: int = Field(0, description="子流水ID")
toDoFlag: int = Field(0, description="待办标志")
transAmount: float = Field(..., description="交易金额")
transFlag: str = Field("P", description="交易标志")
transTypeId: int = Field(0, description="交易类型ID")
transformAmount: int = Field(0, description="转换金额")
transformCrAmount: int = Field(0, description="转换贷方金额")
transformDrAmount: int = Field(0, description="转换借方金额")
transfromBalanceAmount: int = Field(0, description="转换余额")
trxBalance: int = Field(0, description="交易余额")
trxDate: str = Field(..., description="交易日期")
userMemo: str = Field(..., description="用户备注")
class GetBankStatementResponse(BaseModel):
"""获取银行流水响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据包含bankStatementList和totalCount")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 其他响应模型 ====================
class FetchInnerFlowResponse(BaseModel):
"""拉取行内流水响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
class DeleteFilesResponse(BaseModel):
"""删除文件响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, str]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")

View File

@@ -0,0 +1,8 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
python-multipart==0.0.6
pytest>=7.0.0
pytest-cov>=4.0.0
httpx>=0.25.0

View File

@@ -0,0 +1 @@
# Routers package

View File

@@ -0,0 +1,99 @@
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form
from models.request import (
GetTokenRequest,
FetchInnerFlowRequest,
CheckParseStatusRequest,
GetBankStatementRequest,
DeleteFilesRequest,
)
from services.token_service import TokenService
from services.file_service import FileService
from services.statement_service import StatementService
from utils.error_simulator import ErrorSimulator
from typing import List
# 创建路由器
router = APIRouter()
# 初始化服务实例
token_service = TokenService()
file_service = FileService()
statement_service = StatementService()
# ==================== 接口1获取Token ====================
@router.post("/account/common/getToken")
async def get_token(request: GetTokenRequest):
"""创建项目并获取访问Token
如果 projectNo 包含 error_XXXX 标记,将返回对应的错误响应
"""
# 检测错误标记
error_code = ErrorSimulator.detect_error_marker(request.projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 正常流程
return token_service.create_token(request)
# ==================== 接口2上传文件 ====================
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
background_tasks: BackgroundTasks,
groupId: int = Form(..., description="项目ID"),
file: UploadFile = File(..., description="流水文件"),
):
"""上传流水文件
文件将立即返回并在后台延迟4秒完成解析
"""
return await file_service.upload_file(groupId, file, background_tasks)
# ==================== 接口3拉取行内流水 ====================
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(request: FetchInnerFlowRequest):
"""拉取行内流水
如果 customerNo 包含 error_XXXX 标记,将返回对应的错误响应
"""
# 检测错误标记
error_code = ErrorSimulator.detect_error_marker(request.customerNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 正常流程
return file_service.fetch_inner_flow(request)
# ==================== 接口4检查文件解析状态 ====================
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(request: CheckParseStatusRequest):
"""检查文件解析状态
返回文件是否还在解析中parsing字段
"""
return file_service.check_parse_status(
request.groupId, request.inprogressList
)
# ==================== 接口5删除文件 ====================
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(request: DeleteFilesRequest):
"""批量删除上传的文件
根据logIds列表删除对应的文件记录
"""
return file_service.delete_files(request.groupId, request.logIds, request.userId)
# ==================== 接口6获取银行流水 ====================
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(request: GetBankStatementRequest):
"""获取银行流水列表
支持分页查询pageNow, pageSize
"""
return statement_service.get_bank_statement(request)

View File

@@ -0,0 +1 @@
# Services package

View File

@@ -0,0 +1,151 @@
from fastapi import BackgroundTasks, UploadFile
from models.request import FetchInnerFlowRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict, List
import time
from datetime import datetime
class FileService:
"""文件上传和解析服务"""
def __init__(self):
self.file_records = {} # logId -> record
self.parsing_status = {} # logId -> is_parsing
self.log_counter = settings.INITIAL_LOG_ID
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一logId
self.log_counter += 1
log_id = self.log_counter
# 获取当前时间
upload_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 立即存储文件记录(初始状态:解析中)
self.file_records[log_id] = {
"logId": log_id,
"groupId": group_id,
"status": -5,
"uploadStatusDesc": "parsing",
"uploadFileName": file.filename,
"fileSize": 0, # 简化处理
"bankName": "MOCK",
"uploadTime": upload_time,
}
# 标记为解析中
self.parsing_status[log_id] = True
# 启动后台任务,延迟解析
background_tasks.add_task(
self._simulate_parsing, log_id, settings.PARSE_DELAY_SECONDS
)
# 构建响应
response = ResponseBuilder.build_success_response(
"upload", log_id=log_id, upload_time=upload_time
)
return response
def _simulate_parsing(self, log_id: int, delay_seconds: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
delay_seconds: 延迟秒数
"""
time.sleep(delay_seconds)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id]["uploadStatusDesc"] = (
"data.wait.confirm.newaccount"
)
self.parsing_status[log_id] = False
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
# 检查是否还在解析中
is_parsing = any(
self.parsing_status.get(log_id, False) for log_id in log_ids
)
# 获取待处理列表
pending_list = [
self.file_records[log_id]
for log_id in log_ids
if log_id in self.file_records
]
return {
"code": "200",
"data": {"parsing": is_parsing, "pendingList": pending_list},
"status": "200",
"successResponse": True,
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
for log_id in log_ids:
self.file_records.pop(log_id, None)
self.parsing_status.pop(log_id, None)
return {
"code": "200",
"data": {"message": "delete.files.success"},
"status": "200",
"successResponse": True,
}
def fetch_inner_flow(self, request: FetchInnerFlowRequest) -> Dict:
"""拉取行内流水(模拟无数据场景)
Args:
request: 拉取流水请求
Returns:
流水响应字典
"""
# 模拟无行内流水文件场景
return {
"code": "200",
"data": {"code": "501014", "message": "无行内流水文件"},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,33 @@
from models.request import GetBankStatementRequest
from utils.response_builder import ResponseBuilder
from typing import Dict
class StatementService:
"""流水数据服务"""
def get_bank_statement(self, request: GetBankStatementRequest) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求
Returns:
银行流水响应字典
"""
# 加载模板
template = ResponseBuilder.load_template("bank_statement")
statements = template["success_response"]["data"]["bankStatementList"]
total_count = len(statements)
# 模拟分页
start = (request.pageNow - 1) * request.pageSize
end = start + request.pageSize
page_data = statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,49 @@
from models.request import GetTokenRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict
class TokenService:
"""Token管理服务"""
def __init__(self):
self.project_counter = settings.INITIAL_PROJECT_ID
self.tokens = {} # projectId -> token_data
def create_token(self, request: GetTokenRequest) -> Dict:
"""创建Token
Args:
request: 获取Token请求
Returns:
Token响应字典
"""
# 生成唯一项目ID
self.project_counter += 1
project_id = self.project_counter
# 构建响应
response = ResponseBuilder.build_success_response(
"token",
project_id=project_id,
project_no=request.projectNo,
entity_name=request.entityName
)
# 存储token信息
self.tokens[project_id] = response.get("data")
return response
def get_project(self, project_id: int) -> Dict:
"""获取项目信息
Args:
project_id: 项目ID
Returns:
项目信息字典
"""
return self.tokens.get(project_id)

View File

@@ -0,0 +1 @@
# Tests package

View File

@@ -0,0 +1,30 @@
"""
Pytest 配置和共享 fixtures
"""
import pytest
from fastapi.testclient import TestClient
import sys
import os
# 添加项目根目录到 sys.path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import app
@pytest.fixture
def client():
"""创建测试客户端"""
return TestClient(app)
@pytest.fixture
def sample_token_request():
"""示例 Token 请求"""
return {
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000",
}

View File

@@ -0,0 +1 @@
# Integration tests package

View File

@@ -0,0 +1,113 @@
"""
集成测试 - 完整的接口调用流程测试
"""
import pytest
import time
def test_complete_workflow(client):
"""测试完整的接口调用流程"""
# 1. 获取 Token
response = client.post(
"/account/common/getToken",
json={
"projectNo": "integration_test_001",
"entityName": "集成测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000",
},
)
assert response.status_code == 200
token_data = response.json()
assert token_data["code"] == "200"
project_id = token_data["data"]["projectId"]
token = token_data["data"]["token"]
assert token is not None
# 2. 上传文件(模拟)
# 注意:在测试环境中,我们跳过实际的文件上传,直接测试其他接口
# 3. 检查解析状态
response = client.post(
"/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": "10001"},
)
assert response.status_code == 200
status_data = response.json()
assert "parsing" in status_data["data"]
# 4. 获取银行流水
response = client.post(
"/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": 10001,
"pageNow": 1,
"pageSize": 10,
},
)
assert response.status_code == 200
statement_data = response.json()
assert statement_data["code"] == "200"
assert "bankStatementList" in statement_data["data"]
assert "totalCount" in statement_data["data"]
def test_all_error_codes(client):
"""测试所有错误码"""
error_codes = ["40101", "40102", "40104", "40105", "40106", "40107", "40108"]
for error_code in error_codes:
response = client.post(
"/account/common/getToken",
json={
"projectNo": f"test_error_{error_code}",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000",
},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == error_code, f"错误码 {error_code} 未正确触发"
assert data["successResponse"] == False
def test_pagination(client):
"""测试分页功能"""
# 获取 Token
response = client.post(
"/account/common/getToken",
json={
"projectNo": "pagination_test",
"entityName": "分页测试",
"userId": "902001",
"userName": "902001",
"orgCode": "902000",
},
)
project_id = response.json()["data"]["projectId"]
# 测试第一页
response = client.post(
"/watson/api/project/getBSByLogId",
json={"groupId": project_id, "logId": 10001, "pageNow": 1, "pageSize": 1},
)
page1 = response.json()
# 测试第二页
response = client.post(
"/watson/api/project/getBSByLogId",
json={"groupId": project_id, "logId": 10001, "pageNow": 2, "pageSize": 1},
)
page2 = response.json()
# 验证总记录数相同
assert page1["data"]["totalCount"] == page2["data"]["totalCount"]
# 验证页码不同
if page1["data"]["totalCount"] > 1:
assert len(page1["data"]["bankStatementList"]) == 1
assert len(page2["data"]["bankStatementList"]) >= 0

View File

@@ -0,0 +1,46 @@
"""
API 端点测试
"""
def test_root_endpoint(client):
"""测试根路径"""
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert data["status"] == "running"
assert "swagger_docs" in data
def test_health_check(client):
"""测试健康检查端点"""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
def test_get_token_success(client, sample_token_request):
"""测试获取 Token - 成功场景"""
response = client.post("/account/common/getToken", json=sample_token_request)
assert response.status_code == 200
data = response.json()
assert data["code"] == "200"
assert "token" in data["data"]
assert "projectId" in data["data"]
def test_get_token_error_40101(client):
"""测试获取 Token - 错误场景 40101"""
request_data = {
"projectNo": "test_error_40101",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"orgCode": "902000",
}
response = client.post("/account/common/getToken", json=request_data)
assert response.status_code == 200
data = response.json()
assert data["code"] == "40101"
assert data["successResponse"] == False

View File

@@ -0,0 +1 @@
# Utils package

View File

@@ -0,0 +1,49 @@
from typing import Dict, Optional
import re
class ErrorSimulator:
"""错误场景模拟器"""
# 错误码映射表
ERROR_CODES = {
"40101": {"code": "40101", "message": "appId错误"},
"40102": {"code": "40102", "message": "appSecretCode错误"},
"40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
"40105": {"code": "40105", "message": "只读模式下无法新建项目"},
"40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
"40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
"40108": {"code": "40108", "message": "当前用户所属行社无权限"},
"501014": {"code": "501014", "message": "无行内流水文件"},
}
@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
"""检测字符串中的错误标记
规则:如果字符串包含 error_XXXX则返回 XXXX
例如:
- "project_error_40101" -> "40101"
- "test_error_501014" -> "501014"
"""
if not value:
return None
pattern = r'error_(\d+)'
match = re.search(pattern, value)
if match:
return match.group(1)
return None
@staticmethod
def build_error_response(error_code: str) -> Optional[Dict]:
"""构建错误响应"""
if error_code in ErrorSimulator.ERROR_CODES:
error_info = ErrorSimulator.ERROR_CODES[error_code]
return {
"code": error_info["code"],
"message": error_info["message"],
"status": error_info["code"],
"successResponse": False
}
return None

View File

@@ -0,0 +1,69 @@
import json
from pathlib import Path
from typing import Dict, Any
import copy
class ResponseBuilder:
"""响应构建器"""
TEMPLATE_DIR = Path(__file__).parent.parent / "config" / "responses"
@staticmethod
def load_template(template_name: str) -> Dict:
"""加载 JSON 模板
Args:
template_name: 模板名称(不含.json扩展名
Returns:
模板字典
"""
file_path = ResponseBuilder.TEMPLATE_DIR / f"{template_name}.json"
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
@staticmethod
def replace_placeholders(template: Dict, **kwargs) -> Dict:
"""递归替换占位符
Args:
template: 模板字典
**kwargs: 占位符键值对
Returns:
替换后的字典
"""
def replace_value(value):
if isinstance(value, str):
result = value
for key, val in kwargs.items():
placeholder = f"{{{key}}}"
if placeholder in result:
result = result.replace(placeholder, str(val))
return result
elif isinstance(value, dict):
return {k: replace_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [replace_value(item) for item in value]
return value
# 深拷贝模板,避免修改原始数据
return replace_value(copy.deepcopy(template))
@staticmethod
def build_success_response(template_name: str, **kwargs) -> Dict:
"""构建成功响应
Args:
template_name: 模板名称
**kwargs: 占位符键值对
Returns:
响应字典
"""
template = ResponseBuilder.load_template(template_name)
return ResponseBuilder.replace_placeholders(
template["success_response"],
**kwargs
)