9 Commits

Author SHA1 Message Date
wkc
a7cf67e6e4 http请求 2026-03-05 15:01:33 +08:00
wkc
2b5582ddcc docs: 添加文件格式变更说明文档 2026-03-05 14:41:50 +08:00
wkc
9b5c4f8854 feat: 修改流水文件上传支持PDF/CSV/Excel格式
- 文件格式限制从仅Excel改为支持PDF/CSV/XLSX/XLS
- 更新前端校验逻辑
- 更新用户提示信息
- 添加accept属性限制文件选择器
2026-03-05 14:41:11 +08:00
wkc
b52d6c6e7a feat: 实现异步文件上传前端功能
- 添加批量上传API接口
- 扩展UploadData组件,添加批量上传弹窗
- 添加统计卡片展示(上传中、解析中、成功、失败)
- 添加文件上传记录列表
- 实现轮询机制自动刷新状态
- 支持文件数量、格式、大小校验
- 支持手动刷新和状态筛选
- 添加响应式布局支持
2026-03-05 14:21:33 +08:00
wkc
1a9ca2a05f test: 添加异步文件上传功能集成测试脚本 2026-03-05 14:06:29 +08:00
wkc
756129b913 fix: 修复tempFilePaths和records对应关系的潜在bug
问题:
- 原代码中保存临时文件和创建记录使用两个独立的循环
- 无法保证两个列表的索引严格一一对应
- 如果中间出现异常或跳过,会导致对应关系错乱

修复:
- 将两个循环合并为一个,在同一个循环中处理
- 使用相同的索引i创建tempFilePaths[i]和records[i]
- 添加数量一致性验证
- 临时文件名中加入索引i,避免文件名冲突
- 日志中记录索引i便于调试

影响:
- 确保临时文件和数据库记录严格一一对应
- 避免异步处理时出现文件与记录不匹配的问题
2026-03-05 13:47:39 +08:00
wkc
d8d60f9103 feat: 实现CcdiFileUploadServiceImpl所有TODO
完整实现异步文件上传服务的所有TODO方法:

1. 新增批次日志管理器
   - 为每个批次创建独立日志文件
   - 路径: {ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
   - 支持ThreadLocal隔离

2. 完善CcdiFileUploadServiceImpl
   - 注入LsfxAnalysisClient和CcdiBankStatementMapper
   - 实现processFileAsync: 文件上传到流水分析平台
   - 实现waitForParsingComplete: 固定间隔轮询(300次×2秒)
   - 实现获取解析结果: status=-5判断成功
   - 实现fetchAndSaveBankStatements: 分页获取(每页1000条)+批量插入(每批1000条)
   - 集成批次日志管理

3. 关键特性
   - 完整的流水分析平台集成
   - 固定间隔轮询策略
   - 大批量分页获取(每页1000条)
   - 批量插入优化(每批1000条)
   - 严格失败策略: 任何异常直接标记为parsed_failed
   - 完善的日志记录

4. 测试验证
   - 编译通过,无错误
   - 所有TODO已实现
2026-03-05 13:40:29 +08:00
wkc
388c70ce04 docs: 添加异步文件上传服务实现设计文档
- 完整设计 CcdiFileUploadServiceImpl 所有 TODO 实现方案
- 包含依赖注入、文件上传、轮询解析、批量保存等详细设计
- 确定设计决策:固定间隔轮询、大批量分页、严格失败策略
- 实现批次日志管理器 FileUploadLogAppender
- 包含完整的测试策略和部署注意事项
2026-03-05 12:39:58 +08:00
wkc
f1c43589d4 refactor: 修改uploadFile方法参数类型为File
- 将LsfxAnalysisClient.uploadFile方法参数从MultipartFile改为File
- 在LsfxTestController中添加MultipartFile到File的转换逻辑
- 使用临时文件处理转换,并在finally块中自动清理
2026-03-05 12:01:16 +08:00
10 changed files with 1883 additions and 103 deletions

View File

@@ -15,8 +15,8 @@ import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
@@ -109,8 +109,8 @@ public class LsfxAnalysisClient {
/**
* 上传文件
*/
public UploadFileResponse uploadFile(Integer groupId, MultipartFile file) {
log.info("【流水分析】上传文件请求: groupId={}, fileName={}", groupId, file.getOriginalFilename());
public UploadFileResponse uploadFile(Integer groupId, File file) {
log.info("【流水分析】上传文件请求: groupId={}, fileName={}", groupId, file.getName());
long startTime = System.currentTimeMillis();
try {

View File

@@ -18,6 +18,12 @@ import jakarta.annotation.Resource;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.StandardCopyOption;
/**
* 流水分析平台接口测试控制器
*/
@@ -76,8 +82,28 @@ public class LsfxTestController {
return AjaxResult.error("文件大小超过限制最大10MB");
}
UploadFileResponse response = lsfxAnalysisClient.uploadFile(groupId, file);
return AjaxResult.success(response);
// 将 MultipartFile 转换为 File
Path tempFile = null;
try {
// 创建临时文件
tempFile = Files.createTempFile("upload_", "_" + file.getOriginalFilename());
Files.copy(file.getInputStream(), tempFile, StandardCopyOption.REPLACE_EXISTING);
File convertedFile = tempFile.toFile();
UploadFileResponse response = lsfxAnalysisClient.uploadFile(groupId, convertedFile);
return AjaxResult.success(response);
} catch (IOException e) {
return AjaxResult.error("文件转换失败:" + e.getMessage());
} finally {
// 删除临时文件
if (tempFile != null) {
try {
Files.deleteIfExists(tempFile);
} catch (IOException e) {
// 忽略删除失败
}
}
}
}
@Operation(summary = "拉取行内流水", description = "从数仓拉取行内流水数据")

View File

@@ -1,10 +1,11 @@
package com.ruoyi.lsfx.util;
import com.ruoyi.lsfx.exception.LsfxApiException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.ruoyi.lsfx.exception.LsfxApiException;
import jakarta.annotation.Resource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.core.io.FileSystemResource;
import org.springframework.http.*;
import org.springframework.stereotype.Component;
import org.springframework.util.LinkedMultiValueMap;
@@ -13,6 +14,7 @@ import org.springframework.web.client.RestClientException;
import org.springframework.web.client.RestTemplate;
import org.springframework.web.util.UriComponentsBuilder;
import java.io.File;
import java.util.Map;
/**
@@ -200,7 +202,15 @@ public class HttpUtil {
MultiValueMap<String, Object> body = new LinkedMultiValueMap<>();
if (params != null) {
params.forEach(body::add);
params.forEach((key, value) -> {
// 如果是File对象包装为FileSystemResource
if (value instanceof File) {
File file = (File) value;
body.add(key, new FileSystemResource(file));
} else {
body.add(key, value);
}
});
}
HttpEntity<MultiValueMap<String, Object>> requestEntity = new HttpEntity<>(body, httpHeaders);

View File

@@ -0,0 +1,103 @@
package com.ruoyi.ccdi.project.log;
import ch.qos.logback.classic.PatternLayout;
import ch.qos.logback.classic.spi.ILoggingEvent;
import ch.qos.logback.core.FileAppender;
import ch.qos.logback.core.UnsynchronizedAppenderBase;
import lombok.extern.slf4j.Slf4j;
import java.io.File;
import java.text.SimpleDateFormat;
import java.util.Date;
/**
* 文件上传批次日志Appender
* 为每个批次创建独立的日志文件
*
* @author ruoyi
* @date 2026-03-05
*/
@Slf4j
public class FileUploadLogAppender extends UnsynchronizedAppenderBase<ILoggingEvent> {
private static final ThreadLocal<FileAppender<ILoggingEvent>> currentAppender = new ThreadLocal<>();
private PatternLayout layout;
@Override
public void start() {
// 初始化日志格式
this.layout = new PatternLayout();
this.layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
this.layout.setContext(getContext());
this.layout.start();
super.start();
log.info("【文件上传日志】FileUploadLogAppender已启动");
}
@Override
protected void append(ILoggingEvent event) {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.doAppend(event);
}
}
/**
* 为指定批次创建独立的日志文件
*
* @param uploadPath ruoyi.profile配置的上传路径
* @param projectId 项目ID
* @param batchId 批次ID
*/
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
try {
// 构建日志文件路径: {ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
String logDirPath = uploadPath + File.separator + "logs" + File.separator
+ "file-upload" + File.separator + projectId;
// 确保目录存在
File logDir = new File(logDirPath);
if (!logDir.exists()) {
logDir.mkdirs();
}
String logFilePath = logDirPath + File.separator + timestamp + ".log";
// 创建FileAppender
FileAppender<ILoggingEvent> appender = new FileAppender<>();
appender.setFile(logFilePath);
PatternLayout layout = new PatternLayout();
layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
layout.setContext(appender.getContext());
layout.start();
appender.setLayout(layout);
appender.setAppend(true);
appender.setContext(appender.getContext());
appender.start();
currentAppender.set(appender);
log.info("【文件上传日志】创建批次日志文件: path={}, batchId={}", logFilePath, batchId);
} catch (Exception e) {
log.error("【文件上传日志】创建批次日志文件失败: projectId={}, batchId={}", projectId, batchId, e);
}
}
/**
* 关闭当前批次的日志文件
*/
public static void closeBatchLogFile() {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.stop();
currentAppender.remove();
log.info("【文件上传日志】关闭批次日志文件");
}
}
}

View File

@@ -4,11 +4,18 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.log.FileUploadLogAppender;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest;
import com.ruoyi.lsfx.domain.response.*;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
@@ -58,6 +65,12 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Qualifier("fileUploadExecutor")
private Executor fileUploadExecutor;
@Resource
private LsfxAnalysisClient lsfxClient;
@Resource
private CcdiBankStatementMapper bankStatementMapper;
/**
* 获取临时文件存储目录
*/
@@ -153,8 +166,11 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
log.info("【文件上传】项目信息验证通过: projectId={}, lsfxProjectId={}", projectId, lsfxProjectId);
// Critical Fix #2: 保存MultipartFile到临时存储,避免异步处理时文件已被清理
// Critical Fix #2 & #4: 保存临时文件和创建记录在同一个循环中,确保一一对应
List<String> tempFilePaths = new ArrayList<>();
List<CcdiFileUploadRecord> records = new ArrayList<>();
Date now = new Date();
try {
// 确保临时目录存在
Path tempDir = Paths.get(getTempFileDir());
@@ -162,39 +178,41 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
Files.createDirectories(tempDir);
}
// 保存所有文件到临时目录
for (MultipartFile file : files) {
// 同一个循环中保存临时文件和创建记录,确保索引一一对应
for (int i = 0; i < files.length; i++) {
MultipartFile file = files[i];
// 1. 保存临时文件
String originalFilename = file.getOriginalFilename();
String tempFileName = batchId + "_" + System.currentTimeMillis() + "_" + originalFilename;
String tempFileName = batchId + "_" + i + "_" + System.currentTimeMillis() + "_" + originalFilename;
Path tempFilePath = tempDir.resolve(tempFileName);
// 将MultipartFile内容复制到临时文件
Files.copy(file.getInputStream(), tempFilePath, StandardCopyOption.REPLACE_EXISTING);
tempFilePaths.add(tempFilePath.toString());
log.debug("【文件上传】保存临时文件: originalName={}, tempPath={}",
originalFilename, tempFilePath);
log.debug("【文件上传】保存临时文件[{}]: originalName={}, tempPath={}",
i, originalFilename, tempFilePath);
// 2. 创建记录使用相同的索引i
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(projectId);
record.setLsfxProjectId(lsfxProjectId);
record.setFileName(originalFilename);
record.setFileSize(file.getSize());
record.setFileStatus("uploading");
record.setUploadTime(now);
record.setUploadUser(username);
records.add(record);
}
} catch (IOException e) {
log.error("【文件上传】保存临时文件失败", e);
throw new RuntimeException("保存临时文件失败: " + e.getMessage(), e);
}
// 3. 批量插入文件记录(status=uploading)
List<CcdiFileUploadRecord> records = new ArrayList<>();
Date now = new Date();
for (int i = 0; i < files.length; i++) {
MultipartFile file = files[i];
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(projectId);
record.setLsfxProjectId(lsfxProjectId);
record.setFileName(file.getOriginalFilename());
record.setFileSize(file.getSize());
record.setFileStatus("uploading");
record.setUploadTime(now);
record.setUploadUser(username);
records.add(record);
// 验证数量一致性
if (tempFilePaths.size() != records.size()) {
throw new RuntimeException(String.format(
"临时文件数量(%d)与记录数量(%d)不一致", tempFilePaths.size(), records.size()));
}
recordMapper.insertBatch(records);
@@ -240,52 +258,60 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
String batchId) {
log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId);
// 循环提交任务
for (int i = 0; i < tempFilePaths.size(); i++) {
// Critical Fix #6: 检查线程中断状态
if (Thread.currentThread().isInterrupted()) {
log.warn("【文件上传】调度线程被中断,停止提交剩余任务");
break;
}
// 创建批次日志文件
FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);
String tempFilePath = tempFilePaths.get(i);
CcdiFileUploadRecord record = records.get(i);
try {
// 循环提交任务
for (int i = 0; i < tempFilePaths.size(); i++) {
// Critical Fix #6: 检查线程中断状态
if (Thread.currentThread().isInterrupted()) {
log.warn("【文件上传】调度线程被中断,停止提交剩余任务");
break;
}
boolean submitted = false;
int retryCount = 0;
String tempFilePath = tempFilePaths.get(i);
CcdiFileUploadRecord record = records.get(i);
while (!submitted && retryCount < 2) {
try {
// 尝试提交异步任务
CompletableFuture.runAsync(
() -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record),
fileUploadExecutor
);
submitted = true;
log.info("【文件上传】任务提交成功: fileName={}, recordId={}",
record.getFileName(), record.getId());
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}",
record.getFileName());
try {
Thread.sleep(30000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("【文件上传】等待被中断: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
break;
boolean submitted = false;
int retryCount = 0;
while (!submitted && retryCount < 2) {
try {
// 尝试提交异步任务
CompletableFuture.runAsync(
() -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record),
fileUploadExecutor
);
submitted = true;
log.info("【文件上传】任务提交成功: fileName={}, recordId={}",
record.getFileName(), record.getId());
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}",
record.getFileName());
try {
Thread.sleep(30000);
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
log.error("【文件上传】等待被中断: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
break;
}
} else {
log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
}
} else {
log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName());
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
}
}
}
}
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId);
} finally {
// 关闭批次日志文件
FileUploadLogAppender.closeBatchLogFile();
}
}
/**
@@ -326,14 +352,27 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
// 步骤2:上传文件到流水分析平台
log.info("【文件上传】步骤2: 上传文件到流水分析平台");
// TODO: 调用 lsfxClient.uploadFile()
// 需要将临时文件转换为MultipartFile或直接使用文件路径
// UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, filePath.toFile());
// Integer logId = uploadResponse.getData().getLogId();
log.info("【文件上传】步骤2: 上传文件到流水分析平台, tempPath={}", tempFilePath);
// 临时模拟 logId
Integer logId = (int) (System.currentTimeMillis() % 1000000);
File file = filePath.toFile();
if (!file.exists()) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
if (uploadResponse == null || uploadResponse.getData() == null
|| uploadResponse.getData().getUploadLogList() == null
|| uploadResponse.getData().getUploadLogList().isEmpty()) {
throw new RuntimeException("上传文件失败: 响应数据为空");
}
// 从 uploadLogList 中获取第一个 logId
Integer logId = uploadResponse.getData().getUploadLogList().get(0).getLogId();
if (logId == null) {
throw new RuntimeException("上传文件失败: 未返回logId");
}
log.info("【文件上传】文件上传成功: logId={}", logId);
// 步骤3:更新状态为 parsing
log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId);
@@ -343,42 +382,67 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
// 步骤4:轮询解析状态(最多300次,间隔2秒)
log.info("【文件上传】步骤4: 开始轮询解析状态");
// TODO: 实现真实的轮询逻辑
// boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
boolean parsingComplete = true; // 临时模拟
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确");
}
// 步骤5:获取文件上传状态
log.info("【文件上传】步骤5: 获取文件上传状态");
// TODO: 调用 lsfxClient.getFileUploadStatus()
// GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...);
log.info("【文件上传】步骤5: 获取文件上传状态: logId={}", logId);
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
if (statusResponse == null || statusResponse.getData() == null
|| statusResponse.getData().getLogs() == null
|| statusResponse.getData().getLogs().isEmpty()) {
throw new RuntimeException("获取文件上传状态失败: 响应数据为空");
}
// 获取第一个log项因为我们传了logId应该只返回一个
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
log.info("【文件上传】文件状态: status={}, uploadStatusDesc={}", status, uploadStatusDesc);
// 步骤6:判断解析结果
// TODO: 实现真实的判断逻辑
boolean parseSuccess = true; // 临时模拟
// status=-5 且 uploadStatusDesc="data.wait.confirm.newaccount" 表示解析成功
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 解析成功
log.info("【文件上传】步骤6: 解析成功,保存主体信息");
// 提取主体名称和账号
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
record.setFileStatus("parsed_success");
// TODO: 从实际的解析结果中获取
record.setEnterpriseNames("测试主体1,测试主体2");
record.setAccountNos("622xxx,623xxx");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}",
enterpriseNamesStr, accountNosStr);
// 步骤7:获取流水数据并保存
log.info("【文件上传】步骤7: 获取流水数据");
// TODO: 实现 fetchAndSaveBankStatements
// fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount);
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
// 解析失败
log.warn("【文件上传】步骤6: 解析失败");
log.warn("【文件上传】步骤6: 解析失败: status={}, desc={}", status, uploadStatusDesc);
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败:文件格式错误");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
}
@@ -402,21 +466,155 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
/**
* 轮询解析状态
* TODO: 实现真实逻辑
* 轮询解析状态固定间隔2秒最多300次
*
* @param groupId 项目ID
* @param logId 文件ID
* @return true=解析完成false=超时未完成
*/
private boolean waitForParsingComplete(Integer groupId, String logId) {
// TODO: 调用 lsfxClient.checkParseStatus() 轮询
return true;
log.info("【文件上传】开始轮询解析状态: groupId={}, logId={}", groupId, logId);
int maxRetries = 300;
int intervalSeconds = 2;
for (int i = 1; i <= maxRetries; i++) {
try {
// 调用检查解析状态接口
CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId);
if (response == null || response.getData() == null) {
log.warn("【文件上传】轮询第{}次: 响应数据为空", i);
Thread.sleep(intervalSeconds * 1000L);
continue;
}
Boolean parsing = response.getData().getParsing();
log.debug("【文件上传】轮询第{}次: parsing={}", i, parsing);
// parsing=false 表示解析完成
if (Boolean.FALSE.equals(parsing)) {
log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i);
return true;
}
// 未完成,等待后继续
if (i < maxRetries) {
Thread.sleep(intervalSeconds * 1000L);
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
log.error("【文件上传】轮询被中断: logId={}", logId, e);
return false;
} catch (Exception e) {
log.error("【文件上传】轮询异常: logId={}, 次数={}", logId, i, e);
// 继续轮询,不中断
}
}
log.warn("【文件上传】轮询超时: logId={}, 已轮询{}次", logId, maxRetries);
return false;
}
/**
* 获取并保存流水数据
* TODO: 实现真实逻辑
* 获取并保存流水数据每页1000条批量插入每批1000条
*
* @param projectId 项目ID业务字段
* @param groupId 流水分析平台项目ID
* @param logId 文件ID
*/
private void fetchAndSaveBankStatements(Long projectId, Integer groupId,
Integer logId, int totalCount) {
// TODO: 调用 lsfxClient.getBankStatement() 获取流水
// TODO: 批量插入到 ccdi_bank_statement
private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId) {
log.info("【文件上传】开始获取流水数据: projectId={}, groupId={}, logId={}",
projectId, groupId, logId);
// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1); // 只获取1条用于获取总数
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
if (firstResponse == null || firstResponse.getData() == null) {
log.warn("【文件上传】获取流水数据失败: 响应数据为空");
return;
}
Integer totalCount = firstResponse.getData().getTotalCount();
if (totalCount == null || totalCount <= 0) {
log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount);
return;
}
log.info("【文件上传】获取到总数: totalCount={}", totalCount);
// 步骤2: 计算分页信息
int pageSize = 1000; // 每页1000条
int batchSize = 1000; // 批量插入每批1000条与pageSize保持一致
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages);
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
int totalSaved = 0;
// 步骤3: 循环分页获取所有数据
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
try {
// 构建请求参数
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
// 获取流水数据
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
if (response == null || response.getData() == null
|| response.getData().getBankStatementList() == null) {
log.warn("【文件上传】获取流水数据为空: pageNow={}", pageNow);
continue;
}
List<GetBankStatementResponse.BankStatementItem> items =
response.getData().getBankStatementList();
log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size());
// 转换并收集到批量列表
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
if (statement != null) {
statement.setProjectId(projectId); // 设置业务项目ID
batchList.add(statement);
// 达到批量插入阈值1000条执行插入
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入流水: {}条, 累计{}条",
batchList.size(), totalSaved);
batchList.clear();
}
}
}
} catch (Exception e) {
log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e);
// 继续处理下一页,不中断整个流程
}
}
// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
totalSaved += batchList.size();
log.debug("【文件上传】批量插入剩余流水: {}条", batchList.size());
}
log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved);
}
}

View File

@@ -0,0 +1,98 @@
# 异步文件上传功能 - 前端设计更新
## 文档信息
- **更新日期**: 2026-03-05
- **版本**: v1.1
- **变更说明**: 修改文件格式限制
## 变更内容
### 文件格式限制变更
**原限制**
- 仅支持 Excel 文件(.xlsx, .xls
**新限制**
- 支持 PDF 文件(.pdf
- 支持 CSV 文件(.csv
- 支持 Excel 文件(.xlsx, .xls
### 修改点
#### 1. 前端校验逻辑
```javascript
// 修改前
const validTypes = ['.xlsx', '.xls'];
// 修改后
const validTypes = ['.pdf', '.csv', '.xlsx', '.xls'];
```
#### 2. 错误提示
```
修改前: "仅支持 .xlsx, .xls 格式文件"
修改后: "仅支持 PDF、CSV、Excel 格式文件"
```
#### 3. 上传卡片描述
```
修改前: "支持 Excel、PDF 格式文件上传"
修改后: "支持 PDF、CSV、Excel 格式文件上传"
```
#### 4. 批量上传弹窗提示
```
修改前: "支持 .xlsx, .xls 格式文件最多上传100个文件"
修改后: "支持 PDF、CSV、Excel 格式文件最多100个文件单个文件不超过50MB"
```
#### 5. accept属性
```html
<!-- 新增 -->
<el-upload accept=".pdf,.csv,.xlsx,.xls" ...>
```
## 后端接口变更要求
后端Controller接口需要同步修改文件格式校验逻辑
```java
// CcdiFileUploadController.java
// 修改文件格式校验部分
// 修改前
if (!fileName.endsWith(".xlsx") && !fileName.endsWith(".xls")) {
return AjaxResult.error("文件 " + fileName + " 格式不支持仅支持Excel文件");
}
// 修改后
String lowerFileName = fileName.toLowerCase();
if (!lowerFileName.endsWith(".pdf") && !lowerFileName.endsWith(".csv")
&& !lowerFileName.endsWith(".xlsx") && !lowerFileName.endsWith(".xls")) {
return AjaxResult.error("文件 " + fileName + " 格式不支持仅支持PDF、CSV、Excel文件");
}
```
## 测试变更
### 测试文件格式
需要测试以下格式:
- ✅ PDF 文件
- ✅ CSV 文件
- ✅ XLSX 文件
- ✅ XLS 文件
- ❌ 其他格式(应被拒绝)
### 测试用例
1. 上传PDF文件 → 应成功
2. 上传CSV文件 → 应成功
3. 上传XLSX文件 → 应成功
4. 上传XLS文件 → 应成功
5. 上传TXT文件 → 应提示"格式不支持"
6. 上传DOC文件 → 应提示"格式不支持"
---
**文档结束**

View File

@@ -0,0 +1,544 @@
# 异步文件上传服务实现设计文档
## 文档信息
- **创建日期**: 2026-03-05
- **版本**: v1.0
- **作者**: Claude
- **状态**: 已批准
## 1. 概述
### 1.1 功能描述
实现 `CcdiFileUploadServiceImpl` 中所有 TODO 方法,完成项目流水文件的异步批量上传功能的端到端流程。
### 1.2 核心需求
- 集成流水分析平台客户端LsfxAnalysisClient
- 实现文件上传到流水分析平台
- 实现轮询解析状态(固定间隔策略)
- 获取并判断解析结果
- 批量获取并保存流水数据到本地数据库
- 实现批次日志管理
### 1.3 技术栈
- Spring @Async 异步处理
- ThreadPoolTaskExecutor 线程池
- MyBatis Plus 批量操作
- Logback 自定义日志
- 流水分析平台 API
## 2. 设计决策
### 2.1 轮询策略
**决策**: 固定间隔策略
- 轮询次数: 300次
- 间隔时间: 2秒
- 最长等待: 10分钟
- **理由**: 简单可靠,符合设计文档要求
### 2.2 分页获取策略
**决策**: 大批量分页
- 每页数量: 1000条
- 批量插入: 每批1000条
- 先调用一次获取 totalCount
- **理由**: 性能与内存占用的平衡
### 2.3 错误处理策略
**决策**: 严格失败策略
- 任何异常直接标记为 `parsed_failed`
- 记录详细的错误信息到 `error_message` 字段
- 不进行额外重试(线程池层面已有重试机制)
- **理由**: 简单明了,便于排查问题
### 2.4 日志管理策略
**决策**: 完整实现批次日志
- 实现自定义 `FileUploadLogAppender`
- 每个批次生成独立日志文件
- 路径基于 `ruoyi.profile` 配置
- **理由**: 便于运维排查问题
## 3. 详细设计
### 3.1 依赖注入
```java
@Slf4j
@Service
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
@Value("${ruoyi.profile}")
private String uploadPath;
@Resource
private CcdiFileUploadRecordMapper recordMapper;
@Resource
private CcdiProjectMapper projectMapper;
@Resource
@Qualifier("fileUploadExecutor")
private Executor fileUploadExecutor;
@Resource
private LsfxAnalysisClient lsfxClient; // 新增
@Resource
private CcdiBankStatementMapper bankStatementMapper; // 新增
```
### 3.2 文件上传逻辑processFileAsync 第329-333行
**核心流程**:
1. 将临时文件路径转换为 File 对象
2. 验证文件存在性
3. 调用 `lsfxClient.uploadFile(lsfxProjectId, file)`
4. 提取并验证返回的 logId
**关键代码**:
```java
File file = filePath.toFile();
if (!file.exists()) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
if (uploadResponse == null || uploadResponse.getData() == null) {
throw new RuntimeException("上传文件失败: 响应数据为空");
}
Integer logId = uploadResponse.getData().getLogId();
if (logId == null) {
throw new RuntimeException("上传文件失败: 未返回logId");
}
```
### 3.3 轮询解析状态逻辑waitForParsingComplete
**核心流程**:
1. 调用 `checkParseStatus(groupId, logId)`
2. 检查 `parsing` 字段
3. `parsing=false` 表示解析完成
4. 固定间隔2秒最多300次
**关键代码**:
```java
for (int i = 1; i <= maxRetries; i++) {
CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId);
if (response == null || response.getData() == null) {
log.warn("【文件上传】轮询第{}次: 响应数据为空", i);
Thread.sleep(intervalSeconds * 1000L);
continue;
}
Boolean parsing = response.getData().getParsing();
// parsing=false 表示解析完成
if (Boolean.FALSE.equals(parsing)) {
log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i);
return true;
}
if (i < maxRetries) {
Thread.sleep(intervalSeconds * 1000L);
}
}
```
**异常处理**:
- `InterruptedException`: 恢复中断状态,返回 false
- 其他异常: 记录日志,继续轮询
### 3.4 获取解析结果逻辑processFileAsync 第355-383行
**核心流程**:
1. 调用 `getFileUploadStatus(groupId, logId)`
2. 判断 `status == -5 && uploadStatusDesc == "data.wait.confirm.newaccount"`
3. 提取 `enterpriseNameList``accountNoList`
4. 解析成功则调用 `fetchAndSaveBankStatements()`
**关键代码**:
```java
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
// 判断解析结果
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 提取主体信息
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
recordMapper.updateById(record);
// 获取流水数据
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
}
```
### 3.5 批量保存流水数据逻辑fetchAndSaveBankStatements
**核心流程**:
1. 先调用一次接口获取 totalCountpageSize=1, pageNow=1
2. 计算分页信息每页1000条
3. 循环分页获取所有数据
4. 每累积1000条批量插入一次
5. 设置 projectId 到每条流水记录
**关键代码**:
```java
// 步骤1: 先调用一次接口获取 totalCount
GetBankStatementRequest firstRequest = new GetBankStatementRequest();
firstRequest.setGroupId(groupId);
firstRequest.setLogId(logId);
firstRequest.setPageNow(1);
firstRequest.setPageSize(1);
GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest);
Integer totalCount = firstResponse.getData().getTotalCount();
// 步骤2: 计算分页信息
int pageSize = 1000;
int batchSize = 1000;
int totalPages = (int) Math.ceil((double) totalCount / pageSize);
List<CcdiBankStatement> batchList = new ArrayList<>(batchSize);
// 步骤3: 循环分页获取
for (int pageNow = 1; pageNow <= totalPages; pageNow++) {
GetBankStatementRequest request = new GetBankStatementRequest();
request.setGroupId(groupId);
request.setLogId(logId);
request.setPageNow(pageNow);
request.setPageSize(pageSize);
GetBankStatementResponse response = lsfxClient.getBankStatement(request);
for (GetBankStatementResponse.BankStatementItem item : items) {
CcdiBankStatement statement = CcdiBankStatement.fromResponse(item);
statement.setProjectId(projectId); // 设置业务项目ID
batchList.add(statement);
// 达到批量插入阈值1000条
if (batchList.size() >= batchSize) {
bankStatementMapper.insertBatch(batchList);
batchList.clear();
}
}
}
// 步骤4: 保存剩余的数据
if (!batchList.isEmpty()) {
bankStatementMapper.insertBatch(batchList);
}
```
**性能优化**:
- 每页1000条减少请求次数
- 批量插入1000条提高数据库性能
- 异常不中断,继续处理下一页
### 3.6 批次日志管理FileUploadLogAppender
**核心功能**:
1. 继承 `UnsynchronizedAppenderBase<ILoggingEvent>`
2. 使用 `ThreadLocal` 存储当前批次的 FileAppender
3. 为每个批次创建独立的日志文件
**日志文件路径**:
```
{ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log
```
**示例**:
- Windows: `D:/ruoyi/uploadPath/logs/file-upload/123/20260305-103025.log`
- Linux: `/var/ruoyi/logs/file-upload/123/20260305-103025.log`
**关键方法**:
```java
/**
* 为指定批次创建独立的日志文件
*/
public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) {
String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date());
String logDirPath = uploadPath + File.separator + "logs" + File.separator
+ "file-upload" + File.separator + projectId;
File logDir = new File(logDirPath);
if (!logDir.exists()) {
logDir.mkdirs();
}
String logFilePath = logDirPath + File.separator + timestamp + ".log";
FileAppender<ILoggingEvent> appender = new FileAppender<>();
appender.setFile(logFilePath);
PatternLayout layout = new PatternLayout();
layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n");
layout.start();
appender.setLayout(layout);
appender.start();
currentAppender.set(appender);
}
/**
* 关闭当前批次的日志文件
*/
public static void closeBatchLogFile() {
FileAppender<ILoggingEvent> appender = currentAppender.get();
if (appender != null) {
appender.stop();
currentAppender.remove();
}
}
```
**使用方式**:
```java
private void submitTasksAsync(...) {
// 创建批次日志文件
FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId);
try {
// 任务提交逻辑
} finally {
// 关闭日志文件
FileUploadLogAppender.closeBatchLogFile();
}
}
```
## 4. 实现细节
### 4.1 文件上传完整流程
```java
@Async("fileUploadExecutor")
public void processFileAsync(Long projectId, Integer lsfxProjectId, String tempFilePath,
Long recordId, String batchId, CcdiFileUploadRecord record) {
try {
// 步骤1: 状态已是uploading记录已存在
Path filePath = Paths.get(tempFilePath);
if (!Files.exists(filePath)) {
throw new RuntimeException("临时文件不存在: " + tempFilePath);
}
// 步骤2: 上传文件到流水分析平台
File file = filePath.toFile();
UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file);
Integer logId = uploadResponse.getData().getLogId();
// 步骤3: 更新状态为 parsing
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
// 步骤4: 轮询解析状态最多300次间隔2秒
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟)");
}
// 步骤5: 获取文件上传状态
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
// 步骤6: 判断解析结果
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 解析成功
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNames != null ? String.join(",", enterpriseNames) : "");
record.setAccountNos(accountNos != null ? String.join(",", accountNos) : "");
recordMapper.updateById(record);
// 步骤7: 获取流水数据并保存
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
} else {
// 解析失败
record.setFileStatus("parsed_failed");
record.setErrorMessage("解析失败: " + uploadStatusDesc);
recordMapper.updateById(record);
}
} catch (Exception e) {
log.error("【文件上传】处理失败: fileName={}", record.getFileName(), e);
updateRecordStatus(recordId, "parsed_failed", e.getMessage());
} finally {
// 清理临时文件
try {
Path filePath = Paths.get(tempFilePath);
if (Files.exists(filePath)) {
Files.delete(filePath);
}
} catch (IOException e) {
log.warn("【文件上传】清理临时文件失败: {}", tempFilePath, e);
}
}
}
```
### 4.2 错误处理规范
**异常分类**:
1. **文件异常**: 临时文件不存在、文件转换失败
2. **网络异常**: 流水分析平台接口调用失败
3. **业务异常**: 解析失败、解析超时
4. **数据库异常**: 批量插入失败
**处理策略**:
- 所有异常统一捕获,记录详细日志
- 直接标记为 `parsed_failed`
- 记录错误信息到 `error_message` 字段
- finally 块确保临时文件被清理
### 4.3 日志记录规范
**日志级别**:
- `INFO`: 关键步骤(开始上传、上传成功、解析完成、保存成功)
- `DEBUG`: 详细信息(轮询次数、每页数据量)
- `WARN`: 警告信息(响应数据为空、清理失败)
- `ERROR`: 错误信息(处理失败、异常)
**日志格式**:
```
【文件上传】{步骤描述}: {关键参数}={值}
```
**示例**:
```
【文件上传】开始处理文件: fileName=流水1.xlsx, recordId=123
【文件上传】文件上传成功: logId=456789
【文件上传】解析完成: logId=456789, 轮询次数=15
【文件上传】流水数据保存完成: 总共保存5000条
```
## 5. 文件清单
### 5.1 需要修改的文件
| 文件路径 | 修改内容 |
|---------|---------|
| `CcdiFileUploadServiceImpl.java` | 实现 processFileAsync、waitForParsingComplete、fetchAndSaveBankStatements 中的 TODO |
### 5.2 需要新增的文件
| 文件路径 | 说明 |
|---------|------|
| `ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java` | 批次日志管理器 |
## 6. 测试策略
### 6.1 单元测试
**测试用例**:
1. `waitForParsingComplete` - 正常轮询成功
2. `waitForParsingComplete` - 轮询超时
3. `waitForParsingComplete` - 轮询被中断
4. `fetchAndSaveBankStatements` - 无数据
5. `fetchAndSaveBankStatements` - 单页数据
6. `fetchAndSaveBankStatements` - 多页数据
7. `fetchAndSaveBankStatements` - 异常处理
### 6.2 集成测试
**测试场景**:
1. 完整流程测试(单个文件,正常场景)
2. 大文件测试50MB
3. 批量文件测试10个文件
4. 解析失败场景
5. 网络异常场景
6. 线程池满载场景
### 6.3 性能测试
**测试指标**:
- 单个文件处理时长: 3-15分钟
- 100个文件并发处理
- 数据库批量插入性能
- 内存占用情况
## 7. 部署注意事项
### 7.1 配置检查
- [ ] `ruoyi.profile` 配置正确且目录有写权限
- [ ] 线程池容量配置默认100
- [ ] 流水分析平台地址配置正确
- [ ] 应用认证信息配置正确
### 7.2 监控指标
- 线程池活跃线程数
- 文件上传成功率
- 平均处理时长
- 批量插入性能
- 日志文件大小和数量
### 7.3 运维建议
- 定期清理30天前的日志文件
- 监控线程池状态
- 关注数据库连接池使用情况
- 流水分析平台接口调用成功率监控
## 8. 风险与缓解
### 8.1 风险识别
| 风险 | 影响 | 概率 | 缓解措施 |
|------|------|------|----------|
| 流水分析平台不稳定 | 高 | 中 | 异常捕获,标记失败,详细日志 |
| 大文件内存溢出 | 高 | 低 | 批量插入,及时清理临时文件 |
| 线程池满载 | 中 | 中 | 重试机制,提示系统繁忙 |
| 日志文件过大 | 低 | 中 | 按批次分离,定期清理 |
### 8.2 回滚方案
如遇严重问题,可以:
1. 禁用异步上传功能
2. 回退到同步上传方式
3. 暂停新的上传任务
## 9. 参考资料
- [项目异步文件上传功能设计文档](../../design/2026-03-05-async-file-upload-design.md)
- [项目异步文件上传需求](../../assets/项目异步文件上传/task.md)
- [流水分析平台接口文档](../2026-03-02-lsfx-integration-design.md)
- [银行流水实体设计](../2026-03-04-bank-statement-entity-design.md)
---
**文档结束**

View File

@@ -0,0 +1,113 @@
#!/ 异步文件上传功能集成测试脚本
# 测试说明
# 本脚本用于测试异步文件上传功能的完整流程
# 包括: 文件上传、轮询状态、 数据保存
# 测试环境
BASE_URL="http://localhost:8080"
TOKEN=""
# 颜色输出
RED='\033[0;31m'
GREEN='\033[1;32m'
YELLOW='\033[1;33m'
NC='\033[0m'
# 获取 Token
echo -e "${YELLOW}开始获取 Token...${NC}"
TOKEN_RESPONSE=$(curl -s -X POST "${BASE_URL}/login/test?username=admin&password=admin123")
TOKEN=$(echo "$TOKEN_RESPONSE" | grep -o '"token":"[^"]*' | sed 's/.*:\([^"]*\).*/\1/')
if [ -z "$TOKEN" ]; then
echo -e "${RED}获取 Token 失败${NC}"
exit 1
fi
echo -e "${GREEN}Token 获取成功${NC}"
# 准备测试数据
echo -e "${YELLOW}准备测试项目...${NC}"
# 创建测试项目
PROJECT_DATA=$(cat <<EOF
{
"projectName": "测试项目-$(date +%Y%m%d)",
"projectStatus": "进行中"
}
EOF
)
CREATE_RESPONSE=$(curl -s -X POST "${BASE_URL}/ccdi/project" \
-H "Authorization: Bearer ${TOKEN}" \
-H "Content-Type: application/json" \
-d "$PROJECT_DATA")
PROJECT_ID=$(echo "$CREATE_RESPONSE" | grep -o '"projectId":[^,]*' | sed 's/.*:\([^"]*\).*/\1/')
if [ -z "$PROJECT_ID" ]; then
echo -e "${RED}创建项目失败${NC}"
exit 1
fi
echo -e "${GREEN}项目创建成功: ID=${PROJECT_ID}${NC}"
# 创建测试文件
TEST_FILE="/tmp/test_bank_statement_$(date +%s).xlsx"
echo "账号,日期,金额,摘要" > "$TEST_FILE"
echo "622xxx,2024-01-01,1000.00,测试交易1" >> "$TEST_FILE"
echo "623xxx,2024-01-02,2000.00,测试交易2" >> "$TEST_FILE"
echo "622xxx,2024-01-03,3000.00,测试交易3" >> "$TEST_FILE"
# 测试文件上传
echo -e "${YELLOW}测试文件上传...${NC}"
UPLOAD_RESPONSE=$(curl -s -X POST "${BASE_URL}/ccdi/file-upload/batch" \
-H "Authorization: Bearer ${TOKEN}" \
-F "projectId=${PROJECT_ID}" \
-F "files[]=@${TEST_FILE};type=text/plain")
BATCH_ID=$(echo "$UPLOAD_RESPONSE" | grep -o '"data":"[^"]*' | sed 's/.*:\([^"]*\).*/\1/')
if [ -z "$BATCH_ID" ]; then
echo -e "${RED}文件上传失败${NC}"
exit 1
fi
echo -e "${GREEN}文件上传成功: Batch ID=${BATCH_ID}${NC}"
# 等待处理完成
echo -e "${YELLOW}等待文件处理...${NC}"
sleep 10
# 查询上传记录
RECORDS_RESPONSE=$(curl -s -X GET "${BASE_URL}/ccdi/file-upload/list?projectId=${PROJECT_ID}" \
-H "Authorization: Bearer ${TOKEN}")
RECORDS=$(echo "$RECORDS_RESPONSE" | grep -o '"rows"' | sed 's/.*:\(\[.*\]\).*/\1/')
if [ -z "$RECORDS" ] || [ "$RECORDS" = "[]" ]; then
echo -e "${RED}未找到上传记录${NC}"
exit 1
fi
echo -e "${GREEN}查询到 ${#RECORDS[@]} 条记录${NC}"
# 验证记录状态
for RECORD in $RECORDS; do
STATUS=$(echo "$RECORD" | grep -o '"fileStatus"' | sed 's/.*:\([^"]*\).*/\1/')
if [ "$STATUS" = "\"parsed_success\"" ]; then
echo -e "${GREEN}文件解析成功${NC}"
elif [ "$STATUS" = "\"parsed_failed\"" ]; then
ERROR=$(echo "$RECORD" | grep -o '"errorMessage"' | sed 's/.*:\([^"]*\).*/\1/')
echo -e "${RED}文件解析失败: ${ERROR}${NC}"
else
echo -e "${YELLOW}文件状态: ${STATUS}${NC}"
fi
done
# 清理测试数据
echo -e "${YELLOW}清理测试数据...${NC}"
curl -s -X DELETE "${BASE_URL}/ccdi/project/${PROJECT_ID}" \
-H "Authorization: Bearer ${TOKEN}"
rm -f "$TEST_FILE"
echo -e "${GREEN}测试完成${NC}"

View File

@@ -79,3 +79,63 @@ export function getImportStatus(taskId) {
method: 'get'
})
}
// ========== 批量文件上传相关接口 ==========
/**
* 批量上传文件
* @param {Number} projectId 项目ID
* @param {Array<File>} files 文件数组
* @returns {Promise} 返回 batchId
*/
export function batchUploadFiles(projectId, files) {
const formData = new FormData()
files.forEach(file => {
formData.append('files', file)
})
formData.append('projectId', projectId)
return request({
url: '/ccdi/file-upload/batch',
method: 'post',
data: formData,
headers: {
'Content-Type': 'multipart/form-data'
},
timeout: 300000 // 5分钟超时
})
}
/**
* 查询文件上传记录列表
* @param {Object} params 查询参数
*/
export function getFileUploadList(params) {
return request({
url: '/ccdi/file-upload/list',
method: 'get',
params
})
}
/**
* 查询文件上传统计
* @param {Number} projectId 项目ID
*/
export function getFileUploadStatistics(projectId) {
return request({
url: `/ccdi/file-upload/statistics/${projectId}`,
method: 'get'
})
}
/**
* 查询文件上传详情
* @param {Number} id 记录ID
*/
export function getFileUploadDetail(id) {
return request({
url: `/ccdi/file-upload/detail/${id}`,
method: 'get'
})
}

View File

@@ -46,6 +46,126 @@
</div>
</div>
<!-- 统计卡片区域 -->
<div class="statistics-section">
<div class="stat-card" @click="handleStatusFilter('uploading')">
<div class="stat-icon uploading">
<i class="el-icon-upload"></i>
</div>
<div class="stat-content">
<div class="stat-label">上传中</div>
<div class="stat-value">{{ statistics.uploading }}</div>
</div>
</div>
<div class="stat-card" @click="handleStatusFilter('parsing')">
<div class="stat-icon parsing">
<i class="el-icon-loading"></i>
</div>
<div class="stat-content">
<div class="stat-label">解析中</div>
<div class="stat-value">{{ statistics.parsing }}</div>
</div>
</div>
<div class="stat-card" @click="handleStatusFilter('parsed_success')">
<div class="stat-icon success">
<i class="el-icon-success"></i>
</div>
<div class="stat-content">
<div class="stat-label">解析成功</div>
<div class="stat-value">{{ statistics.parsed_success }}</div>
</div>
</div>
<div class="stat-card" @click="handleStatusFilter('parsed_failed')">
<div class="stat-icon failed">
<i class="el-icon-error"></i>
</div>
<div class="stat-content">
<div class="stat-label">解析失败</div>
<div class="stat-value">{{ statistics.parsed_failed }}</div>
</div>
</div>
</div>
<!-- 文件上传记录列表 -->
<div class="file-list-section">
<div class="list-toolbar">
<div class="filter-group">
<el-select
v-model="queryParams.fileStatus"
placeholder="文件状态"
clearable
@change="loadFileList"
style="width: 150px"
>
<el-option label="上传中" value="uploading"></el-option>
<el-option label="解析中" value="parsing"></el-option>
<el-option label="解析成功" value="parsed_success"></el-option>
<el-option label="解析失败" value="parsed_failed"></el-option>
</el-select>
</div>
<el-button icon="el-icon-refresh" @click="handleManualRefresh">刷新</el-button>
</div>
<el-table :data="fileUploadList" v-loading="listLoading" stripe border>
<el-table-column prop="fileName" label="文件名" min-width="200"></el-table-column>
<el-table-column prop="fileSize" label="文件大小" width="120">
<template slot-scope="scope">
{{ formatFileSize(scope.row.fileSize) }}
</template>
</el-table-column>
<el-table-column prop="fileStatus" label="状态" width="120">
<template slot-scope="scope">
<el-tag :type="getStatusType(scope.row.fileStatus)" size="small">
{{ getStatusText(scope.row.fileStatus) }}
</el-tag>
</template>
</el-table-column>
<el-table-column prop="enterpriseNames" label="主体名称" min-width="150">
<template slot-scope="scope">
{{ scope.row.enterpriseNames || '-' }}
</template>
</el-table-column>
<el-table-column prop="uploadTime" label="上传时间" width="160"></el-table-column>
<el-table-column prop="uploadUser" label="上传人" width="100"></el-table-column>
<el-table-column label="操作" width="120" fixed="right">
<template slot-scope="scope">
<el-button
v-if="scope.row.fileStatus === 'parsed_success'"
type="text"
size="small"
@click="handleViewFlow(scope.row)"
>
查看流水
</el-button>
<el-button
v-if="scope.row.fileStatus === 'parsed_failed'"
type="text"
size="small"
@click="handleViewError(scope.row)"
>
查看错误
</el-button>
<span v-if="scope.row.fileStatus === 'uploading' || scope.row.fileStatus === 'parsing'">
-
</span>
</template>
</el-table-column>
</el-table>
<el-pagination
@current-change="handlePageChange"
:current-page="queryParams.pageNum"
:page-size="queryParams.pageSize"
:total="total"
layout="total, prev, pager, next, jumper"
style="margin-top: 16px; text-align: right"
></el-pagination>
</div>
<!-- 数据质量检查
<div class="quality-check-section">
<div class="section-header">
@@ -120,6 +240,7 @@
>
</span>
</el-dialog>
accept=".pdf,.csv,.xlsx,.xls"
<!-- 名单选择弹窗 -->
<el-dialog
@@ -149,6 +270,63 @@
>
</span>
</el-dialog>
<!-- 批量上传弹窗 -->
<el-dialog
title="批量上传流水文件"
:visible.sync="batchUploadDialogVisible"
:close-on-click-modal="false"
width="700px"
>
<el-upload
class="batch-upload-area"
drag
action="#"
multiple
:auto-upload="false"
:on-change="handleBatchFileChange"
:file-list="selectedFiles"
>
<i class="el-icon-upload"></i>
<div class="el-upload__text">将文件拖到此处<em>点击上传</em></div>
<div class="el-upload__tip" slot="tip">
支持 PDFCSVExcel 格式文件最多100个文件单个文件不超过50MB
</div>
</el-upload>
<div v-if="selectedFiles.length > 0" class="selected-files">
<div class="files-header">
<span>已选择 {{ selectedFiles.length }} 个文件</span>
</div>
<div class="files-list">
<div
v-for="(file, index) in selectedFiles"
:key="index"
class="file-item"
>
<i class="el-icon-document"></i>
<span class="file-name">{{ file.name }}</span>
<span class="file-size">{{ formatFileSize(file.size) }}</span>
<el-button
type="text"
icon="el-icon-close"
@click="handleRemoveFile(index)"
></el-button>
</div>
</div>
</div>
<span slot="footer">
<el-button @click="batchUploadDialogVisible = false">取消</el-button>
<el-button
type="primary"
:loading="uploadLoading"
:disabled="selectedFiles.length === 0"
@click="handleBatchUpload"
>开始上传</el-button
>
</span>
</el-dialog>
</div>
</template>
@@ -160,6 +338,9 @@ import {
pullBankInfo,
updateNameListSelection,
uploadFile,
batchUploadFiles,
getFileUploadList,
getFileUploadStatistics,
} from "@/api/ccdiProjectUpload";
export default {
@@ -218,7 +399,7 @@ export default {
{
key: "transaction",
title: "流水导入",
desc: "支持 Excel、PDF 格式文件上传",
desc: "支持 PDF、CSV、Excel 格式文件上传",
icon: "el-icon-document",
btnText: "上传流水",
uploaded: false,
@@ -261,6 +442,34 @@ export default {
level: "info",
},
],
// === 批量上传相关 ===
batchUploadDialogVisible: false,
selectedFiles: [],
uploadLoading: false,
// === 统计数据 ===
statistics: {
uploading: 0,
parsing: 0,
parsed_success: 0,
parsed_failed: 0,
},
// === 文件列表相关 ===
fileUploadList: [],
listLoading: false,
queryParams: {
projectId: null,
fileStatus: null,
pageNum: 1,
pageSize: 20,
},
total: 0,
// === 轮询相关 ===
pollingTimer: null,
pollingEnabled: false,
pollingInterval: 5000,
};
},
created() {
@@ -272,6 +481,20 @@ export default {
mounted() {
// 组件挂载后监听项目ID变化
this.$watch("projectId", this.loadInitialData);
// 加载统计数据和文件列表
this.loadStatistics();
this.loadFileList();
// 检查是否需要启动轮询
this.$nextTick(() => {
if (this.statistics.uploading > 0 || this.statistics.parsing > 0) {
this.startPolling();
}
});
},
beforeDestroy() {
this.stopPolling();
},
methods: {
/** 加载初始数据 */
@@ -361,13 +584,19 @@ export default {
const card = this.uploadCards.find((c) => c.key === key);
if (!card) return;
if (key === "namelist") {
this.showNameListDialog = true;
} else {
if (key === "transaction") {
// 流水导入 - 打开批量上传弹窗
this.batchUploadDialogVisible = true;
this.selectedFiles = [];
} else if (key === "credit") {
// 征信导入 - 保持现有逻辑
this.uploadFileType = key;
this.uploadDialogTitle = `上传${card.title}`;
this.uploadFileTypes = card.desc.replace(/.*支持|上传/g, "").trim();
this.showUploadDialog = true;
} else if (key === "namelist") {
// 名单库选择 - 保持现有逻辑
this.showNameListDialog = true;
}
},
/** 文件选择变化 */
@@ -601,6 +830,223 @@ export default {
};
return statusMap[status] || "未知";
},
// === 批量上传相关方法 ===
/** 批量上传的文件选择变化 */
handleBatchFileChange(file, fileList) {
if (fileList.length > 100) {
this.$message.warning("最多上传100个文件");
fileList = fileList.slice(0, 100);
}
const validTypes = ['.pdf', '.csv', '.xlsx', '.xls'];
const invalidFiles = fileList.filter((f) => {
const ext = f.name.substring(f.name.lastIndexOf(".")).toLowerCase();
return !validTypes.includes(ext);
});
if (invalidFiles.length > 0) {
this.$message.error("仅支持 PDF、CSV、Excel 格式文件");
return;
}
const oversizedFiles = fileList.filter((f) => f.size > 50 * 1024 * 1024);
if (oversizedFiles.length > 0) {
this.$message.error("单个文件不能超过50MB");
return;
}
this.selectedFiles = fileList;
},
/** 删除已选文件 */
handleRemoveFile(index) {
this.selectedFiles.splice(index, 1);
},
/** 开始批量上传 */
async handleBatchUpload() {
if (this.selectedFiles.length === 0) {
this.$message.warning("请选择要上传的文件");
return;
}
this.uploadLoading = true;
try {
const res = await batchUploadFiles(
this.projectId,
this.selectedFiles.map((f) => f.raw)
);
this.uploadLoading = false;
this.batchUploadDialogVisible = false;
this.$message.success("上传任务已提交,请查看处理进度");
// 刷新数据并启动轮询
await Promise.all([this.loadStatistics(), this.loadFileList()]);
this.startPolling();
} catch (error) {
this.uploadLoading = false;
this.$message.error("上传失败:" + (error.msg || "未知错误"));
}
},
// === 统计和列表相关方法 ===
/** 加载统计数据 */
async loadStatistics() {
try {
const res = await getFileUploadStatistics(this.projectId);
this.statistics = res.data || {
uploading: 0,
parsing: 0,
parsed_success: 0,
parsed_failed: 0,
};
} catch (error) {
console.error("加载统计数据失败:", error);
}
},
/** 加载文件列表 */
async loadFileList() {
this.listLoading = true;
try {
const params = {
projectId: this.projectId,
fileStatus: this.queryParams.fileStatus,
pageNum: this.queryParams.pageNum,
pageSize: this.queryParams.pageSize,
};
const res = await getFileUploadList(params);
this.fileUploadList = res.rows || [];
this.total = res.total || 0;
} catch (error) {
this.$message.error("加载文件列表失败");
console.error(error);
} finally {
this.listLoading = false;
}
},
// === 轮询相关方法 ===
/** 启动轮询 */
startPolling() {
if (this.pollingEnabled) return;
this.pollingEnabled = true;
const poll = () => {
if (!this.pollingEnabled) return;
Promise.all([this.loadStatistics(), this.loadFileList()])
.then(() => {
if (
this.statistics.uploading === 0 &&
this.statistics.parsing === 0
) {
this.stopPolling();
return;
}
this.pollingTimer = setTimeout(poll, this.pollingInterval);
})
.catch((error) => {
console.error("轮询失败:", error);
this.pollingTimer = setTimeout(poll, this.pollingInterval);
});
};
poll();
},
/** 停止轮询 */
stopPolling() {
this.pollingEnabled = false;
if (this.pollingTimer) {
clearTimeout(this.pollingTimer);
this.pollingTimer = null;
}
},
/** 手动刷新 */
async handleManualRefresh() {
await Promise.all([this.loadStatistics(), this.loadFileList()]);
this.$message.success("刷新成功");
if (this.statistics.uploading > 0 || this.statistics.parsing > 0) {
this.startPolling();
}
},
// === 辅助方法 ===
/** 状态筛选 */
handleStatusFilter(status) {
this.queryParams.fileStatus = status;
this.queryParams.pageNum = 1;
this.loadFileList();
},
/** 分页变化 */
handlePageChange(pageNum) {
this.queryParams.pageNum = pageNum;
this.loadFileList();
},
/** 查看流水 */
handleViewFlow(record) {
this.$emit("menu-change", {
key: "detail",
route: "detail",
params: { logId: record.logId },
});
},
/** 查看错误 */
handleViewError(record) {
this.$alert(record.errorMessage || "未知错误", "错误信息", {
confirmButtonText: "确定",
type: "error",
});
},
/** 状态文本映射 */
getStatusText(status) {
const map = {
uploading: "上传中",
parsing: "解析中",
parsed_success: "解析成功",
parsed_failed: "解析失败",
};
return map[status] || status;
},
/** 状态标签类型映射 */
getStatusType(status) {
const map = {
uploading: "primary",
parsing: "warning",
parsed_success: "success",
parsed_failed: "danger",
};
return map[status] || "info";
},
/** 格式化文件大小 */
formatFileSize(bytes) {
if (bytes === 0) return "0 B";
const k = 1024;
const sizes = ["B", "KB", "MB", "GB"];
const i = Math.floor(Math.log(bytes) / Math.log(k));
return parseFloat((bytes / Math.pow(k, i)).toFixed(2)) + " " + sizes[i];
},
},
};
</script>
@@ -876,6 +1322,97 @@ export default {
}
}
// 统计卡片区域
.statistics-section {
display: grid;
grid-template-columns: repeat(4, 1fr);
gap: 16px;
margin-bottom: 16px;
.stat-card {
background: #fff;
border-radius: 4px;
padding: 20px;
display: flex;
align-items: center;
gap: 16px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05);
cursor: pointer;
transition: all 0.3s;
&:hover {
transform: translateY(-2px);
box-shadow: 0 4px 12px rgba(0, 0, 0, 0.1);
}
.stat-icon {
width: 48px;
height: 48px;
border-radius: 50%;
display: flex;
align-items: center;
justify-content: center;
font-size: 24px;
&.uploading {
background: rgba(64, 158, 255, 0.1);
color: #409eff;
}
&.parsing {
background: rgba(230, 162, 60, 0.1);
color: #e6a23c;
}
&.success {
background: rgba(103, 194, 58, 0.1);
color: #67c23a;
}
&.failed {
background: rgba(245, 108, 108, 0.1);
color: #f56c6c;
}
}
.stat-content {
flex: 1;
.stat-label {
font-size: 14px;
color: #909399;
margin-bottom: 4px;
}
.stat-value {
font-size: 24px;
font-weight: 600;
color: #303133;
}
}
}
}
// 文件列表区域
.file-list-section {
background: #fff;
border-radius: 4px;
padding: 20px;
box-shadow: 0 2px 8px rgba(0, 0, 0, 0.05);
.list-toolbar {
display: flex;
justify-content: space-between;
align-items: center;
margin-bottom: 16px;
.filter-group {
display: flex;
gap: 12px;
}
}
}
// 上传弹窗样式
::v-deep .el-dialog__wrapper {
.upload-area {
@@ -898,6 +1435,83 @@ export default {
}
}
// 批量上传弹窗样式
.batch-upload-area {
width: 100%;
::v-deep .el-upload {
width: 100%;
.el-upload-dragger {
width: 100%;
height: 180px;
}
}
}
.selected-files {
margin-top: 16px;
border: 1px solid #ebeef5;
border-radius: 4px;
max-height: 300px;
overflow-y: auto;
.files-header {
padding: 12px 16px;
background: #f5f7fa;
border-bottom: 1px solid #ebeef5;
font-size: 14px;
font-weight: 500;
color: #606266;
}
.files-list {
padding: 8px;
.file-item {
display: flex;
align-items: center;
padding: 8px 12px;
border-radius: 4px;
transition: background 0.3s;
&:hover {
background: #f5f7fa;
}
i {
font-size: 18px;
color: #1890ff;
margin-right: 8px;
}
.file-name {
flex: 1;
font-size: 14px;
color: #303133;
overflow: hidden;
text-overflow: ellipsis;
white-space: nowrap;
}
.file-size {
font-size: 12px;
color: #909399;
margin: 0 12px;
}
.el-button {
padding: 4px;
color: #909399;
&:hover {
color: #f56c6c;
}
}
}
}
}
// 响应式
@media (max-width: 1200px) {
.upload-section .upload-cards {
@@ -908,6 +1522,10 @@ export default {
grid-template-columns: repeat(3, 1fr);
gap: 16px;
}
.statistics-section {
grid-template-columns: repeat(2, 1fr);
}
}
@media (max-width: 768px) {
@@ -932,5 +1550,15 @@ export default {
.quality-check-section .metrics {
grid-template-columns: 1fr;
}
.statistics-section {
grid-template-columns: 1fr;
}
.file-list-section .list-toolbar {
flex-direction: column;
align-items: flex-start;
gap: 12px;
}
}
</style>