# 项目流水标签功能设计 ## 背景 当前项目已经具备两条银行流水入库链路: - 批量上传他行流水文件:`/ccdi/file-upload/batch` - 拉取本行信息:`/ccdi/file-upload/pull-bank-info` 两条链路最终都会把流水写入 `ccdi_bank_statement`,但系统还没有对项目内流水执行统一打标,也没有独立的手动重算能力。 本次需求要求: - 在异步批量上传流水文件全部处理完成后,自动对项目内流水进行打标 - 在拉取本行信息任务全部处理完成后,也自动对项目内流水进行打标 - 提供手动接口,支持重新计算项目内标签 - 根据 [assets/大额交易.csv](../../assets/大额交易.csv) 中的模型信息实现第一批规则 - 避免重复打标 - 同时支持两类结果: - 单条流水异常:对具体流水打标 - 对象区间异常:对项目中的对象打标 - 打标结果中保留足够的异常原因信息,便于后续页面清晰展示 ## 目标 - 为项目建立可扩展的流水标签能力,而不是只为“大额交易”写一次性逻辑 - 支持自动触发和手动触发两种标签重算方式 - 用代码中的 Mapper XML 固化技术口径 SQL,不做运行时 SQL 配置化 - 通过统一结果表保存标签命中结果和异常原因快照 - 保证同一项目不会同时触发多个重算任务 - 使用线程池并行执行规则,提高重算效率 ## 非目标 - 本期不新增前端页面展示 - 本期不实现动态 SQL 配置平台 - 本期不把标签计算失败与流水导入失败绑定为同一事务 - 本期不实现除大额交易外的其他模型 - 本期不实现对象级结果到具体来源批次的多来源追溯 ## 现状分析 ### 流水导入现状 批量上传主链路位于: - `ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java` 当前关键行为: - 每个文件独立异步上传、轮询解析、拉取流水、写入 `ccdi_bank_statement` - 文件全部提交后,调度线程立即结束 - 没有“当前批次所有异步任务完成后的收尾回调” 拉取本行信息主链路同样位于 `CcdiFileUploadServiceImpl`,每个身份证号会生成一条上传记录并异步拉取流水,但也没有“全部任务完成后”的统一收尾逻辑。 ### 参数配置现状 模型参数位于: - `ccdi_model_param` - `CcdiModelParamServiceImpl` 系统已经支持: - 项目默认参数与自定义参数切换 - 通过 `project_id = 0` 读取系统默认参数 - 通过 `project_id = 当前项目` 读取项目自定义参数 因此阈值类规则可以直接复用现有参数体系。 ### 流水查询现状 流水明细能力位于: - `CcdiBankStatementMapper.xml` - `CcdiBankStatementController` - `CcdiBankStatementServiceImpl` 当前流水表中已经具备标签计算所需的一部分字段: - `project_id` - `group_id` - `batch_id`,其业务语义可作为 `log_id` - `bank_statement_id` - `cret_no` ## 方案对比 ### 方案一:把每个规则直接写死在 Service 中 优点: - 开发速度快 - 初期实现简单 缺点: - 第二个模型接入时会迅速膨胀为大段 `if/else` - SQL、规则元数据、结果写入逻辑会散落在多个类里 - 不利于后续维护和测试 ### 方案二:规则元数据入库,SQL 写在 Mapper XML 中 优点: - 规则展示信息可配置 - 技术口径 SQL 保持在代码中,便于评审和测试 - 后续新增模型的扩展路径清晰 缺点: - 第一版需要同时补规则表、结果表、任务表和执行框架 ### 方案三:规则元数据和 SQL 都配置化 优点: - 表面上扩展灵活 缺点: - 动态 SQL 的安全性、可测试性、可维护性都较差 - 与当前项目代码式 MyBatis 规则体系不一致 - 不符合本次“SQL 用 Mapper + XML 实现”的约束 ## 最终方案 采用方案二: - 规则元数据入库 - 技术口径 SQL 固化在 `Mapper + XML` - 统一结果表保存标签命中 - 统一任务调度器管理自动触发、手动触发和项目级互斥 - 规则级任务在线程池中并行执行 ## 详细设计 ### 一、数据模型设计 #### 1. 标签规则定义表 建议新增:`ccdi_bank_tag_rule` 建议字段: - `id` - `model_code` - `model_name` - `rule_code` - `rule_name` - `indicator_code` - `result_type`:`STATEMENT` / `OBJECT` - `risk_level` - `business_caliber` - `enabled` - `sort_order` - `create_by` - `create_time` - `update_by` - `update_time` - `remark` 用途: - 保存展示和业务元数据 - 不保存技术 SQL - 作为结果表中的规则元信息来源 第一版初始化“大额交易” 8 条规则元数据: - 房车消费支出交易 - 税务支出交易 - 大额单笔收入 - 累计收入超限 - 年流水交易额超限 - 大额存现交易 - 短时间多次存现 - 大额转账交易 #### 2. 标签结果表 建议新增:`ccdi_bank_statement_tag_result` 建议字段: - `id` - `project_id` - `model_code` - `model_name` - `rule_code` - `rule_name` - `indicator_code` - `result_type` - `risk_level` - `bank_statement_id` - `object_type` - `object_key` - `group_id` - `log_id` - `reason_detail` - `business_caliber_snapshot` - `hit_value_snapshot` - `create_by` - `create_time` - `update_by` - `update_time` - `remark` 字段约束: - 流水级结果必须保存: - `bank_statement_id` - `group_id` - `log_id` - 对象级结果必须保存: - `object_type` - `object_key` - 对象级结果不写: - `group_id` - `log_id` - `bank_statement_id` 唯一约束建议拆分为两组: - 流水级唯一键:`project_id + rule_code + bank_statement_id` - 对象级唯一键:`project_id + rule_code + object_type + object_key` 用途: - 防止重复打标 - 保留足够的异常原因快照,供后续页面展示 #### 3. 标签任务执行表 建议新增:`ccdi_bank_tag_task` 建议字段: - `id` - `project_id` - `trigger_type`:`AUTO_BATCH_UPLOAD` / `AUTO_PULL_BANK_INFO` / `MANUAL` - `model_code` - `status`:`RUNNING` / `SUCCESS` / `PARTIAL_FAILED` / `FAILED` - `need_rerun` - `success_rule_count` - `failed_rule_count` - `hit_count` - `error_message` - `start_time` - `end_time` - `create_by` - `create_time` - `update_by` - `update_time` 用途: - 记录当前项目是否存在正在执行的重算任务 - 承接“自动触发补跑”标记 - 为后续前端展示最近一次打标状态预留基础 ### 二、规则分型设计 第一版大额交易 8 条规则按结果类型分为两类。 #### 1. 流水级规则 返回具体流水 `bank_statement_id`,并从流水表带出 `group_id`、`batch_id(log_id)`。 包括: - 房车消费支出交易 - 税务支出交易 - 大额单笔收入 - 大额存现交易 - 大额转账交易 #### 2. 对象级规则 返回对象主键和原因摘要,不绑定单条流水。 包括: - 累计收入超限 - 年流水交易额超限 - 短时间多次存现 对象级结果建议使用: - `object_type = STAFF_ID_CARD` - `object_key = 身份证号` 后续如果出现亲属、账号、企业等对象级命中,可以继续扩展 `object_type`。 ### 三、技术实现设计 #### 1. SQL 落点 所有规则 SQL 均通过 `Mapper + XML` 实现。 建议新增: - `CcdiBankTagRuleMapper` - `CcdiBankTagResultMapper` - `CcdiBankTagTaskMapper` - `CcdiBankTagAnalysisMapper` - 对应 XML 其中规则筛选 Mapper 方法按规则拆分,例如: - `selectHouseOrCarExpenseStatements` - `selectTaxExpenseStatements` - `selectSingleLargeIncomeStatements` - `selectCumulativeIncomeObjects` - `selectAnnualTurnoverObjects` - `selectLargeCashDepositStatements` - `selectFrequentCashDepositObjects` - `selectLargeTransferStatements` #### 2. 参数读取 阈值类规则通过现有 `ccdi_model_param` 获取项目当前有效参数: - 若项目 `config_type = default`,读取 `project_id = 0` - 若项目 `config_type = custom`,读取当前 `project_id` 第一版涉及参数包括: - `SINGLE_TRANSACTION_AMOUNT` - `CUMULATIVE_TRANSACTION_AMOUNT` - `annual_turnover` - `LARGE_CASH_DEPOSIT` - `FREQUENT_CASH_DEPOSIT` - `FREQUENT_TRANSFER` #### 3. 结果写入 重算开始前,先按范围删除旧结果: - 手动全量:删除当前项目全部结果 - 手动按模型:删除当前项目指定模型结果 - 自动触发:按当前项目全部结果重算,保持结果与最新流水一致 删除完成后,再按规则并行写入结果。 每条规则线程内部只做: - 查询命中结果 - 构建结果实体 - 批量插入结果表 规则线程内部不允许再删旧结果,避免并发线程相互覆盖。 ### 四、触发流程设计 #### 1. 批量上传自动触发 触发入口: - `CcdiFileUploadServiceImpl.batchUploadFiles` 改造方向: - 当前 `submitTasksAsync` 需要改为收集每个文件的 `CompletableFuture` - 使用 `CompletableFuture.allOf(...)` 在所有文件处理完成后统一执行批次收尾 - 若本批次至少有一份文件成功落库流水,则申请一次项目级标签重算 #### 2. 拉取本行信息自动触发 触发入口: - `CcdiFileUploadServiceImpl.submitPullBankInfo` 改造方向: - 当前 `submitPullBankInfoTasks` 同样收集每个身份证任务的 `CompletableFuture` - 所有任务处理完成后统一执行批次收尾 - 若本批次至少有一条成功入库流水,则申请一次项目级标签重算 #### 3. 手动触发 新增接口: - `POST /ccdi/project/tags/rebuild` 请求体: - `projectId` 必填 - `modelCode` 选填 规则: - 未传 `modelCode`:重算当前项目全部模型 - 传 `modelCode`:只重算指定模型 ### 五、并发控制设计 #### 1. 项目级互斥 要求: - 同一项目任意时刻只允许一个标签重算任务运行 实现建议: - 新增 `ProjectTagRebuildCoordinator` - 以 `projectId` 作为互斥粒度 - 结合任务表 `RUNNING` 状态和进程内互斥结构控制 行为规则: - 自动触发时若已有任务在运行: - 不启动新的重算 - 仅把当前运行任务标记为 `need_rerun = true` - 手动触发时若已有任务在运行: - 直接返回“当前项目标签正在重算中,请稍后再试” #### 2. 自动补跑 若自动触发期间项目已有重算任务在运行: - 当前任务结束后检查 `need_rerun` - 若为 `true`,自动再执行一轮项目级全量重算 - 补跑结束后清空 `need_rerun` 这样可以同时满足: - 防止并发重算 - 不丢失导入完成后的自动重算机会 ### 六、线程池设计 采用两层调度模型。 #### 1. 项目级任务 每个项目的重算由一个总任务负责: - 控制任务生命周期 - 统一删旧结果 - 提交规则级任务 - 汇总执行结果 #### 2. 规则级任务 每条规则使用线程池并行处理。 建议新增独立线程池: - `tagRuleExecutor` 规则级线程任务职责: - 读取规则元数据 - 查询阈值参数 - 执行 Mapper SQL - 批量写入结果表 - 返回命中数和执行状态 优点: - 多条规则之间互不阻塞 - 后续新增模型时可复用同一套并行框架 ### 七、失败处理设计 #### 1. 自动触发失败 - 不影响流水已成功入库的数据 - 标签任务状态标记为 `FAILED` 或 `PARTIAL_FAILED` - 记录失败规则和错误信息 - 不回滚已经成功写入的流水 #### 2. 手动触发失败 - 接口直接返回失败 - 任务状态记录失败原因 - 已成功完成的规则结果保留 #### 3. 单规则失败 - 不中断其他规则执行 - 整个任务按“部分失败”汇总 ### 八、异常原因快照设计 结果表中的 `reason_detail` 必须能直接解释标签来源。 建议格式: - 流水级: - `摘要命中“购买房产首付款”,对手方“杭州贝壳房地产经纪有限公司”,支出金额 680000.00 元` - 对象级: - `同一交易对手累计流入 60300000.00 元,超过阈值 50000001.00 元,对手方:浙江远望贸易有限公司` 额外保存: - `business_caliber_snapshot` - `hit_value_snapshot` 用于后续页面直接展示“规则口径 + 命中值”。 ## 接口设计 ### 1. 手动重算接口 - 路径:`POST /ccdi/project/tags/rebuild` - 请求体: - `projectId` - `modelCode` 可选 - 返回: - 成功:`AjaxResult.success("标签重算任务已提交")` - 失败:`AjaxResult.error("当前项目标签正在重算中,请稍后再试")` ### 2. 结果查询接口 本期不做前端展示,结果查询接口可暂缓;但后续建议预留: - 项目标签汇总查询 - 项目标签明细查询 - 项目标签任务状态查询 ## 测试设计 后端至少覆盖以下场景: - 批量上传所有文件完成后会申请一次项目级重算 - 拉取本行信息所有任务完成后会申请一次项目级重算 - 手动接口支持全量重算 - 手动接口支持按模型重算 - 同一项目正在重算时,手动接口会被拒绝 - 同一项目自动触发期间再次触发时会标记补跑 - 流水级结果写入时会保存 `group_id + log_id` - 对象级结果写入时不会保存 `group_id + log_id` - 重复重算不会出现重复结果 - 参数修改后再次重算,结果会随阈值变化 - 单条规则失败时任务状态为 `PARTIAL_FAILED` ## 风险与缓解 ### 风险一:同项目高频导入导致连续补跑 缓解: - 自动触发期间只维护一个 `need_rerun` 标记,避免无限累积队列 ### 风险二:规则级并行导致数据库写入压力上升 缓解: - 为规则线程池设置合理核心线程数和队列长度 - 批量插入结果,避免逐条写入 ### 风险三:对象级结果原因信息不足 缓解: - 在 XML 查询阶段直接拼出可展示的原因摘要字段 ## 后续扩展方向 - 新增项目标签汇总页和明细页 - 结合 `ccdi_project` 回写项目风险人数统计 - 支持更多模型接入 - 支持对象级结果下钻到关联流水集合