新增NAS一键打包部署脚本及Docker部署方案

This commit is contained in:
wkc
2026-03-13 15:13:18 +08:00
parent 77f53cb991
commit d63bdbf7b7
44 changed files with 2728 additions and 0 deletions

9
.env.example Normal file
View File

@@ -0,0 +1,9 @@
# Docker 对外端口
FRONTEND_PORT=62319
BACKEND_PORT=62318
LSFX_MOCK_PORT=62320
# Spring Boot 运行配置
SPRING_PROFILES_ACTIVE=local
RUOYI_PROFILE=/app/data/ruoyi
JAVA_OPTS=-Xms512m -Xmx1024m

5
.gitignore vendored
View File

@@ -40,9 +40,11 @@ nbdist/
######################################################################
# Others
*.log
*.pyc
*.xml.versionsBackup
*.swp
nul
__pycache__/
# Git Worktrees
.worktrees/
@@ -70,3 +72,6 @@ db_config.conf
/.playwright-cli/
# Local deployment bundles
.deploy/

33
deploy/deploy-to-nas.bat Normal file
View File

@@ -0,0 +1,33 @@
@echo off
setlocal EnableExtensions EnableDelayedExpansion
set "SCRIPT_DIR=%~dp0"
set "SERVER_HOST=116.62.17.81"
set "SERVER_PORT=9444"
set "SERVER_USERNAME=wkc"
set "SERVER_PASSWORD=wkc@0825"
set "REMOTE_ROOT=/volume1/webapp/ccdi"
set "DRY_RUN="
set /a POSITION=0
:parse_args
if "%~1"=="" goto run_script
if /I "%~1"=="--dry-run" (
set "DRY_RUN=-DryRun"
) else (
set /a POSITION+=1
if !POSITION!==1 set "SERVER_HOST=%~1"
if !POSITION!==2 set "SERVER_PORT=%~1"
if !POSITION!==3 set "SERVER_USERNAME=%~1"
if !POSITION!==4 set "SERVER_PASSWORD=%~1"
if !POSITION!==5 set "REMOTE_ROOT=%~1"
)
shift
goto parse_args
:run_script
powershell -NoProfile -ExecutionPolicy Bypass -File "%SCRIPT_DIR%deploy.ps1" -ServerHost "%SERVER_HOST%" -Port "%SERVER_PORT%" -Username "%SERVER_USERNAME%" -Password "%SERVER_PASSWORD%" -RemoteRoot "%REMOTE_ROOT%" %DRY_RUN%
set "EXIT_CODE=%ERRORLEVEL%"
endlocal & exit /b %EXIT_CODE%

112
deploy/deploy.ps1 Normal file
View File

@@ -0,0 +1,112 @@
param(
[string]$ServerHost = "116.62.17.81",
[int]$Port = 9444,
[string]$Username = "wkc",
[string]$Password = "wkc@0825",
[string]$RemoteRoot = "/volume1/webapp/ccdi",
[switch]$DryRun
)
Set-StrictMode -Version Latest
$ErrorActionPreference = "Stop"
$scriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path
$repoRoot = (Resolve-Path (Join-Path $scriptDir "..")).Path
$stageRoot = Join-Path $repoRoot ".deploy\\ccdi-package"
if ($DryRun) {
Write-Host "[DryRun] 一键部署参数预览"
Write-Host "Host: $ServerHost"
Write-Host "Port: $Port"
Write-Host "Username: $Username"
Write-Host "RemoteRoot: $RemoteRoot"
exit 0
}
function Ensure-Command {
param([string]$CommandName)
if (-not (Get-Command $CommandName -ErrorAction SilentlyContinue)) {
throw "缺少命令: $CommandName"
}
}
function Reset-Directory {
param([string]$Path)
if (Test-Path $Path) {
[System.IO.Directory]::Delete($Path, $true)
}
New-Item -ItemType Directory -Path $Path | Out-Null
}
function Copy-ItemSafe {
param(
[string]$Source,
[string]$Destination
)
Copy-Item -Path $Source -Destination $Destination -Recurse -Force
}
Write-Host "[1/5] 检查本地环境"
Ensure-Command "mvn"
Ensure-Command "npm"
Ensure-Command "python"
Write-Host "[2/5] 打包后端"
Push-Location $repoRoot
try {
mvn clean package -DskipTests
if ($LASTEXITCODE -ne 0) {
throw "后端打包失败"
}
} finally {
Pop-Location
}
Write-Host "[3/5] 打包前端"
Push-Location (Join-Path $repoRoot "ruoyi-ui")
try {
npm run build:prod
if ($LASTEXITCODE -ne 0) {
throw "前端打包失败"
}
} finally {
Pop-Location
}
Write-Host "[4/5] 组装部署目录"
Reset-Directory $stageRoot
New-Item -ItemType Directory -Path (Join-Path $stageRoot "backend") | Out-Null
New-Item -ItemType Directory -Path (Join-Path $stageRoot "frontend") | Out-Null
Copy-ItemSafe (Join-Path $repoRoot "docker") (Join-Path $stageRoot "docker")
Copy-ItemSafe (Join-Path $repoRoot "lsfx-mock-server") (Join-Path $stageRoot "lsfx-mock-server")
Copy-ItemSafe (Join-Path $repoRoot "ruoyi-ui\\dist") (Join-Path $stageRoot "frontend\\dist")
Copy-ItemSafe (Join-Path $repoRoot "docker-compose.yml") (Join-Path $stageRoot "docker-compose.yml")
Copy-ItemSafe (Join-Path $repoRoot ".env.example") (Join-Path $stageRoot ".env.example")
Copy-ItemSafe (Join-Path $repoRoot "ruoyi-admin\\target\\ruoyi-admin.jar") (Join-Path $stageRoot "backend\\ruoyi-admin.jar")
Write-Host "[5/5] 上传并远端部署"
$paramikoCheck = @'
import importlib.util
import sys
sys.exit(0 if importlib.util.find_spec("paramiko") else 1)
'@
$paramikoCheck | python -
if ($LASTEXITCODE -ne 0) {
python -m pip install --user paramiko
if ($LASTEXITCODE -ne 0) {
throw "安装 paramiko 失败"
}
}
python (Join-Path $scriptDir "remote-deploy.py") `
--host $ServerHost `
--port $Port `
--username $Username `
--password $Password `
--local-root $stageRoot `
--remote-root $RemoteRoot
if ($LASTEXITCODE -ne 0) {
throw "远端部署失败"
}

177
deploy/remote-deploy.py Normal file
View File

@@ -0,0 +1,177 @@
import argparse
import os
import posixpath
import shlex
import sys
from pathlib import Path
import paramiko
SKIP_DIRS = {"__pycache__", ".pytest_cache", ".git"}
SKIP_FILES = {".DS_Store"}
def parse_args():
parser = argparse.ArgumentParser(description="Upload CCDI deployment package and run docker compose remotely.")
parser.add_argument("--host", required=True)
parser.add_argument("--port", type=int, required=True)
parser.add_argument("--username", required=True)
parser.add_argument("--password", required=True)
parser.add_argument("--local-root", required=True)
parser.add_argument("--remote-root", required=True)
return parser.parse_args()
def ensure_remote_dir(ssh, remote_path):
command = f"mkdir -p {shlex.quote(remote_path)}"
exit_code, output, error = run_command(ssh, command)
if exit_code != 0:
raise RuntimeError(f"Failed to create remote directory {remote_path}:\n{output}\n{error}")
def resolve_sftp_root(sftp, shell_root):
parts = [part for part in shell_root.split("/") if part]
for index in range(len(parts)):
candidate = "/" + "/".join(parts[index:])
try:
sftp.listdir(candidate)
return candidate
except OSError:
continue
raise RuntimeError(f"Unable to resolve SFTP path for remote root: {shell_root}")
def upload_tree(ssh, sftp, local_root, shell_remote_root, sftp_remote_root):
for current_root, dirs, files in os.walk(local_root):
dirs[:] = [directory for directory in dirs if directory not in SKIP_DIRS]
relative = os.path.relpath(current_root, local_root)
relative_posix = "" if relative == "." else relative.replace("\\", "/")
shell_remote_dir = shell_remote_root if not relative_posix else posixpath.join(shell_remote_root, relative_posix)
sftp_remote_dir = sftp_remote_root if not relative_posix else posixpath.join(sftp_remote_root, relative_posix)
ensure_remote_dir(ssh, shell_remote_dir)
for file_name in files:
if file_name in SKIP_FILES:
continue
local_file = os.path.join(current_root, file_name)
remote_file = posixpath.join(sftp_remote_dir, file_name)
sftp.put(local_file, remote_file)
def run_command(ssh, command):
stdin, stdout, stderr = ssh.exec_command(command)
exit_code = stdout.channel.recv_exit_status()
output = stdout.read().decode("utf-8", errors="ignore")
error = stderr.read().decode("utf-8", errors="ignore")
return exit_code, output, error
def sudo_prefix(password):
return f"printf '%s\\n' {shlex.quote(password)} | sudo -S -p '' "
def detect_compose_command(ssh, password):
daemon_prefix = ""
daemon_checks = [
("", "docker ps >/dev/null 2>&1"),
(sudo_prefix(password), f"{sudo_prefix(password)}docker ps >/dev/null 2>&1"),
]
for prefix, probe in daemon_checks:
exit_code, _, _ = run_command(ssh, probe)
if exit_code == 0:
daemon_prefix = prefix
break
else:
raise RuntimeError("Docker daemon is not accessible on remote host.")
checks = [
(f"{daemon_prefix}docker compose", f"{daemon_prefix}docker compose version"),
(f"{daemon_prefix}docker-compose", f"{daemon_prefix}docker-compose --version"),
]
for compose_cmd, probe in checks:
exit_code, _, _ = run_command(ssh, probe)
if exit_code == 0:
return compose_cmd
raise RuntimeError("Docker Compose command not found on remote host.")
def main():
args = parse_args()
local_root = Path(args.local_root).resolve()
remote_root = args.remote_root.rstrip("/")
if not local_root.exists():
raise FileNotFoundError(f"Local root does not exist: {local_root}")
ssh = paramiko.SSHClient()
ssh.set_missing_host_key_policy(paramiko.AutoAddPolicy())
ssh.connect(
hostname=args.host,
port=args.port,
username=args.username,
password=args.password,
timeout=20,
)
sftp = ssh.open_sftp()
try:
pre_clean = (
"set -e;"
f"mkdir -p {shlex.quote(remote_root)};"
f"mkdir -p {shlex.quote(posixpath.join(remote_root, 'runtime/ruoyi'))};"
f"mkdir -p {shlex.quote(posixpath.join(remote_root, 'runtime/logs/backend'))};"
f"rm -rf {shlex.quote(posixpath.join(remote_root, 'backend'))} "
f"{shlex.quote(posixpath.join(remote_root, 'frontend'))} "
f"{shlex.quote(posixpath.join(remote_root, 'docker'))} "
f"{shlex.quote(posixpath.join(remote_root, 'lsfx-mock-server'))};"
f"rm -f {shlex.quote(posixpath.join(remote_root, 'docker-compose.yml'))} "
f"{shlex.quote(posixpath.join(remote_root, '.env.example'))};"
)
exit_code, output, error = run_command(ssh, pre_clean)
if exit_code != 0:
raise RuntimeError(f"Remote cleanup failed:\n{output}\n{error}")
sftp_remote_root = resolve_sftp_root(sftp, remote_root)
upload_tree(ssh, sftp, str(local_root), remote_root, sftp_remote_root)
compose_cmd = detect_compose_command(ssh, args.password)
deploy_command = (
"set -e;"
f"cd {shlex.quote(remote_root)};"
f"{compose_cmd} up -d --build;"
f"{compose_cmd} ps;"
)
exit_code, output, error = run_command(ssh, deploy_command)
if exit_code != 0:
raise RuntimeError(f"Remote deploy failed:\n{output}\n{error}")
logs_command = (
"set -e;"
f"cd {shlex.quote(remote_root)};"
f"{compose_cmd} logs backend --tail 120;"
)
_, logs_output, logs_error = run_command(ssh, logs_command)
print("=== DEPLOY OUTPUT ===")
print(output.strip())
if error.strip():
print("=== DEPLOY STDERR ===")
print(error.strip())
if logs_output.strip():
print("=== BACKEND LOGS ===")
print(logs_output.strip())
if logs_error.strip():
print("=== BACKEND LOG STDERR ===")
print(logs_error.strip())
finally:
sftp.close()
ssh.close()
if __name__ == "__main__":
try:
main()
except Exception as exc:
print(str(exc), file=sys.stderr)
sys.exit(1)

45
docker-compose.yml Normal file
View File

@@ -0,0 +1,45 @@
services:
backend:
build:
context: .
dockerfile: docker/backend/Dockerfile
container_name: ccdi-backend
restart: unless-stopped
environment:
SPRING_PROFILES_ACTIVE: ${SPRING_PROFILES_ACTIVE:-local}
RUOYI_PROFILE: ${RUOYI_PROFILE:-/app/data/ruoyi}
JAVA_OPTS: ${JAVA_OPTS:--Xms512m -Xmx1024m}
ports:
- "${BACKEND_PORT:-62318}:8080"
- "${LSFX_MOCK_PORT:-62320}:8000"
volumes:
- ./runtime/ruoyi:/app/data/ruoyi
- ./runtime/logs/backend:/app/logs
lsfx-mock-server:
build:
context: .
dockerfile: docker/mock/Dockerfile
container_name: ccdi-lsfx-mock
restart: unless-stopped
depends_on:
- backend
network_mode: "service:backend"
environment:
APP_NAME: 流水分析Mock服务
APP_VERSION: 1.0.0
DEBUG: "false"
HOST: 0.0.0.0
PORT: 8000
PARSE_DELAY_SECONDS: 4
frontend:
build:
context: .
dockerfile: docker/frontend/Dockerfile
container_name: ccdi-frontend
restart: unless-stopped
depends_on:
- backend
ports:
- "${FRONTEND_PORT:-62319}:80"

11
docker/backend/Dockerfile Normal file
View File

@@ -0,0 +1,11 @@
FROM eclipse-temurin:21-jre-jammy
WORKDIR /app
COPY backend/ruoyi-admin.jar /app/ruoyi-admin.jar
RUN mkdir -p /app/data/ruoyi /app/logs
EXPOSE 8080
ENTRYPOINT ["sh", "-c", "java ${JAVA_OPTS} -jar /app/ruoyi-admin.jar"]

View File

@@ -0,0 +1,8 @@
FROM nginx:stable-alpine
COPY docker/frontend/nginx.conf /etc/nginx/conf.d/default.conf
COPY frontend/dist/ /usr/share/nginx/html/
RUN chmod -R a+rX /usr/share/nginx/html
EXPOSE 80

View File

@@ -0,0 +1,27 @@
server {
listen 80;
server_name _;
root /usr/share/nginx/html;
index index.html;
location / {
try_files $uri $uri/ /index.html;
}
location /prod-api/ {
proxy_pass http://backend:8080/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /v3/api-docs/ {
proxy_pass http://backend:8080/v3/api-docs/;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
}
}

15
docker/mock/Dockerfile Normal file
View File

@@ -0,0 +1,15 @@
FROM python:3.11-slim
ENV PYTHONUNBUFFERED=1
WORKDIR /app
COPY lsfx-mock-server/requirements.txt /tmp/requirements.txt
RUN pip install --no-cache-dir -r /tmp/requirements.txt
COPY lsfx-mock-server /app
EXPOSE 8000
CMD ["python", "main.py"]

View File

@@ -0,0 +1,136 @@
# CCDI Docker 后端部署 Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 为 Spring Boot 后端与 `lsfx mock server` 建立可构建、可上传、可在服务器运行的 Docker 部署链路。
**Architecture:** 后端产物继续使用 Maven 构建出的 `ruoyi-admin.jar`,运行时通过 Java 21 容器加载 `local` profile。`lsfx mock server` 作为独立 Python 服务纳入仓库,并在 Compose 中与后端共享网络命名空间,以兼容现有 `http://localhost:8000` 配置。
**Tech Stack:** Maven, Spring Boot 3, Java 21, Docker Compose, Python 3.11, FastAPI, PowerShell, Paramiko
---
### Task 1: 整理 `lsfx mock server` 到主仓库
**Files:**
- Create: `lsfx-mock-server/**`
- Modify: `docs/plans/2026-03-13-ccdi-docker-deployment-design.md`
- Test: `lsfx-mock-server/tests/test_api.py`
**Step 1: 复制并清理运行文件**
- 从现有工作树复制 `main.py``config/``models/``routers/``services/``utils/``requirements.txt``tests/`
- 排除 `__pycache__``.pytest_cache`
**Step 2: 运行 mock server 测试**
Run: `python -m pytest lsfx-mock-server/tests -q`
Expected: 测试通过,接口与健康检查可用
**Step 3: 修正最小必要问题**
- 若路径、依赖或导入失败,仅做最小修复
**Step 4: 记录目录用途**
-`lsfx-mock-server/README.md` 补充与主项目集成的启动说明
### Task 2: 编写后端与 mock 的 Docker 文件
**Files:**
- Create: `docker/backend/Dockerfile`
- Create: `docker/mock/Dockerfile`
- Modify: `lsfx-mock-server/README.md`
**Step 1: 创建后端镜像定义**
- 使用 Java 21 运行时镜像
- 工作目录统一为 `/app`
- 复制 `backend/ruoyi-admin.jar`
- 默认入口使用 `java -jar /app/ruoyi-admin.jar`
**Step 2: 创建 mock 镜像定义**
- 使用 `python:3.11-slim`
- 安装 `lsfx-mock-server/requirements.txt`
- 启动 `python main.py`
**Step 3: 本地验证镜像定义**
Run: `docker build -f docker/mock/Dockerfile -t ccdi-lsfx-mock:test .`
Expected: 构建成功
### Task 3: 编写 Compose 编排
**Files:**
- Create: `docker-compose.yml`
- Create: `.env.example`
**Step 1: 定义 `backend` 服务**
- 端口映射 `62318:8080`
- 环境变量包含 `SPRING_PROFILES_ACTIVE=local``RUOYI_PROFILE=/app/data/ruoyi`
- 卷挂载运行目录与日志目录
**Step 2: 定义 `lsfx-mock-server` 服务**
- 使用 `network_mode: "service:backend"`
- 依赖 `backend`
- 不额外对外暴露端口
**Step 3: 做配置校验**
Run: `docker compose config`
Expected: Compose 文件能正常展开且无语法错误
### Task 4: 编写后端打包与远端部署脚本
**Files:**
- Create: `deploy/deploy.ps1`
- Create: `deploy/remote-deploy.py`
**Step 1: 编写本地打包流程**
- 执行 Maven 打包
- 收集 `ruoyi-admin.jar`
- 检查 `lsfx-mock-server` 运行文件完整性
**Step 2: 编写上传脚本**
- 使用 Paramiko 建立 SSH 与 SFTP 连接
- 创建远端目录 `/volume1/webapp/ccdi`
- 上传 Compose、Dockerfile、后端 JAR、mock 目录
**Step 3: 编写远端启动命令**
- 兼容 `docker compose``docker-compose`
- 执行 `up -d --build`
- 返回容器状态与后端日志摘要
### Task 5: 构建与联调验证
**Files:**
- Modify: `docs/plans/2026-03-13-ccdi-docker-deployment-design.md`
**Step 1: 本地构建后端**
Run: `mvn clean package -DskipTests`
Expected: `ruoyi-admin/target/ruoyi-admin.jar` 生成成功
**Step 2: 本地跑通 Compose 校验**
Run: `docker compose config`
Expected: 无错误
**Step 3: 远端部署验证**
- 验证 `backend` 容器启动
- 验证 `mock server` 在后端网络命名空间内可访问
- 验证 `http://116.62.17.81:62318/swagger-ui/index.html`
**Step 4: 提交**
```bash
git add lsfx-mock-server docker docker-compose.yml .env.example deploy docs/plans/2026-03-13-ccdi-docker-deployment-*.md
git commit -m "新增Docker后端部署方案"
```

View File

@@ -0,0 +1,156 @@
# CCDI Docker 部署设计
**日期**: 2026-03-13
**目标**: 将当前项目的前端、后端与 `lsfx mock server` 打包后上传到服务器 `116.62.17.81:9444``/volume1/webapp/ccdi`,并使用 Docker 统一部署运行。
## 背景与约束
- 前端对外端口固定为 `62319`
- 后端对外端口固定为 `62318`
- `lsfx mock server` 对外端口固定为 `62320`
- 后端运行时必须使用 Java 21
- 后端运行 profile 固定为 `local`
- 后端继续使用现有 [`application-local.yml`](/D:/ccdi/ccdi/ruoyi-admin/src/main/resources/application-local.yml) 中的 MySQL、Redis 与 `lsfx.api.base-url`
- `lsfx.api.base-url` 当前为 `http://localhost:8000`,希望不改动既有配置
- 服务端部署根目录固定为 `/volume1/webapp/ccdi`
## 方案选择
### 方案一:`mock server` 与后端共用网络命名空间
前端、后端、`mock server` 全部使用 Docker 部署,其中 `lsfx mock server` 通过 `network_mode: "service:backend"` 与后端共享网络命名空间。
优点:
- 不需要修改 `application-local.yml` 中的 `http://localhost:8000`
- 后端容器内访问 `localhost:8000` 时,实际就是同网络命名空间内的 `mock server`
- 对外暴露前端、后端和 `lsfx mock server` 端口,同时仍保持后端对 `localhost:8000` 的兼容访问
缺点:
- Compose 编排方式比普通三容器互联稍特殊
### 方案二:三服务独立组网
后端访问 `http://lsfx-mock-server:8000`
优点:
- Compose 结构最常规
缺点:
- 需要修改现有 `local` 配置,不符合本次要求
### 方案三:本地构建镜像后上传镜像包
优点:
- 服务器上不需要源码级构建
缺点:
- 容易受到本地与服务器架构差异影响
- 镜像体积大,上传与迭代成本高
## 最终方案
采用方案一。
## 部署架构
### 前端
- 本地执行 `npm run build:prod`
- 使用 Nginx 容器托管 `ruoyi-ui/dist`
- Nginx 将 `/prod-api``/v3/api-docs` 反向代理到后端容器 `http://backend:8080`
- Docker 对外暴露 `62319`
### 后端
- 本地执行 `mvn clean package -DskipTests`
- 使用 Java 21 运行 `ruoyi-admin/target/ruoyi-admin.jar`
- 通过环境变量设置:
- `SPRING_PROFILES_ACTIVE=local`
- `RUOYI_PROFILE=/app/data/ruoyi`
- Docker 对外暴露 `62318`
- 同时额外映射 `62320 -> 8000`,让宿主机可直接访问共享网络命名空间中的 `lsfx mock server`
### LSFX Mock Server
- 将现有 FastAPI 实现整理为主仓库正式目录
- 使用 Python 3.11 容器运行
- 默认监听 `8000`
- 通过后端共享网络命名空间,对外暴露 `62320`
- 通过 `network_mode: "service:backend"` 让后端继续使用 `http://localhost:8000`
## 目录规划
服务器目录规划如下:
```text
/volume1/webapp/ccdi/
├── docker-compose.yml
├── .env
├── deploy/
│ ├── deploy.ps1
│ └── remote-deploy.py
├── docker/
│ ├── backend/Dockerfile
│ ├── frontend/Dockerfile
│ ├── frontend/nginx.conf
│ └── mock/Dockerfile
├── backend/
│ └── ruoyi-admin.jar
├── frontend/
│ └── dist/
├── lsfx-mock-server/
└── runtime/
├── ruoyi/
└── logs/
```
## 关键配置设计
### `ruoyi.profile`
当前 [`application-local.yml`](/D:/ccdi/ccdi/ruoyi-admin/src/main/resources/application-local.yml) 未定义 `ruoyi.profile`。后端代码中的 [`RuoYiConfig.java`](/D:/ccdi/ccdi/ruoyi-common/src/main/java/com/ruoyi/common/config/RuoYiConfig.java) 依赖该值计算上传、导入与头像目录。
因此在 Docker 运行时通过环境变量补充:
```text
RUOYI_PROFILE=/app/data/ruoyi
```
并挂载到服务器目录,确保容器重启后数据保留。
### 反向代理
前端仍保持生产构建时的 `VUE_APP_BASE_API=/prod-api`避免改动业务代码。Nginx 负责将:
- `/prod-api/` 转发到 `http://backend:8080/`
- `/v3/api-docs/` 转发到 `http://backend:8080/v3/api-docs/`
## 部署流程
1. 本地整理并提交部署文件
2. 本地打包前端与后端产物
3. 本地通过 SSH/SFTP 上传到服务器目标路径
4. 远端执行 `docker compose up -d --build`
5. 验证前端、后端、`mock server` 与代理链路
## 验证点
- `http://116.62.17.81:62319` 可打开前端
- `http://116.62.17.81:62318/swagger-ui/index.html` 可访问后端文档
- `http://116.62.17.81:62320/docs` 可访问 `lsfx mock server` 文档
- 前端登录与接口请求经 `/prod-api` 正常转发
- 后端容器可访问 `http://localhost:8000`
- `mock server` 健康检查正常
## 风险与处理
- 若服务器仅支持 `docker-compose`,部署脚本需兼容 `docker compose``docker-compose`
- 若服务器无法访问 `192.168.0.111` 上的 MySQL/Redis则后端启动会失败本次不改该配置
- 若服务器无 Docker 运行环境,需要先补齐 Docker 与 Compose 插件

View File

@@ -0,0 +1,89 @@
# CCDI Docker 前端部署 Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 为 Vue 前端建立可打包、可容器化、可上传并在服务器通过 Nginx 对外提供服务的 Docker 部署链路。
**Architecture:** 前端继续使用现有 `npm run build:prod` 产出 `dist`,容器内由 Nginx 提供静态资源与反向代理。通过 `/prod-api``/v3/api-docs` 将请求转发到后端容器,保持现有业务代码与生产环境变量不变。
**Tech Stack:** Vue 2, npm, Nginx, Docker Compose, PowerShell, Paramiko
---
### Task 1: 定义前端容器与 Nginx 代理
**Files:**
- Create: `docker/frontend/Dockerfile`
- Create: `docker/frontend/nginx.conf`
- Modify: `docs/plans/2026-03-13-ccdi-docker-deployment-design.md`
**Step 1: 创建前端镜像定义**
- 基于 `nginx:stable-alpine`
- 复制 `frontend/dist` 到 Nginx 静态目录
- 复制自定义 `nginx.conf`
**Step 2: 配置反向代理**
- `/` 返回前端 `index.html`
- `/prod-api/` 代理到 `http://backend:8080/`
- `/v3/api-docs/` 代理到 `http://backend:8080/v3/api-docs/`
**Step 3: 校验 Nginx 配置**
Run: `docker run --rm -v ${PWD}/docker/frontend/nginx.conf:/etc/nginx/conf.d/default.conf:ro nginx:stable-alpine nginx -t`
Expected: syntax is ok
### Task 2: 编写前端打包收集流程
**Files:**
- Modify: `deploy/deploy.ps1`
- Create: `frontend/.gitkeep`
**Step 1: 构建前端**
Run: `npm --prefix ruoyi-ui run build:prod`
Expected: `ruoyi-ui/dist` 生成成功
**Step 2: 收集部署目录**
-`ruoyi-ui/dist` 复制到 `frontend/dist`
- 保持部署目录与 Dockerfile 输入一致
### Task 3: 将前端加入 Compose
**Files:**
- Modify: `docker-compose.yml`
- Modify: `.env.example`
**Step 1: 定义 `frontend` 服务**
- 暴露 `62319:80`
- 依赖 `backend`
**Step 2: 校验 Compose**
Run: `docker compose config`
Expected: 前端服务、依赖与端口映射正确
### Task 4: 联调验证
**Files:**
- Modify: `docs/plans/2026-03-13-ccdi-docker-deployment-design.md`
**Step 1: 检查前端生产产物**
- 验证 `dist/index.html``static/` 文件生成
**Step 2: 远端验证访问**
- 验证 `http://116.62.17.81:62319`
- 登录后检查浏览器请求是否发往 `/prod-api`
- 验证 Swagger 页面可通过前端入口转发访问
**Step 3: 提交**
```bash
git add docker/frontend deploy/deploy.ps1 docker-compose.yml .env.example docs/plans/2026-03-13-ccdi-docker-deployment-*.md
git commit -m "新增Docker前端部署方案"
```

View File

@@ -0,0 +1,69 @@
# Deploy To NAS Backend Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 为现有后端打包与远端部署链路增加一个可被 `.bat` 入口复用的 `DryRun` 模式。
**Architecture:** 保持 `deploy.ps1` 作为真实执行器不变,仅增加参数解析和轻量分支,让 BAT 可以先走快速验证,再走真实部署。底层上传与远端部署逻辑继续复用现有 Python 脚本。
**Tech Stack:** PowerShell, Python, Docker Compose, Windows CMD
---
### Task 1: 为 `deploy.ps1` 增加 DryRun 模式
**Files:**
- Modify: `deploy/deploy.ps1`
- Test: `tests/deploy/test_deploy_to_nas.py`
**Step 1: 写失败测试**
```python
def test_deploy_ps1_dry_run_prints_target():
...
```
**Step 2: 运行测试确认失败**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_deploy_ps1_dry_run_prints_target -q`
Expected: 失败,因为 `deploy.ps1` 还不支持 `-DryRun`
**Step 3: 最小实现**
- 新增 `-DryRun` 开关
- 打印 `Host/Port/Username/RemoteRoot`
- 直接返回成功
**Step 4: 重新运行测试**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_deploy_ps1_dry_run_prints_target -q`
Expected: 通过
### Task 2: 保持真实部署行为不变
**Files:**
- Modify: `deploy/deploy.ps1`
- Modify: `deploy/remote-deploy.py`
- Test: `tests/deploy/test_deploy_to_nas.py`
**Step 1: 写失败测试**
```python
def test_deploy_ps1_still_accepts_default_parameters():
...
```
**Step 2: 运行测试确认失败**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_deploy_ps1_still_accepts_default_parameters -q`
Expected: 因缺少对应输出或参数处理失败而不通过
**Step 3: 最小实现**
- 保持默认 NAS 参数
- 保持真实执行路径不变
**Step 4: 运行测试**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py -q`
Expected: 通过

View File

@@ -0,0 +1,110 @@
# 一键部署 BAT 入口设计
**日期**: 2026-03-13
**目标**: 在现有 PowerShell 与 Python 部署链路之上,新增一个 Windows 下可直接双击或命令行执行的 `.bat` 入口脚本,用于一键打包前后端并部署到 NAS。
## 背景
当前仓库已经有以下部署能力:
- [`deploy/deploy.ps1`](/D:/ccdi/ccdi/deploy/deploy.ps1):负责本地打包、组装部署目录、上传到 NAS、远端执行 Docker Compose
- [`deploy/remote-deploy.py`](/D:/ccdi/ccdi/deploy/remote-deploy.py):负责 SSH/SFTP 上传与远端 Docker 部署
但 Windows 用户直接使用时仍需要显式调用 PowerShell不够直观。
## 方案选择
### 方案一:薄封装 BAT 入口
新增一个 `deploy/deploy-to-nas.bat`,只做以下几件事:
- 定位仓库根目录
- 调用 PowerShell 执行 `deploy.ps1`
- 提供默认的 NAS 连接参数
- 原样透传退出码
优点:
- 复用现有稳定链路
- 维护成本最低
- 双击和命令行都能使用
缺点:
- 底层仍依赖 PowerShell、Python、Maven、npm
### 方案二:把所有逻辑都改写到 BAT
优点:
- 形式上只有一个入口文件
缺点:
- BAT 对目录处理、错误处理、网络部署支持差
- 可维护性明显下降
### 方案三BAT + 独立配置文件
优点:
- 多环境切换更灵活
缺点:
- 对当前固定 NAS 场景偏重
## 最终方案
采用方案一。
## 设计细节
### 入口脚本
新增 [`deploy/deploy-to-nas.bat`](/D:/ccdi/ccdi/deploy/deploy-to-nas.bat)。
职责:
- 默认使用:
- Host: `116.62.17.81`
- Port: `9444`
- Username: `wkc`
- Password: `wkc@0825`
- RemoteRoot: `/volume1/webapp/ccdi`
- 支持命令行覆盖参数
- 统一调用 `powershell -ExecutionPolicy Bypass -File deploy.ps1`
### 可验证性
为避免每次验证都真的触发完整部署,给 [`deploy/deploy.ps1`](/D:/ccdi/ccdi/deploy/deploy.ps1) 增加一个 `-DryRun` 开关:
- 打印将要使用的目标参数
- 不执行 Maven、npm、上传与远端部署
- 直接返回 `0`
这样 `.bat` 可以配合 `--dry-run` 做快速回归验证。
### 参数约定
BAT 入口参数顺序:
```text
deploy-to-nas.bat [host] [port] [username] [password] [remoteRoot] [--dry-run]
```
如果不传,则使用默认值。
## 验证方式
1. `cmd /c deploy\deploy-to-nas.bat --dry-run`
2. 确认输出中的 NAS 地址、端口、路径与默认值一致
3. 可选:`cmd /c deploy\deploy-to-nas.bat 116.62.17.81 9444 wkc wkc@0825 /volume1/webapp/ccdi --dry-run`
4. 最终运行无 `--dry-run` 的真实部署
## 风险与处理
- 若用户机器禁止 PowerShell 脚本执行BAT 通过 `-ExecutionPolicy Bypass` 绕过当前会话限制
- 若路径中存在空格BAT 需统一用双引号包裹
- 若密码中存在特殊字符BAT 只做原样透传,不自行拼接复杂 shell 表达式

View File

@@ -0,0 +1,68 @@
# Deploy To NAS Frontend Implementation Plan
> **For Claude:** REQUIRED SUB-SKILL: Use superpowers:executing-plans to implement this plan task-by-task.
**Goal:** 新增一个 Windows 下可双击执行的 `.bat` 一键入口,默认触发前后端打包并部署到 NAS。
**Architecture:** 通过 `deploy-to-nas.bat` 作为薄封装入口,把默认参数与可选覆盖参数转交给现有 `deploy.ps1`。BAT 只负责入口体验,不承载核心部署逻辑。
**Tech Stack:** Windows CMD, PowerShell, pytest
---
### Task 1: 新增 BAT 入口脚本
**Files:**
- Create: `deploy/deploy-to-nas.bat`
- Test: `tests/deploy/test_deploy_to_nas.py`
**Step 1: 写失败测试**
```python
def test_bat_dry_run_uses_default_nas_target():
...
```
**Step 2: 运行测试确认失败**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_bat_dry_run_uses_default_nas_target -q`
Expected: 失败,因为 BAT 文件不存在
**Step 3: 最小实现**
- 新建 BAT 文件
- 默认调用 `deploy.ps1`
- 支持 `--dry-run`
**Step 4: 运行测试**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_bat_dry_run_uses_default_nas_target -q`
Expected: 通过
### Task 2: 支持参数覆盖
**Files:**
- Modify: `deploy/deploy-to-nas.bat`
- Test: `tests/deploy/test_deploy_to_nas.py`
**Step 1: 写失败测试**
```python
def test_bat_dry_run_accepts_override_arguments():
...
```
**Step 2: 运行测试确认失败**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py::test_bat_dry_run_accepts_override_arguments -q`
Expected: 失败,因为 BAT 未透传覆盖参数
**Step 3: 最小实现**
- 按位置参数传递 host、port、username、password、remoteRoot
-`--dry-run` 透传给 PowerShell
**Step 4: 全量测试**
Run: `py -3.12 -m pytest tests/deploy/test_deploy_to_nas.py -q`
Expected: 通过

View File

@@ -0,0 +1,16 @@
# 应用配置
APP_NAME=流水分析Mock服务
APP_VERSION=1.0.0
DEBUG=true
# 服务器配置
HOST=0.0.0.0
PORT=8000
# 模拟配置
PARSE_DELAY_SECONDS=4
MAX_FILE_SIZE=10485760
# 初始ID配置
INITIAL_PROJECT_ID=1000
INITIAL_LOG_ID=10000

3
lsfx-mock-server/.gitignore vendored Normal file
View File

@@ -0,0 +1,3 @@
__pycache__/
.pytest_cache/
*.pyc

244
lsfx-mock-server/README.md Normal file
View File

@@ -0,0 +1,244 @@
# 流水分析 Mock 服务器
基于 Python + FastAPI 的独立 Mock 服务器,用于模拟流水分析平台的 7 个核心接口。
## ✨ 特性
-**完整的接口模拟** - 实现所有 7 个核心接口
-**文件解析延迟** - 使用 FastAPI 后台任务模拟 4 秒解析延迟
-**错误场景触发** - 通过 `error_XXXX` 标记触发所有 8 个错误码
-**自动 API 文档** - Swagger UI 和 ReDoc 自动生成
-**配置驱动** - JSON 模板文件,易于修改响应数据
-**零配置启动** - 开箱即用,无需数据库
## 🚀 快速开始
### 1. 安装依赖
```bash
pip install -r requirements.txt
```
### 2. 启动服务
```bash
python main.py
```
或使用 uvicorn支持热重载
```bash
uvicorn main:app --reload --host 0.0.0.0 --port 8000
```
### 3. 访问 API 文档
- **Swagger UI**: http://localhost:8000/docs
- **ReDoc**: http://localhost:8000/redoc
## 📖 使用示例
### 正常流程
```python
import requests
# 1. 获取 Token
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000"
}
)
token_data = response.json()
project_id = token_data["data"]["projectId"]
# 2. 上传文件
files = {"file": ("test.csv", open("test.csv", "rb"), "text/csv")}
response = requests.post(
"http://localhost:8000/watson/api/project/remoteUploadSplitFile",
files=files,
data={"groupId": project_id}
)
log_id = response.json()["data"]["uploadLogList"][0]["logId"]
# 3. 轮询检查解析状态
import time
for i in range(10):
response = requests.post(
"http://localhost:8000/watson/api/project/upload/getpendings",
json={"groupId": project_id, "inprogressList": str(log_id)}
)
result = response.json()
if not result["data"]["parsing"]:
print("解析完成")
break
time.sleep(1)
# 4. 获取银行流水
response = requests.post(
"http://localhost:8000/watson/api/project/getBSByLogId",
json={
"groupId": project_id,
"logId": log_id,
"pageNow": 1,
"pageSize": 10
}
)
```
### 错误场景测试
```python
# 触发 40101 错误appId错误
response = requests.post(
"http://localhost:8000/account/common/getToken",
json={
"projectNo": "test_error_40101", # 包含错误标记
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000"
}
)
# 返回: {"code": "40101", "message": "appId错误", ...}
```
## 🔧 配置说明
### 环境变量
创建 `.env` 文件(参考 `.env.example`
```bash
# 应用配置
APP_NAME=流水分析Mock服务
APP_VERSION=1.0.0
DEBUG=true
# 服务器配置
HOST=0.0.0.0
PORT=8000
# 模拟配置
PARSE_DELAY_SECONDS=4
MAX_FILE_SIZE=10485760
```
### 响应模板
修改 `config/responses/` 下的 JSON 文件可以自定义响应数据:
- `token.json` - Token 响应模板
- `upload.json` - 上传文件响应模板
- `parse_status.json` - 解析状态响应模板
- `bank_statement.json` - 银行流水响应模板
## 🐳 Docker 部署
### 使用 Docker
```bash
# 构建镜像
docker build -t lsfx-mock-server .
# 运行容器
docker run -d -p 8000:8000 --name lsfx-mock lsfx-mock-server
```
### 使用 Docker Compose
```bash
docker-compose up -d
```
## 📁 项目结构
```
lsfx-mock-server/
├── main.py # 应用入口
├── config/
│ ├── settings.py # 全局配置
│ └── responses/ # 响应模板
├── models/
│ ├── request.py # 请求模型
│ └── response.py # 响应模型
├── services/
│ ├── token_service.py # Token 管理
│ ├── file_service.py # 文件上传和解析
│ └── statement_service.py # 流水数据管理
├── routers/
│ └── api.py # API 路由
├── utils/
│ ├── error_simulator.py # 错误模拟
│ └── response_builder.py # 响应构建器
└── tests/ # 测试套件
```
## 🧪 运行测试
```bash
# 运行所有测试
pytest tests/ -v
# 生成覆盖率报告
pytest tests/ -v --cov=. --cov-report=html
```
## 🔌 API 接口列表
| 接口 | 方法 | 路径 | 描述 |
|------|------|------|------|
| 1 | POST | `/account/common/getToken` | 获取 Token |
| 2 | POST | `/watson/api/project/remoteUploadSplitFile` | 上传文件 |
| 3 | POST | `/watson/api/project/getJZFileOrZjrcuFile` | 拉取行内流水 |
| 4 | POST | `/watson/api/project/upload/getpendings` | 检查解析状态 |
| 5 | POST | `/watson/api/project/batchDeleteUploadFile` | 删除文件 |
| 6 | POST | `/watson/api/project/getBSByLogId` | 获取银行流水 |
## ⚠️ 错误码列表
| 错误码 | 描述 |
|--------|------|
| 40101 | appId错误 |
| 40102 | appSecretCode错误 |
| 40104 | 可使用项目次数为0无法创建项目 |
| 40105 | 只读模式下无法新建项目 |
| 40106 | 错误的分析类型,不在规定的取值范围内 |
| 40107 | 当前系统不支持的分析类型 |
| 40108 | 当前用户所属行社无权限 |
| 501014 | 无行内流水文件 |
## 🛠️ 开发指南
### 添加新接口
1.`models/request.py``models/response.py` 中添加模型
2.`services/` 中添加服务类
3.`routers/api.py` 中添加路由
4.`config/responses/` 中添加响应模板
5. 编写测试
### 修改响应数据
直接编辑 `config/responses/` 下的 JSON 文件,重启服务即可生效。
## 📝 License
MIT
## 🤝 Contributing
欢迎提交 Issue 和 Pull Request

View File

@@ -0,0 +1,106 @@
{
"success_response": {
"code": "200",
"data": {
"bankStatementList": [
{
"accountId": 0,
"accountMaskNo": "101015251071645",
"accountingDate": "2024-02-01",
"accountingDateId": 20240201,
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": 4814.82,
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847662,
"bankTrxNumber": "1a10458dd5c3366d7272285812d434fc",
"batchId": 19135,
"cashType": "1",
"commentsNum": 0,
"crAmount": 0,
"cretNo": "230902199012261247",
"currency": "CNY",
"customerAccountMaskNo": "597671502",
"customerBank": "",
"customerId": -1,
"customerName": "小店",
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": 245.8,
"exceptionType": "",
"groupId": 16238,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": 245.8,
"transFlag": "P",
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": "2024-02-01 10:33:44",
"userMemo": "财付通消费_小店"
},
{
"accountId": 0,
"accountMaskNo": "101015251071645",
"accountingDate": "2024-02-02",
"accountingDateId": 20240202,
"archivingFlag": 0,
"attachments": 0,
"balanceAmount": 5000.00,
"bank": "ZJRCU",
"bankComments": "",
"bankStatementId": 12847663,
"bankTrxNumber": "2b20568ee6d4477e8383396923e545gd",
"batchId": 19135,
"cashType": "1",
"commentsNum": 0,
"crAmount": 185.18,
"cretNo": "230902199012261247",
"currency": "CNY",
"customerAccountMaskNo": "123456789",
"customerBank": "",
"customerId": -1,
"customerName": "支付宝",
"customerReference": "",
"downPaymentFlag": 0,
"drAmount": 0,
"exceptionType": "",
"groupId": 16238,
"internalFlag": 0,
"leId": 16308,
"leName": "张传伟",
"overrideBsId": 0,
"paymentMethod": "",
"sourceCatalogId": 0,
"split": 0,
"subBankstatementId": 0,
"toDoFlag": 0,
"transAmount": 185.18,
"transFlag": "R",
"transTypeId": 0,
"transformAmount": 0,
"transformCrAmount": 0,
"transformDrAmount": 0,
"transfromBalanceAmount": 0,
"trxBalance": 0,
"trxDate": "2024-02-02 14:22:18",
"userMemo": "支付宝转账_支付宝"
}
],
"totalCount": 131
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,41 @@
{
"success_response": {
"code": "200",
"data": {
"parsing": false,
"pendingList": [
{
"accountNoList": [],
"bankName": "ZJRCU",
"dataTypeInfo": ["CSV", ","],
"downloadFileName": "230902199012261247_20260201_20260201_1772096608615.csv",
"enterpriseNameList": [],
"filePackageId": "cde6c7cf5cab48e8892f0c1c36b2aa7d",
"fileSize": 53101,
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": "2026-02-27 09:50:18",
"isSplit": 0,
"leId": 16210,
"logId": "{log_id}",
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":true}",
"logType": "bankstatement",
"loginLeId": 16210,
"lostHeader": [],
"realBankName": "ZJRCU",
"rows": 0,
"source": "http",
"status": -5,
"templateName": "ZJRCU_T251114",
"totalRecords": 131,
"trxDateEndId": 20240228,
"trxDateStartId": 20240201,
"uploadFileName": "230902199012261247_20260201_20260201_1772096608615.csv",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
]
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,15 @@
{
"success_response": {
"code": "200",
"data": {
"token": "eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.mock_token_{project_id}",
"projectId": "{project_id}",
"projectNo": "{project_no}",
"entityName": "{entity_name}",
"analysisType": 0
},
"message": "create.token.success",
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,49 @@
{
"success_response": {
"code": "200",
"data": {
"accountsOfLog": {
"{log_id}": [
{
"bank": "BSX",
"accountName": "测试账户",
"accountNo": "6222021234567890",
"currency": "CNY"
}
]
},
"uploadLogList": [
{
"accountNoList": [],
"bankName": "BSX",
"dataTypeInfo": ["CSV", ","],
"downloadFileName": "测试流水.csv",
"enterpriseNameList": [],
"filePackageId": "14b13103010e4d32b5406c764cfe3644",
"fileSize": 46724,
"fileUploadBy": 448,
"fileUploadByUserName": "admin@support.com",
"fileUploadTime": "{upload_time}",
"leId": 10724,
"logId": "{log_id}",
"logMeta": "{\"lostHeader\":[],\"balanceAmount\":true}",
"logType": "bankstatement",
"loginLeId": 10724,
"realBankName": "BSX",
"rows": 0,
"source": "http",
"status": -5,
"templateName": "BSX_T240925",
"totalRecords": 280,
"trxDateEndId": 20240905,
"trxDateStartId": 20230914,
"uploadFileName": "测试流水.csv",
"uploadStatusDesc": "data.wait.confirm.newaccount"
}
],
"uploadStatus": 1
},
"status": "200",
"successResponse": true
}
}

View File

@@ -0,0 +1,30 @@
from pydantic_settings import BaseSettings
from typing import Optional
class Settings(BaseSettings):
"""全局配置类"""
# 应用配置
APP_NAME: str = "流水分析Mock服务"
APP_VERSION: str = "1.0.0"
DEBUG: bool = True
# 服务器配置
HOST: str = "0.0.0.0"
PORT: int = 8000
# 模拟配置
PARSE_DELAY_SECONDS: int = 4 # 文件解析延迟秒数
MAX_FILE_SIZE: int = 10485760 # 10MB
# 测试数据配置
INITIAL_PROJECT_ID: int = 1000
INITIAL_LOG_ID: int = 10000
class Config:
env_file = ".env"
env_file_encoding = "utf-8"
settings = Settings()

80
lsfx-mock-server/main.py Normal file
View File

@@ -0,0 +1,80 @@
"""
流水分析Mock服务器 - 主应用入口
基于 FastAPI 实现的 Mock 服务器,用于模拟流水分析平台的 7 个核心接口
"""
from fastapi import FastAPI
from routers import api
from config.settings import settings
# 创建 FastAPI 应用实例
app = FastAPI(
title=settings.APP_NAME,
description="""
## 流水分析 Mock 服务器
模拟流水分析平台的 7 个核心接口,用于开发和测试。
### 主要功能
- **Token管理** - 创建项目并获取访问Token
- **文件上传** - 上传流水文件支持异步解析4秒延迟
- **行内流水** - 拉取行内流水数据
- **解析状态** - 轮询检查文件解析状态
- **文件删除** - 批量删除上传的文件
- **流水查询** - 分页获取银行流水数据
### 错误模拟
在请求参数中包含 `error_XXXX` 标记可触发对应的错误响应。
例如:`projectNo: "test_error_40101"` 将返回 40101 错误。
### 使用方式
1. 获取Token: POST /account/common/getToken
2. 上传文件: POST /watson/api/project/remoteUploadSplitFile
3. 轮询解析状态: POST /watson/api/project/upload/getpendings
4. 获取流水: POST /watson/api/project/getBSByLogId
""",
version=settings.APP_VERSION,
docs_url="/docs",
redoc_url="/redoc",
)
# 包含 API 路由
app.include_router(api.router, tags=["流水分析接口"])
@app.get("/", summary="服务根路径")
async def root():
"""服务根路径,返回基本信息"""
return {
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
"swagger_docs": "/docs",
"redoc": "/redoc",
"status": "running",
}
@app.get("/health", summary="健康检查")
async def health_check():
"""健康检查端点"""
return {
"status": "healthy",
"service": settings.APP_NAME,
"version": settings.APP_VERSION,
}
if __name__ == "__main__":
import uvicorn
# 启动服务器
uvicorn.run(
app,
host=settings.HOST,
port=settings.PORT,
log_level="debug" if settings.DEBUG else "info",
)

View File

@@ -0,0 +1 @@
# Models package

View File

@@ -0,0 +1,53 @@
from pydantic import BaseModel, Field
from typing import Optional, List
class GetTokenRequest(BaseModel):
"""获取Token请求模型"""
projectNo: str = Field(..., description="项目编号格式902000_当前时间戳")
entityName: str = Field(..., description="项目名称")
userId: str = Field(..., description="操作人员编号,固定值")
userName: str = Field(..., description="操作人员姓名,固定值")
appId: str = Field("remote_app", description="应用ID固定值")
appSecretCode: str = Field(..., description="安全码md5(projectNo + '_' + entityName + '_' + dXj6eHRmPv)")
role: str = Field("VIEWER", description="角色,固定值")
orgCode: str = Field(..., description="行社机构号,固定值")
entityId: Optional[str] = Field(None, description="企业统信码或个人身份证号")
xdRelatedPersons: Optional[str] = Field(None, description="信贷关联人信息")
jzDataDateId: Optional[str] = Field("0", description="拉取指定日期推送过来的金综链流水")
innerBSStartDateId: Optional[str] = Field("0", description="拉取行内流水开始日期")
innerBSEndDateId: Optional[str] = Field("0", description="拉取行内流水结束日期")
analysisType: str = Field("-1", description="分析类型,固定值")
departmentCode: str = Field(..., description="客户经理所属营业部/分理处的机构编码")
class FetchInnerFlowRequest(BaseModel):
"""拉取行内流水请求模型"""
groupId: int = Field(..., description="项目id")
customerNo: str = Field(..., description="客户身份证号")
dataChannelCode: str = Field(..., description="校验码")
requestDateId: int = Field(..., description="发起请求的时间")
dataStartDateId: int = Field(..., description="拉取开始日期")
dataEndDateId: int = Field(..., description="拉取结束日期")
uploadUserId: int = Field(..., description="柜员号")
class CheckParseStatusRequest(BaseModel):
"""检查文件解析状态请求模型"""
groupId: int = Field(..., description="项目id")
inprogressList: str = Field(..., description="文件id列表逗号分隔")
class GetBankStatementRequest(BaseModel):
"""获取银行流水请求模型"""
groupId: int = Field(..., description="项目id")
logId: int = Field(..., description="文件id")
pageNow: int = Field(..., description="当前页码")
pageSize: int = Field(..., description="查询条数")
class DeleteFilesRequest(BaseModel):
"""删除文件请求模型"""
groupId: int = Field(..., description="项目id")
logIds: List[int] = Field(..., description="文件id数组")
userId: int = Field(..., description="用户柜员号")

View File

@@ -0,0 +1,187 @@
from pydantic import BaseModel, Field
from typing import Optional, List, Dict, Any
# ==================== Token相关模型 ====================
class TokenData(BaseModel):
"""Token数据"""
token: str = Field(..., description="token")
projectId: int = Field(..., description="见知项目Id")
projectNo: str = Field(..., description="项目编号")
entityName: str = Field(..., description="项目名称")
analysisType: int = Field(0, description="分析类型")
class GetTokenResponse(BaseModel):
"""获取Token响应"""
code: str = Field("200", description="返回码")
data: Optional[TokenData] = Field(None, description="返回数据")
message: str = Field("create.token.success", description="返回消息")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 文件上传相关模型 ====================
class AccountInfo(BaseModel):
"""账户信息"""
bank: str = Field(..., description="银行")
accountName: str = Field(..., description="账户名称")
accountNo: str = Field(..., description="账号")
currency: str = Field(..., description="币种")
class UploadLogItem(BaseModel):
"""上传日志项"""
accountNoList: List[str] = Field(default=[], description="账号列表")
bankName: str = Field(..., description="银行名称")
dataTypeInfo: List[str] = Field(default=[], description="数据类型信息")
downloadFileName: str = Field(..., description="下载文件名")
enterpriseNameList: List[str] = Field(default=[], description="企业名称列表")
filePackageId: str = Field(..., description="文件包ID")
fileSize: int = Field(..., description="文件大小")
fileUploadBy: int = Field(..., description="上传者ID")
fileUploadByUserName: str = Field(..., description="上传者用户名")
fileUploadTime: str = Field(..., description="上传时间")
leId: int = Field(..., description="企业ID")
logId: int = Field(..., description="日志ID")
logMeta: str = Field(..., description="日志元数据")
logType: str = Field(..., description="日志类型")
loginLeId: int = Field(..., description="登录企业ID")
realBankName: str = Field(..., description="真实银行名称")
rows: int = Field(0, description="行数")
source: str = Field(..., description="来源")
status: int = Field(-5, description="状态值")
templateName: str = Field(..., description="模板名称")
totalRecords: int = Field(0, description="总记录数")
trxDateEndId: int = Field(..., description="交易结束日期ID")
trxDateStartId: int = Field(..., description="交易开始日期ID")
uploadFileName: str = Field(..., description="上传文件名")
uploadStatusDesc: str = Field(..., description="上传状态描述")
class UploadFileResponse(BaseModel):
"""上传文件响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 检查解析状态相关模型 ====================
class PendingItem(BaseModel):
"""待处理项"""
accountNoList: List[str] = Field(default=[], description="账号列表")
bankName: str = Field(..., description="银行名称")
dataTypeInfo: List[str] = Field(default=[], description="数据类型信息")
downloadFileName: str = Field(..., description="下载文件名")
enterpriseNameList: List[str] = Field(default=[], description="企业名称列表")
filePackageId: str = Field(..., description="文件包ID")
fileSize: int = Field(..., description="文件大小")
fileUploadBy: int = Field(..., description="上传者ID")
fileUploadByUserName: str = Field(..., description="上传者用户名")
fileUploadTime: str = Field(..., description="上传时间")
isSplit: int = Field(0, description="是否分割")
leId: int = Field(..., description="企业ID")
logId: int = Field(..., description="日志ID")
logMeta: str = Field(..., description="日志元数据")
logType: str = Field(..., description="日志类型")
loginLeId: int = Field(..., description="登录企业ID")
lostHeader: List[str] = Field(default=[], description="丢失的头部")
realBankName: str = Field(..., description="真实银行名称")
rows: int = Field(0, description="行数")
source: str = Field(..., description="来源")
status: int = Field(-5, description="状态值")
templateName: str = Field(..., description="模板名称")
totalRecords: int = Field(0, description="总记录数")
trxDateEndId: int = Field(..., description="交易结束日期ID")
trxDateStartId: int = Field(..., description="交易开始日期ID")
uploadFileName: str = Field(..., description="上传文件名")
uploadStatusDesc: str = Field(..., description="上传状态描述")
class CheckParseStatusResponse(BaseModel):
"""检查解析状态响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据包含parsing和pendingList")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 银行流水相关模型 ====================
class BankStatementItem(BaseModel):
"""银行流水项"""
accountId: int = Field(0, description="账号ID")
accountMaskNo: str = Field(..., description="账号")
accountingDate: str = Field(..., description="记账日期")
accountingDateId: int = Field(..., description="记账日期ID")
archivingFlag: int = Field(0, description="归档标志")
attachments: int = Field(0, description="附件数")
balanceAmount: float = Field(..., description="余额")
bank: str = Field(..., description="银行")
bankComments: str = Field("", description="银行注释")
bankStatementId: int = Field(..., description="流水ID")
bankTrxNumber: str = Field(..., description="银行交易号")
batchId: int = Field(..., description="批次ID")
cashType: str = Field("1", description="现金类型")
commentsNum: int = Field(0, description="评论数")
crAmount: float = Field(0, description="贷方金额")
cretNo: str = Field(..., description="证件号")
currency: str = Field("CNY", description="币种")
customerAccountMaskNo: str = Field(..., description="客户账号")
customerBank: str = Field("", description="客户银行")
customerId: int = Field(-1, description="客户ID")
customerName: str = Field(..., description="客户名称")
customerReference: str = Field("", description="客户参考")
downPaymentFlag: int = Field(0, description="首付标志")
drAmount: float = Field(0, description="借方金额")
exceptionType: str = Field("", description="异常类型")
groupId: int = Field(0, description="项目ID")
internalFlag: int = Field(0, description="内部标志")
leId: int = Field(..., description="企业ID")
leName: str = Field(..., description="企业名称")
overrideBsId: int = Field(0, description="覆盖流水ID")
paymentMethod: str = Field("", description="支付方式")
sourceCatalogId: int = Field(0, description="来源目录ID")
split: int = Field(0, description="分割")
subBankstatementId: int = Field(0, description="子流水ID")
toDoFlag: int = Field(0, description="待办标志")
transAmount: float = Field(..., description="交易金额")
transFlag: str = Field("P", description="交易标志")
transTypeId: int = Field(0, description="交易类型ID")
transformAmount: int = Field(0, description="转换金额")
transformCrAmount: int = Field(0, description="转换贷方金额")
transformDrAmount: int = Field(0, description="转换借方金额")
transfromBalanceAmount: int = Field(0, description="转换余额")
trxBalance: int = Field(0, description="交易余额")
trxDate: str = Field(..., description="交易日期")
userMemo: str = Field(..., description="用户备注")
class GetBankStatementResponse(BaseModel):
"""获取银行流水响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据包含bankStatementList和totalCount")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
# ==================== 其他响应模型 ====================
class FetchInnerFlowResponse(BaseModel):
"""拉取行内流水响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, Any]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")
class DeleteFilesResponse(BaseModel):
"""删除文件响应"""
code: str = Field("200", description="返回码")
data: Optional[Dict[str, str]] = Field(None, description="返回数据")
status: str = Field("200", description="状态")
successResponse: bool = Field(True, description="是否成功响应")

View File

@@ -0,0 +1,8 @@
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
pydantic-settings==2.1.0
python-multipart==0.0.6
pytest>=7.0.0
pytest-cov>=4.0.0
httpx==0.27.2

View File

@@ -0,0 +1 @@
# Routers package

View File

@@ -0,0 +1,165 @@
from fastapi import APIRouter, BackgroundTasks, UploadFile, File, Form
from services.token_service import TokenService
from services.file_service import FileService
from services.statement_service import StatementService
from utils.error_simulator import ErrorSimulator
from typing import List, Optional
# 创建路由器
router = APIRouter()
# 初始化服务实例
token_service = TokenService()
file_service = FileService()
statement_service = StatementService()
# ==================== 接口1获取Token ====================
@router.post("/account/common/getToken")
async def get_token(
projectNo: str = Form(..., description="项目编号格式902000_当前时间戳"),
entityName: str = Form(..., description="项目名称"),
userId: str = Form(..., description="操作人员编号,固定值"),
userName: str = Form(..., description="操作人员姓名,固定值"),
appId: str = Form("remote_app", description="应用ID固定值"),
appSecretCode: str = Form(..., description="安全码"),
role: str = Form("VIEWER", description="角色,固定值"),
orgCode: str = Form(..., description="行社机构号,固定值"),
entityId: Optional[str] = Form(None, description="企业统信码或个人身份证号"),
xdRelatedPersons: Optional[str] = Form(None, description="信贷关联人信息"),
jzDataDateId: str = Form("0", description="拉取指定日期推送过来的金综链流水"),
innerBSStartDateId: str = Form("0", description="拉取行内流水开始日期"),
innerBSEndDateId: str = Form("0", description="拉取行内流水结束日期"),
analysisType: str = Form("-1", description="分析类型,固定值"),
departmentCode: str = Form(..., description="客户经理所属营业部/分理处的机构编码"),
):
"""创建项目并获取访问Token
如果 projectNo 包含 error_XXXX 标记,将返回对应的错误响应
"""
# 检测错误标记
error_code = ErrorSimulator.detect_error_marker(projectNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 构建请求数据字典
request_data = {
"projectNo": projectNo,
"entityName": entityName,
"userId": userId,
"userName": userName,
"appId": appId,
"appSecretCode": appSecretCode,
"role": role,
"orgCode": orgCode,
"entityId": entityId,
"xdRelatedPersons": xdRelatedPersons,
"jzDataDateId": jzDataDateId,
"innerBSStartDateId": innerBSStartDateId,
"innerBSEndDateId": innerBSEndDateId,
"analysisType": analysisType,
"departmentCode": departmentCode,
}
# 正常流程
return token_service.create_token(request_data)
# ==================== 接口2上传文件 ====================
@router.post("/watson/api/project/remoteUploadSplitFile")
async def upload_file(
background_tasks: BackgroundTasks,
groupId: int = Form(..., description="项目ID"),
file: UploadFile = File(..., description="流水文件"),
):
"""上传流水文件
文件将立即返回并在后台延迟4秒完成解析
"""
return await file_service.upload_file(groupId, file, background_tasks)
# ==================== 接口3拉取行内流水 ====================
@router.post("/watson/api/project/getJZFileOrZjrcuFile")
async def fetch_inner_flow(
groupId: int = Form(..., description="项目id"),
customerNo: str = Form(..., description="客户身份证号"),
dataChannelCode: str = Form(..., description="校验码"),
requestDateId: int = Form(..., description="发起请求的时间"),
dataStartDateId: int = Form(..., description="拉取开始日期"),
dataEndDateId: int = Form(..., description="拉取结束日期"),
uploadUserId: int = Form(..., description="柜员号"),
):
"""拉取行内流水
如果 customerNo 包含 error_XXXX 标记,将返回对应的错误响应
"""
# 检测错误标记
error_code = ErrorSimulator.detect_error_marker(customerNo)
if error_code:
return ErrorSimulator.build_error_response(error_code)
# 构建请求字典
request_data = {
"groupId": groupId,
"customerNo": customerNo,
"dataChannelCode": dataChannelCode,
"requestDateId": requestDateId,
"dataStartDateId": dataStartDateId,
"dataEndDateId": dataEndDateId,
"uploadUserId": uploadUserId,
}
# 正常流程
return file_service.fetch_inner_flow(request_data)
# ==================== 接口4检查文件解析状态 ====================
@router.post("/watson/api/project/upload/getpendings")
async def check_parse_status(
groupId: int = Form(..., description="项目id"),
inprogressList: str = Form(..., description="文件id列表逗号分隔"),
):
"""检查文件解析状态
返回文件是否还在解析中parsing字段
"""
return file_service.check_parse_status(groupId, inprogressList)
# ==================== 接口5删除文件 ====================
@router.post("/watson/api/project/batchDeleteUploadFile")
async def delete_files(
groupId: int = Form(..., description="项目id"),
logIds: str = Form(..., description="文件id数组逗号分隔如: 10001,10002"),
userId: int = Form(..., description="用户柜员号"),
):
"""批量删除上传的文件
根据logIds列表删除对应的文件记录
"""
# 将逗号分隔的字符串转换为整数列表
log_id_list = [int(id.strip()) for id in logIds.split(",")]
return file_service.delete_files(groupId, log_id_list, userId)
# ==================== 接口6获取银行流水 ====================
@router.post("/watson/api/project/getBSByLogId")
async def get_bank_statement(
groupId: int = Form(..., description="项目id"),
logId: int = Form(..., description="文件id"),
pageNow: int = Form(..., description="当前页码"),
pageSize: int = Form(..., description="查询条数"),
):
"""获取银行流水列表
支持分页查询pageNow, pageSize
"""
# 构建请求字典
request_data = {
"groupId": groupId,
"logId": logId,
"pageNow": pageNow,
"pageSize": pageSize,
}
return statement_service.get_bank_statement(request_data)

View File

@@ -0,0 +1 @@
# Services package

View File

@@ -0,0 +1,150 @@
from fastapi import BackgroundTasks, UploadFile
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict, List, Union
import time
from datetime import datetime
class FileService:
"""文件上传和解析服务"""
def __init__(self):
self.file_records = {} # logId -> record
self.parsing_status = {} # logId -> is_parsing
self.log_counter = settings.INITIAL_LOG_ID
async def upload_file(
self, group_id: int, file: UploadFile, background_tasks: BackgroundTasks
) -> Dict:
"""上传文件并启动后台解析任务
Args:
group_id: 项目ID
file: 上传的文件
background_tasks: FastAPI后台任务
Returns:
上传响应字典
"""
# 生成唯一logId
self.log_counter += 1
log_id = self.log_counter
# 获取当前时间
upload_time = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
# 立即存储文件记录(初始状态:解析中)
self.file_records[log_id] = {
"logId": log_id,
"groupId": group_id,
"status": -5,
"uploadStatusDesc": "parsing",
"uploadFileName": file.filename,
"fileSize": 0, # 简化处理
"bankName": "MOCK",
"uploadTime": upload_time,
}
# 标记为解析中
self.parsing_status[log_id] = True
# 启动后台任务,延迟解析
background_tasks.add_task(
self._simulate_parsing, log_id, settings.PARSE_DELAY_SECONDS
)
# 构建响应
response = ResponseBuilder.build_success_response(
"upload", log_id=log_id, upload_time=upload_time
)
return response
def _simulate_parsing(self, log_id: int, delay_seconds: int):
"""后台任务:模拟文件解析过程
Args:
log_id: 日志ID
delay_seconds: 延迟秒数
"""
time.sleep(delay_seconds)
# 解析完成,更新状态
if log_id in self.file_records:
self.file_records[log_id]["uploadStatusDesc"] = (
"data.wait.confirm.newaccount"
)
self.parsing_status[log_id] = False
def check_parse_status(self, group_id: int, inprogress_list: str) -> Dict:
"""检查文件解析状态
Args:
group_id: 项目ID
inprogress_list: 文件ID列表逗号分隔
Returns:
解析状态响应字典
"""
# 解析logId列表
log_ids = [int(x.strip()) for x in inprogress_list.split(",") if x.strip()]
# 检查是否还在解析中
is_parsing = any(
self.parsing_status.get(log_id, False) for log_id in log_ids
)
# 获取待处理列表
pending_list = [
self.file_records[log_id]
for log_id in log_ids
if log_id in self.file_records
]
return {
"code": "200",
"data": {"parsing": is_parsing, "pendingList": pending_list},
"status": "200",
"successResponse": True,
}
def delete_files(self, group_id: int, log_ids: List[int], user_id: int) -> Dict:
"""删除文件
Args:
group_id: 项目ID
log_ids: 文件ID列表
user_id: 用户ID
Returns:
删除响应字典
"""
# 删除文件记录
for log_id in log_ids:
self.file_records.pop(log_id, None)
self.parsing_status.pop(log_id, None)
return {
"code": "200",
"data": {"message": "delete.files.success"},
"status": "200",
"successResponse": True,
}
def fetch_inner_flow(self, request: Union[Dict, object]) -> Dict:
"""拉取行内流水(模拟无数据场景)
Args:
request: 拉取流水请求(可以是字典或对象)
Returns:
流水响应字典
"""
# 模拟无行内流水文件场景
return {
"code": "200",
"data": {"code": "501014", "message": "无行内流水文件"},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,40 @@
from utils.response_builder import ResponseBuilder
from typing import Dict, Union
class StatementService:
"""流水数据服务"""
def get_bank_statement(self, request: Union[Dict, object]) -> Dict:
"""获取银行流水列表
Args:
request: 获取银行流水请求(可以是字典或对象)
Returns:
银行流水响应字典
"""
# 支持 dict 或对象
if isinstance(request, dict):
page_now = request.get("pageNow", 1)
page_size = request.get("pageSize", 10)
else:
page_now = request.pageNow
page_size = request.pageSize
# 加载模板
template = ResponseBuilder.load_template("bank_statement")
statements = template["success_response"]["data"]["bankStatementList"]
total_count = len(statements)
# 模拟分页
start = (page_now - 1) * page_size
end = start + page_size
page_data = statements[start:end]
return {
"code": "200",
"data": {"bankStatementList": page_data, "totalCount": total_count},
"status": "200",
"successResponse": True,
}

View File

@@ -0,0 +1,57 @@
from models.request import GetTokenRequest
from utils.response_builder import ResponseBuilder
from config.settings import settings
from typing import Dict, Union
class TokenService:
"""Token管理服务"""
def __init__(self):
self.project_counter = settings.INITIAL_PROJECT_ID
self.tokens = {} # projectId -> token_data
def create_token(self, request: Union[GetTokenRequest, Dict]) -> Dict:
"""创建Token
Args:
request: 获取Token请求可以是 GetTokenRequest 对象或字典)
Returns:
Token响应字典
"""
# 支持 dict 或 GetTokenRequest 对象
if isinstance(request, dict):
project_no = request.get("projectNo")
entity_name = request.get("entityName")
else:
project_no = request.projectNo
entity_name = request.entityName
# 生成唯一项目ID
self.project_counter += 1
project_id = self.project_counter
# 构建响应
response = ResponseBuilder.build_success_response(
"token",
project_id=project_id,
project_no=project_no,
entity_name=entity_name
)
# 存储token信息
self.tokens[project_id] = response.get("data")
return response
def get_project(self, project_id: int) -> Dict:
"""获取项目信息
Args:
project_id: 项目ID
Returns:
项目信息字典
"""
return self.tokens.get(project_id)

View File

@@ -0,0 +1 @@
# Tests package

View File

@@ -0,0 +1,34 @@
"""
Pytest 配置和共享 fixtures
"""
import pytest
from fastapi.testclient import TestClient
import sys
import os
# 添加项目根目录到 sys.path
sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
from main import app
@pytest.fixture
def client():
"""创建测试客户端"""
return TestClient(app)
@pytest.fixture
def sample_token_request():
"""示例 Token 请求 - 返回 form-data 格式的数据"""
return {
"projectNo": "test_project_001",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
}

View File

@@ -0,0 +1 @@
# Integration tests package

View File

@@ -0,0 +1,125 @@
"""
集成测试 - 完整的接口调用流程测试
"""
import pytest
import time
def test_complete_workflow(client):
"""测试完整的接口调用流程"""
# 1. 获取 Token
response = client.post(
"/account/common/getToken",
data={
"projectNo": "integration_test_001",
"entityName": "集成测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
assert response.status_code == 200
token_data = response.json()
assert token_data["code"] == "200"
project_id = token_data["data"]["projectId"]
token = token_data["data"]["token"]
assert token is not None
# 2. 上传文件(模拟)
# 注意:在测试环境中,我们跳过实际的文件上传,直接测试其他接口
# 3. 检查解析状态
response = client.post(
"/watson/api/project/upload/getpendings",
data={"groupId": project_id, "inprogressList": "10001"},
)
assert response.status_code == 200
status_data = response.json()
assert "parsing" in status_data["data"]
# 4. 获取银行流水
response = client.post(
"/watson/api/project/getBSByLogId",
data={
"groupId": project_id,
"logId": 10001,
"pageNow": 1,
"pageSize": 10,
},
)
assert response.status_code == 200
statement_data = response.json()
assert statement_data["code"] == "200"
assert "bankStatementList" in statement_data["data"]
assert "totalCount" in statement_data["data"]
def test_all_error_codes(client):
"""测试所有错误码"""
error_codes = ["40101", "40102", "40104", "40105", "40106", "40107", "40108"]
for error_code in error_codes:
response = client.post(
"/account/common/getToken",
data={
"projectNo": f"test_error_{error_code}",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
assert response.status_code == 200
data = response.json()
assert data["code"] == error_code, f"错误码 {error_code} 未正确触发"
assert data["successResponse"] == False
def test_pagination(client):
"""测试分页功能"""
# 获取 Token
response = client.post(
"/account/common/getToken",
data={
"projectNo": "pagination_test",
"entityName": "分页测试",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
},
)
project_id = response.json()["data"]["projectId"]
# 测试第一页
response = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": project_id, "logId": 10001, "pageNow": 1, "pageSize": 1},
)
page1 = response.json()
# 测试第二页
response = client.post(
"/watson/api/project/getBSByLogId",
data={"groupId": project_id, "logId": 10001, "pageNow": 2, "pageSize": 1},
)
page2 = response.json()
# 验证总记录数相同
assert page1["data"]["totalCount"] == page2["data"]["totalCount"]
# 验证页码不同
if page1["data"]["totalCount"] > 1:
assert len(page1["data"]["bankStatementList"]) == 1
assert len(page2["data"]["bankStatementList"]) >= 0

View File

@@ -0,0 +1,50 @@
"""
API 端点测试
"""
def test_root_endpoint(client):
"""测试根路径"""
response = client.get("/")
assert response.status_code == 200
data = response.json()
assert data["status"] == "running"
assert "swagger_docs" in data
def test_health_check(client):
"""测试健康检查端点"""
response = client.get("/health")
assert response.status_code == 200
data = response.json()
assert data["status"] == "healthy"
def test_get_token_success(client, sample_token_request):
"""测试获取 Token - 成功场景"""
response = client.post("/account/common/getToken", data=sample_token_request)
assert response.status_code == 200
data = response.json()
assert data["code"] == "200"
assert "token" in data["data"]
assert "projectId" in data["data"]
def test_get_token_error_40101(client):
"""测试获取 Token - 错误场景 40101"""
request_data = {
"projectNo": "test_error_40101",
"entityName": "测试企业",
"userId": "902001",
"userName": "902001",
"appId": "remote_app",
"appSecretCode": "test_secret_code_12345",
"role": "VIEWER",
"orgCode": "902000",
"departmentCode": "902000",
}
response = client.post("/account/common/getToken", data=request_data)
assert response.status_code == 200
data = response.json()
assert data["code"] == "40101"
assert data["successResponse"] == False

View File

@@ -0,0 +1 @@
# Utils package

View File

@@ -0,0 +1,49 @@
from typing import Dict, Optional
import re
class ErrorSimulator:
"""错误场景模拟器"""
# 错误码映射表
ERROR_CODES = {
"40101": {"code": "40101", "message": "appId错误"},
"40102": {"code": "40102", "message": "appSecretCode错误"},
"40104": {"code": "40104", "message": "可使用项目次数为0无法创建项目"},
"40105": {"code": "40105", "message": "只读模式下无法新建项目"},
"40106": {"code": "40106", "message": "错误的分析类型,不在规定的取值范围内"},
"40107": {"code": "40107", "message": "当前系统不支持的分析类型"},
"40108": {"code": "40108", "message": "当前用户所属行社无权限"},
"501014": {"code": "501014", "message": "无行内流水文件"},
}
@staticmethod
def detect_error_marker(value: str) -> Optional[str]:
"""检测字符串中的错误标记
规则:如果字符串包含 error_XXXX则返回 XXXX
例如:
- "project_error_40101" -> "40101"
- "test_error_501014" -> "501014"
"""
if not value:
return None
pattern = r'error_(\d+)'
match = re.search(pattern, value)
if match:
return match.group(1)
return None
@staticmethod
def build_error_response(error_code: str) -> Optional[Dict]:
"""构建错误响应"""
if error_code in ErrorSimulator.ERROR_CODES:
error_info = ErrorSimulator.ERROR_CODES[error_code]
return {
"code": error_info["code"],
"message": error_info["message"],
"status": error_info["code"],
"successResponse": False
}
return None

View File

@@ -0,0 +1,69 @@
import json
from pathlib import Path
from typing import Dict, Any
import copy
class ResponseBuilder:
"""响应构建器"""
TEMPLATE_DIR = Path(__file__).parent.parent / "config" / "responses"
@staticmethod
def load_template(template_name: str) -> Dict:
"""加载 JSON 模板
Args:
template_name: 模板名称(不含.json扩展名
Returns:
模板字典
"""
file_path = ResponseBuilder.TEMPLATE_DIR / f"{template_name}.json"
with open(file_path, 'r', encoding='utf-8') as f:
return json.load(f)
@staticmethod
def replace_placeholders(template: Dict, **kwargs) -> Dict:
"""递归替换占位符
Args:
template: 模板字典
**kwargs: 占位符键值对
Returns:
替换后的字典
"""
def replace_value(value):
if isinstance(value, str):
result = value
for key, val in kwargs.items():
placeholder = f"{{{key}}}"
if placeholder in result:
result = result.replace(placeholder, str(val))
return result
elif isinstance(value, dict):
return {k: replace_value(v) for k, v in value.items()}
elif isinstance(value, list):
return [replace_value(item) for item in value]
return value
# 深拷贝模板,避免修改原始数据
return replace_value(copy.deepcopy(template))
@staticmethod
def build_success_response(template_name: str, **kwargs) -> Dict:
"""构建成功响应
Args:
template_name: 模板名称
**kwargs: 占位符键值对
Returns:
响应字典
"""
template = ResponseBuilder.load_template(template_name)
return ResponseBuilder.replace_placeholders(
template["success_response"],
**kwargs
)

View File

@@ -0,0 +1,81 @@
import subprocess
from pathlib import Path
REPO_ROOT = Path(__file__).resolve().parents[2]
DEPLOY_PS1 = REPO_ROOT / "deploy" / "deploy.ps1"
DEPLOY_BAT = REPO_ROOT / "deploy" / "deploy-to-nas.bat"
DOCKER_COMPOSE = REPO_ROOT / "docker-compose.yml"
def run_powershell(*args):
return subprocess.run(
[
"powershell",
"-NoProfile",
"-ExecutionPolicy",
"Bypass",
"-File",
str(DEPLOY_PS1),
*args,
],
cwd=REPO_ROOT,
capture_output=True,
text=True,
encoding="utf-8",
)
def run_bat(*args):
return subprocess.run(
[
"cmd.exe",
"/c",
str(DEPLOY_BAT),
*args,
],
cwd=REPO_ROOT,
capture_output=True,
text=True,
encoding="utf-8",
)
def test_deploy_ps1_dry_run_prints_target():
result = run_powershell("-DryRun")
output = f"{result.stdout}\n{result.stderr}"
assert result.returncode == 0
assert "DryRun" in output
assert "116.62.17.81" in output
assert "9444" in output
assert "/volume1/webapp/ccdi" in output
def test_bat_dry_run_uses_default_nas_target():
result = run_bat("--dry-run")
output = f"{result.stdout}\n{result.stderr}"
assert result.returncode == 0
assert "DryRun" in output
assert "116.62.17.81" in output
assert "9444" in output
assert "/volume1/webapp/ccdi" in output
def test_bat_dry_run_accepts_override_arguments():
result = run_bat("1.2.3.4", "9000", "demo", "demoPwd", "/demo/path", "--dry-run")
output = f"{result.stdout}\n{result.stderr}"
assert result.returncode == 0
assert "1.2.3.4" in output
assert "9000" in output
assert "demo" in output
assert "/demo/path" in output
def test_compose_exposes_lsfx_mock_port_via_backend_namespace():
compose_text = DOCKER_COMPOSE.read_text(encoding="utf-8")
assert '${LSFX_MOCK_PORT:-62320}:8000' in compose_text
assert 'network_mode: "service:backend"' in compose_text