实现历史项目流水复制后端逻辑

This commit is contained in:
wkc
2026-03-29 09:54:05 +08:00
parent b098d4eed1
commit 46d190aa74
8 changed files with 412 additions and 7 deletions

View File

@@ -30,6 +30,11 @@ public interface CcdiBankStatementMapper extends BaseMapper<CcdiBankStatement> {
int deleteByProjectIdAndBatchId(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId);
List<CcdiBankStatement> selectStatementsForHistoryImport(@Param("projectId") Long projectId,
@Param("batchId") Integer batchId,
@Param("startDate") String startDate,
@Param("endDate") String endDate);
Page<CcdiBankStatementListVO> selectStatementPage(Page<CcdiBankStatementListVO> page,
@Param("query") CcdiBankStatementQueryDTO query);

View File

@@ -25,6 +25,14 @@ public interface CcdiFileUploadRecordMapper extends BaseMapper<CcdiFileUploadRec
*/
int insertBatch(@Param("list") List<CcdiFileUploadRecord> records);
/**
* 查询来源项目中解析成功且具备logId的文件记录
*
* @param projectIds 来源项目ID列表
* @return 文件记录列表
*/
List<CcdiFileUploadRecord> selectSuccessfulRecordsByProjectIds(@Param("projectIds") List<Long> projectIds);
/**
* 统计各状态文件数量
*

View File

@@ -1,11 +1,27 @@
package com.ruoyi.ccdi.project.service.impl;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.dto.CcdiProjectImportHistoryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import com.ruoyi.ccdi.project.service.ICcdiProjectHistoryImportService;
import jakarta.annotation.Resource;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
import java.util.concurrent.CompletableFuture;
import java.util.ArrayList;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Objects;
import java.util.Set;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicInteger;
/**
* 历史项目导入服务实现
@@ -16,12 +32,154 @@ import java.util.concurrent.CompletableFuture;
@Service
public class CcdiProjectHistoryImportServiceImpl implements ICcdiProjectHistoryImportService {
private static final AtomicInteger HISTORY_IMPORT_BATCH_ID = new AtomicInteger(1_000_000);
@Resource
private CcdiBankStatementMapper bankStatementMapper;
@Resource
private CcdiFileUploadRecordMapper recordMapper;
@Resource
private CcdiProjectMapper projectMapper;
@Resource
@Qualifier("fileUploadExecutor")
private Executor fileUploadExecutor;
@Override
public void submitImport(Long targetProjectId, Integer targetLsfxProjectId,
CcdiProjectImportHistoryDTO dto, String operator) {
CompletableFuture.runAsync(() -> log.info(
"【项目历史导入】任务已提交: projectId={}, lsfxProjectId={}, sourceProjectIds={}, operator={}",
targetProjectId, targetLsfxProjectId, dto.getSourceProjectIds(), operator
));
fileUploadExecutor.execute(() -> executeImport(targetProjectId, targetLsfxProjectId, dto, operator));
}
private void executeImport(Long targetProjectId, Integer targetLsfxProjectId,
CcdiProjectImportHistoryDTO dto, String operator) {
List<CcdiFileUploadRecord> sourceRecords = recordMapper.selectSuccessfulRecordsByProjectIds(dto.getSourceProjectIds());
if (sourceRecords == null || sourceRecords.isEmpty()) {
log.info("【项目历史导入】无可复制的来源批次: projectId={}, sourceProjectIds={}",
targetProjectId, dto.getSourceProjectIds());
return;
}
List<CcdiBankStatement> statementsToInsert = new ArrayList<>();
List<CcdiFileUploadRecord> recordsToInsert = new ArrayList<>();
Set<String> dedupKeys = new HashSet<>();
for (CcdiFileUploadRecord sourceRecord : sourceRecords) {
List<CcdiBankStatement> sourceStatements = bankStatementMapper.selectStatementsForHistoryImport(
sourceRecord.getProjectId(), sourceRecord.getLogId(), dto.getStartDate(), dto.getEndDate()
);
if (sourceStatements == null || sourceStatements.isEmpty()) {
continue;
}
Integer newBatchId = HISTORY_IMPORT_BATCH_ID.incrementAndGet();
int sizeBefore = statementsToInsert.size();
for (CcdiBankStatement sourceStatement : sourceStatements) {
CcdiBankStatement targetStatement = copyStatement(sourceStatement, targetProjectId, targetLsfxProjectId, newBatchId);
if (dedupKeys.add(buildDedupKey(targetStatement))) {
statementsToInsert.add(targetStatement);
}
}
if (statementsToInsert.size() > sizeBefore) {
recordsToInsert.add(buildHistoryImportRecord(
sourceRecord, targetProjectId, targetLsfxProjectId, newBatchId, resolveSourceProjectName(sourceRecord.getProjectId()), operator
));
}
}
if (!statementsToInsert.isEmpty()) {
bankStatementMapper.insertBatch(statementsToInsert);
}
if (!recordsToInsert.isEmpty()) {
recordMapper.insertBatch(recordsToInsert);
}
}
private CcdiBankStatement copyStatement(CcdiBankStatement sourceStatement, Long targetProjectId,
Integer targetLsfxProjectId, Integer newBatchId) {
CcdiBankStatement targetStatement = new CcdiBankStatement();
BeanUtils.copyProperties(sourceStatement, targetStatement);
targetStatement.setBankStatementId(null);
targetStatement.setProjectId(targetProjectId);
targetStatement.setGroupId(targetLsfxProjectId);
targetStatement.setBatchId(newBatchId);
normalizeDedupFields(targetStatement);
return targetStatement;
}
private CcdiFileUploadRecord buildHistoryImportRecord(CcdiFileUploadRecord sourceRecord, Long targetProjectId,
Integer targetLsfxProjectId, Integer newBatchId,
String sourceProjectName, String operator) {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(targetProjectId);
record.setLsfxProjectId(targetLsfxProjectId);
record.setLogId(newBatchId);
record.setFileName(sourceRecord.getFileName());
record.setFileSize(sourceRecord.getFileSize());
record.setFileStatus("parsed_success");
record.setEnterpriseNames(sourceRecord.getEnterpriseNames());
record.setAccountNos(sourceRecord.getAccountNos());
record.setUploadTime(new Date());
record.setUploadUser(operator);
record.setSourceType("HISTORY_IMPORT");
record.setSourceProjectId(sourceRecord.getProjectId());
record.setSourceProjectName(sourceProjectName);
return record;
}
private String resolveSourceProjectName(Long sourceProjectId) {
CcdiProject sourceProject = projectMapper.selectById(sourceProjectId);
return sourceProject == null ? null : sourceProject.getProjectName();
}
private void normalizeDedupFields(CcdiBankStatement statement) {
statement.setLeAccountNo(trimToNull(statement.getLeAccountNo()));
}
private String trimToNull(String value) {
if (value == null) {
return null;
}
String trimmed = value.trim();
return trimmed.isEmpty() ? null : trimmed;
}
private String buildDedupKey(CcdiBankStatement statement) {
return String.join("|",
valueOf(statement.getTrxDate()),
valueOf(statement.getCurrency()),
valueOf(statement.getLeAccountNo()),
valueOf(statement.getLeAccountName()),
valueOf(statement.getAccountId()),
valueOf(statement.getCustomerAccountName()),
valueOf(statement.getCustomerAccountNo()),
valueOf(statement.getCustomerBank()),
valueOf(statement.getCustomerReference()),
valueOf(statement.getCustomerCertNo()),
valueOf(statement.getCustomerSocialCreditCode()),
valueOf(statement.getAmountDr()),
valueOf(statement.getAmountCr()),
valueOf(statement.getAmountBalance()),
valueOf(statement.getCashType()),
valueOf(statement.getUserMemo()),
valueOf(statement.getBankComments()),
valueOf(statement.getBankTrxNumber()),
valueOf(statement.getBank()),
valueOf(statement.getTrxFlag()),
valueOf(statement.getTrxType()),
valueOf(statement.getExceptionType()),
valueOf(statement.getInternalFlag()),
valueOf(statement.getCreateDate()),
valueOf(statement.getPaymentMethod()),
valueOf(statement.getCretNo())
);
}
private String valueOf(Object value) {
return Objects.toString(value, "");
}
}

View File

@@ -299,6 +299,33 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<include refid="statementOrderBy"/>
</select>
<select id="selectStatementsForHistoryImport" resultMap="CcdiBankStatementResult">
SELECT
<include refid="selectCcdiBankStatementVo"/>
FROM ccdi_bank_statement bs
<where>
bs.project_id = #{projectId}
AND bs.batch_id = #{batchId}
<if test="startDate != null and startDate != ''">
AND (<include refid="parsedTrxDateExpr"/>) <![CDATA[ >= ]]>
CASE
WHEN LENGTH(TRIM(#{startDate})) = 10
THEN STR_TO_DATE(CONCAT(TRIM(#{startDate}), ' 00:00:00'), '%Y-%m-%d %H:%i:%s')
ELSE STR_TO_DATE(TRIM(#{startDate}), '%Y-%m-%d %H:%i:%s')
END
</if>
<if test="endDate != null and endDate != ''">
AND (<include refid="parsedTrxDateExpr"/>) <![CDATA[ <= ]]>
CASE
WHEN LENGTH(TRIM(#{endDate})) = 10
THEN STR_TO_DATE(CONCAT(TRIM(#{endDate}), ' 23:59:59'), '%Y-%m-%d %H:%i:%s')
ELSE STR_TO_DATE(TRIM(#{endDate}), '%Y-%m-%d %H:%i:%s')
END
</if>
</where>
ORDER BY bs.batch_sequence ASC, bs.bank_statement_id ASC
</select>
<select id="selectStatementDetailById" resultMap="CcdiBankStatementDetailVOResultMap">
SELECT
bs.bank_statement_id AS bankStatementId,

View File

@@ -32,13 +32,13 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
<!-- 批量插入 -->
<insert id="insertBatch" parameterType="java.util.List" useGeneratedKeys="true" keyProperty="id">
insert into ccdi_file_upload_record (
project_id, lsfx_project_id, file_name, file_size, file_status,
project_id, lsfx_project_id, log_id, file_name, file_size, file_status,
source_type, source_project_id, source_project_name,
enterprise_names, account_nos, upload_time, upload_user
) values
<foreach collection="list" item="item" separator=",">
(
#{item.projectId}, #{item.lsfxProjectId}, #{item.fileName},
#{item.projectId}, #{item.lsfxProjectId}, #{item.logId}, #{item.fileName},
#{item.fileSize}, #{item.fileStatus}, #{item.sourceType},
#{item.sourceProjectId}, #{item.sourceProjectName},
#{item.enterpriseNames}, #{item.accountNos}, #{item.uploadTime}, #{item.uploadUser}
@@ -46,6 +46,17 @@ PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
</foreach>
</insert>
<select id="selectSuccessfulRecordsByProjectIds" resultMap="CcdiFileUploadRecordResult">
<include refid="selectCcdiFileUploadRecordVo"/>
where project_id in
<foreach collection="projectIds" item="projectId" open="(" separator="," close=")">
#{projectId}
</foreach>
and file_status = 'parsed_success'
and log_id is not null
order by project_id asc, log_id asc, id asc
</select>
<!-- 统计各状态文件数量 -->
<select id="countByStatus" resultType="java.util.Map">
select file_status as `status`, count(*) as count

View File

@@ -150,6 +150,28 @@ class CcdiBankStatementMapperXmlTest {
}
}
@Test
void historyImportQueries_shouldFilterSuccessfulSourceBatchesAndDateRange() throws Exception {
try (InputStream inputStream = getClass().getClassLoader()
.getResourceAsStream("mapper/ccdi/project/CcdiFileUploadRecordMapper.xml")) {
String xml = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
assertTrue(xml.contains("selectSuccessfulRecordsByProjectIds"), xml);
assertTrue(xml.contains("file_status = 'parsed_success'"), xml);
assertTrue(xml.contains("log_id is not null"), xml);
}
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(RESOURCE)) {
String xml = new String(inputStream.readAllBytes(), StandardCharsets.UTF_8);
assertTrue(xml.contains("selectStatementsForHistoryImport"), xml);
assertTrue(xml.contains("bs.project_id = #{projectId}"), xml);
assertTrue(xml.contains("bs.batch_id = #{batchId}"), xml);
assertTrue(xml.contains("#{startDate}"), xml);
assertTrue(xml.contains("#{endDate}"), xml);
}
}
@Test
void insertBatch_shouldAvoidUpdatingAutoIncrementPrimaryKeyInDuplicateBranch() throws Exception {
try (InputStream inputStream = getClass().getClassLoader().getResourceAsStream(RESOURCE)) {

View File

@@ -0,0 +1,166 @@
package com.ruoyi.ccdi.project.service.impl;
import com.ruoyi.ccdi.project.domain.CcdiProject;
import com.ruoyi.ccdi.project.domain.dto.CcdiProjectImportHistoryDTO;
import com.ruoyi.ccdi.project.domain.entity.CcdiBankStatement;
import com.ruoyi.ccdi.project.domain.entity.CcdiFileUploadRecord;
import com.ruoyi.ccdi.project.mapper.CcdiBankStatementMapper;
import com.ruoyi.ccdi.project.mapper.CcdiFileUploadRecordMapper;
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.mockito.InjectMocks;
import org.mockito.Mock;
import org.mockito.junit.jupiter.MockitoExtension;
import java.math.BigDecimal;
import java.util.List;
import java.util.concurrent.Executor;
import java.util.concurrent.atomic.AtomicReference;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyList;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@ExtendWith(MockitoExtension.class)
class CcdiProjectHistoryImportServiceImplTest {
@InjectMocks
private CcdiProjectHistoryImportServiceImpl service;
@Mock
private CcdiBankStatementMapper bankStatementMapper;
@Mock
private CcdiFileUploadRecordMapper recordMapper;
@Mock
private CcdiProjectMapper projectMapper;
@Mock
private Executor fileUploadExecutor;
@Test
void shouldFilterStatementsByTrxDateAndDeduplicateAcrossSourceProjects() {
CcdiProjectImportHistoryDTO dto = buildImportDto();
when(recordMapper.selectSuccessfulRecordsByProjectIds(dto.getSourceProjectIds()))
.thenReturn(List.of(buildSourceRecord(11L, 101, "批次A"), buildSourceRecord(12L, 202, "批次B")));
when(bankStatementMapper.selectStatementsForHistoryImport(11L, 101, "2026-01-01", "2026-01-31"))
.thenReturn(List.of(buildStatement("2026-01-10", "6222", "100.00", "备注A")));
when(bankStatementMapper.selectStatementsForHistoryImport(12L, 202, "2026-01-01", "2026-01-31"))
.thenReturn(List.of(
buildStatement("2026-01-10", "6222", "100.00", "备注A"),
buildStatement("2026-01-11", "6333", "200.00", "备注B")
));
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(fileUploadExecutor).execute(any(Runnable.class));
AtomicReference<List<CcdiBankStatement>> insertedStatements = new AtomicReference<>();
doAnswer(invocation -> {
insertedStatements.set(List.copyOf(invocation.getArgument(0)));
return insertedStatements.get().size();
}).when(bankStatementMapper).insertBatch(anyList());
when(projectMapper.selectById(11L)).thenReturn(buildProject(11L, "历史项目A"));
when(projectMapper.selectById(12L)).thenReturn(buildProject(12L, "历史项目B"));
service.submitImport(90L, 3001, dto, "tester");
assertEquals(2, insertedStatements.get().size());
assertTrue(insertedStatements.get().stream().allMatch(item -> Long.valueOf(90L).equals(item.getProjectId())));
assertTrue(insertedStatements.get().stream().allMatch(item -> Integer.valueOf(3001).equals(item.getGroupId())));
verify(bankStatementMapper).selectStatementsForHistoryImport(11L, 101, "2026-01-01", "2026-01-31");
verify(bankStatementMapper).selectStatementsForHistoryImport(12L, 202, "2026-01-01", "2026-01-31");
}
@Test
void shouldGenerateNewBatchIdsAndHistoryImportFileRecordsOnlyForSuccessfulSourceBatches() {
CcdiProjectImportHistoryDTO dto = buildImportDto();
CcdiFileUploadRecord sourceRecord = buildSourceRecord(11L, 101, "批次A");
when(recordMapper.selectSuccessfulRecordsByProjectIds(dto.getSourceProjectIds()))
.thenReturn(List.of(sourceRecord, buildSourceRecord(12L, 202, "批次B")));
when(bankStatementMapper.selectStatementsForHistoryImport(11L, 101, "2026-01-01", "2026-01-31"))
.thenReturn(List.of(buildStatement("2026-01-12", "6444", "300.00", "备注C")));
when(bankStatementMapper.selectStatementsForHistoryImport(12L, 202, "2026-01-01", "2026-01-31"))
.thenReturn(List.of());
doAnswer(invocation -> {
Runnable runnable = invocation.getArgument(0);
runnable.run();
return null;
}).when(fileUploadExecutor).execute(any(Runnable.class));
AtomicReference<List<CcdiBankStatement>> insertedStatements = new AtomicReference<>();
doAnswer(invocation -> {
insertedStatements.set(List.copyOf(invocation.getArgument(0)));
return insertedStatements.get().size();
}).when(bankStatementMapper).insertBatch(anyList());
AtomicReference<List<CcdiFileUploadRecord>> insertedRecords = new AtomicReference<>();
doAnswer(invocation -> {
insertedRecords.set(List.copyOf(invocation.getArgument(0)));
return insertedRecords.get().size();
}).when(recordMapper).insertBatch(anyList());
when(projectMapper.selectById(11L)).thenReturn(buildProject(11L, "历史项目A"));
service.submitImport(90L, 3001, dto, "tester");
assertEquals(1, insertedStatements.get().size());
assertNotEquals(101, insertedStatements.get().get(0).getBatchId());
assertEquals(1, insertedRecords.get().size());
assertEquals("HISTORY_IMPORT", insertedRecords.get().get(0).getSourceType());
assertEquals(11L, insertedRecords.get().get(0).getSourceProjectId());
assertEquals("历史项目A", insertedRecords.get().get(0).getSourceProjectName());
assertEquals("parsed_success", insertedRecords.get().get(0).getFileStatus());
assertNotEquals(sourceRecord.getLogId(), insertedRecords.get().get(0).getLogId());
}
private CcdiProjectImportHistoryDTO buildImportDto() {
CcdiProjectImportHistoryDTO dto = new CcdiProjectImportHistoryDTO();
dto.setProjectName("新项目");
dto.setDescription("从历史复制");
dto.setSourceProjectIds(List.of(11L, 12L));
dto.setStartDate("2026-01-01");
dto.setEndDate("2026-01-31");
return dto;
}
private CcdiFileUploadRecord buildSourceRecord(Long sourceProjectId, Integer logId, String fileName) {
CcdiFileUploadRecord record = new CcdiFileUploadRecord();
record.setProjectId(sourceProjectId);
record.setLogId(logId);
record.setFileName(fileName);
record.setFileStatus("parsed_success");
return record;
}
private CcdiProject buildProject(Long projectId, String projectName) {
CcdiProject project = new CcdiProject();
project.setProjectId(projectId);
project.setProjectName(projectName);
return project;
}
private CcdiBankStatement buildStatement(String trxDate, String accountNo, String amountCr, String memo) {
CcdiBankStatement statement = new CcdiBankStatement();
statement.setTrxDate(trxDate);
statement.setLeAccountNo(accountNo);
statement.setAmountCr(new BigDecimal(amountCr));
statement.setUserMemo(memo);
statement.setCustomerAccountName("对手方");
statement.setCustomerAccountNo("7000");
statement.setBatchSequence(1);
return statement;
}
}

View File

@@ -42,3 +42,11 @@
-`CcdiProjectServiceImpl#importFromHistory` 改为“创建项目后注册 `afterCommit` 回调”,事务提交后再调用 `historyImportService.submitImport(...)`
-`CcdiProjectServiceImplTest` 中新增历史项目列表查询与事务后提交流程测试
- 验证命令:`mvn -pl ccdi-project -am -Dtest=CcdiProjectServiceImplTest,CcdiProjectControllerTest -Dsurefire.failIfNoSpecifiedTests=false test`
### 2026-03-29 Task 4 后端流水复制与来源记录生成
-`CcdiBankStatementMapper`/XML 增加历史导入只读查询,按 `projectId + batchId + trxDate` 范围读取来源流水
-`CcdiFileUploadRecordMapper`/XML 增加来源项目成功批次查询,并在批量插入时补齐 `log_id`
-`CcdiProjectHistoryImportServiceImpl` 扩展为真实异步复制链路:读取成功批次、按新批次号重建目标流水、按来源字段生成历史导入文件记录,并在导入前做内存去重
- 新增 `CcdiProjectHistoryImportServiceImplTest` 与 XML 断言,验证日期范围传递、跨来源批次去重、新批次号生成及来源标识写入
- 验证命令:`mvn -pl ccdi-project -am -Dtest=CcdiProjectHistoryImportServiceImplTest,CcdiFileUploadServiceImplTest,CcdiBankStatementMapperXmlTest -Dsurefire.failIfNoSpecifiedTests=false test`