参数保存后异步触发项目流水重打标
This commit is contained in:
@@ -5,6 +5,7 @@ import org.springframework.context.annotation.Configuration;
|
||||
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
import java.util.concurrent.ThreadPoolExecutor;
|
||||
|
||||
/**
|
||||
* 流水标签线程池配置
|
||||
@@ -12,6 +13,23 @@ import java.util.concurrent.Executor;
|
||||
@Configuration
|
||||
public class BankTagThreadPoolConfig {
|
||||
|
||||
/**
|
||||
* 项目级重打标异步调度线程池
|
||||
*
|
||||
* @return 线程池执行器
|
||||
*/
|
||||
@Bean("tagRebuildExecutor")
|
||||
public Executor tagRebuildExecutor() {
|
||||
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||
executor.setCorePoolSize(2);
|
||||
executor.setMaxPoolSize(4);
|
||||
executor.setQueueCapacity(100);
|
||||
executor.setThreadNamePrefix("bank-tag-rebuild-");
|
||||
executor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy());
|
||||
executor.initialize();
|
||||
return executor;
|
||||
}
|
||||
|
||||
/**
|
||||
* 规则级并行执行线程池
|
||||
*
|
||||
|
||||
@@ -11,6 +11,9 @@ public enum TriggerType {
|
||||
/** 自动拉取本行信息 */
|
||||
AUTO_PULL_BANK_INFO,
|
||||
|
||||
/** 自动参数变更 */
|
||||
AUTO_PARAM_CHANGE,
|
||||
|
||||
/** 手动触发 */
|
||||
MANUAL
|
||||
}
|
||||
|
||||
@@ -16,6 +16,8 @@ import com.ruoyi.ccdi.project.domain.vo.ModelParamAllVO;
|
||||
import com.ruoyi.ccdi.project.domain.vo.ModelGroupVO;
|
||||
import com.ruoyi.ccdi.project.mapper.CcdiModelParamMapper;
|
||||
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
|
||||
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiBankTagService;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiModelParamService;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiProjectService;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@@ -50,6 +52,9 @@ public class CcdiModelParamServiceImpl implements ICcdiModelParamService {
|
||||
@Resource
|
||||
private ICcdiProjectService projectService;
|
||||
|
||||
@Resource
|
||||
private ICcdiBankTagService bankTagService;
|
||||
|
||||
@Override
|
||||
public List<ModelListVO> selectModelList(Long projectId) {
|
||||
log.info("selectModelList 被调用,projectId={}", projectId);
|
||||
@@ -213,6 +218,11 @@ public class CcdiModelParamServiceImpl implements ICcdiModelParamService {
|
||||
if (!updateList.isEmpty()) {
|
||||
modelParamMapper.batchUpdateParamValues(updateList);
|
||||
log.info("批量更新参数成功, count={}", updateList.size());
|
||||
if (projectId > 0) {
|
||||
bankTagService.submitAutoRebuild(projectId, TriggerType.AUTO_PARAM_CHANGE);
|
||||
log.info("项目参数保存成功,已触发流水自动重打标: projectId={}, updatedCount={}",
|
||||
projectId, updateList.size());
|
||||
}
|
||||
}
|
||||
} catch (ServiceException e) {
|
||||
throw e;
|
||||
|
||||
@@ -5,11 +5,13 @@ import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||
import com.ruoyi.ccdi.project.mapper.CcdiBankTagTaskMapper;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiProjectService;
|
||||
import com.ruoyi.common.exception.ServiceException;
|
||||
import org.springframework.beans.factory.annotation.Qualifier;
|
||||
import org.springframework.context.annotation.Lazy;
|
||||
import org.springframework.stereotype.Component;
|
||||
|
||||
import jakarta.annotation.Resource;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.Executor;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
@@ -20,6 +22,7 @@ import lombok.extern.slf4j.Slf4j;
|
||||
public class ProjectBankTagRebuildCoordinator {
|
||||
|
||||
private final ConcurrentHashMap<Long, Boolean> runningProjects = new ConcurrentHashMap<>();
|
||||
private final ConcurrentHashMap<Long, Boolean> pendingAutoRerunProjects = new ConcurrentHashMap<>();
|
||||
|
||||
@Resource
|
||||
private CcdiBankTagTaskMapper taskMapper;
|
||||
@@ -31,6 +34,10 @@ public class ProjectBankTagRebuildCoordinator {
|
||||
@Resource
|
||||
private ICcdiProjectService projectService;
|
||||
|
||||
@Resource
|
||||
@Qualifier("tagRebuildExecutor")
|
||||
private Executor tagRebuildExecutor;
|
||||
|
||||
/**
|
||||
* 提交手动重算
|
||||
*
|
||||
@@ -62,18 +69,24 @@ public class ProjectBankTagRebuildCoordinator {
|
||||
if (runningTask != null || runningProjects.containsKey(projectId)) {
|
||||
log.warn("【流水标签】项目正在重算,已标记完成后补跑: projectId={}, runningTaskId={}, triggerType={}",
|
||||
projectId, runningTask != null ? runningTask.getId() : null, triggerType);
|
||||
markNeedRerun(runningTask);
|
||||
markNeedRerun(projectId, runningTask);
|
||||
return;
|
||||
}
|
||||
|
||||
executeWithLock(projectId, () -> {
|
||||
projectService.ensureProjectCanStartTagging(projectId);
|
||||
boolean needRerun;
|
||||
do {
|
||||
Long taskId = bankTagService.rebuildProject(projectId, null, "system", triggerType);
|
||||
needRerun = taskId != null && consumeNeedRerun(taskId);
|
||||
} while (needRerun);
|
||||
});
|
||||
if (runningProjects.putIfAbsent(projectId, Boolean.TRUE) != null) {
|
||||
log.warn("【流水标签】项目自动重算已在排队,已标记完成后补跑: projectId={}, triggerType={}",
|
||||
projectId, triggerType);
|
||||
markNeedRerun(projectId, runningTask);
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
tagRebuildExecutor.execute(() -> executeAutoRebuild(projectId, triggerType));
|
||||
log.info("【流水标签】自动重算任务已异步提交: projectId={}, triggerType={}", projectId, triggerType);
|
||||
} catch (RuntimeException ex) {
|
||||
runningProjects.remove(projectId);
|
||||
throw ex;
|
||||
}
|
||||
}
|
||||
|
||||
private void executeWithLock(Long projectId, Runnable action) {
|
||||
@@ -94,24 +107,46 @@ public class ProjectBankTagRebuildCoordinator {
|
||||
return runningProjects.containsKey(projectId) || taskMapper.selectRunningTaskByProjectId(projectId) != null;
|
||||
}
|
||||
|
||||
private void markNeedRerun(CcdiBankTagTask runningTask) {
|
||||
private void executeAutoRebuild(Long projectId, TriggerType triggerType) {
|
||||
try {
|
||||
projectService.ensureProjectCanStartTagging(projectId);
|
||||
boolean needRerun;
|
||||
do {
|
||||
Long taskId = bankTagService.rebuildProject(projectId, null, "system", triggerType);
|
||||
needRerun = taskId != null && consumeNeedRerun(projectId, taskId);
|
||||
} while (needRerun);
|
||||
} catch (Exception ex) {
|
||||
log.error("【流水标签】自动重算执行失败: projectId={}, triggerType={}, error={}",
|
||||
projectId, triggerType, ex.getMessage(), ex);
|
||||
} finally {
|
||||
runningProjects.remove(projectId);
|
||||
log.info("【流水标签】自动重算任务执行结束: projectId={}, triggerType={}", projectId, triggerType);
|
||||
}
|
||||
}
|
||||
|
||||
private void markNeedRerun(Long projectId, CcdiBankTagTask runningTask) {
|
||||
if (runningTask == null) {
|
||||
pendingAutoRerunProjects.put(projectId, Boolean.TRUE);
|
||||
return;
|
||||
}
|
||||
runningTask.setNeedRerun(1);
|
||||
taskMapper.updateTask(runningTask);
|
||||
}
|
||||
|
||||
private boolean consumeNeedRerun(Long taskId) {
|
||||
private boolean consumeNeedRerun(Long projectId, Long taskId) {
|
||||
CcdiBankTagTask finishedTask = taskMapper.selectById(taskId);
|
||||
if (finishedTask == null || finishedTask.getNeedRerun() == null || finishedTask.getNeedRerun() == 0) {
|
||||
boolean taskNeedRerun = finishedTask != null && finishedTask.getNeedRerun() != null && finishedTask.getNeedRerun() == 1;
|
||||
boolean pendingNeedRerun = pendingAutoRerunProjects.remove(projectId) != null;
|
||||
if (!taskNeedRerun && !pendingNeedRerun) {
|
||||
return false;
|
||||
}
|
||||
|
||||
CcdiBankTagTask update = new CcdiBankTagTask();
|
||||
update.setId(taskId);
|
||||
update.setNeedRerun(0);
|
||||
taskMapper.updateTask(update);
|
||||
if (taskNeedRerun) {
|
||||
CcdiBankTagTask update = new CcdiBankTagTask();
|
||||
update.setId(taskId);
|
||||
update.setNeedRerun(0);
|
||||
taskMapper.updateTask(update);
|
||||
}
|
||||
return true;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -2,6 +2,7 @@ package com.ruoyi.ccdi.project.service.impl;
|
||||
|
||||
import com.ruoyi.ccdi.project.domain.CcdiModelParam;
|
||||
import com.ruoyi.ccdi.project.domain.CcdiProject;
|
||||
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||
import com.ruoyi.ccdi.project.domain.dto.ModelParamGroupDTO;
|
||||
import com.ruoyi.ccdi.project.domain.dto.ModelParamSaveDTO;
|
||||
import com.ruoyi.ccdi.project.domain.dto.ModelParamSaveAllDTO;
|
||||
@@ -9,6 +10,7 @@ import com.ruoyi.ccdi.project.domain.dto.ParamValueItem;
|
||||
import com.ruoyi.ccdi.project.domain.vo.ModelParamAllVO;
|
||||
import com.ruoyi.ccdi.project.mapper.CcdiModelParamMapper;
|
||||
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiBankTagService;
|
||||
import com.ruoyi.ccdi.project.service.ICcdiProjectService;
|
||||
import com.ruoyi.common.exception.ServiceException;
|
||||
import com.ruoyi.common.utils.SecurityUtils;
|
||||
@@ -25,9 +27,11 @@ import java.util.List;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
import static org.mockito.ArgumentMatchers.anyList;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mockStatic;
|
||||
import static org.mockito.Mockito.never;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
@@ -46,6 +50,9 @@ class CcdiModelParamServiceImplTest {
|
||||
@Mock
|
||||
private ICcdiProjectService projectService;
|
||||
|
||||
@Mock
|
||||
private ICcdiBankTagService bankTagService;
|
||||
|
||||
@Test
|
||||
void selectAllParams_shouldReadSystemDefaultsForDefaultProject() {
|
||||
CcdiProject project = new CcdiProject();
|
||||
@@ -128,6 +135,42 @@ class CcdiModelParamServiceImplTest {
|
||||
assertThrows(ServiceException.class, () -> service.saveParams(saveDTO));
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveAllParams_shouldSubmitAutoRebuildAfterProjectParamsUpdated() {
|
||||
CcdiProject project = new CcdiProject();
|
||||
project.setProjectId(40L);
|
||||
project.setConfigType("custom");
|
||||
when(projectMapper.selectById(40L)).thenReturn(project);
|
||||
when(modelParamMapper.selectOne(any())).thenReturn(
|
||||
buildParam(1L, 40L, "LARGE_TRANSACTION", "大额交易模型", "SINGLE_TRANSACTION_AMOUNT", "1000")
|
||||
);
|
||||
|
||||
try (MockedStatic<SecurityUtils> mocked = mockStatic(SecurityUtils.class)) {
|
||||
mocked.when(SecurityUtils::getUsername).thenReturn("admin");
|
||||
|
||||
service.saveAllParams(buildSaveAllDto());
|
||||
}
|
||||
|
||||
verify(bankTagService).submitAutoRebuild(40L, TriggerType.AUTO_PARAM_CHANGE);
|
||||
}
|
||||
|
||||
@Test
|
||||
void saveAllParams_shouldNotSubmitAutoRebuildForGlobalDefaults() {
|
||||
ModelParamSaveAllDTO saveAllDTO = buildSaveAllDto();
|
||||
saveAllDTO.setProjectId(0L);
|
||||
when(modelParamMapper.selectOne(any())).thenReturn(
|
||||
buildParam(1L, 0L, "LARGE_TRANSACTION", "大额交易模型", "SINGLE_TRANSACTION_AMOUNT", "1000")
|
||||
);
|
||||
|
||||
try (MockedStatic<SecurityUtils> mocked = mockStatic(SecurityUtils.class)) {
|
||||
mocked.when(SecurityUtils::getUsername).thenReturn("admin");
|
||||
|
||||
service.saveAllParams(saveAllDTO);
|
||||
}
|
||||
|
||||
verify(bankTagService, never()).submitAutoRebuild(any(), any());
|
||||
}
|
||||
|
||||
private CcdiModelParam buildParam(
|
||||
Long id,
|
||||
Long projectId,
|
||||
|
||||
@@ -15,6 +15,8 @@ import org.mockito.Mock;
|
||||
import org.mockito.junit.jupiter.MockitoExtension;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Executor;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
import static org.mockito.ArgumentMatchers.any;
|
||||
@@ -37,6 +39,9 @@ class ProjectBankTagRebuildCoordinatorTest {
|
||||
@Mock
|
||||
private ICcdiProjectService projectService;
|
||||
|
||||
@Mock
|
||||
private Executor tagRebuildExecutor;
|
||||
|
||||
@Test
|
||||
void submitManualRebuild_shouldRejectWhenProjectAlreadyRunning() {
|
||||
CcdiBankTagTask runningTask = new CcdiBankTagTask();
|
||||
@@ -122,4 +127,12 @@ class ProjectBankTagRebuildCoordinatorTest {
|
||||
assertThrows(ServiceException.class,
|
||||
() -> coordinator.submitManual(40L, null, "tester"));
|
||||
}
|
||||
|
||||
@Test
|
||||
void submitAuto_shouldDispatchRebuildAsynchronously() {
|
||||
coordinator.submitAuto(40L, TriggerType.AUTO_PARAM_CHANGE);
|
||||
|
||||
verify(tagRebuildExecutor).execute(any(Runnable.class));
|
||||
verify(bankTagService, never()).rebuildProject(40L, null, "system", TriggerType.AUTO_PARAM_CHANGE);
|
||||
}
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user