完成拉取本行信息后端实现与校验

This commit is contained in:
wkc
2026-03-11 17:16:34 +08:00
parent 52a5056a70
commit 9dd12d9ef0
9 changed files with 627 additions and 81 deletions

View File

@@ -2,8 +2,10 @@ package com.ruoyi.ccdi.project.controller;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO;
import com.ruoyi.ccdi.project.domain.dto.CcdiPullBankInfoSubmitDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.domain.vo.CcdiIdCardParseVO;
import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import com.ruoyi.common.core.controller.BaseController;
import com.ruoyi.common.core.domain.AjaxResult;
@@ -15,9 +17,12 @@ import io.swagger.v3.oas.annotations.Operation;
import io.swagger.v3.oas.annotations.tags.Tag;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.util.CollectionUtils;
import org.springframework.util.StringUtils;
import org.springframework.web.bind.annotation.*;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
import java.util.concurrent.RejectedExecutionException;
/**
@@ -85,6 +90,48 @@ public class CcdiFileUploadController extends BaseController {
}
}
/**
* 解析身份证文件
*/
@PostMapping("/parse-id-card-file")
@Operation(summary = "解析身份证文件", description = "解析首个sheet第一列的身份证号")
public AjaxResult parseIdCardFile(@RequestParam MultipartFile file) {
if (file == null || file.isEmpty()) {
return AjaxResult.error("身份证文件不能为空");
}
List<String> idCards = fileUploadService.parseIdCardFile(file);
return AjaxResult.success("解析成功", new CcdiIdCardParseVO(idCards, idCards.size()));
}
/**
* 提交拉取本行信息任务
*/
@PostMapping("/pull-bank-info")
@Operation(summary = "拉取本行信息", description = "按身份证号批量提交拉取本行信息任务")
public AjaxResult pullBankInfo(@RequestBody CcdiPullBankInfoSubmitDTO dto) {
if (dto == null || dto.getProjectId() == null) {
return AjaxResult.error("项目ID不能为空");
}
if (CollectionUtils.isEmpty(dto.getIdCards())) {
return AjaxResult.error("身份证号不能为空");
}
if (!StringUtils.hasText(dto.getStartDate()) || !StringUtils.hasText(dto.getEndDate())) {
return AjaxResult.error("开始日期和结束日期不能为空");
}
Long userId = SecurityUtils.getUserId();
String username = SecurityUtils.getUsername();
String batchId = fileUploadService.submitPullBankInfo(
dto.getProjectId(),
dto.getIdCards(),
dto.getStartDate(),
dto.getEndDate(),
userId,
username
);
return AjaxResult.success("拉取任务已提交", batchId);
}
/**
* 查询上传记录列表
*/

View File

@@ -0,0 +1,24 @@
package com.ruoyi.ccdi.project.domain.dto;
import lombok.Data;
import java.util.List;
/**
* 拉取本行信息提交参数
*/
@Data
public class CcdiPullBankInfoSubmitDTO {
/** 项目ID */
private Long projectId;
/** 身份证号列表 */
private List<String> idCards;
/** 开始日期 */
private String startDate;
/** 结束日期 */
private String endDate;
}

View File

@@ -0,0 +1,14 @@
package com.ruoyi.ccdi.project.domain.excel;
import com.alibaba.excel.annotation.ExcelProperty;
import lombok.Data;
/**
* 身份证导入行
*/
@Data
public class CcdiIdCardExcelRow {
@ExcelProperty(index = 0)
private String idCard;
}

View File

@@ -0,0 +1,22 @@
package com.ruoyi.ccdi.project.domain.vo;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
import java.util.List;
/**
* 身份证文件解析结果
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class CcdiIdCardParseVO {
/** 解析到的身份证列表 */
private List<String> idCards;
/** 数量 */
private Integer count;
}

View File

@@ -6,6 +6,8 @@ import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import org.springframework.web.multipart.MultipartFile;
import java.util.List;
/**
* 文件上传服务接口
*
@@ -24,6 +26,32 @@ public interface ICcdiFileUploadService {
*/
String batchUploadFiles(Long projectId, MultipartFile[] files, String username);
/**
* 解析身份证文件
*
* @param file Excel 文件
* @return 身份证号列表
*/
List<String> parseIdCardFile(MultipartFile file);
/**
* 提交拉取本行信息任务
*
* @param projectId 项目ID
* @param idCards 身份证号列表
* @param startDate 开始日期
* @param endDate 结束日期
* @param userId 当前登录用户ID
* @param username 当前登录用户名
* @return 批次ID
*/
String submitPullBankInfo(Long projectId,
List<String> idCards,
String startDate,
String endDate,
Long userId,
String username);
/**
* 查询上传记录列表
*

View File

@@ -1,17 +1,20 @@
package com.ruoyi.ccdi.project.service.impl;
import com.alibaba.excel.EasyExcel;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.dto.CcdiFileUploadQueryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.domain.excel.CcdiIdCardExcelRow;
import com.ruoyi.ccdi.project.domain.vo.CcdiFileUploadStatisticsVO;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.ccdi.project.service.ICcdiFileUploadService;
import com.ruoyi.lsfx.client.LsfxAnalysisClient;
import com.ruoyi.lsfx.domain.request.FetchInnerFlowRequest;
import com.ruoyi.lsfx.domain.request.GetBankStatementRequest;
import com.ruoyi.lsfx.domain.request.GetFileUploadStatusRequest;
import com.ruoyi.lsfx.domain.response.*;
@@ -34,10 +37,13 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.StandardCopyOption;
import java.time.LocalDate;
import java.time.format.DateTimeFormatter;
import java.util.*;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.RejectedExecutionException;
import java.util.regex.Pattern;
/**
* 文件上传服务实现
@@ -50,6 +56,8 @@ import java.util.concurrent.RejectedExecutionException;
public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
private static final int MAX_ERROR_MESSAGE_LENGTH = 2000;
private static final Pattern ID_CARD_PATTERN =
Pattern.compile("^[1-9]\\d{5}(18|19|20)\\d{2}(0[1-9]|1[0-2])([0-2]\\d|3[01])\\d{3}[0-9Xx]$");
@Data
private static class FetchBankStatementResult {
@@ -88,6 +96,116 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
return uploadPath + File.separator + "temp";
}
@Override
public List<String> parseIdCardFile(MultipartFile file) {
if (file == null || file.isEmpty()) {
throw new RuntimeException("身份证文件不能为空");
}
try {
List<CcdiIdCardExcelRow> rows = EasyExcel.read(file.getInputStream())
.head(CcdiIdCardExcelRow.class)
.sheet(0)
.headRowNumber(1)
.doReadSync();
LinkedHashSet<String> idCards = new LinkedHashSet<>();
for (CcdiIdCardExcelRow row : rows) {
if (row == null) {
continue;
}
String idCard = row.getIdCard();
if (!StringUtils.hasText(idCard)) {
continue;
}
String normalized = idCard.trim();
if (!ID_CARD_PATTERN.matcher(normalized).matches()) {
continue;
}
idCards.add(normalized);
}
if (idCards.isEmpty()) {
throw new RuntimeException("首个sheet第一列未解析到有效身份证号");
}
return new ArrayList<>(idCards);
} catch (IOException e) {
throw new RuntimeException("读取身份证文件失败: " + e.getMessage(), e);
}
}
@Transactional
@Override
public String submitPullBankInfo(Long projectId,
List<String> idCards,
String startDate,
String endDate,
Long userId,
String username) {
if (projectId == null) {
throw new IllegalArgumentException("项目ID不能为空");
}
if (!StringUtils.hasText(startDate) || !StringUtils.hasText(endDate)) {
throw new IllegalArgumentException("开始日期和结束日期不能为空");
}
if (idCards == null || idCards.isEmpty()) {
throw new IllegalArgumentException("身份证号不能为空");
}
LocalDate start = LocalDate.parse(startDate);
LocalDate end = LocalDate.parse(endDate);
if (start.isAfter(end)) {
throw new IllegalArgumentException("开始日期不能晚于结束日期");
}
CcdiProject project = projectMapper.selectById(projectId);
if (project == null) {
throw new IllegalArgumentException("项目不存在: projectId=" + projectId);
}
Integer lsfxProjectId = project.getLsfxProjectId();
if (lsfxProjectId == null) {
throw new IllegalStateException("项目未关联流水分析平台: projectId=" + projectId);
}
String batchId = UUID.randomUUID().toString().replace("-", "");
Date now = new Date();
List<CcdiFileUploadRecord> records = new ArrayList<>();
List<String> normalizedIdCards = new ArrayList<>();
for (String idCard : idCards) {
if (!StringUtils.hasText(idCard)) {
continue;
}
String normalized = idCard.trim();
normalizedIdCards.add(normalized);
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(projectId);
record.setLsfxProjectId(lsfxProjectId);
record.setFileName(normalized);
record.setFileSize(0L);
record.setFileStatus("uploading");
record.setAccountNos(normalized);
record.setUploadTime(now);
record.setUploadUser(username);
records.add(record);
}
if (records.isEmpty()) {
throw new IllegalArgumentException("身份证号不能为空");
}
recordMapper.insertBatch(records);
TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() {
@Override
public void afterCommit() {
CompletableFuture.runAsync(() -> submitPullBankInfoTasks(
projectId, lsfxProjectId, records, normalizedIdCards, startDate, endDate, userId, batchId
));
}
});
return batchId;
}
@Override
public Page<CcdiFileUploadRecord> selectPage(Page<CcdiFileUploadRecord> page,
CcdiFileUploadQueryDTO queryDTO) {
@@ -351,6 +469,90 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
statement.setLeAccountNo(trimAccountNo(statement.getLeAccountNo()));
}
private void submitPullBankInfoTasks(Long projectId,
Integer lsfxProjectId,
List<CcdiFileUploadRecord> records,
List<String> idCards,
String startDate,
String endDate,
Long userId,
String batchId) {
log.info("【拉取本行信息】调度线程启动: projectId={}, batchId={}", projectId, batchId);
for (int i = 0; i < records.size(); i++) {
if (Thread.currentThread().isInterrupted()) {
log.warn("【拉取本行信息】调度线程被中断,停止提交剩余任务");
break;
}
CcdiFileUploadRecord record = records.get(i);
String idCard = idCards.get(i);
boolean submitted = false;
int retryCount = 0;
while (!submitted && retryCount < 2) {
try {
CompletableFuture.runAsync(
() -> processPullBankInfoAsync(projectId, lsfxProjectId, record, idCard, startDate, endDate, userId),
fileUploadExecutor
);
submitted = true;
log.info("【拉取本行信息】任务提交成功: idCard={}, recordId={}", idCard, record.getId());
} catch (RejectedExecutionException e) {
retryCount++;
if (retryCount == 1) {
log.warn("【拉取本行信息】线程池已满,等待30秒后重试: idCard={}", idCard);
try {
Thread.sleep(30000);
} catch (InterruptedException interruptedException) {
Thread.currentThread().interrupt();
log.error("【拉取本行信息】等待被中断: idCard={}", idCard);
updateRecordStatus(record.getId(), "parsed_failed", "任务提交被中断");
break;
}
} else {
log.error("【拉取本行信息】重试失败,放弃任务: idCard={}", idCard);
updateRecordStatus(record.getId(), "parsed_failed", "系统繁忙,请稍后重试");
}
}
}
}
}
public void processPullBankInfoAsync(Long projectId,
Integer lsfxProjectId,
CcdiFileUploadRecord record,
String idCard,
String startDate,
String endDate,
Long userId) {
try {
FetchInnerFlowRequest request = new FetchInnerFlowRequest();
request.setGroupId(lsfxProjectId);
request.setCustomerNo(idCard);
request.setDataChannelCode("ZJRCU");
request.setRequestDateId(Integer.parseInt(LocalDate.now().format(DateTimeFormatter.BASIC_ISO_DATE)));
request.setDataStartDateId(Integer.parseInt(startDate.replace("-", "")));
request.setDataEndDateId(Integer.parseInt(endDate.replace("-", "")));
request.setUploadUserId(toUploadUserId(userId));
FetchInnerFlowResponse response = lsfxClient.fetchInnerFlow(request);
if (response == null || response.getData() == null || response.getData().isEmpty()) {
throw new RuntimeException("拉取本行信息失败: 未返回logId");
}
Integer logId = response.getData().get(0);
if (logId == null) {
throw new RuntimeException("拉取本行信息失败: 未返回logId");
}
processRecordAfterLogIdReady(projectId, lsfxProjectId, record, logId);
} catch (Exception e) {
log.error("【拉取本行信息】处理失败: idCard={}, recordId={}", idCard, record.getId(), e);
updateFailedRecord(record, e.getMessage());
}
}
/**
* 异步处理单个文件的完整流程
* 包含:上传 → 轮询解析状态 → 获取结果 → 保存流水数据
@@ -399,84 +601,7 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
log.info("【文件上传】文件上传成功: logId={}", logId);
// 步骤3:更新状态为 parsing
log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId);
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
// 步骤4:轮询解析状态(最多300次,间隔2秒)
log.info("【文件上传】步骤4: 开始轮询解析状态");
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确");
}
// 步骤5:获取文件上传状态
log.info("【文件上传】步骤5: 获取文件上传状态: logId={}", logId);
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
if (statusResponse == null || statusResponse.getData() == null
|| statusResponse.getData().getLogs() == null
|| statusResponse.getData().getLogs().isEmpty()) {
throw new RuntimeException("获取文件上传状态失败: 响应数据为空");
}
// 获取第一个log项因为我们传了logId应该只返回一个
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
log.info("【文件上传】文件状态: status={}, uploadStatusDesc={}", status, uploadStatusDesc);
// 步骤6:判断解析结果
// status=-5 且 uploadStatusDesc="data.wait.confirm.newaccount" 表示解析成功
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (parseSuccess) {
// 解析成功
log.info("【文件上传】步骤6: 解析成功,保存主体信息");
// 提取主体名称和账号
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}",
enterpriseNamesStr, accountNosStr);
// 步骤7:获取流水数据并保存
log.info("【文件上传】步骤7: 获取流水数据");
FetchBankStatementResult fetchResult =
fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
if (!fetchResult.isSuccess()) {
updateFailedRecord(record, fetchResult.getErrorMessage());
return;
}
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
record.setErrorMessage(null);
recordMapper.updateById(record);
} else {
// 解析失败
log.warn("【文件上传】步骤6: 解析失败: status={}, desc={}", status, uploadStatusDesc);
updateFailedRecord(record, "解析失败: " + uploadStatusDesc);
}
processRecordAfterLogIdReady(projectId, lsfxProjectId, record, logId);
log.info("【文件上传】处理完成: fileName={}", record.getFileName());
} catch (Exception e) {
@@ -496,6 +621,74 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
}
}
private void processRecordAfterLogIdReady(Long projectId,
Integer lsfxProjectId,
CcdiFileUploadRecord record,
Integer logId) {
log.info("【文件上传】步骤3: 更新状态为解析中, logId={}", logId);
record.setLogId(logId);
record.setFileStatus("parsing");
recordMapper.updateById(record);
log.info("【文件上传】步骤4: 开始轮询解析状态");
boolean parsingComplete = waitForParsingComplete(lsfxProjectId, logId.toString());
if (!parsingComplete) {
throw new RuntimeException("解析超时(超过10分钟),请检查文件格式是否正确");
}
log.info("【文件上传】步骤5: 获取文件上传状态: logId={}", logId);
GetFileUploadStatusRequest statusRequest = new GetFileUploadStatusRequest();
statusRequest.setGroupId(lsfxProjectId);
statusRequest.setLogId(logId);
GetFileUploadStatusResponse statusResponse = lsfxClient.getFileUploadStatus(statusRequest);
if (statusResponse == null || statusResponse.getData() == null
|| statusResponse.getData().getLogs() == null
|| statusResponse.getData().getLogs().isEmpty()) {
throw new RuntimeException("获取文件上传状态失败: 响应数据为空");
}
GetFileUploadStatusResponse.LogItem logItem = statusResponse.getData().getLogs().get(0);
Integer status = logItem.getStatus();
String uploadStatusDesc = logItem.getUploadStatusDesc();
String fileName = StringUtils.hasText(logItem.getUploadFileName())
? logItem.getUploadFileName()
: logItem.getDownloadFileName();
if (StringUtils.hasText(fileName)) {
record.setFileName(fileName);
}
log.info("【文件上传】文件状态: status={}, uploadStatusDesc={}", status, uploadStatusDesc);
boolean parseSuccess = status != null && status == -5
&& "data.wait.confirm.newaccount".equals(uploadStatusDesc);
if (!parseSuccess) {
log.warn("【文件上传】步骤6: 解析失败: status={}, desc={}", status, uploadStatusDesc);
updateFailedRecord(record, "解析失败: " + uploadStatusDesc);
return;
}
log.info("【文件上传】步骤6: 解析成功,保存主体信息");
List<String> enterpriseNames = logItem.getEnterpriseNameList();
List<String> accountNos = logItem.getAccountNoList();
String enterpriseNamesStr = enterpriseNames != null ? String.join(",", enterpriseNames) : "";
String accountNosStr = accountNos != null ? String.join(",", accountNos) : "";
log.info("【文件上传】主体信息已保存: enterpriseNames={}, accountNos={}",
enterpriseNamesStr, accountNosStr);
log.info("【文件上传】步骤7: 获取流水数据");
FetchBankStatementResult fetchResult = fetchAndSaveBankStatements(projectId, lsfxProjectId, logId);
if (!fetchResult.isSuccess()) {
updateFailedRecord(record, fetchResult.getErrorMessage());
return;
}
record.setFileStatus("parsed_success");
record.setEnterpriseNames(enterpriseNamesStr);
record.setAccountNos(accountNosStr);
record.setErrorMessage(null);
recordMapper.updateById(record);
}
/**
* 轮询解析状态固定间隔2秒最多300次
*
@@ -665,4 +858,11 @@ public class CcdiFileUploadServiceImpl implements ICcdiFileUploadService {
private void cleanupBankStatements(Long projectId, Integer logId) {
bankStatementMapper.deleteByProjectIdAndBatchId(projectId, logId);
}
private Integer toUploadUserId(Long userId) {
if (userId == null) {
throw new IllegalArgumentException("当前登录用户ID不能为空");
}
return Math.toIntExact(userId);
}
}

View File

@@ -30,13 +30,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
insert into ccdi_file_upload_record (
project_id, lsfx_project_id, file_name, file_size, file_status,
upload_time, upload_user
enterprise_names, account_nos, upload_time, upload_user
) values
<foreach collection="list" item="item" separator=",">
(
#{item.projectId}, #{item.lsfxProjectId}, #{item.fileName},
#{item.fileSize}, #{item.fileStatus}, #{item.uploadTime},
#{item.uploadUser}
#{item.fileSize}, #{item.fileStatus}, #{item.enterpriseNames},
#{item.accountNos}, #{item.uploadTime}, #{item.uploadUser}
)
</foreach>
</insert>