Files
ccdi/docs/plans/2026-03-16-project-bank-statement-tagging-design.md

14 KiB
Raw Blame History

项目流水标签功能设计

背景

当前项目已经具备两条银行流水入库链路:

  • 批量上传他行流水文件:/ccdi/file-upload/batch
  • 拉取本行信息:/ccdi/file-upload/pull-bank-info

两条链路最终都会把流水写入 ccdi_bank_statement,但系统还没有对项目内流水执行统一打标,也没有独立的手动重算能力。

本次需求要求:

  • 在异步批量上传流水文件全部处理完成后,自动对项目内流水进行打标
  • 在拉取本行信息任务全部处理完成后,也自动对项目内流水进行打标
  • 提供手动接口,支持重新计算项目内标签
  • 根据 assets/大额交易.csv 中的模型信息实现第一批规则
  • 避免重复打标
  • 同时支持两类结果:
    • 单条流水异常:对具体流水打标
    • 对象区间异常:对项目中的对象打标
  • 打标结果中保留足够的异常原因信息,便于后续页面清晰展示

目标

  • 为项目建立可扩展的流水标签能力,而不是只为“大额交易”写一次性逻辑
  • 支持自动触发和手动触发两种标签重算方式
  • 用代码中的 Mapper XML 固化技术口径 SQL不做运行时 SQL 配置化
  • 通过统一结果表保存标签命中结果和异常原因快照
  • 保证同一项目不会同时触发多个重算任务
  • 使用线程池并行执行规则,提高重算效率

非目标

  • 本期不新增前端页面展示
  • 本期不实现动态 SQL 配置平台
  • 本期不把标签计算失败与流水导入失败绑定为同一事务
  • 本期不实现除大额交易外的其他模型
  • 本期不实现对象级结果到具体来源批次的多来源追溯

现状分析

流水导入现状

批量上传主链路位于:

  • ccdi-project/src/main/java/com/ruoyi/ccdi/project/service/impl/CcdiFileUploadServiceImpl.java

当前关键行为:

  • 每个文件独立异步上传、轮询解析、拉取流水、写入 ccdi_bank_statement
  • 文件全部提交后,调度线程立即结束
  • 没有“当前批次所有异步任务完成后的收尾回调”

拉取本行信息主链路同样位于 CcdiFileUploadServiceImpl,每个身份证号会生成一条上传记录并异步拉取流水,但也没有“全部任务完成后”的统一收尾逻辑。

参数配置现状

模型参数位于:

  • ccdi_model_param
  • CcdiModelParamServiceImpl

系统已经支持:

  • 项目默认参数与自定义参数切换
  • 通过 project_id = 0 读取系统默认参数
  • 通过 project_id = 当前项目 读取项目自定义参数

因此阈值类规则可以直接复用现有参数体系。

流水查询现状

流水明细能力位于:

  • CcdiBankStatementMapper.xml
  • CcdiBankStatementController
  • CcdiBankStatementServiceImpl

当前流水表中已经具备标签计算所需的一部分字段:

  • project_id
  • group_id
  • batch_id,其业务语义可作为 log_id
  • bank_statement_id
  • cret_no

方案对比

方案一:把每个规则直接写死在 Service 中

优点:

  • 开发速度快
  • 初期实现简单

缺点:

  • 第二个模型接入时会迅速膨胀为大段 if/else
  • SQL、规则元数据、结果写入逻辑会散落在多个类里
  • 不利于后续维护和测试

方案二规则元数据入库SQL 写在 Mapper XML 中

优点:

  • 规则展示信息可配置
  • 技术口径 SQL 保持在代码中,便于评审和测试
  • 后续新增模型的扩展路径清晰

缺点:

  • 第一版需要同时补规则表、结果表、任务表和执行框架

方案三:规则元数据和 SQL 都配置化

优点:

  • 表面上扩展灵活

缺点:

  • 动态 SQL 的安全性、可测试性、可维护性都较差
  • 与当前项目代码式 MyBatis 规则体系不一致
  • 不符合本次“SQL 用 Mapper + XML 实现”的约束

最终方案

采用方案二:

  • 规则元数据入库
  • 技术口径 SQL 固化在 Mapper + XML
  • 统一结果表保存标签命中
  • 统一任务调度器管理自动触发、手动触发和项目级互斥
  • 规则级任务在线程池中并行执行

详细设计

一、数据模型设计

1. 标签规则定义表

建议新增:ccdi_bank_tag_rule

建议字段:

  • id
  • model_code
  • model_name
  • rule_code
  • rule_name
  • indicator_code
  • result_typeSTATEMENT / OBJECT
  • risk_level
  • business_caliber
  • enabled
  • sort_order
  • create_by
  • create_time
  • update_by
  • update_time
  • remark

用途:

  • 保存展示和业务元数据
  • 不保存技术 SQL
  • 作为结果表中的规则元信息来源

第一版初始化“大额交易” 8 条规则元数据:

  • 房车消费支出交易
  • 税务支出交易
  • 大额单笔收入
  • 累计收入超限
  • 年流水交易额超限
  • 大额存现交易
  • 短时间多次存现
  • 大额转账交易

2. 标签结果表

建议新增:ccdi_bank_statement_tag_result

建议字段:

  • id
  • project_id
  • model_code
  • model_name
  • rule_code
  • rule_name
  • indicator_code
  • result_type
  • risk_level
  • bank_statement_id
  • object_type
  • object_key
  • group_id
  • log_id
  • reason_detail
  • business_caliber_snapshot
  • hit_value_snapshot
  • create_by
  • create_time
  • update_by
  • update_time
  • remark

字段约束:

  • 流水级结果必须保存:
    • bank_statement_id
    • group_id
    • log_id
  • 对象级结果必须保存:
    • object_type
    • object_key
  • 对象级结果不写:
    • group_id
    • log_id
    • bank_statement_id

唯一约束建议拆分为两组:

  • 流水级唯一键:project_id + rule_code + bank_statement_id
  • 对象级唯一键:project_id + rule_code + object_type + object_key

用途:

  • 防止重复打标
  • 保留足够的异常原因快照,供后续页面展示

3. 标签任务执行表

建议新增:ccdi_bank_tag_task

建议字段:

  • id
  • project_id
  • trigger_typeAUTO_BATCH_UPLOAD / AUTO_PULL_BANK_INFO / MANUAL
  • model_code
  • statusRUNNING / SUCCESS / PARTIAL_FAILED / FAILED
  • need_rerun
  • success_rule_count
  • failed_rule_count
  • hit_count
  • error_message
  • start_time
  • end_time
  • create_by
  • create_time
  • update_by
  • update_time

用途:

  • 记录当前项目是否存在正在执行的重算任务
  • 承接“自动触发补跑”标记
  • 为后续前端展示最近一次打标状态预留基础

二、规则分型设计

第一版大额交易 8 条规则按结果类型分为两类。

1. 流水级规则

返回具体流水 bank_statement_id,并从流水表带出 group_idbatch_id(log_id)

包括:

  • 房车消费支出交易
  • 税务支出交易
  • 大额单笔收入
  • 大额存现交易
  • 大额转账交易

2. 对象级规则

返回对象主键和原因摘要,不绑定单条流水。

包括:

  • 累计收入超限
  • 年流水交易额超限
  • 短时间多次存现

对象级结果建议使用:

  • object_type = STAFF_ID_CARD
  • object_key = 身份证号

后续如果出现亲属、账号、企业等对象级命中,可以继续扩展 object_type

三、技术实现设计

1. SQL 落点

所有规则 SQL 均通过 Mapper + XML 实现。

建议新增:

  • CcdiBankTagRuleMapper
  • CcdiBankTagResultMapper
  • CcdiBankTagTaskMapper
  • CcdiBankTagAnalysisMapper
  • 对应 XML

其中规则筛选 Mapper 方法按规则拆分,例如:

  • selectHouseOrCarExpenseStatements
  • selectTaxExpenseStatements
  • selectSingleLargeIncomeStatements
  • selectCumulativeIncomeObjects
  • selectAnnualTurnoverObjects
  • selectLargeCashDepositStatements
  • selectFrequentCashDepositObjects
  • selectLargeTransferStatements

2. 参数读取

阈值类规则通过现有 ccdi_model_param 获取项目当前有效参数:

  • 若项目 config_type = default,读取 project_id = 0
  • 若项目 config_type = custom,读取当前 project_id

第一版涉及参数包括:

  • SINGLE_TRANSACTION_AMOUNT
  • CUMULATIVE_TRANSACTION_AMOUNT
  • annual_turnover
  • LARGE_CASH_DEPOSIT
  • FREQUENT_CASH_DEPOSIT
  • FREQUENT_TRANSFER

3. 结果写入

重算开始前,先按范围删除旧结果:

  • 手动全量:删除当前项目全部结果
  • 手动按模型:删除当前项目指定模型结果
  • 自动触发:按当前项目全部结果重算,保持结果与最新流水一致

删除完成后,再按规则并行写入结果。

每条规则线程内部只做:

  • 查询命中结果
  • 构建结果实体
  • 批量插入结果表

规则线程内部不允许再删旧结果,避免并发线程相互覆盖。

四、触发流程设计

1. 批量上传自动触发

触发入口:

  • CcdiFileUploadServiceImpl.batchUploadFiles

改造方向:

  • 当前 submitTasksAsync 需要改为收集每个文件的 CompletableFuture
  • 使用 CompletableFuture.allOf(...) 在所有文件处理完成后统一执行批次收尾
  • 若本批次至少有一份文件成功落库流水,则申请一次项目级标签重算

2. 拉取本行信息自动触发

触发入口:

  • CcdiFileUploadServiceImpl.submitPullBankInfo

改造方向:

  • 当前 submitPullBankInfoTasks 同样收集每个身份证任务的 CompletableFuture
  • 所有任务处理完成后统一执行批次收尾
  • 若本批次至少有一条成功入库流水,则申请一次项目级标签重算

3. 手动触发

新增接口:

  • POST /ccdi/project/tags/rebuild

请求体:

  • projectId 必填
  • modelCode 选填

规则:

  • 未传 modelCode:重算当前项目全部模型
  • modelCode:只重算指定模型

五、并发控制设计

1. 项目级互斥

要求:

  • 同一项目任意时刻只允许一个标签重算任务运行

实现建议:

  • 新增 ProjectTagRebuildCoordinator
  • projectId 作为互斥粒度
  • 结合任务表 RUNNING 状态和进程内互斥结构控制

行为规则:

  • 自动触发时若已有任务在运行:
    • 不启动新的重算
    • 仅把当前运行任务标记为 need_rerun = true
  • 手动触发时若已有任务在运行:
    • 直接返回“当前项目标签正在重算中,请稍后再试”

2. 自动补跑

若自动触发期间项目已有重算任务在运行:

  • 当前任务结束后检查 need_rerun
  • 若为 true,自动再执行一轮项目级全量重算
  • 补跑结束后清空 need_rerun

这样可以同时满足:

  • 防止并发重算
  • 不丢失导入完成后的自动重算机会

六、线程池设计

采用两层调度模型。

1. 项目级任务

每个项目的重算由一个总任务负责:

  • 控制任务生命周期
  • 统一删旧结果
  • 提交规则级任务
  • 汇总执行结果

2. 规则级任务

每条规则使用线程池并行处理。

建议新增独立线程池:

  • tagRuleExecutor

规则级线程任务职责:

  • 读取规则元数据
  • 查询阈值参数
  • 执行 Mapper SQL
  • 批量写入结果表
  • 返回命中数和执行状态

优点:

  • 多条规则之间互不阻塞
  • 后续新增模型时可复用同一套并行框架

七、失败处理设计

1. 自动触发失败

  • 不影响流水已成功入库的数据
  • 标签任务状态标记为 FAILEDPARTIAL_FAILED
  • 记录失败规则和错误信息
  • 不回滚已经成功写入的流水

2. 手动触发失败

  • 接口直接返回失败
  • 任务状态记录失败原因
  • 已成功完成的规则结果保留

3. 单规则失败

  • 不中断其他规则执行
  • 整个任务按“部分失败”汇总

八、异常原因快照设计

结果表中的 reason_detail 必须能直接解释标签来源。

建议格式:

  • 流水级:
    • 摘要命中“购买房产首付款”,对手方“杭州贝壳房地产经纪有限公司”,支出金额 680000.00 元
  • 对象级:
    • 同一交易对手累计流入 60300000.00 元,超过阈值 50000001.00 元,对手方:浙江远望贸易有限公司

额外保存:

  • business_caliber_snapshot
  • hit_value_snapshot

用于后续页面直接展示“规则口径 + 命中值”。

接口设计

1. 手动重算接口

  • 路径:POST /ccdi/project/tags/rebuild
  • 请求体:
    • projectId
    • modelCode 可选
  • 返回:
    • 成功:AjaxResult.success("标签重算任务已提交")
    • 失败:AjaxResult.error("当前项目标签正在重算中,请稍后再试")

2. 结果查询接口

本期不做前端展示,结果查询接口可暂缓;但后续建议预留:

  • 项目标签汇总查询
  • 项目标签明细查询
  • 项目标签任务状态查询

测试设计

后端至少覆盖以下场景:

  • 批量上传所有文件完成后会申请一次项目级重算
  • 拉取本行信息所有任务完成后会申请一次项目级重算
  • 手动接口支持全量重算
  • 手动接口支持按模型重算
  • 同一项目正在重算时,手动接口会被拒绝
  • 同一项目自动触发期间再次触发时会标记补跑
  • 流水级结果写入时会保存 group_id + log_id
  • 对象级结果写入时不会保存 group_id + log_id
  • 重复重算不会出现重复结果
  • 参数修改后再次重算,结果会随阈值变化
  • 单条规则失败时任务状态为 PARTIAL_FAILED

风险与缓解

风险一:同项目高频导入导致连续补跑

缓解:

  • 自动触发期间只维护一个 need_rerun 标记,避免无限累积队列

风险二:规则级并行导致数据库写入压力上升

缓解:

  • 为规则线程池设置合理核心线程数和队列长度
  • 批量插入结果,避免逐条写入

风险三:对象级结果原因信息不足

缓解:

  • 在 XML 查询阶段直接拼出可展示的原因摘要字段

后续扩展方向

  • 新增项目标签汇总页和明细页
  • 结合 ccdi_project 回写项目风险人数统计
  • 支持更多模型接入
  • 支持对象级结果下钻到关联流水集合