diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java new file mode 100644 index 0000000..f37b758 --- /dev/null +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/log/FileUploadLogAppender.java @@ -0,0 +1,103 @@ +package com.ruoyi.ccdi.project.log; + +import ch.qos.logback.classic.PatternLayout; +import ch.qos.logback.classic.spi.ILoggingEvent; +import ch.qos.logback.core.FileAppender; +import ch.qos.logback.core.UnsynchronizedAppenderBase; +import lombok.extern.slf4j.Slf4j; + +import java.io.File; +import java.text.SimpleDateFormat; +import java.util.Date; + +/** + * 文件上传批次日志Appender + * 为每个批次创建独立的日志文件 + * + * @author ruoyi + * @date 2026-03-05 + */ +@Slf4j +public class FileUploadLogAppender extends UnsynchronizedAppenderBase { + + private static final ThreadLocal> currentAppender = new ThreadLocal<>(); + + private PatternLayout layout; + + @Override + public void start() { + // 初始化日志格式 + this.layout = new PatternLayout(); + this.layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"); + this.layout.setContext(getContext()); + this.layout.start(); + + super.start(); + log.info("【文件上传日志】FileUploadLogAppender已启动"); + } + + @Override + protected void append(ILoggingEvent event) { + FileAppender appender = currentAppender.get(); + if (appender != null) { + appender.doAppend(event); + } + } + + /** + * 为指定批次创建独立的日志文件 + * + * @param uploadPath ruoyi.profile配置的上传路径 + * @param projectId 项目ID + * @param batchId 批次ID + */ + public static void createBatchLogFile(String uploadPath, Long projectId, String batchId) { + try { + // 构建日志文件路径: {ruoyi.profile}/logs/file-upload/{projectId}/{timestamp}.log + String timestamp = new SimpleDateFormat("yyyyMMdd-HHmmss").format(new Date()); + String logDirPath = uploadPath + File.separator + "logs" + File.separator + + "file-upload" + File.separator + projectId; + + // 确保目录存在 + File logDir = new File(logDirPath); + if (!logDir.exists()) { + logDir.mkdirs(); + } + + String logFilePath = logDirPath + File.separator + timestamp + ".log"; + + // 创建FileAppender + FileAppender appender = new FileAppender<>(); + appender.setFile(logFilePath); + + PatternLayout layout = new PatternLayout(); + layout.setPattern("%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n"); + layout.setContext(appender.getContext()); + layout.start(); + + appender.setLayout(layout); + appender.setAppend(true); + appender.setContext(appender.getContext()); + appender.start(); + + currentAppender.set(appender); + + log.info("【文件上传日志】创建批次日志文件: path={}, batchId={}", logFilePath, batchId); + + } catch (Exception e) { + log.error("【文件上传日志】创建批次日志文件失败: projectId={}, batchId={}", projectId, batchId, e); + } + } + + /** + * 关闭当前批次的日志文件 + */ + public static void closeBatchLogFile() { + FileAppender appender = currentAppender.get(); + if (appender != null) { + appender.stop(); + currentAppender.remove(); + log.info("【文件上传日志】关闭批次日志文件"); + } + } +} diff --git a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java index c911c83..b288ef4 100644 --- a/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java +++ b/ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java @@ -4,11 +4,18 @@ import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper; import com.baomidou.mybatisplus.extension.plugins.pagination.Page; import com.ruoyi.ccdi.project.domain.CcdiProject; import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO; +import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement; import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord; import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO; +import com.ruoyi.ccdi.project.log.FileUploadLogAppender; +import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper; import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper; import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper; import com.ruoyi.ccdi.project.service.ICcdiFileUploadService; +import com.ruoyi.lsfx.client.LsfxAnalysisClient; +import com.ruoyi.lsfx.domain.request.GetBankStatementRequest; +import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest; +import com.ruoyi.lsfx.domain.response.*; import jakarta.annotation.Resource; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Qualifier; @@ -58,6 +65,12 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { @Qualifier("fileUploadExecutor") private Executor fileUploadExecutor; + @Resource + private LsfxAnalysisClient lsfxClient; + + @Resource + private CcdiBankStatementMapper bankStatementMapper; + /** * 获取临时文件存储目录 */ @@ -240,52 +253,60 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { String batchId) { log.info("【文件上传】调度线程启动: projectId={}, batchId={}", projectId, batchId); - // 循环提交任务 - for (int i = 0; i < tempFilePaths.size(); i++) { - // Critical Fix #6: 检查线程中断状态 - if (Thread.currentThread().isInterrupted()) { - log.warn("【文件上传】调度线程被中断,停止提交剩余任务"); - break; - } + // 创建批次日志文件 + FileUploadLogAppender.createBatchLogFile(uploadPath, projectId, batchId); - String tempFilePath = tempFilePaths.get(i); - CcdiFileUploadRecord record = records.get(i); + try { + // 循环提交任务 + for (int i = 0; i < tempFilePaths.size(); i++) { + // Critical Fix #6: 检查线程中断状态 + if (Thread.currentThread().isInterrupted()) { + log.warn("【文件上传】调度线程被中断,停止提交剩余任务"); + break; + } - boolean submitted = false; - int retryCount = 0; + String tempFilePath = tempFilePaths.get(i); + CcdiFileUploadRecord record = records.get(i); - while (!submitted && retryCount < 2) { - try { - // 尝试提交异步任务 - CompletableFuture.runAsync( - () -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record), - fileUploadExecutor - ); - submitted = true; - log.info("【文件上传】任务提交成功: fileName={}, recordId={}", - record.getFileName(), record.getId()); - } catch (RejectedExecutionException e) { - retryCount++; - if (retryCount == 1) { - log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", - record.getFileName()); - try { - Thread.sleep(30000); - } catch (InterruptedException ie) { - Thread.currentThread().interrupt(); - log.error("【文件上传】等待被中断: fileName={}", record.getFileName()); - updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); - break; + boolean submitted = false; + int retryCount = 0; + + while (!submitted && retryCount < 2) { + try { + // 尝试提交异步任务 + CompletableFuture.runAsync( + () -> processFileAsync(projectId, lsfxProjectId, tempFilePath, record.getId(), batchId, record), + fileUploadExecutor + ); + submitted = true; + log.info("【文件上传】任务提交成功: fileName={}, recordId={}", + record.getFileName(), record.getId()); + } catch (RejectedExecutionException e) { + retryCount++; + if (retryCount == 1) { + log.warn("【文件上传】线程池已满,等待30秒后重试: fileName={}", + record.getFileName()); + try { + Thread.sleep(30000); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + log.error("【文件上传】等待被中断: fileName={}", record.getFileName()); + updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断"); + break; + } + } else { + log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName()); + updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); } - } else { - log.error("【文件上传】重试失败,放弃任务: fileName={}", record.getFileName()); - updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试"); } } } - } - log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId); + log.info("【文件上传】调度线程完成: projectId={}, batchId={}", projectId, batchId); + } finally { + // 关闭批次日志文件 + FileUploadLogAppender.closeBatchLogFile(); + } } /** @@ -326,14 +347,27 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { } // 步骤2:上传文件到流水分析平台 - log.info("【文件上传】步骤2: 上传文件到流水分析平台"); - // TODO: 调用 lsfxClient.uploadFile() - // 需要将临时文件转换为MultipartFile或直接使用文件路径 - // UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, filePath.toFile()); - // Integer logId = uploadResponse.getData().getLogId(); + log.info("【文件上传】步骤2: 上传文件到流水分析平台, tempPath={}", tempFilePath); - // 临时模拟 logId - Integer logId = (int) (System.currentTimeMillis() % 1000000); + File file = filePath.toFile(); + if (!file.exists()) { + throw new RuntimeException("临时文件不存在: " + tempFilePath); + } + + UploadFileResponse uploadResponse = lsfxClient.uploadFile(lsfxProjectId, file); + if (uploadResponse == null || uploadResponse.getData() == null + || uploadResponse.getData().getUploadLogList() == null + || uploadResponse.getData().getUploadLogList().isEmpty()) { + throw new RuntimeException("上传文件失败: 响应数据为空"); + } + + // 从 uploadLogList 中获取第一个 logId + Integer logId = uploadResponse.getData().getUploadLogList().get(0).getLogId(); + if (logId == null) { + throw new RuntimeException("上传文件失败: 未返回logId"); + } + + log.info("【文件上传】文件上传成功: logId={}", logId); // 步骤3:更新状态为 parsing log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId); @@ -343,42 +377,67 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { // 步骤4:轮询解析状态(最多300次,间隔2秒) log.info("【文件上传】步骤4: 开始轮询解析状态"); - // TODO: 实现真实的轮询逻辑 - // boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); - boolean parsingComplete = true; // 临时模拟 + boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString()); if (!parsingComplete) { throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确"); } // 步骤5:获取文件上传状态 - log.info("【文件上传】步骤5: 获取文件上传状态"); - // TODO: 调用 lsfxClient.getFileUploadStatus() - // GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(...); + log.info("【文件上传】步骤5: 获取文件上传状态: logId={}", logId); + + GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest(); + statusRequest.setGroupId(lsfxProjectId); + statusRequest.setLogId(logId); + + GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest); + + if (statusResponse == null || statusResponse.getData() == null + || statusResponse.getData().getLogs() == null + || statusResponse.getData().getLogs().isEmpty()) { + throw new RuntimeException("获取文件上传状态失败: 响应数据为空"); + } + + // 获取第一个log项(因为我们传了logId,应该只返回一个) + GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0); + Integer status = logItem.getStatus(); + String uploadStatusDesc = logItem.getUploadStatusDesc(); + + log.info("【文件上传】文件状态: status={}, uploadStatusDesc={}", status, uploadStatusDesc); // 步骤6:判断解析结果 - // TODO: 实现真实的判断逻辑 - boolean parseSuccess = true; // 临时模拟 + // status=-5 且 uploadStatusDesc="data.wait.confirm.newaccount" 表示解析成功 + boolean parseSuccess = status != null && status == -5 + && "data.wait.confirm.newaccount".equals(uploadStatusDesc); if (parseSuccess) { // 解析成功 log.info("【文件上传】步骤6: 解析成功,保存主体信息"); + + // 提取主体名称和账号 + List enterpriseNames = logItem.getEnterpriseNameList(); + List accountNos = logItem.getAccountNoList(); + + String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : ""; + String accountNosStr = accountNos != null ? String.join(",", accountNos) : ""; + record.setFileStatus("parsed_success"); - // TODO: 从实际的解析结果中获取 - record.setEnterpriseNames("测试主体1,测试主体2"); - record.setAccountNos("622xxx,623xxx"); + record.setEnterpriseNames(enterpriseNamesStr); + record.setAccountNos(accountNosStr); recordMapper.updateById(record); + log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}", + enterpriseNamesStr, accountNosStr); + // 步骤7:获取流水数据并保存 log.info("【文件上传】步骤7: 获取流水数据"); - // TODO: 实现 fetchAndSaveBankStatements - // fetchAndSaveBankStatements(projectId, lsfxProjectId, logId, totalCount); + fetchAndSaveBankStatements(projectId, lsfxProjectId, logId); } else { // 解析失败 - log.warn("【文件上传】步骤6: 解析失败"); + log.warn("【文件上传】步骤6: 解析失败: status={}, desc={}", status, uploadStatusDesc); record.setFileStatus("parsed_failed"); - record.setErrorMessage("解析失败:文件格式错误"); + record.setErrorMessage("解析失败: " + uploadStatusDesc); recordMapper.updateById(record); } @@ -402,21 +461,155 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService { } /** - * 轮询解析状态 - * TODO: 实现真实逻辑 + * 轮询解析状态(固定间隔2秒,最多300次) + * + * @param groupId 项目ID + * @param logId 文件ID + * @return true=解析完成,false=超时未完成 */ private boolean waitForParsingComplete(Integer groupId, String logId) { - // TODO: 调用 lsfxClient.checkParseStatus() 轮询 - return true; + log.info("【文件上传】开始轮询解析状态: groupId={}, logId={}", groupId, logId); + + int maxRetries = 300; + int intervalSeconds = 2; + + for (int i = 1; i <= maxRetries; i++) { + try { + // 调用检查解析状态接口 + CheckParseStatusResponse response = lsfxClient.checkParseStatus(groupId, logId); + + if (response == null || response.getData() == null) { + log.warn("【文件上传】轮询第{}次: 响应数据为空", i); + Thread.sleep(intervalSeconds * 1000L); + continue; + } + + Boolean parsing = response.getData().getParsing(); + log.debug("【文件上传】轮询第{}次: parsing={}", i, parsing); + + // parsing=false 表示解析完成 + if (Boolean.FALSE.equals(parsing)) { + log.info("【文件上传】解析完成: logId={}, 轮询次数={}", logId, i); + return true; + } + + // 未完成,等待后继续 + if (i < maxRetries) { + Thread.sleep(intervalSeconds * 1000L); + } + + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + log.error("【文件上传】轮询被中断: logId={}", logId, e); + return false; + } catch (Exception e) { + log.error("【文件上传】轮询异常: logId={}, 次数={}", logId, i, e); + // 继续轮询,不中断 + } + } + + log.warn("【文件上传】轮询超时: logId={}, 已轮询{}次", logId, maxRetries); + return false; } /** - * 获取并保存流水数据 - * TODO: 实现真实逻辑 + * 获取并保存流水数据(每页1000条,批量插入每批1000条) + * + * @param projectId 项目ID(业务字段) + * @param groupId 流水分析平台项目ID + * @param logId 文件ID */ - private void fetchAndSaveBankStatements(Long projectId, Integer groupId, - Integer logId, int totalCount) { - // TODO: 调用 lsfxClient.getBankStatement() 获取流水 - // TODO: 批量插入到 ccdi_bank_statement + private void fetchAndSaveBankStatements(Long projectId, Integer groupId, Integer logId) { + log.info("【文件上传】开始获取流水数据: projectId={}, groupId={}, logId={}", + projectId, groupId, logId); + + // 步骤1: 先调用一次接口获取 totalCount + GetBankStatementRequest firstRequest = new GetBankStatementRequest(); + firstRequest.setGroupId(groupId); + firstRequest.setLogId(logId); + firstRequest.setPageNow(1); + firstRequest.setPageSize(1); // 只获取1条,用于获取总数 + + GetBankStatementResponse firstResponse = lsfxClient.getBankStatement(firstRequest); + + if (firstResponse == null || firstResponse.getData() == null) { + log.warn("【文件上传】获取流水数据失败: 响应数据为空"); + return; + } + + Integer totalCount = firstResponse.getData().getTotalCount(); + if (totalCount == null || totalCount <= 0) { + log.warn("【文件上传】无流水数据需要保存: totalCount={}", totalCount); + return; + } + + log.info("【文件上传】获取到总数: totalCount={}", totalCount); + + // 步骤2: 计算分页信息 + int pageSize = 1000; // 每页1000条 + int batchSize = 1000; // 批量插入每批1000条(与pageSize保持一致) + int totalPages = (int) Math.ceil((double) totalCount / pageSize); + + log.info("【文件上传】分页信息: 每页{}条, 共{}页", pageSize, totalPages); + + List batchList = new ArrayList<>(batchSize); + int totalSaved = 0; + + // 步骤3: 循环分页获取所有数据 + for (int pageNow = 1; pageNow <= totalPages; pageNow++) { + try { + // 构建请求参数 + GetBankStatementRequest request = new GetBankStatementRequest(); + request.setGroupId(groupId); + request.setLogId(logId); + request.setPageNow(pageNow); + request.setPageSize(pageSize); + + // 获取流水数据 + GetBankStatementResponse response = lsfxClient.getBankStatement(request); + + if (response == null || response.getData() == null + || response.getData().getBankStatementList() == null) { + log.warn("【文件上传】获取流水数据为空: pageNow={}", pageNow); + continue; + } + + List items = + response.getData().getBankStatementList(); + + log.debug("【文件上传】获取第{}页数据: {}条", pageNow, items.size()); + + // 转换并收集到批量列表 + for (GetBankStatementResponse.BankStatementItem item : items) { + CcdiBankStatement statement = CcdiBankStatement.fromResponse(item); + if (statement != null) { + statement.setProjectId(projectId); // 设置业务项目ID + batchList.add(statement); + + // 达到批量插入阈值(1000条),执行插入 + if (batchList.size() >= batchSize) { + bankStatementMapper.insertBatch(batchList); + totalSaved += batchList.size(); + log.debug("【文件上传】批量插入流水: {}条, 累计{}条", + batchList.size(), totalSaved); + batchList.clear(); + } + } + } + + } catch (Exception e) { + log.error("【文件上传】获取或保存流水数据失败: pageNow={}", pageNow, e); + // 继续处理下一页,不中断整个流程 + } + } + + // 步骤4: 保存剩余的数据 + if (!batchList.isEmpty()) { + bankStatementMapper.insertBatch(batchList); + totalSaved += batchList.size(); + log.debug("【文件上传】批量插入剩余流水: {}条", batchList.size()); + } + + log.info("【文件上传】流水数据保存完成: 总共保存{}条", totalSaved); } }