feat: 实现流水标签重算服务与协调器
This commit is contained in:
@@ -0,0 +1,30 @@
|
|||||||
|
package com.ruoyi.ccdi.project.config;
|
||||||
|
|
||||||
|
import org.springframework.context.annotation.Bean;
|
||||||
|
import org.springframework.context.annotation.Configuration;
|
||||||
|
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
|
||||||
|
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流水标签线程池配置
|
||||||
|
*/
|
||||||
|
@Configuration
|
||||||
|
public class BankTagThreadPoolConfig {
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 规则级并行执行线程池
|
||||||
|
*
|
||||||
|
* @return 线程池执行器
|
||||||
|
*/
|
||||||
|
@Bean("tagRuleExecutor")
|
||||||
|
public Executor tagRuleExecutor() {
|
||||||
|
ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
|
||||||
|
executor.setCorePoolSize(4);
|
||||||
|
executor.setMaxPoolSize(8);
|
||||||
|
executor.setQueueCapacity(100);
|
||||||
|
executor.setThreadNamePrefix("bank-tag-rule-");
|
||||||
|
executor.initialize();
|
||||||
|
return executor;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,16 @@
|
|||||||
|
package com.ruoyi.ccdi.project.domain.enums;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流水标签触发类型
|
||||||
|
*/
|
||||||
|
public enum TriggerType {
|
||||||
|
|
||||||
|
/** 自动批量上传 */
|
||||||
|
AUTO_BATCH_UPLOAD,
|
||||||
|
|
||||||
|
/** 自动拉取本行信息 */
|
||||||
|
AUTO_PULL_BANK_INFO,
|
||||||
|
|
||||||
|
/** 手动触发 */
|
||||||
|
MANUAL
|
||||||
|
}
|
||||||
@@ -0,0 +1,36 @@
|
|||||||
|
package com.ruoyi.ccdi.project.domain.vo;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagRule;
|
||||||
|
import lombok.Data;
|
||||||
|
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流水标签规则执行配置
|
||||||
|
*/
|
||||||
|
@Data
|
||||||
|
public class BankTagRuleExecutionConfig {
|
||||||
|
|
||||||
|
/** 项目ID */
|
||||||
|
private Long projectId;
|
||||||
|
|
||||||
|
/** 生效参数项目ID */
|
||||||
|
private Long effectiveProjectId;
|
||||||
|
|
||||||
|
/** 规则元数据 */
|
||||||
|
private CcdiBankTagRule ruleMeta;
|
||||||
|
|
||||||
|
/** 阈值配置 */
|
||||||
|
private Map<String, String> thresholdValues = new LinkedHashMap<>();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 获取阈值
|
||||||
|
*
|
||||||
|
* @param paramCode 参数编码
|
||||||
|
* @return 参数值
|
||||||
|
*/
|
||||||
|
public String getThresholdValue(String paramCode) {
|
||||||
|
return thresholdValues.get(paramCode);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -1,6 +1,7 @@
|
|||||||
package com.ruoyi.ccdi.project.service;
|
package com.ruoyi.ccdi.project.service;
|
||||||
|
|
||||||
import com.ruoyi.ccdi.project.domain.dto.CcdiBankTagRebuildDTO;
|
import com.ruoyi.ccdi.project.domain.dto.CcdiBankTagRebuildDTO;
|
||||||
|
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 流水标签服务接口
|
* 流水标签服务接口
|
||||||
@@ -15,4 +16,12 @@ public interface ICcdiBankTagService {
|
|||||||
* @return 提示信息
|
* @return 提示信息
|
||||||
*/
|
*/
|
||||||
String submitRebuild(CcdiBankTagRebuildDTO dto, String operator);
|
String submitRebuild(CcdiBankTagRebuildDTO dto, String operator);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 提交自动重算
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param triggerType 触发方式
|
||||||
|
*/
|
||||||
|
void submitAutoRebuild(Long projectId, TriggerType triggerType);
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -0,0 +1,77 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.CcdiModelParam;
|
||||||
|
import com.ruoyi.ccdi.project.domain.CcdiProject;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagRule;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagRuleExecutionConfig;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiModelParamMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
|
||||||
|
import com.ruoyi.common.exception.ServiceException;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import java.util.LinkedHashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流水标签规则执行参数解析器
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class BankTagRuleConfigResolver {
|
||||||
|
|
||||||
|
private static final Map<String, Set<String>> RULE_PARAM_MAPPING = Map.of(
|
||||||
|
"SINGLE_LARGE_INCOME", Set.of("SINGLE_TRANSACTION_AMOUNT"),
|
||||||
|
"CUMULATIVE_INCOME", Set.of("CUMULATIVE_TRANSACTION_AMOUNT"),
|
||||||
|
"ANNUAL_TURNOVER", Set.of("annual_turnover"),
|
||||||
|
"LARGE_CASH_DEPOSIT", Set.of("LARGE_CASH_DEPOSIT"),
|
||||||
|
"FREQUENT_CASH_DEPOSIT", Set.of("LARGE_CASH_DEPOSIT", "FREQUENT_CASH_DEPOSIT"),
|
||||||
|
"LARGE_TRANSFER", Set.of("FREQUENT_TRANSFER")
|
||||||
|
);
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiProjectMapper projectMapper;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiModelParamMapper modelParamMapper;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 解析规则执行配置
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param ruleMeta 规则元数据
|
||||||
|
* @return 执行配置
|
||||||
|
*/
|
||||||
|
public BankTagRuleExecutionConfig resolve(Long projectId, CcdiBankTagRule ruleMeta) {
|
||||||
|
if (projectId == null) {
|
||||||
|
throw new ServiceException("项目ID不能为空");
|
||||||
|
}
|
||||||
|
if (ruleMeta == null) {
|
||||||
|
throw new ServiceException("规则信息不能为空");
|
||||||
|
}
|
||||||
|
|
||||||
|
CcdiProject project = projectMapper.selectById(projectId);
|
||||||
|
if (project == null) {
|
||||||
|
throw new ServiceException("项目不存在");
|
||||||
|
}
|
||||||
|
|
||||||
|
Long effectiveProjectId = "default".equals(project.getConfigType()) ? 0L : projectId;
|
||||||
|
List<CcdiModelParam> params = modelParamMapper.selectByProjectAndModel(effectiveProjectId, ruleMeta.getModelCode());
|
||||||
|
|
||||||
|
Map<String, String> thresholdValues = new LinkedHashMap<>();
|
||||||
|
Set<String> requiredParamCodes = RULE_PARAM_MAPPING.getOrDefault(ruleMeta.getRuleCode(), Set.of());
|
||||||
|
for (CcdiModelParam param : params) {
|
||||||
|
if (requiredParamCodes.contains(param.getParamCode())) {
|
||||||
|
thresholdValues.put(param.getParamCode(), param.getParamValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
BankTagRuleExecutionConfig config = new BankTagRuleExecutionConfig();
|
||||||
|
config.setProjectId(projectId);
|
||||||
|
config.setEffectiveProjectId(effectiveProjectId);
|
||||||
|
config.setRuleMeta(ruleMeta);
|
||||||
|
config.setThresholdValues(thresholdValues);
|
||||||
|
return config;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,265 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.dto.CcdiBankTagRebuildDTO;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagResult;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagRule;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagTask;
|
||||||
|
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagObjectHitVO;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagRuleExecutionConfig;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagStatementHitVO;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagAnalysisMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagResultMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagRuleMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagTaskMapper;
|
||||||
|
import com.ruoyi.ccdi.project.service.ICcdiBankTagService;
|
||||||
|
import org.springframework.beans.factory.annotation.Qualifier;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Service;
|
||||||
|
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import java.math.BigDecimal;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Date;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CompletableFuture;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 流水标签服务实现
|
||||||
|
*/
|
||||||
|
@Service
|
||||||
|
public class CcdiBankTagServiceImpl implements ICcdiBankTagService {
|
||||||
|
|
||||||
|
private static final String STATUS_RUNNING = "RUNNING";
|
||||||
|
private static final String STATUS_SUCCESS = "SUCCESS";
|
||||||
|
private static final String STATUS_FAILED = "FAILED";
|
||||||
|
private static final String RESULT_TYPE_STATEMENT = "STATEMENT";
|
||||||
|
private static final String OBJECT_TYPE_STAFF_ID_CARD = "STAFF_ID_CARD";
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagRuleMapper ruleMapper;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagResultMapper resultMapper;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagTaskMapper taskMapper;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagAnalysisMapper analysisMapper;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private BankTagRuleConfigResolver configResolver;
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
@Qualifier("tagRuleExecutor")
|
||||||
|
private Executor tagRuleExecutor;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Resource
|
||||||
|
private ProjectBankTagRebuildCoordinator coordinator;
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String submitRebuild(CcdiBankTagRebuildDTO dto, String operator) {
|
||||||
|
coordinator.submitManual(dto.getProjectId(), dto.getModelCode(), operator);
|
||||||
|
return "标签重算任务已提交";
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 提交自动重算
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param triggerType 触发方式
|
||||||
|
*/
|
||||||
|
public void submitAutoRebuild(Long projectId, TriggerType triggerType) {
|
||||||
|
coordinator.submitAuto(projectId, triggerType);
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 执行项目标签重算
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param modelCode 模型编码
|
||||||
|
* @param operator 操作人
|
||||||
|
* @param triggerType 触发方式
|
||||||
|
*/
|
||||||
|
public Long rebuildProject(Long projectId, String modelCode, String operator, TriggerType triggerType) {
|
||||||
|
CcdiBankTagTask task = buildRunningTask(projectId, modelCode, operator, triggerType);
|
||||||
|
taskMapper.insertTask(task);
|
||||||
|
|
||||||
|
try {
|
||||||
|
List<CcdiBankTagRule> rules = ruleMapper.selectEnabledRules(modelCode);
|
||||||
|
resultMapper.deleteByProjectAndModel(projectId, modelCode);
|
||||||
|
|
||||||
|
List<CompletableFuture<List<CcdiBankTagResult>>> futures = rules.stream()
|
||||||
|
.map(rule -> CompletableFuture.supplyAsync(
|
||||||
|
() -> executeRule(projectId, rule, operator),
|
||||||
|
tagRuleExecutor
|
||||||
|
))
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
List<CcdiBankTagResult> allResults = futures.stream()
|
||||||
|
.map(CompletableFuture::join)
|
||||||
|
.flatMap(List::stream)
|
||||||
|
.toList();
|
||||||
|
|
||||||
|
if (!allResults.isEmpty()) {
|
||||||
|
resultMapper.insertBatch(allResults);
|
||||||
|
}
|
||||||
|
|
||||||
|
task.setStatus(STATUS_SUCCESS);
|
||||||
|
task.setSuccessRuleCount(rules.size());
|
||||||
|
task.setFailedRuleCount(0);
|
||||||
|
task.setHitCount(allResults.size());
|
||||||
|
task.setEndTime(new Date());
|
||||||
|
task.setNeedRerun(null);
|
||||||
|
task.setUpdateBy(operator);
|
||||||
|
task.setUpdateTime(new Date());
|
||||||
|
taskMapper.updateTask(task);
|
||||||
|
return task.getId();
|
||||||
|
} catch (Exception ex) {
|
||||||
|
task.setStatus(STATUS_FAILED);
|
||||||
|
task.setErrorMessage(ex.getMessage());
|
||||||
|
task.setEndTime(new Date());
|
||||||
|
task.setNeedRerun(null);
|
||||||
|
task.setUpdateBy(operator);
|
||||||
|
task.setUpdateTime(new Date());
|
||||||
|
taskMapper.updateTask(task);
|
||||||
|
throw ex;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private CcdiBankTagTask buildRunningTask(Long projectId, String modelCode, String operator, TriggerType triggerType) {
|
||||||
|
Date now = new Date();
|
||||||
|
CcdiBankTagTask task = new CcdiBankTagTask();
|
||||||
|
task.setProjectId(projectId);
|
||||||
|
task.setModelCode(modelCode);
|
||||||
|
task.setTriggerType(triggerType.name());
|
||||||
|
task.setStatus(STATUS_RUNNING);
|
||||||
|
task.setNeedRerun(0);
|
||||||
|
task.setSuccessRuleCount(0);
|
||||||
|
task.setFailedRuleCount(0);
|
||||||
|
task.setHitCount(0);
|
||||||
|
task.setStartTime(now);
|
||||||
|
task.setCreateBy(operator);
|
||||||
|
task.setCreateTime(now);
|
||||||
|
task.setUpdateBy(operator);
|
||||||
|
task.setUpdateTime(now);
|
||||||
|
return task;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<CcdiBankTagResult> executeRule(Long projectId, CcdiBankTagRule rule, String operator) {
|
||||||
|
BankTagRuleExecutionConfig config = configResolver.resolve(projectId, rule);
|
||||||
|
if (RESULT_TYPE_STATEMENT.equals(rule.getResultType())) {
|
||||||
|
List<BankTagStatementHitVO> hits = executeStatementRule(projectId, rule, config);
|
||||||
|
return buildStatementResults(projectId, rule, hits, operator);
|
||||||
|
}
|
||||||
|
|
||||||
|
List<BankTagObjectHitVO> hits = executeObjectRule(projectId, rule, config);
|
||||||
|
return buildObjectResults(projectId, rule, hits, operator);
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<BankTagStatementHitVO> executeStatementRule(Long projectId,
|
||||||
|
CcdiBankTagRule rule,
|
||||||
|
BankTagRuleExecutionConfig config) {
|
||||||
|
return switch (rule.getRuleCode()) {
|
||||||
|
case "HOUSE_OR_CAR_EXPENSE" -> analysisMapper.selectHouseOrCarExpenseStatements(projectId);
|
||||||
|
case "TAX_EXPENSE" -> analysisMapper.selectTaxExpenseStatements(projectId);
|
||||||
|
case "SINGLE_LARGE_INCOME" -> analysisMapper.selectSingleLargeIncomeStatements(
|
||||||
|
projectId, toBigDecimal(config.getThresholdValue("SINGLE_TRANSACTION_AMOUNT"))
|
||||||
|
);
|
||||||
|
case "LARGE_CASH_DEPOSIT" -> analysisMapper.selectLargeCashDepositStatements(
|
||||||
|
projectId, toBigDecimal(config.getThresholdValue("LARGE_CASH_DEPOSIT"))
|
||||||
|
);
|
||||||
|
case "LARGE_TRANSFER" -> analysisMapper.selectLargeTransferStatements(
|
||||||
|
projectId, toBigDecimal(config.getThresholdValue("FREQUENT_TRANSFER"))
|
||||||
|
);
|
||||||
|
default -> List.of();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<BankTagObjectHitVO> executeObjectRule(Long projectId,
|
||||||
|
CcdiBankTagRule rule,
|
||||||
|
BankTagRuleExecutionConfig config) {
|
||||||
|
return switch (rule.getRuleCode()) {
|
||||||
|
case "CUMULATIVE_INCOME" -> analysisMapper.selectCumulativeIncomeObjects(
|
||||||
|
projectId, toBigDecimal(config.getThresholdValue("CUMULATIVE_TRANSACTION_AMOUNT"))
|
||||||
|
);
|
||||||
|
case "ANNUAL_TURNOVER" -> analysisMapper.selectAnnualTurnoverObjects(
|
||||||
|
projectId, toBigDecimal(config.getThresholdValue("annual_turnover"))
|
||||||
|
);
|
||||||
|
case "FREQUENT_CASH_DEPOSIT" -> analysisMapper.selectFrequentCashDepositObjects(
|
||||||
|
projectId,
|
||||||
|
toBigDecimal(config.getThresholdValue("LARGE_CASH_DEPOSIT")),
|
||||||
|
toInteger(config.getThresholdValue("FREQUENT_CASH_DEPOSIT"))
|
||||||
|
);
|
||||||
|
default -> List.of();
|
||||||
|
};
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<CcdiBankTagResult> buildStatementResults(Long projectId,
|
||||||
|
CcdiBankTagRule rule,
|
||||||
|
List<BankTagStatementHitVO> hits,
|
||||||
|
String operator) {
|
||||||
|
List<CcdiBankTagResult> results = new ArrayList<>();
|
||||||
|
Date now = new Date();
|
||||||
|
for (BankTagStatementHitVO hit : hits) {
|
||||||
|
CcdiBankTagResult result = buildBaseResult(projectId, rule, operator, now);
|
||||||
|
result.setBankStatementId(hit.getBankStatementId());
|
||||||
|
result.setGroupId(hit.getGroupId());
|
||||||
|
result.setLogId(hit.getLogId());
|
||||||
|
result.setReasonDetail(hit.getReasonDetail());
|
||||||
|
results.add(result);
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<CcdiBankTagResult> buildObjectResults(Long projectId,
|
||||||
|
CcdiBankTagRule rule,
|
||||||
|
List<BankTagObjectHitVO> hits,
|
||||||
|
String operator) {
|
||||||
|
List<CcdiBankTagResult> results = new ArrayList<>();
|
||||||
|
Date now = new Date();
|
||||||
|
for (BankTagObjectHitVO hit : hits) {
|
||||||
|
CcdiBankTagResult result = buildBaseResult(projectId, rule, operator, now);
|
||||||
|
result.setObjectType(hit.getObjectType() != null ? hit.getObjectType() : OBJECT_TYPE_STAFF_ID_CARD);
|
||||||
|
result.setObjectKey(hit.getObjectKey());
|
||||||
|
result.setReasonDetail(hit.getReasonDetail());
|
||||||
|
results.add(result);
|
||||||
|
}
|
||||||
|
return results;
|
||||||
|
}
|
||||||
|
|
||||||
|
private CcdiBankTagResult buildBaseResult(Long projectId, CcdiBankTagRule rule, String operator, Date now) {
|
||||||
|
CcdiBankTagResult result = new CcdiBankTagResult();
|
||||||
|
result.setProjectId(projectId);
|
||||||
|
result.setModelCode(rule.getModelCode());
|
||||||
|
result.setModelName(rule.getModelName());
|
||||||
|
result.setRuleCode(rule.getRuleCode());
|
||||||
|
result.setRuleName(rule.getRuleName());
|
||||||
|
result.setIndicatorCode(rule.getIndicatorCode());
|
||||||
|
result.setResultType(rule.getResultType());
|
||||||
|
result.setRiskLevel(rule.getRiskLevel());
|
||||||
|
result.setBusinessCaliberSnapshot(rule.getBusinessCaliber());
|
||||||
|
result.setCreateBy(operator);
|
||||||
|
result.setCreateTime(now);
|
||||||
|
result.setUpdateBy(operator);
|
||||||
|
result.setUpdateTime(now);
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private BigDecimal toBigDecimal(String value) {
|
||||||
|
if (value == null || value.isBlank()) {
|
||||||
|
return BigDecimal.ZERO;
|
||||||
|
}
|
||||||
|
return new BigDecimal(value);
|
||||||
|
}
|
||||||
|
|
||||||
|
private Integer toInteger(String value) {
|
||||||
|
if (value == null || value.isBlank()) {
|
||||||
|
return 0;
|
||||||
|
}
|
||||||
|
return Integer.parseInt(value);
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,100 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagTask;
|
||||||
|
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagTaskMapper;
|
||||||
|
import com.ruoyi.common.exception.ServiceException;
|
||||||
|
import org.springframework.context.annotation.Lazy;
|
||||||
|
import org.springframework.stereotype.Component;
|
||||||
|
|
||||||
|
import jakarta.annotation.Resource;
|
||||||
|
import java.util.concurrent.ConcurrentHashMap;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 项目级流水标签重算协调器
|
||||||
|
*/
|
||||||
|
@Component
|
||||||
|
public class ProjectBankTagRebuildCoordinator {
|
||||||
|
|
||||||
|
private final ConcurrentHashMap<Long, Boolean> runningProjects = new ConcurrentHashMap<>();
|
||||||
|
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagTaskMapper taskMapper;
|
||||||
|
|
||||||
|
@Lazy
|
||||||
|
@Resource
|
||||||
|
private CcdiBankTagServiceImpl bankTagService;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 提交手动重算
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param modelCode 模型编码
|
||||||
|
* @param operator 操作人
|
||||||
|
*/
|
||||||
|
public void submitManual(Long projectId, String modelCode, String operator) {
|
||||||
|
if (isProjectRunning(projectId)) {
|
||||||
|
throw new ServiceException("当前项目标签正在重算中,请稍后再试");
|
||||||
|
}
|
||||||
|
|
||||||
|
executeWithLock(projectId, () -> bankTagService.rebuildProject(projectId, modelCode, operator, TriggerType.MANUAL));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* 提交自动重算
|
||||||
|
*
|
||||||
|
* @param projectId 项目ID
|
||||||
|
* @param triggerType 触发类型
|
||||||
|
*/
|
||||||
|
public void submitAuto(Long projectId, TriggerType triggerType) {
|
||||||
|
CcdiBankTagTask runningTask = taskMapper.selectRunningTaskByProjectId(projectId);
|
||||||
|
if (runningTask != null || runningProjects.containsKey(projectId)) {
|
||||||
|
markNeedRerun(runningTask);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
executeWithLock(projectId, () -> {
|
||||||
|
boolean needRerun;
|
||||||
|
do {
|
||||||
|
Long taskId = bankTagService.rebuildProject(projectId, null, "system", triggerType);
|
||||||
|
needRerun = taskId != null && consumeNeedRerun(taskId);
|
||||||
|
} while (needRerun);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private void executeWithLock(Long projectId, Runnable action) {
|
||||||
|
if (runningProjects.putIfAbsent(projectId, Boolean.TRUE) != null) {
|
||||||
|
throw new ServiceException("当前项目标签正在重算中,请稍后再试");
|
||||||
|
}
|
||||||
|
try {
|
||||||
|
action.run();
|
||||||
|
} finally {
|
||||||
|
runningProjects.remove(projectId);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean isProjectRunning(Long projectId) {
|
||||||
|
return runningProjects.containsKey(projectId) || taskMapper.selectRunningTaskByProjectId(projectId) != null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void markNeedRerun(CcdiBankTagTask runningTask) {
|
||||||
|
if (runningTask == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
runningTask.setNeedRerun(1);
|
||||||
|
taskMapper.updateTask(runningTask);
|
||||||
|
}
|
||||||
|
|
||||||
|
private boolean consumeNeedRerun(Long taskId) {
|
||||||
|
CcdiBankTagTask finishedTask = taskMapper.selectById(taskId);
|
||||||
|
if (finishedTask == null || finishedTask.getNeedRerun() == null || finishedTask.getNeedRerun() == 0) {
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
|
CcdiBankTagTask update = new CcdiBankTagTask();
|
||||||
|
update.setId(taskId);
|
||||||
|
update.setNeedRerun(0);
|
||||||
|
taskMapper.updateTask(update);
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,60 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.CcdiModelParam;
|
||||||
|
import com.ruoyi.ccdi.project.domain.CcdiProject;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagRule;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagRuleExecutionConfig;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiModelParamMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiProjectMapper;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class BankTagRuleConfigResolverTest {
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private BankTagRuleConfigResolver resolver;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiProjectMapper projectMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiModelParamMapper modelParamMapper;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void resolve_shouldReadEffectiveProjectParamsForThresholdRules() {
|
||||||
|
CcdiProject project = new CcdiProject();
|
||||||
|
project.setProjectId(40L);
|
||||||
|
project.setConfigType("default");
|
||||||
|
when(projectMapper.selectById(40L)).thenReturn(project);
|
||||||
|
when(modelParamMapper.selectByProjectAndModel(0L, "LARGE_TRANSACTION")).thenReturn(List.of(
|
||||||
|
buildParam("SINGLE_TRANSACTION_AMOUNT", "1111")
|
||||||
|
));
|
||||||
|
|
||||||
|
CcdiBankTagRule ruleMeta = new CcdiBankTagRule();
|
||||||
|
ruleMeta.setModelCode("LARGE_TRANSACTION");
|
||||||
|
ruleMeta.setRuleCode("SINGLE_LARGE_INCOME");
|
||||||
|
ruleMeta.setIndicatorCode("SINGLE_TRANSACTION_AMOUNT");
|
||||||
|
|
||||||
|
BankTagRuleExecutionConfig config = resolver.resolve(40L, ruleMeta);
|
||||||
|
|
||||||
|
assertEquals("1111", config.getThresholdValue("SINGLE_TRANSACTION_AMOUNT"));
|
||||||
|
}
|
||||||
|
|
||||||
|
private CcdiModelParam buildParam(String paramCode, String paramValue) {
|
||||||
|
CcdiModelParam param = new CcdiModelParam();
|
||||||
|
param.setProjectId(0L);
|
||||||
|
param.setModelCode("LARGE_TRANSACTION");
|
||||||
|
param.setParamCode(paramCode);
|
||||||
|
param.setParamValue(paramValue);
|
||||||
|
return param;
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,84 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagRule;
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagTask;
|
||||||
|
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagRuleExecutionConfig;
|
||||||
|
import com.ruoyi.ccdi.project.domain.vo.BankTagStatementHitVO;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagAnalysisMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagResultMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagRuleMapper;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagTaskMapper;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.InOrder;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
import org.springframework.test.util.ReflectionTestUtils;
|
||||||
|
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.concurrent.Executor;
|
||||||
|
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.ArgumentMatchers.anyList;
|
||||||
|
import static org.mockito.Mockito.inOrder;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class CcdiBankTagServiceImplTest {
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private CcdiBankTagServiceImpl service;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagRuleMapper ruleMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagResultMapper resultMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagTaskMapper taskMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagAnalysisMapper analysisMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private BankTagRuleConfigResolver configResolver;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void rebuildProject_shouldDeleteOldResultsBeforeSubmittingRuleTasks() {
|
||||||
|
ReflectionTestUtils.setField(service, "tagRuleExecutor", (Executor) Runnable::run);
|
||||||
|
|
||||||
|
CcdiBankTagRule rule = new CcdiBankTagRule();
|
||||||
|
rule.setModelCode("LARGE_TRANSACTION");
|
||||||
|
rule.setModelName("大额交易");
|
||||||
|
rule.setRuleCode("HOUSE_OR_CAR_EXPENSE");
|
||||||
|
rule.setRuleName("房车消费支出交易");
|
||||||
|
rule.setResultType("STATEMENT");
|
||||||
|
|
||||||
|
BankTagRuleExecutionConfig config = new BankTagRuleExecutionConfig();
|
||||||
|
config.setProjectId(40L);
|
||||||
|
config.setRuleMeta(rule);
|
||||||
|
|
||||||
|
BankTagStatementHitVO hit = new BankTagStatementHitVO();
|
||||||
|
hit.setBankStatementId(10L);
|
||||||
|
hit.setGroupId(40);
|
||||||
|
hit.setLogId(40001);
|
||||||
|
hit.setReasonDetail("命中房车消费支出");
|
||||||
|
|
||||||
|
when(ruleMapper.selectEnabledRules(null)).thenReturn(List.of(rule));
|
||||||
|
when(configResolver.resolve(40L, rule)).thenReturn(config);
|
||||||
|
when(analysisMapper.selectHouseOrCarExpenseStatements(40L)).thenReturn(List.of(hit));
|
||||||
|
|
||||||
|
service.rebuildProject(40L, null, "admin", TriggerType.MANUAL);
|
||||||
|
|
||||||
|
InOrder inOrder = inOrder(resultMapper);
|
||||||
|
verify(resultMapper).deleteByProjectAndModel(40L, null);
|
||||||
|
verify(resultMapper).insertBatch(anyList());
|
||||||
|
inOrder.verify(resultMapper).deleteByProjectAndModel(40L, null);
|
||||||
|
inOrder.verify(resultMapper).insertBatch(anyList());
|
||||||
|
verify(taskMapper).insertTask(any(CcdiBankTagTask.class));
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -0,0 +1,57 @@
|
|||||||
|
package com.ruoyi.ccdi.project.service.impl;
|
||||||
|
|
||||||
|
import com.ruoyi.ccdi.project.domain.entity.CcdiBankTagTask;
|
||||||
|
import com.ruoyi.ccdi.project.domain.enums.TriggerType;
|
||||||
|
import com.ruoyi.ccdi.project.mapper.CcdiBankTagTaskMapper;
|
||||||
|
import com.ruoyi.common.exception.ServiceException;
|
||||||
|
import org.junit.jupiter.api.Test;
|
||||||
|
import org.junit.jupiter.api.extension.ExtendWith;
|
||||||
|
import org.mockito.InjectMocks;
|
||||||
|
import org.mockito.Mock;
|
||||||
|
import org.mockito.junit.jupiter.MockitoExtension;
|
||||||
|
|
||||||
|
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||||
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
|
import static org.mockito.Mockito.never;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
@ExtendWith(MockitoExtension.class)
|
||||||
|
class ProjectBankTagRebuildCoordinatorTest {
|
||||||
|
|
||||||
|
@InjectMocks
|
||||||
|
private ProjectBankTagRebuildCoordinator coordinator;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagTaskMapper taskMapper;
|
||||||
|
|
||||||
|
@Mock
|
||||||
|
private CcdiBankTagServiceImpl bankTagService;
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void submitManualRebuild_shouldRejectWhenProjectAlreadyRunning() {
|
||||||
|
CcdiBankTagTask runningTask = new CcdiBankTagTask();
|
||||||
|
runningTask.setId(1L);
|
||||||
|
runningTask.setProjectId(40L);
|
||||||
|
runningTask.setStatus("RUNNING");
|
||||||
|
when(taskMapper.selectRunningTaskByProjectId(40L)).thenReturn(runningTask);
|
||||||
|
|
||||||
|
assertThrows(ServiceException.class, () -> coordinator.submitManual(40L, null, "admin"));
|
||||||
|
verify(bankTagService, never()).rebuildProject(40L, null, "admin", TriggerType.MANUAL);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
void submitAutoRebuild_shouldMarkNeedRerunWhenProjectAlreadyRunning() {
|
||||||
|
CcdiBankTagTask runningTask = new CcdiBankTagTask();
|
||||||
|
runningTask.setId(1L);
|
||||||
|
runningTask.setProjectId(40L);
|
||||||
|
runningTask.setStatus("RUNNING");
|
||||||
|
runningTask.setNeedRerun(0);
|
||||||
|
when(taskMapper.selectRunningTaskByProjectId(40L)).thenReturn(runningTask);
|
||||||
|
|
||||||
|
coordinator.submitAuto(40L, TriggerType.AUTO_BATCH_UPLOAD);
|
||||||
|
|
||||||
|
verify(taskMapper).updateTask(any(CcdiBankTagTask.class));
|
||||||
|
verify(bankTagService, never()).rebuildProject(40L, null, "system", TriggerType.AUTO_BATCH_UPLOAD);
|
||||||
|
}
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user