新增图谱功能及验收清单

This commit is contained in:
wjj
2026-05-29 18:09:32 +08:00
parent 180a892275
commit 26cd049991
49 changed files with 8220 additions and 83 deletions

View File

@@ -0,0 +1,120 @@
-- ============================================================
-- 资金流图谱 MySQL 结果表
-- 说明:
-- 1. 表名和核心字段保持图谱平台 ODPS SQL 的必要结果表一致。
-- 2. 先从 ODPS 一次性同步已验证的行内流水图谱基座。
-- 3. 后续在纪检 MySQL 内从 ccdi_bank_statement 做增量去重插入。
-- 4. 不增加 project_id资金流图谱按全局 cret_no / 姓名查询。
-- 5. 只保留资金流图谱页面必需表,不依赖 lx_fund_flow_sum_edge。
-- 6. 真实资金边由后端基于 lx_fund_flow_detail_edge.trx_date 实时聚合。
-- 7. 手工资金流向作为主体级汇总边单独存储,不写入真实流水明细表。
-- 8. nullable 扩展字段只服务家庭关系、手工分析和增量来源。
-- ============================================================
CREATE TABLE IF NOT EXISTS lx_fund_flow_subject_node (
object_key VARCHAR(64) NOT NULL COMMENT '主体节点唯一标识通常为证件号MD5或代理主体MD5',
idnocfno VARCHAR(64) NULL COMMENT '证件号/身份证号',
name VARCHAR(255) NULL COMMENT '主体名称',
cinocsno VARCHAR(64) NULL COMMENT '客户内部号',
idno_type VARCHAR(64) NULL COMMENT '主体类型,如个人、企业、代理主体',
staff_id VARCHAR(64) NULL COMMENT '员工标识',
source_type VARCHAR(64) NULL COMMENT '来源类型',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_fund_flow_subject_idnocfno (idnocfno),
KEY idx_lx_fund_flow_subject_name (name),
KEY idx_lx_fund_flow_subject_source_type (source_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱主体节点表';
CREATE TABLE IF NOT EXISTS lx_fund_flow_account_node (
object_key VARCHAR(64) NOT NULL COMMENT '账户节点唯一标识',
acc_no VARCHAR(128) NULL COMMENT '账号',
acc_name VARCHAR(255) NULL COMMENT '账户名称',
cinocsno VARCHAR(64) NULL COMMENT '客户内部号',
source VARCHAR(128) NULL COMMENT '来源',
acc_type VARCHAR(64) NULL COMMENT '账户类型如INTERNAL/EXTERNAL',
acc_idno VARCHAR(64) NULL COMMENT '开户证件号',
acc_status VARCHAR(64) NULL COMMENT '账户状态',
acc_date VARCHAR(32) NULL COMMENT '开户日期',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_fund_flow_account_acc_no (acc_no),
KEY idx_lx_fund_flow_account_acc_name (acc_name),
KEY idx_lx_fund_flow_account_acc_idno (acc_idno)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱账户节点表';
CREATE TABLE IF NOT EXISTS lx_fund_flow_own_account_edge (
object_key VARCHAR(128) NOT NULL COMMENT '主体持有账户边唯一标识',
from_key VARCHAR(128) NOT NULL COMMENT '主体节点key格式 idno_node/{object_key}',
to_key VARCHAR(128) NOT NULL COMMENT '账户节点key格式 account_node/{object_key}',
acc_name VARCHAR(255) NULL COMMENT '账户名称',
acc_no VARCHAR(128) NULL COMMENT '账号',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_fund_flow_own_from_key (from_key),
KEY idx_lx_fund_flow_own_to_key (to_key),
KEY idx_lx_fund_flow_own_acc_no (acc_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱主体持有账户边表';
CREATE TABLE IF NOT EXISTS lx_fund_flow_detail_edge (
object_key VARCHAR(64) NOT NULL COMMENT '账户层交易明细边唯一标识',
le_account_name VARCHAR(255) NULL COMMENT '本方账户名称',
le_account_no VARCHAR(128) NULL COMMENT '本方账号',
customer_account_no VARCHAR(128) NULL COMMENT '对手方账号',
customer_account_name VARCHAR(255) NULL COMMENT '对手方账户名称',
trx_date VARCHAR(32) NULL COMMENT '交易日期',
user_memo VARCHAR(1000) NULL COMMENT '交易摘要',
cash_type VARCHAR(255) NULL COMMENT '交易类型',
amount DECIMAL(19, 2) NULL COMMENT '交易金额',
flag VARCHAR(8) NULL COMMENT '收支方向1支出2收入',
amount_balance DECIMAL(19, 2) NULL COMMENT '交易后余额',
currency VARCHAR(32) NULL COMMENT '币种',
bank VARCHAR(255) NULL COMMENT '银行',
from_key VARCHAR(128) NOT NULL COMMENT '账户层起点key格式 account_node/{object_key}',
to_key VARCHAR(128) NOT NULL COMMENT '账户层终点key格式 account_node/{object_key}',
family_relation_type VARCHAR(64) NULL COMMENT '家庭关系类型,如配偶、父母、子女',
source_table VARCHAR(32) NULL COMMENT '流水来源基座可为空增量可为CCDI_BANK_STATEMENT/FIRST/LEVEL1',
penetrate_level INT NOT NULL DEFAULT 0 COMMENT '穿透层级一期为0',
bank_statement_id BIGINT NULL COMMENT '增量来源ccdi_bank_statement主键',
bank_trx_number VARCHAR(128) NULL COMMENT '银行流水号/交易流水号,如来源存在则记录',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_fund_flow_detail_from_to (from_key, to_key),
KEY idx_lx_fund_flow_detail_from_date (from_key, trx_date),
KEY idx_lx_fund_flow_detail_to_date (to_key, trx_date),
KEY idx_lx_fund_flow_detail_le_acc (le_account_no),
KEY idx_lx_fund_flow_detail_cp_acc (customer_account_no),
KEY idx_lx_fund_flow_detail_trx_date (trx_date),
KEY idx_lx_fund_flow_detail_statement (bank_statement_id),
KEY idx_lx_fund_flow_detail_family (family_relation_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱账户层交易明细边表';
CREATE TABLE IF NOT EXISTS lx_fund_flow_manual_edge (
object_key VARCHAR(64) NOT NULL COMMENT '手工资金流向汇总边唯一标识',
from_object_key VARCHAR(64) NOT NULL COMMENT '起点主体object_key身份证MD5或手工主体MD5',
to_object_key VARCHAR(64) NOT NULL COMMENT '终点主体object_key身份证MD5或手工主体MD5',
from_name VARCHAR(255) NULL COMMENT '起点主体名称冗余',
to_name VARCHAR(255) NULL COMMENT '终点主体名称冗余',
amount DECIMAL(19, 2) NULL COMMENT '手工录入汇总金额',
transaction_count INT NOT NULL DEFAULT 1 COMMENT '手工录入笔数',
direction VARCHAR(8) NOT NULL COMMENT '资金方向1支出2收入',
relation_desc VARCHAR(255) NULL COMMENT '资金流向关系说明',
source_desc VARCHAR(500) NULL COMMENT '手工边来源说明',
remark VARCHAR(1000) NULL COMMENT '分析备注',
source_type VARCHAR(64) NOT NULL DEFAULT 'MANUAL' COMMENT '来源类型固定为MANUAL',
created_by VARCHAR(64) NULL COMMENT '创建人',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_by VARCHAR(64) NULL COMMENT '更新人',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_fund_flow_manual_from (from_object_key),
KEY idx_lx_fund_flow_manual_to (to_object_key),
KEY idx_lx_fund_flow_manual_pair_direction (from_object_key, to_object_key, direction),
KEY idx_lx_fund_flow_manual_source_type (source_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱手工资金流向汇总边表';

View File

@@ -0,0 +1,553 @@
-- ============================================================
-- 资金流图谱测试数据
-- 说明:
-- 1. 仅用于开发联调,来源为当前库内 ccdi_bank_statement 与 ccdi_staff_fmy_relation。
-- 2. 不修改 ccdi_bank_statement 原始流水。
-- 3. 只写入资金流图谱必要四表:
-- lx_fund_flow_subject_node
-- lx_fund_flow_account_node
-- lx_fund_flow_own_account_edge
-- lx_fund_flow_detail_edge
-- 4. 测试数据 source_table/source_type/source 标记为 GRAPH_TEST可重复执行。
-- 5. 默认模拟“全部流水”查询,同时覆盖日期筛选、家庭关系标签和节点穿透。
-- ============================================================
-- 清理上一轮 GRAPH_TEST 测试数据。只清理测试标记数据,不影响 ODPS 基座。
DROP TEMPORARY TABLE IF EXISTS graph_test_old_node_key;
CREATE TEMPORARY TABLE graph_test_old_node_key AS
SELECT object_key
FROM lx_fund_flow_subject_node
WHERE source_type LIKE 'GRAPH_TEST%';
DROP TEMPORARY TABLE IF EXISTS graph_test_old_account_key;
CREATE TEMPORARY TABLE graph_test_old_account_key AS
SELECT object_key
FROM lx_fund_flow_account_node
WHERE source LIKE 'GRAPH_TEST%';
DELETE FROM lx_fund_flow_detail_edge
WHERE source_table = 'GRAPH_TEST';
DELETE FROM lx_fund_flow_own_account_edge
WHERE from_key IN (
SELECT CONCAT('idno_node/', object_key)
FROM graph_test_old_node_key
)
OR to_key IN (
SELECT CONCAT('account_node/', object_key)
FROM graph_test_old_account_key
);
DELETE FROM lx_fund_flow_account_node
WHERE object_key IN (
SELECT object_key
FROM graph_test_old_account_key
);
DELETE FROM lx_fund_flow_subject_node
WHERE object_key IN (
SELECT object_key
FROM graph_test_old_node_key
);
-- 选择一个同时具备流水和家庭关系的员工作为中心主体。
DROP TEMPORARY TABLE IF EXISTS graph_seed_subject;
CREATE TEMPORARY TABLE graph_seed_subject AS
SELECT
bs.cret_no AS person_id,
COALESCE(NULLIF(MAX(TRIM(staff.name)), ''), NULLIF(MAX(TRIM(bs.LE_ACCOUNT_NAME)), ''), '测试主体') AS person_name,
COUNT(1) AS trx_cnt
FROM ccdi_bank_statement bs
INNER JOIN ccdi_base_staff staff
ON staff.id_card = bs.cret_no
INNER JOIN ccdi_staff_fmy_relation r
ON r.person_id = bs.cret_no
AND r.status = 1
AND r.relation_cert_no IS NOT NULL
AND TRIM(r.relation_cert_no) <> ''
WHERE bs.cret_no IS NOT NULL
AND TRIM(bs.cret_no) <> ''
AND bs.LE_ACCOUNT_NO IS NOT NULL
AND TRIM(bs.LE_ACCOUNT_NO) <> ''
AND bs.CUSTOMER_ACCOUNT_NAME IS NOT NULL
AND TRIM(bs.CUSTOMER_ACCOUNT_NAME) NOT IN ('', '0')
AND (bs.AMOUNT_DR > 0 OR bs.AMOUNT_CR > 0)
AND TRIM(bs.cret_no) = '617673198109148314'
GROUP BY bs.cret_no
ORDER BY trx_cnt DESC
LIMIT 1;
-- 选 3 个家庭关系人,优先配偶。
DROP TEMPORARY TABLE IF EXISTS graph_seed_counterparty;
CREATE TEMPORARY TABLE graph_seed_counterparty (
cp_no INT NOT NULL AUTO_INCREMENT PRIMARY KEY,
person_id VARCHAR(100) NOT NULL,
counterparty_type VARCHAR(32) NOT NULL,
relation_type VARCHAR(50) NULL,
counterparty_name VARCHAR(100) NOT NULL,
counterparty_cert_no VARCHAR(50) NULL,
counterparty_account_no VARCHAR(128) NOT NULL,
counterparty_subject_key VARCHAR(64) NOT NULL,
counterparty_account_key VARCHAR(64) NOT NULL,
can_expand TINYINT NOT NULL DEFAULT 0
);
INSERT INTO graph_seed_counterparty (
person_id,
counterparty_type,
relation_type,
counterparty_name,
counterparty_cert_no,
counterparty_account_no,
counterparty_subject_key,
counterparty_account_key,
can_expand
)
SELECT
person_id,
'FAMILY' AS counterparty_type,
relation_type,
relation_name AS counterparty_name,
relation_cert_no AS counterparty_cert_no,
CONCAT('FMY_', RIGHT(relation_cert_no, 12)) AS counterparty_account_no,
MD5(TRIM(relation_cert_no)) AS counterparty_subject_key,
MD5(CONCAT('FAMILY_ACCOUNT|', TRIM(relation_cert_no), '|', relation_name)) AS counterparty_account_key,
1 AS can_expand
FROM (
SELECT
r.person_id,
TRIM(r.relation_type) AS relation_type,
TRIM(r.relation_name) AS relation_name,
TRIM(r.relation_cert_no) AS relation_cert_no,
ROW_NUMBER() OVER (
PARTITION BY r.person_id
ORDER BY
CASE WHEN r.relation_type = '配偶' THEN 0 ELSE 1 END,
r.id
) AS rn
FROM ccdi_staff_fmy_relation r
INNER JOIN graph_seed_subject s
ON s.person_id = r.person_id
WHERE r.status = 1
AND r.relation_name IS NOT NULL
AND TRIM(r.relation_name) <> ''
AND r.relation_cert_no IS NOT NULL
AND TRIM(r.relation_cert_no) <> ''
) x
WHERE rn <= 3;
-- 再选 6 个普通对手方,模拟资金图里非家庭关系节点。
DROP TEMPORARY TABLE IF EXISTS graph_seed_family_name;
CREATE TEMPORARY TABLE graph_seed_family_name AS
SELECT DISTINCT counterparty_name
FROM graph_seed_counterparty;
INSERT INTO graph_seed_counterparty (
person_id,
counterparty_type,
relation_type,
counterparty_name,
counterparty_cert_no,
counterparty_account_no,
counterparty_subject_key,
counterparty_account_key,
can_expand
)
SELECT
person_id,
'NORMAL' AS counterparty_type,
NULL AS relation_type,
counterparty_name,
NULL AS counterparty_cert_no,
counterparty_account_no,
MD5(CONCAT('NAME_PROXY|', person_id, '|', counterparty_name)) AS counterparty_subject_key,
MD5(CONCAT('NAME_ONLY_ACC|', person_id, '|', counterparty_name, '|', counterparty_account_no)) AS counterparty_account_key,
0 AS can_expand
FROM (
SELECT
y.person_id,
y.counterparty_name,
COALESCE(y.raw_account_no, CONCAT('NAME_ONLY_', y.rn)) AS counterparty_account_no,
y.rn
FROM (
SELECT
s.person_id,
TRIM(bs.CUSTOMER_ACCOUNT_NAME) AS counterparty_name,
MAX(NULLIF(TRIM(bs.CUSTOMER_ACCOUNT_NO), '')) AS raw_account_no,
ROW_NUMBER() OVER (
ORDER BY COUNT(1) DESC, MAX(bs.bank_statement_id)
) AS rn
FROM graph_seed_subject s
INNER JOIN ccdi_bank_statement bs
ON bs.cret_no = s.person_id
LEFT JOIN graph_seed_family_name f
ON f.counterparty_name = TRIM(bs.CUSTOMER_ACCOUNT_NAME)
WHERE bs.CUSTOMER_ACCOUNT_NAME IS NOT NULL
AND TRIM(bs.CUSTOMER_ACCOUNT_NAME) NOT IN ('', '0')
AND f.counterparty_name IS NULL
AND (bs.AMOUNT_DR > 0 OR bs.AMOUNT_CR > 0)
GROUP BY
s.person_id,
TRIM(bs.CUSTOMER_ACCOUNT_NAME)
) y
) x
WHERE rn <= 6;
-- 取中心员工真实流水作为金额、方向、日期来源,并映射到家庭/普通对手方。
DROP TEMPORARY TABLE IF EXISTS graph_seed_statement;
CREATE TEMPORARY TABLE graph_seed_statement AS
SELECT *
FROM (
SELECT
y.*,
ROW_NUMBER() OVER (
PARTITION BY y.cp_no
ORDER BY y.flag, y.trx_date, y.bank_statement_id
) AS rn
FROM (
SELECT
x.*,
ROW_NUMBER() OVER (
PARTITION BY x.cp_no, x.flag
ORDER BY x.trx_date, x.bank_statement_id
) AS direction_rn
FROM (
SELECT
cp.cp_no,
bs.bank_statement_id,
bs.cret_no,
COALESCE(NULLIF(TRIM(subj.person_name), ''), NULLIF(TRIM(bs.LE_ACCOUNT_NAME), ''), '彭静勇') AS le_account_name,
TRIM(bs.LE_ACCOUNT_NO) AS le_account_no,
cp.counterparty_type,
cp.relation_type,
cp.counterparty_name AS customer_account_name,
cp.counterparty_account_no AS customer_account_no,
cp.counterparty_cert_no,
cp.counterparty_subject_key,
cp.counterparty_account_key,
cp.can_expand,
bs.TRX_DATE AS trx_date,
bs.USER_MEMO AS user_memo,
bs.CASH_TYPE AS cash_type,
CASE
WHEN bs.AMOUNT_DR > 0 THEN bs.AMOUNT_DR
WHEN bs.AMOUNT_CR > 0 THEN bs.AMOUNT_CR
ELSE 0
END AS amount,
CASE
WHEN bs.AMOUNT_DR > 0 THEN '1'
WHEN bs.AMOUNT_CR > 0 THEN '2'
ELSE NULL
END AS flag,
bs.AMOUNT_BALANCE AS amount_balance,
bs.CURRENCY AS currency,
bs.BANK AS bank,
bs.BANK_TRX_NUMBER AS bank_trx_number
FROM graph_seed_counterparty cp
INNER JOIN graph_seed_subject subj
ON subj.person_id = cp.person_id
INNER JOIN ccdi_bank_statement bs
ON bs.cret_no = cp.person_id
WHERE bs.cret_no IS NOT NULL
AND TRIM(bs.cret_no) <> ''
AND bs.LE_ACCOUNT_NO IS NOT NULL
AND TRIM(bs.LE_ACCOUNT_NO) <> ''
AND (bs.AMOUNT_DR > 0 OR bs.AMOUNT_CR > 0)
) x
WHERE x.flag IS NOT NULL
) y
WHERE (y.flag = '1' AND y.direction_rn <= 5)
OR (y.flag = '2' AND y.direction_rn <= 3)
) x
WHERE x.rn <= 8;
-- 主体点:员工本人。
INSERT IGNORE INTO lx_fund_flow_subject_node (
object_key,
idnocfno,
name,
cinocsno,
idno_type,
staff_id,
source_type
)
SELECT
MD5(TRIM(s.person_id)) AS object_key,
TRIM(s.person_id) AS idnocfno,
s.person_name AS name,
NULL AS cinocsno,
'个人' AS idno_type,
NULL AS staff_id,
'GRAPH_TEST_EMPLOYEE' AS source_type
FROM graph_seed_subject s;
-- 主体点:家庭关系人和普通对手方。
INSERT IGNORE INTO lx_fund_flow_subject_node (
object_key,
idnocfno,
name,
cinocsno,
idno_type,
staff_id,
source_type
)
SELECT DISTINCT
cp.counterparty_subject_key AS object_key,
cp.counterparty_cert_no AS idnocfno,
cp.counterparty_name AS name,
NULL AS cinocsno,
CASE WHEN cp.counterparty_cert_no IS NOT NULL THEN '个人' ELSE 'NAME_PROXY' END AS idno_type,
NULL AS staff_id,
CASE
WHEN cp.counterparty_type = 'FAMILY' THEN CONCAT('GRAPH_TEST_FAMILY_', cp.relation_type)
ELSE 'GRAPH_TEST_COUNTERPARTY'
END AS source_type
FROM graph_seed_counterparty cp;
-- 账户点:员工真实账户。
INSERT IGNORE INTO lx_fund_flow_account_node (
object_key,
acc_no,
acc_name,
cinocsno,
source,
acc_type,
acc_idno,
acc_status,
acc_date
)
SELECT DISTINCT
MD5(CONCAT(TRIM(s.le_account_no), TRIM(s.le_account_name))) AS object_key,
TRIM(s.le_account_no) AS acc_no,
TRIM(s.le_account_name) AS acc_name,
NULL AS cinocsno,
'GRAPH_TEST_BANK_STATEMENT' AS source,
'INTERNAL' AS acc_type,
TRIM(s.cret_no) AS acc_idno,
NULL AS acc_status,
NULL AS acc_date
FROM graph_seed_statement s;
-- 账户点:对手方账户。
INSERT IGNORE INTO lx_fund_flow_account_node (
object_key,
acc_no,
acc_name,
cinocsno,
source,
acc_type,
acc_idno,
acc_status,
acc_date
)
SELECT DISTINCT
s.counterparty_account_key AS object_key,
s.customer_account_no AS acc_no,
s.customer_account_name AS acc_name,
NULL AS cinocsno,
CASE
WHEN s.counterparty_type = 'FAMILY' THEN 'GRAPH_TEST_FAMILY_RELATION'
ELSE 'GRAPH_TEST_COUNTERPARTY'
END AS source,
'EXTERNAL' AS acc_type,
s.counterparty_cert_no AS acc_idno,
NULL AS acc_status,
NULL AS acc_date
FROM graph_seed_statement s;
-- 持有边:员工主体 -> 员工账户。
INSERT IGNORE INTO lx_fund_flow_own_account_edge (
object_key,
from_key,
to_key,
acc_name,
acc_no
)
SELECT DISTINCT
MD5(CONCAT(MD5(TRIM(s.cret_no)), '|', MD5(CONCAT(TRIM(s.le_account_no), TRIM(s.le_account_name))))) AS object_key,
CONCAT('idno_node/', MD5(TRIM(s.cret_no))) AS from_key,
CONCAT('account_node/', MD5(CONCAT(TRIM(s.le_account_no), TRIM(s.le_account_name)))) AS to_key,
TRIM(s.le_account_name) AS acc_name,
TRIM(s.le_account_no) AS acc_no
FROM graph_seed_statement s;
-- 持有边:对手方主体 -> 对手方账户。
INSERT IGNORE INTO lx_fund_flow_own_account_edge (
object_key,
from_key,
to_key,
acc_name,
acc_no
)
SELECT DISTINCT
MD5(CONCAT(s.counterparty_subject_key, '|', s.counterparty_account_key)) AS object_key,
CONCAT('idno_node/', s.counterparty_subject_key) AS from_key,
CONCAT('account_node/', s.counterparty_account_key) AS to_key,
s.customer_account_name AS acc_name,
s.customer_account_no AS acc_no
FROM graph_seed_statement s;
-- 账户层明细边:按流水方向生成 from_key / to_key并写入家庭关系类型。
INSERT IGNORE INTO lx_fund_flow_detail_edge (
object_key,
le_account_name,
le_account_no,
customer_account_no,
customer_account_name,
trx_date,
user_memo,
cash_type,
amount,
flag,
amount_balance,
currency,
bank,
from_key,
to_key,
family_relation_type,
source_table,
penetrate_level,
bank_statement_id,
bank_trx_number
)
SELECT
MD5(CONCAT('GRAPH_TEST|', s.bank_statement_id, '|', s.counterparty_subject_key, '|', s.rn)) AS object_key,
s.le_account_name,
s.le_account_no,
s.customer_account_no,
s.customer_account_name,
SUBSTRING(s.trx_date, 1, 19) AS trx_date,
s.user_memo,
s.cash_type,
s.amount,
s.flag,
s.amount_balance,
s.currency,
s.bank,
CASE
WHEN s.flag = '1'
THEN CONCAT('account_node/', MD5(CONCAT(TRIM(s.le_account_no), TRIM(s.le_account_name))))
ELSE CONCAT('account_node/', s.counterparty_account_key)
END AS from_key,
CASE
WHEN s.flag = '1'
THEN CONCAT('account_node/', s.counterparty_account_key)
ELSE CONCAT('account_node/', MD5(CONCAT(TRIM(s.le_account_no), TRIM(s.le_account_name))))
END AS to_key,
s.relation_type AS family_relation_type,
'GRAPH_TEST' AS source_table,
0 AS penetrate_level,
s.bank_statement_id,
s.bank_trx_number
FROM graph_seed_statement s
WHERE s.flag IS NOT NULL
AND s.amount > 0;
-- 一层穿透演示数据:用于验证点击“淘宝”后继续展开到商户节点。
SELECT object_key
INTO @taobao_node
FROM lx_fund_flow_subject_node
WHERE name = ('淘宝' COLLATE utf8mb4_general_ci)
ORDER BY updated_time DESC
LIMIT 1;
SELECT SUBSTRING_INDEX(own.to_key, '/', -1)
INTO @taobao_account_key
FROM lx_fund_flow_own_account_edge own
WHERE own.from_key = (CONCAT('idno_node/', @taobao_node) COLLATE utf8mb4_general_ci)
LIMIT 1;
SET @merchant_a_node := MD5('GRAPH_TEST_MERCHANT_A');
SET @merchant_b_node := MD5('GRAPH_TEST_MERCHANT_B');
SET @merchant_a_acc := MD5('GRAPH_TEST_MERCHANT_A_ACC');
SET @merchant_b_acc := MD5('GRAPH_TEST_MERCHANT_B_ACC');
INSERT INTO lx_fund_flow_subject_node (object_key, idnocfno, name, cinocsno, idno_type, staff_id, source_type)
VALUES
(@merchant_a_node, NULL, '商户A', NULL, 'NAME_PROXY', NULL, 'GRAPH_TEST_TRACE_COUNTERPARTY'),
(@merchant_b_node, NULL, '商户B', NULL, 'NAME_PROXY', NULL, 'GRAPH_TEST_TRACE_COUNTERPARTY')
ON DUPLICATE KEY UPDATE name = VALUES(name), source_type = VALUES(source_type), updated_time = CURRENT_TIMESTAMP;
INSERT INTO lx_fund_flow_account_node (object_key, acc_no, acc_name, source, acc_type)
VALUES
(@merchant_a_acc, 'TRACE-A-001', '商户A', 'GRAPH_TEST', 'EXTERNAL'),
(@merchant_b_acc, 'TRACE-B-001', '商户B', 'GRAPH_TEST', 'EXTERNAL')
ON DUPLICATE KEY UPDATE acc_name = VALUES(acc_name), source = VALUES(source), updated_time = CURRENT_TIMESTAMP;
INSERT INTO lx_fund_flow_own_account_edge (object_key, from_key, to_key, acc_name, acc_no)
VALUES
(MD5(CONCAT(@merchant_a_node, '|', @merchant_a_acc)), CONCAT('idno_node/', @merchant_a_node), CONCAT('account_node/', @merchant_a_acc), '商户A', 'TRACE-A-001'),
(MD5(CONCAT(@merchant_b_node, '|', @merchant_b_acc)), CONCAT('idno_node/', @merchant_b_node), CONCAT('account_node/', @merchant_b_acc), '商户B', 'TRACE-B-001')
ON DUPLICATE KEY UPDATE acc_name = VALUES(acc_name), acc_no = VALUES(acc_no), updated_time = CURRENT_TIMESTAMP;
INSERT INTO lx_fund_flow_detail_edge (
object_key,
le_account_name,
le_account_no,
customer_account_no,
customer_account_name,
trx_date,
user_memo,
cash_type,
amount,
flag,
amount_balance,
currency,
bank,
from_key,
to_key,
family_relation_type,
source_table,
penetrate_level,
bank_statement_id,
bank_trx_number
)
VALUES
(MD5('GRAPH_TEST_TAOBAO_TRACE_001'), '淘宝', '淘宝账户', 'TRACE-A-001', '商户A', '2025-03-20 09:18:00', '货款结算', '转账', 12880.00, '1', 0, 'CNY', '测试银行', CONCAT('account_node/', @taobao_account_key), CONCAT('account_node/', @merchant_a_acc), NULL, 'GRAPH_TEST', 1, NULL, 'TRACE001'),
(MD5('GRAPH_TEST_TAOBAO_TRACE_002'), '商户A', 'TRACE-A-001', '淘宝账户', '淘宝', '2025-03-20 16:42:00', '退款冲正', '退款', 3660.50, '2', 0, 'CNY', '测试银行', CONCAT('account_node/', @merchant_a_acc), CONCAT('account_node/', @taobao_account_key), NULL, 'GRAPH_TEST', 1, NULL, 'TRACE002'),
(MD5('GRAPH_TEST_TAOBAO_TRACE_003'), '淘宝', '淘宝账户', 'TRACE-B-001', '商户B', '2025-03-21 11:25:00', '服务费结算', '转账', 8290.30, '1', 0, 'CNY', '测试银行', CONCAT('account_node/', @taobao_account_key), CONCAT('account_node/', @merchant_b_acc), NULL, 'GRAPH_TEST', 1, NULL, 'TRACE003')
ON DUPLICATE KEY UPDATE
le_account_name = VALUES(le_account_name),
le_account_no = VALUES(le_account_no),
customer_account_no = VALUES(customer_account_no),
customer_account_name = VALUES(customer_account_name),
trx_date = VALUES(trx_date),
user_memo = VALUES(user_memo),
cash_type = VALUES(cash_type),
amount = VALUES(amount),
flag = VALUES(flag),
from_key = VALUES(from_key),
to_key = VALUES(to_key),
source_table = VALUES(source_table),
penetrate_level = VALUES(penetrate_level),
bank_trx_number = VALUES(bank_trx_number),
updated_time = CURRENT_TIMESTAMP;
SELECT 'lx_fund_flow_subject_node' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_fund_flow_subject_node
WHERE source_type LIKE 'GRAPH_TEST%'
UNION ALL
SELECT 'lx_fund_flow_account_node' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_fund_flow_account_node
WHERE source LIKE 'GRAPH_TEST%'
UNION ALL
SELECT 'lx_fund_flow_own_account_edge' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_fund_flow_own_account_edge e
WHERE e.from_key IN (
SELECT CONCAT('idno_node/', object_key)
FROM lx_fund_flow_subject_node
WHERE source_type LIKE 'GRAPH_TEST%'
)
OR e.to_key IN (
SELECT CONCAT('account_node/', object_key)
FROM lx_fund_flow_account_node
WHERE source LIKE 'GRAPH_TEST%'
)
UNION ALL
SELECT 'lx_fund_flow_detail_edge' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_fund_flow_detail_edge
WHERE source_table = 'GRAPH_TEST';
SELECT
s.person_id AS test_person_id,
s.person_name AS test_person_name,
MD5(TRIM(s.person_id)) AS test_object_key
FROM graph_seed_subject s;

View File

@@ -0,0 +1,129 @@
-- ============================================================
-- 关系图谱 MySQL 结果表
-- 说明:
-- 1. 表名和核心字段保持关系图谱平台 SQL 口径。
-- 2. lx_rel_node 为统一点表,家庭、工商等来源都落同一张点表。
-- 3. 边表按关系类型拆分,页面查询时由后端统一读取并合并展示。
-- 4. 家庭关系个人点 object_key 使用身份证号 MD5便于和资金图谱主体口径对齐。
-- 5. 家庭边按两端 object_key 无向去重,避免夫妻双方都是员工时出现两条边。
-- ============================================================
CREATE TABLE IF NOT EXISTS lx_rel_node (
object_key VARCHAR(128) NOT NULL COMMENT '节点唯一标识家庭个人为身份证MD5工商企业/股东/法人沿用图谱源主键',
node_name VARCHAR(255) NULL COMMENT '节点名称',
id_number VARCHAR(128) NULL COMMENT '证件号或统一社会信用代码',
subject_type VARCHAR(32) NULL COMMENT '主体类型,如个人、企业',
source_type VARCHAR(255) NULL COMMENT '来源类型,可聚合多个来源值',
detail_ref_type VARCHAR(64) NULL COMMENT '详情来源类型,如 STAFF_FAMILY、ENTERPRISE_BASE、ECI_COMPANY',
detail_ref_key VARCHAR(128) NULL COMMENT '详情来源主键如身份证号、统一社会信用代码、company_id',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_rel_node_name (node_name),
KEY idx_lx_rel_node_id_number (id_number),
KEY idx_lx_rel_node_subject_type (subject_type),
KEY idx_lx_rel_node_source_type (source_type),
KEY idx_lx_rel_node_detail_ref (detail_ref_type, detail_ref_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='关系图谱统一节点表';
-- 兼容已提前建过旧版 lx_rel_node 的生产环境IF NOT EXISTS 不会补新增字段。
SET @current_schema = DATABASE();
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_rel_node' AND COLUMN_NAME = 'detail_ref_type') = 0,
'ALTER TABLE lx_rel_node ADD COLUMN detail_ref_type VARCHAR(64) NULL COMMENT ''详情来源类型,如 STAFF_FAMILY、ENTERPRISE_BASE、ECI_COMPANY'' AFTER source_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_rel_node' AND COLUMN_NAME = 'detail_ref_key') = 0,
'ALTER TABLE lx_rel_node ADD COLUMN detail_ref_key VARCHAR(128) NULL COMMENT ''详情来源主键如身份证号、统一社会信用代码、company_id'' AFTER detail_ref_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_rel_node' AND INDEX_NAME = 'idx_lx_rel_node_detail_ref') = 0,
'ALTER TABLE lx_rel_node ADD KEY idx_lx_rel_node_detail_ref (detail_ref_type, detail_ref_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
CREATE TABLE IF NOT EXISTS lx_rel_family_edge (
object_key VARCHAR(64) NOT NULL COMMENT '家庭关系边唯一标识按两端节点无向MD5生成',
from_key VARCHAR(160) NOT NULL COMMENT '起点,格式 rel_node/{object_key}',
to_key VARCHAR(160) NOT NULL COMMENT '终点,格式 rel_node/{object_key}',
person_id VARCHAR(255) NULL COMMENT '员工身份证号;双向归并时可为多个证件号拼接',
relation_name VARCHAR(255) NULL COMMENT '关系人姓名;双向归并时可为多个姓名拼接',
relation_type VARCHAR(255) NULL COMMENT '关系类型;双向归并时可为多个关系拼接',
relation_cert_no VARCHAR(255) NULL COMMENT '关系人证件号;双向归并时可为多个证件号拼接',
gender VARCHAR(32) NULL COMMENT '性别',
birth_date VARCHAR(32) NULL COMMENT '出生日期',
relation_cert_type VARCHAR(64) NULL COMMENT '关系人证件类型',
mobile_phone1 VARCHAR(64) NULL COMMENT '手机号码1',
mobile_phone2 VARCHAR(64) NULL COMMENT '手机号码2',
wechat_no1 VARCHAR(128) NULL COMMENT '微信名称1',
wechat_no2 VARCHAR(128) NULL COMMENT '微信名称2',
wechat_no3 VARCHAR(128) NULL COMMENT '微信名称3',
contact_address VARCHAR(500) NULL COMMENT '联系地址',
annual_income DECIMAL(15,2) NULL COMMENT '家庭成员年收入(元/年)',
relation_desc VARCHAR(500) NULL COMMENT '关系详细描述',
status VARCHAR(16) NULL COMMENT '状态',
effective_date VARCHAR(32) NULL COMMENT '关系生效日期',
invalid_date VARCHAR(32) NULL COMMENT '关系失效日期',
remark TEXT NULL COMMENT '备注',
data_source VARCHAR(64) NULL COMMENT '数据来源',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_rel_family_from_key (from_key),
KEY idx_lx_rel_family_to_key (to_key),
KEY idx_lx_rel_family_person_id (person_id),
KEY idx_lx_rel_family_relation_cert_no (relation_cert_no),
KEY idx_lx_rel_family_relation_type (relation_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='关系图谱家庭关系边表';
CREATE TABLE IF NOT EXISTS lx_rel_stock_edge (
object_key VARCHAR(191) NOT NULL COMMENT '股东关系边唯一标识沿用图谱SQL的 company_id + p_key_no + stock_type 口径',
from_key VARCHAR(160) NOT NULL COMMENT '起点股东节点key格式 rel_node/{p_key_no}',
to_key VARCHAR(160) NOT NULL COMMENT '终点企业节点key格式 rel_node/{company_id}',
company_name VARCHAR(255) NULL COMMENT '企业名称',
stock_name VARCHAR(255) NULL COMMENT '股东',
stock_type VARCHAR(64) NULL COMMENT '股东类型',
stock_percent VARCHAR(64) NULL COMMENT '持股比例',
should_capi VARCHAR(128) NULL COMMENT '认缴出资额',
should_capi_value VARCHAR(128) NULL COMMENT '认缴出资额数值',
should_capi_unit VARCHAR(64) NULL COMMENT '认缴出资额单位',
shoud_date VARCHAR(64) NULL COMMENT '认缴出资日期',
p_key_no VARCHAR(128) NULL COMMENT '股东keyno',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_rel_stock_from_key (from_key),
KEY idx_lx_rel_stock_to_key (to_key),
KEY idx_lx_rel_stock_p_key_no (p_key_no),
KEY idx_lx_rel_stock_type (stock_type)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='关系图谱股东持股边表';
CREATE TABLE IF NOT EXISTS lx_rel_represent_edge (
object_key VARCHAR(191) NOT NULL COMMENT '法人关系边唯一标识沿用图谱SQL的 company_id + oper_key_no 口径',
from_key VARCHAR(160) NOT NULL COMMENT '起点法人节点key格式 rel_node/{oper_key_no}',
to_key VARCHAR(160) NOT NULL COMMENT '终点企业节点key格式 rel_node/{company_id}',
oper_name VARCHAR(255) NULL COMMENT '法定代表人姓名',
oper_key_no VARCHAR(128) NULL COMMENT '法定代表人keyno',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key),
KEY idx_lx_rel_represent_from_key (from_key),
KEY idx_lx_rel_represent_to_key (to_key),
KEY idx_lx_rel_represent_oper_key_no (oper_key_no)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='关系图谱法定代表人边表';

View File

@@ -0,0 +1,476 @@
-- ============================================================
-- 关系图谱 MySQL 构建脚本
-- 说明:
-- 1. 执行前先执行 03_lx_relation_graph_mysql_ddl.sql。
-- 2. 本脚本优先写入家庭关系点边,再写入工商股东/法人点边。
-- 3. 点表和边表通过主键与聚合逻辑保证去重,可重复执行。
-- 4. 家庭个人点 object_key = MD5(身份证号)。
-- 5. 家庭边按两端 object_key 无向归并,夫妻双方都是员工时只保留一条边。
-- 6. 工商源表沿用现有关系图谱 SQLsjfx_pro.t_eci_company_orc / sjfx_pro.t_eci_partner_orc。
-- ============================================================
SET SESSION group_concat_max_len = 8192;
-- ============================================================
-- Step 1. 家庭关系去重
-- 仅有证件号的家庭成员入图;无证件号无法生成身份证 MD5本轮不入图。
-- ============================================================
DROP TEMPORARY TABLE IF EXISTS tmp_rel_family_dedup;
CREATE TEMPORARY TABLE tmp_rel_family_dedup AS
SELECT
id,
TRIM(person_id) AS person_id,
TRIM(relation_type) AS relation_type,
TRIM(relation_name) AS relation_name,
TRIM(gender) AS gender,
DATE_FORMAT(birth_date, '%Y-%m-%d') AS birth_date,
TRIM(relation_cert_type) AS relation_cert_type,
TRIM(relation_cert_no) AS relation_cert_no,
TRIM(mobile_phone1) AS mobile_phone1,
TRIM(mobile_phone2) AS mobile_phone2,
TRIM(wechat_no1) AS wechat_no1,
TRIM(wechat_no2) AS wechat_no2,
TRIM(wechat_no3) AS wechat_no3,
TRIM(contact_address) AS contact_address,
annual_income,
TRIM(relation_desc) AS relation_desc,
status,
DATE_FORMAT(effective_date, '%Y-%m-%d %H:%i:%s') AS effective_date,
DATE_FORMAT(invalid_date, '%Y-%m-%d %H:%i:%s') AS invalid_date,
TRIM(remark) AS remark,
TRIM(data_source) AS data_source,
create_time,
update_time
FROM (
SELECT
r.*,
ROW_NUMBER() OVER (
PARTITION BY
TRIM(r.person_id),
TRIM(r.relation_type),
TRIM(r.relation_name),
TRIM(r.relation_cert_no)
ORDER BY r.update_time DESC, r.create_time DESC, r.id DESC
) AS rn
FROM ccdi_staff_fmy_relation r
WHERE r.status = 1
AND r.person_id IS NOT NULL
AND TRIM(r.person_id) <> ''
AND r.relation_type IS NOT NULL
AND TRIM(r.relation_type) <> ''
AND r.relation_name IS NOT NULL
AND TRIM(r.relation_name) <> ''
AND r.relation_cert_no IS NOT NULL
AND TRIM(r.relation_cert_no) <> ''
) x
WHERE x.rn = 1;
-- ============================================================
-- Step 2. 家庭关系点:员工本人优先入图
-- ============================================================
INSERT INTO lx_rel_node (
object_key,
node_name,
id_number,
subject_type,
source_type,
detail_ref_type,
detail_ref_key
)
SELECT
MD5(f.person_id) AS object_key,
COALESCE(NULLIF(MAX(TRIM(staff.name)), ''), f.person_id) AS node_name,
f.person_id AS id_number,
'个人' AS subject_type,
'员工家庭关系-员工' AS source_type,
'STAFF_FAMILY' AS detail_ref_type,
f.person_id AS detail_ref_key
FROM tmp_rel_family_dedup f
LEFT JOIN ccdi_base_staff staff
ON staff.id_card = f.person_id
GROUP BY f.person_id
ON DUPLICATE KEY UPDATE
node_name = COALESCE(NULLIF(VALUES(node_name), ''), lx_rel_node.node_name),
id_number = COALESCE(NULLIF(VALUES(id_number), ''), lx_rel_node.id_number),
subject_type = COALESCE(NULLIF(lx_rel_node.subject_type, ''), VALUES(subject_type)),
source_type = CASE
WHEN lx_rel_node.source_type IS NULL OR lx_rel_node.source_type = '' THEN VALUES(source_type)
WHEN FIND_IN_SET(VALUES(source_type), lx_rel_node.source_type) > 0 THEN lx_rel_node.source_type
ELSE CONCAT(lx_rel_node.source_type, ',', VALUES(source_type))
END,
detail_ref_type = COALESCE(NULLIF(lx_rel_node.detail_ref_type, ''), VALUES(detail_ref_type)),
detail_ref_key = COALESCE(NULLIF(lx_rel_node.detail_ref_key, ''), VALUES(detail_ref_key)),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 3. 家庭关系点:家属入图
-- 如果家属同时也是员工主键一致source_type 会合并。
-- ============================================================
INSERT INTO lx_rel_node (
object_key,
node_name,
id_number,
subject_type,
source_type,
detail_ref_type,
detail_ref_key
)
SELECT
MD5(f.relation_cert_no) AS object_key,
COALESCE(NULLIF(MAX(TRIM(staff.name)), ''), MAX(f.relation_name)) AS node_name,
f.relation_cert_no AS id_number,
'个人' AS subject_type,
'员工家庭关系-成员' AS source_type,
'STAFF_FAMILY' AS detail_ref_type,
f.relation_cert_no AS detail_ref_key
FROM tmp_rel_family_dedup f
LEFT JOIN ccdi_base_staff staff
ON staff.id_card = f.relation_cert_no
GROUP BY f.relation_cert_no
ON DUPLICATE KEY UPDATE
node_name = COALESCE(NULLIF(VALUES(node_name), ''), lx_rel_node.node_name),
id_number = COALESCE(NULLIF(VALUES(id_number), ''), lx_rel_node.id_number),
subject_type = COALESCE(NULLIF(lx_rel_node.subject_type, ''), VALUES(subject_type)),
source_type = CASE
WHEN lx_rel_node.source_type IS NULL OR lx_rel_node.source_type = '' THEN VALUES(source_type)
WHEN FIND_IN_SET(VALUES(source_type), lx_rel_node.source_type) > 0 THEN lx_rel_node.source_type
ELSE CONCAT(lx_rel_node.source_type, ',', VALUES(source_type))
END,
detail_ref_type = COALESCE(NULLIF(lx_rel_node.detail_ref_type, ''), VALUES(detail_ref_type)),
detail_ref_key = COALESCE(NULLIF(lx_rel_node.detail_ref_key, ''), VALUES(detail_ref_key)),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 4. 家庭关系边:按两端 object_key 无向归并
-- ============================================================
DROP TEMPORARY TABLE IF EXISTS tmp_rel_family_edge_rows;
CREATE TEMPORARY TABLE tmp_rel_family_edge_rows AS
SELECT
MD5(f.person_id) AS person_object_key,
MD5(f.relation_cert_no) AS relation_object_key,
LEAST(MD5(f.person_id), MD5(f.relation_cert_no)) AS from_object_key,
GREATEST(MD5(f.person_id), MD5(f.relation_cert_no)) AS to_object_key,
f.*
FROM tmp_rel_family_dedup f
WHERE f.person_id <> f.relation_cert_no;
INSERT INTO lx_rel_family_edge (
object_key,
from_key,
to_key,
person_id,
relation_name,
relation_type,
relation_cert_no,
gender,
birth_date,
relation_cert_type,
mobile_phone1,
mobile_phone2,
wechat_no1,
wechat_no2,
wechat_no3,
contact_address,
annual_income,
relation_desc,
status,
effective_date,
invalid_date,
remark,
data_source
)
SELECT
MD5(CONCAT('family|', from_object_key, '|', to_object_key)) AS object_key,
CONCAT('rel_node/', from_object_key) AS from_key,
CONCAT('rel_node/', to_object_key) AS to_key,
GROUP_CONCAT(DISTINCT person_id ORDER BY person_id SEPARATOR '/') AS person_id,
GROUP_CONCAT(DISTINCT relation_name ORDER BY relation_name SEPARATOR '/') AS relation_name,
GROUP_CONCAT(DISTINCT relation_type ORDER BY relation_type SEPARATOR '/') AS relation_type,
GROUP_CONCAT(DISTINCT relation_cert_no ORDER BY relation_cert_no SEPARATOR '/') AS relation_cert_no,
GROUP_CONCAT(DISTINCT NULLIF(gender, '') ORDER BY gender SEPARATOR '/') AS gender,
GROUP_CONCAT(DISTINCT NULLIF(birth_date, '') ORDER BY birth_date SEPARATOR '/') AS birth_date,
GROUP_CONCAT(DISTINCT NULLIF(relation_cert_type, '') ORDER BY relation_cert_type SEPARATOR '/') AS relation_cert_type,
GROUP_CONCAT(DISTINCT NULLIF(mobile_phone1, '') ORDER BY mobile_phone1 SEPARATOR '/') AS mobile_phone1,
GROUP_CONCAT(DISTINCT NULLIF(mobile_phone2, '') ORDER BY mobile_phone2 SEPARATOR '/') AS mobile_phone2,
GROUP_CONCAT(DISTINCT NULLIF(wechat_no1, '') ORDER BY wechat_no1 SEPARATOR '/') AS wechat_no1,
GROUP_CONCAT(DISTINCT NULLIF(wechat_no2, '') ORDER BY wechat_no2 SEPARATOR '/') AS wechat_no2,
GROUP_CONCAT(DISTINCT NULLIF(wechat_no3, '') ORDER BY wechat_no3 SEPARATOR '/') AS wechat_no3,
GROUP_CONCAT(DISTINCT NULLIF(contact_address, '') ORDER BY contact_address SEPARATOR '/') AS contact_address,
MAX(annual_income) AS annual_income,
GROUP_CONCAT(DISTINCT NULLIF(relation_desc, '') ORDER BY relation_desc SEPARATOR '/') AS relation_desc,
GROUP_CONCAT(DISTINCT CAST(status AS CHAR) ORDER BY status SEPARATOR '/') AS status,
GROUP_CONCAT(DISTINCT NULLIF(effective_date, '') ORDER BY effective_date SEPARATOR '/') AS effective_date,
GROUP_CONCAT(DISTINCT NULLIF(invalid_date, '') ORDER BY invalid_date SEPARATOR '/') AS invalid_date,
GROUP_CONCAT(DISTINCT NULLIF(remark, '') ORDER BY remark SEPARATOR '/') AS remark,
GROUP_CONCAT(DISTINCT NULLIF(data_source, '') ORDER BY data_source SEPARATOR '/') AS data_source
FROM tmp_rel_family_edge_rows
GROUP BY from_object_key, to_object_key
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
person_id = VALUES(person_id),
relation_name = VALUES(relation_name),
relation_type = VALUES(relation_type),
relation_cert_no = VALUES(relation_cert_no),
gender = VALUES(gender),
birth_date = VALUES(birth_date),
relation_cert_type = VALUES(relation_cert_type),
mobile_phone1 = VALUES(mobile_phone1),
mobile_phone2 = VALUES(mobile_phone2),
wechat_no1 = VALUES(wechat_no1),
wechat_no2 = VALUES(wechat_no2),
wechat_no3 = VALUES(wechat_no3),
contact_address = VALUES(contact_address),
annual_income = VALUES(annual_income),
relation_desc = VALUES(relation_desc),
status = VALUES(status),
effective_date = VALUES(effective_date),
invalid_date = VALUES(invalid_date),
remark = VALUES(remark),
data_source = VALUES(data_source),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 5. 工商公司最新快照
-- 如果生产 MySQL 未同步 sjfx_pro.t_eci_company_orc / t_eci_partner_orc可只执行 Step 1-4。
-- ============================================================
DROP TEMPORARY TABLE IF EXISTS tmp_rel_company_latest;
CREATE TEMPORARY TABLE tmp_rel_company_latest AS
SELECT
TRIM(key_no) AS key_no,
TRIM(company_id) AS company_id,
TRIM(company_name) AS company_name,
TRIM(credit_code) AS credit_code,
TRIM(oper_key_no) AS oper_key_no,
TRIM(oper_name) AS oper_name,
check_date
FROM (
SELECT
c.*,
ROW_NUMBER() OVER (
PARTITION BY TRIM(c.credit_code)
ORDER BY SUBSTR(c.check_date, 1, 10) DESC, c.updated_date DESC
) AS rn
FROM sjfx_pro.t_eci_company_orc c
WHERE LENGTH(TRIM(c.credit_code)) = 18
AND c.isadd <> '-1'
AND c.company_id IS NOT NULL
AND TRIM(c.company_id) <> ''
AND c.key_no IS NOT NULL
AND TRIM(c.key_no) <> ''
AND c.company_name IS NOT NULL
AND TRIM(c.company_name) <> ''
) x
WHERE x.rn = 1;
DROP TEMPORARY TABLE IF EXISTS tmp_rel_partner_dedup;
CREATE TEMPORARY TABLE tmp_rel_partner_dedup AS
SELECT
TRIM(key_no) AS key_no,
TRIM(company_id) AS company_id,
TRIM(company_name) AS company_name,
TRIM(stock_name) AS stock_name,
TRIM(stock_type) AS stock_type,
TRIM(stock_percent) AS stock_percent,
TRIM(should_capi) AS should_capi,
TRIM(should_capi_value) AS should_capi_value,
TRIM(should_capi_unit) AS should_capi_unit,
TRIM(shoud_date) AS shoud_date,
TRIM(p_key_no) AS p_key_no,
dates
FROM (
SELECT
p.*,
ROW_NUMBER() OVER (
PARTITION BY TRIM(p.company_id), TRIM(p.p_key_no), TRIM(p.stock_type)
ORDER BY p.dates DESC, p.capi_date DESC, p.shoud_date DESC
) AS rn
FROM sjfx_pro.t_eci_partner_orc p
WHERE p.isadd <> '-1'
AND p.company_id IS NOT NULL
AND TRIM(p.company_id) <> ''
AND p.key_no IS NOT NULL
AND TRIM(p.key_no) <> ''
AND p.p_key_no IS NOT NULL
AND TRIM(p.p_key_no) <> ''
AND p.stock_name IS NOT NULL
AND TRIM(p.stock_name) <> ''
AND p.stock_type IN ('企业股东', '自然人股东')
) x
WHERE x.rn = 1;
-- ============================================================
-- Step 6. 工商点入 lx_rel_node
-- ============================================================
INSERT INTO lx_rel_node (
object_key,
node_name,
id_number,
subject_type,
source_type,
detail_ref_type,
detail_ref_key
)
SELECT
node_key AS object_key,
MAX(node_name) AS node_name,
MAX(id_number) AS id_number,
MAX(subject_type) AS subject_type,
GROUP_CONCAT(DISTINCT source_type ORDER BY source_type SEPARATOR ',') AS source_type,
MAX(detail_ref_type) AS detail_ref_type,
MAX(detail_ref_key) AS detail_ref_key
FROM (
SELECT
c.company_id AS node_key,
c.company_name AS node_name,
c.credit_code AS id_number,
'企业' AS subject_type,
'工商企业' AS source_type,
'ECI_COMPANY' AS detail_ref_type,
c.company_id AS detail_ref_key
FROM tmp_rel_company_latest c
UNION ALL
SELECT
p.p_key_no AS node_key,
p.stock_name AS node_name,
NULL AS id_number,
'企业' AS subject_type,
'企业股东' AS source_type,
'ECI_PARTNER' AS detail_ref_type,
p.p_key_no AS detail_ref_key
FROM tmp_rel_partner_dedup p
WHERE p.stock_type = '企业股东'
UNION ALL
SELECT
p.p_key_no AS node_key,
p.stock_name AS node_name,
NULL AS id_number,
'个人' AS subject_type,
'自然人股东' AS source_type,
'ECI_PARTNER' AS detail_ref_type,
p.p_key_no AS detail_ref_key
FROM tmp_rel_partner_dedup p
WHERE p.stock_type = '自然人股东'
UNION ALL
SELECT
c.oper_key_no AS node_key,
c.oper_name AS node_name,
NULL AS id_number,
'个人' AS subject_type,
'法定代表人' AS source_type,
'ECI_COMPANY_OPERATOR' AS detail_ref_type,
c.oper_key_no AS detail_ref_key
FROM tmp_rel_company_latest c
WHERE c.oper_key_no IS NOT NULL
AND c.oper_key_no <> ''
AND c.oper_name IS NOT NULL
AND c.oper_name <> ''
) n
WHERE node_key IS NOT NULL
AND node_key <> ''
GROUP BY node_key
ON DUPLICATE KEY UPDATE
node_name = COALESCE(NULLIF(VALUES(node_name), ''), lx_rel_node.node_name),
id_number = COALESCE(NULLIF(VALUES(id_number), ''), lx_rel_node.id_number),
subject_type = COALESCE(NULLIF(VALUES(subject_type), ''), lx_rel_node.subject_type),
source_type = CASE
WHEN lx_rel_node.source_type IS NULL OR lx_rel_node.source_type = '' THEN VALUES(source_type)
WHEN VALUES(source_type) IS NULL OR VALUES(source_type) = '' THEN lx_rel_node.source_type
WHEN LOCATE(VALUES(source_type), lx_rel_node.source_type) > 0 THEN lx_rel_node.source_type
ELSE CONCAT_WS(',', lx_rel_node.source_type, VALUES(source_type))
END,
detail_ref_type = COALESCE(NULLIF(lx_rel_node.detail_ref_type, ''), VALUES(detail_ref_type)),
detail_ref_key = COALESCE(NULLIF(lx_rel_node.detail_ref_key, ''), VALUES(detail_ref_key)),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 7. 工商股东边入 lx_rel_stock_edge
-- ============================================================
INSERT INTO lx_rel_stock_edge (
object_key,
from_key,
to_key,
company_name,
stock_name,
stock_type,
stock_percent,
should_capi,
should_capi_value,
should_capi_unit,
shoud_date,
p_key_no
)
SELECT
CONCAT(c.company_id, p.p_key_no, p.stock_type) AS object_key,
CONCAT('rel_node/', p.p_key_no) AS from_key,
CONCAT('rel_node/', c.company_id) AS to_key,
p.company_name,
p.stock_name,
p.stock_type,
p.stock_percent,
p.should_capi,
p.should_capi_value,
p.should_capi_unit,
p.shoud_date,
p.p_key_no
FROM tmp_rel_partner_dedup p
INNER JOIN tmp_rel_company_latest c
ON c.company_id = p.company_id
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
company_name = VALUES(company_name),
stock_name = VALUES(stock_name),
stock_type = VALUES(stock_type),
stock_percent = VALUES(stock_percent),
should_capi = VALUES(should_capi),
should_capi_value = VALUES(should_capi_value),
should_capi_unit = VALUES(should_capi_unit),
shoud_date = VALUES(shoud_date),
p_key_no = VALUES(p_key_no),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 8. 工商法人边入 lx_rel_represent_edge
-- ============================================================
INSERT INTO lx_rel_represent_edge (
object_key,
from_key,
to_key,
oper_name,
oper_key_no
)
SELECT
CONCAT(c.company_id, c.oper_key_no) AS object_key,
CONCAT('rel_node/', c.oper_key_no) AS from_key,
CONCAT('rel_node/', c.company_id) AS to_key,
c.oper_name,
c.oper_key_no
FROM tmp_rel_company_latest c
WHERE c.oper_key_no IS NOT NULL
AND c.oper_key_no <> ''
AND c.oper_name IS NOT NULL
AND c.oper_name <> ''
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
oper_name = VALUES(oper_name),
oper_key_no = VALUES(oper_key_no),
updated_time = CURRENT_TIMESTAMP;
-- ============================================================
-- Step 9. 结果检查
-- ============================================================
SELECT 'lx_rel_node' AS table_name, COUNT(1) AS row_cnt FROM lx_rel_node
UNION ALL
SELECT 'lx_rel_family_edge' AS table_name, COUNT(1) AS row_cnt FROM lx_rel_family_edge
UNION ALL
SELECT 'lx_rel_stock_edge' AS table_name, COUNT(1) AS row_cnt FROM lx_rel_stock_edge
UNION ALL
SELECT 'lx_rel_represent_edge' AS table_name, COUNT(1) AS row_cnt FROM lx_rel_represent_edge;

View File

@@ -0,0 +1,288 @@
-- ============================================================
-- 关系图谱固定测试数据
-- 说明:
-- 1. 仅用于开发联调,执行前先执行 03_lx_relation_graph_mysql_ddl.sql。
-- 2. 只写入 lx_rel_node / lx_rel_family_edge / lx_rel_stock_edge / lx_rel_represent_edge。
-- 3. 测试数据以 GRAPH_TEST 标记,可重复执行。
-- 4. 覆盖家庭关系、股东持股、法定代表人三类关系。
-- 5. 家庭关系按身份证 MD5 成点,夫妻双方只保留一条配偶边。
-- ============================================================
SET @staff_a_id := '330101198001010011';
SET @staff_b_id := '330101198202020022';
SET @child_id := '330101201206010033';
SET @staff_a_key := MD5(@staff_a_id);
SET @staff_b_key := MD5(@staff_b_id);
SET @child_key := MD5(@child_id);
SET @family_spouse_from := LEAST(@staff_a_key, @staff_b_key);
SET @family_spouse_to := GREATEST(@staff_a_key, @staff_b_key);
SET @family_child_from := LEAST(@staff_a_key, @child_key);
SET @family_child_to := GREATEST(@staff_a_key, @child_key);
SET @company_key := 'GRAPH_TEST_COMPANY_001';
SET @company_credit_code := '91330100GRAPH00001';
SET @person_holder_key := 'GRAPH_TEST_PERSON_HOLDER_001';
SET @spouse_holder_key := 'GRAPH_TEST_PERSON_SPOUSE_HOLDER_001';
SET @enterprise_holder_key := 'GRAPH_TEST_ENTERPRISE_HOLDER_001';
SET @legal_key := 'GRAPH_TEST_LEGAL_001';
-- 清理上一轮 GRAPH_TEST 数据。只清理测试标记数据,不影响生产/基座数据。
DELETE FROM lx_rel_family_edge
WHERE data_source = 'GRAPH_TEST';
DELETE FROM lx_rel_stock_edge
WHERE object_key LIKE 'GRAPH_TEST_%';
DELETE FROM lx_rel_represent_edge
WHERE object_key LIKE 'GRAPH_TEST_%';
DELETE FROM lx_rel_node
WHERE source_type LIKE 'GRAPH_TEST%';
-- 测试点:家庭关系个人。
INSERT INTO lx_rel_node (
object_key,
node_name,
id_number,
subject_type,
source_type,
detail_ref_type,
detail_ref_key
)
VALUES
(@staff_a_key, '关系图谱测试员工甲', @staff_a_id, '个人', 'GRAPH_TEST_员工家庭关系-员工', 'STAFF_FAMILY', @staff_a_id),
(@staff_b_key, '关系图谱测试员工乙', @staff_b_id, '个人', 'GRAPH_TEST_员工家庭关系-员工,GRAPH_TEST_员工家庭关系-成员', 'STAFF_FAMILY', @staff_b_id),
(@child_key, '关系图谱测试子女', @child_id, '个人', 'GRAPH_TEST_员工家庭关系-成员', 'STAFF_FAMILY', @child_id)
ON DUPLICATE KEY UPDATE
node_name = VALUES(node_name),
id_number = VALUES(id_number),
subject_type = VALUES(subject_type),
source_type = VALUES(source_type),
detail_ref_type = VALUES(detail_ref_type),
detail_ref_key = VALUES(detail_ref_key),
updated_time = CURRENT_TIMESTAMP;
-- 测试边:家庭关系,夫妻双方只落一条边。
INSERT INTO lx_rel_family_edge (
object_key,
from_key,
to_key,
person_id,
relation_name,
relation_type,
relation_cert_no,
gender,
birth_date,
relation_cert_type,
mobile_phone1,
mobile_phone2,
wechat_no1,
wechat_no2,
wechat_no3,
contact_address,
annual_income,
relation_desc,
status,
effective_date,
invalid_date,
remark,
data_source
)
VALUES
(MD5(CONCAT('family|', @family_spouse_from, '|', @family_spouse_to)),
CONCAT('rel_node/', @family_spouse_from),
CONCAT('rel_node/', @family_spouse_to),
CONCAT(@staff_a_id, '/', @staff_b_id),
'关系图谱测试员工甲/关系图谱测试员工乙',
'配偶',
CONCAT(@staff_a_id, '/', @staff_b_id),
'F/M',
'1980-01-01/1982-02-02',
'身份证',
'13800000001/13800000002',
NULL,
NULL,
NULL,
NULL,
'杭州市测试区家庭路1号',
320000.00,
'夫妻双方都是员工,归并为一条家庭边',
'1',
'2024-01-01 00:00:00',
NULL,
'GRAPH_TEST',
'GRAPH_TEST'),
(MD5(CONCAT('family|', @family_child_from, '|', @family_child_to)),
CONCAT('rel_node/', @family_child_from),
CONCAT('rel_node/', @family_child_to),
@staff_a_id,
'关系图谱测试子女',
'子女',
@child_id,
'M',
'2012-06-01',
'身份证',
'13800000003',
NULL,
NULL,
NULL,
NULL,
'杭州市测试区家庭路1号',
NULL,
'员工甲与子女关系',
'1',
'2024-01-01 00:00:00',
NULL,
'GRAPH_TEST',
'GRAPH_TEST')
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
person_id = VALUES(person_id),
relation_name = VALUES(relation_name),
relation_type = VALUES(relation_type),
relation_cert_no = VALUES(relation_cert_no),
gender = VALUES(gender),
birth_date = VALUES(birth_date),
relation_cert_type = VALUES(relation_cert_type),
mobile_phone1 = VALUES(mobile_phone1),
contact_address = VALUES(contact_address),
annual_income = VALUES(annual_income),
relation_desc = VALUES(relation_desc),
status = VALUES(status),
effective_date = VALUES(effective_date),
remark = VALUES(remark),
data_source = VALUES(data_source),
updated_time = CURRENT_TIMESTAMP;
-- 测试点:工商企业、股东、法人。
INSERT INTO lx_rel_node (
object_key,
node_name,
id_number,
subject_type,
source_type,
detail_ref_type,
detail_ref_key
)
VALUES
(@company_key, '关系图谱测试科技有限公司', @company_credit_code, '企业', 'GRAPH_TEST_工商企业', 'ECI_COMPANY', @company_key),
(@person_holder_key, '关系图谱测试自然人股东', NULL, '个人', 'GRAPH_TEST_自然人股东', 'ECI_PARTNER', @person_holder_key),
(@enterprise_holder_key, '关系图谱测试企业股东有限公司', '91330100GRAPH00002', '企业', 'GRAPH_TEST_企业股东', 'ECI_PARTNER', @enterprise_holder_key),
(@legal_key, '关系图谱测试法定代表人', NULL, '个人', 'GRAPH_TEST_法定代表人', 'ECI_COMPANY_OPERATOR', @legal_key)
ON DUPLICATE KEY UPDATE
node_name = VALUES(node_name),
id_number = VALUES(id_number),
subject_type = VALUES(subject_type),
source_type = VALUES(source_type),
detail_ref_type = VALUES(detail_ref_type),
detail_ref_key = VALUES(detail_ref_key),
updated_time = CURRENT_TIMESTAMP;
-- 测试边:股东持股关系。
INSERT INTO lx_rel_stock_edge (
object_key,
from_key,
to_key,
company_name,
stock_name,
stock_type,
stock_percent,
should_capi,
should_capi_value,
should_capi_unit,
shoud_date,
p_key_no
)
VALUES
('GRAPH_TEST_STOCK_PERSON_001',
CONCAT('rel_node/', @person_holder_key),
CONCAT('rel_node/', @company_key),
'关系图谱测试科技有限公司',
'关系图谱测试自然人股东',
'自然人股东',
'60%',
'600',
'600',
'万元人民币',
'2024-05-01',
@person_holder_key),
('GRAPH_TEST_STOCK_SPOUSE_CANDIDATE_001',
CONCAT('rel_node/', @spouse_holder_key),
CONCAT('rel_node/', @company_key),
'关系图谱测试科技有限公司',
'关系图谱测试员工乙',
'自然人股东',
'15%',
'150',
'150',
'万元人民币',
'2024-05-01',
@spouse_holder_key),
('GRAPH_TEST_STOCK_ENTERPRISE_001',
CONCAT('rel_node/', @enterprise_holder_key),
CONCAT('rel_node/', @company_key),
'关系图谱测试科技有限公司',
'关系图谱测试企业股东有限公司',
'企业股东',
'40%',
'400',
'400',
'万元人民币',
'2024-05-01',
@enterprise_holder_key)
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
company_name = VALUES(company_name),
stock_name = VALUES(stock_name),
stock_type = VALUES(stock_type),
stock_percent = VALUES(stock_percent),
should_capi = VALUES(should_capi),
should_capi_value = VALUES(should_capi_value),
should_capi_unit = VALUES(should_capi_unit),
shoud_date = VALUES(shoud_date),
p_key_no = VALUES(p_key_no),
updated_time = CURRENT_TIMESTAMP;
-- 测试边:法定代表人关系。
INSERT INTO lx_rel_represent_edge (
object_key,
from_key,
to_key,
oper_name,
oper_key_no
)
VALUES
('GRAPH_TEST_REPRESENT_001',
CONCAT('rel_node/', @legal_key),
CONCAT('rel_node/', @company_key),
'关系图谱测试法定代表人',
@legal_key)
ON DUPLICATE KEY UPDATE
from_key = VALUES(from_key),
to_key = VALUES(to_key),
oper_name = VALUES(oper_name),
oper_key_no = VALUES(oper_key_no),
updated_time = CURRENT_TIMESTAMP;
SELECT 'lx_rel_node' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_rel_node
WHERE source_type LIKE 'GRAPH_TEST%'
UNION ALL
SELECT 'lx_rel_family_edge' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_rel_family_edge
WHERE data_source = 'GRAPH_TEST'
UNION ALL
SELECT 'lx_rel_stock_edge' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_rel_stock_edge
WHERE object_key LIKE 'GRAPH_TEST_%'
UNION ALL
SELECT 'lx_rel_represent_edge' AS table_name, COUNT(1) AS graph_test_rows
FROM lx_rel_represent_edge
WHERE object_key LIKE 'GRAPH_TEST_%';
SELECT
@staff_a_id AS test_person_id,
@staff_a_key AS test_object_key,
@company_key AS test_company_object_key;

View File

@@ -0,0 +1,490 @@
-- ============================================================
-- 资金流图谱已建表环境补充脚本
-- 说明:
-- 1. 本脚本用于目标库已存在 lx_fund_flow_* 表时补齐新增字段和索引。
-- 2. 不删除、不重建已有图谱表,不清空 ODPS 基座或生产数据。
-- 3. 新环境初始化仍可参考 01_lx_fund_graph_mysql_ddl.sql。
-- 4. 执行前需人工确认当前库为目标库;涉及中文注释时使用 bin/mysql_utf8_exec.sh 执行。
-- ============================================================
SET @current_schema = DATABASE();
-- ============================================================
-- lx_fund_flow_subject_node 补充字段和索引
-- ============================================================
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'cinocsno') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN cinocsno VARCHAR(64) NULL COMMENT ''客户内部号'' AFTER name',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'idno_type') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN idno_type VARCHAR(64) NULL COMMENT ''主体类型,如个人、企业、代理主体'' AFTER cinocsno',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'staff_id') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN staff_id VARCHAR(64) NULL COMMENT ''员工标识'' AFTER idno_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'source_type') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN source_type VARCHAR(64) NULL COMMENT ''来源类型'' AFTER staff_id',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'created_time') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ''创建时间'' AFTER source_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND COLUMN_NAME = 'updated_time') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD COLUMN updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ''更新时间'' AFTER created_time',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND INDEX_NAME = 'idx_lx_fund_flow_subject_idnocfno') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD KEY idx_lx_fund_flow_subject_idnocfno (idnocfno)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND INDEX_NAME = 'idx_lx_fund_flow_subject_name') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD KEY idx_lx_fund_flow_subject_name (name)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_subject_node' AND INDEX_NAME = 'idx_lx_fund_flow_subject_source_type') = 0,
'ALTER TABLE lx_fund_flow_subject_node ADD KEY idx_lx_fund_flow_subject_source_type (source_type)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- ============================================================
-- lx_fund_flow_account_node 补充字段和索引
-- ============================================================
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'source') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN source VARCHAR(128) NULL COMMENT ''来源'' AFTER cinocsno',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'acc_type') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN acc_type VARCHAR(64) NULL COMMENT ''账户类型如INTERNAL/EXTERNAL'' AFTER source',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'acc_idno') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN acc_idno VARCHAR(64) NULL COMMENT ''开户证件号'' AFTER acc_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'acc_status') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN acc_status VARCHAR(64) NULL COMMENT ''账户状态'' AFTER acc_idno',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'acc_date') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN acc_date VARCHAR(32) NULL COMMENT ''开户日期'' AFTER acc_status',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'created_time') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ''创建时间'' AFTER acc_date',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND COLUMN_NAME = 'updated_time') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD COLUMN updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ''更新时间'' AFTER created_time',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND INDEX_NAME = 'idx_lx_fund_flow_account_acc_no') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD KEY idx_lx_fund_flow_account_acc_no (acc_no)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND INDEX_NAME = 'idx_lx_fund_flow_account_acc_name') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD KEY idx_lx_fund_flow_account_acc_name (acc_name)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_account_node' AND INDEX_NAME = 'idx_lx_fund_flow_account_acc_idno') = 0,
'ALTER TABLE lx_fund_flow_account_node ADD KEY idx_lx_fund_flow_account_acc_idno (acc_idno)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- ============================================================
-- lx_fund_flow_own_account_edge 补充字段和索引
-- ============================================================
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND COLUMN_NAME = 'acc_name') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD COLUMN acc_name VARCHAR(255) NULL COMMENT ''账户名称'' AFTER to_key',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND COLUMN_NAME = 'acc_no') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD COLUMN acc_no VARCHAR(128) NULL COMMENT ''账号'' AFTER acc_name',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND COLUMN_NAME = 'created_time') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD COLUMN created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ''创建时间'' AFTER acc_no',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND COLUMN_NAME = 'updated_time') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD COLUMN updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ''更新时间'' AFTER created_time',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND INDEX_NAME = 'idx_lx_fund_flow_own_from_key') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD KEY idx_lx_fund_flow_own_from_key (from_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND INDEX_NAME = 'idx_lx_fund_flow_own_to_key') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD KEY idx_lx_fund_flow_own_to_key (to_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_own_account_edge' AND INDEX_NAME = 'idx_lx_fund_flow_own_acc_no') = 0,
'ALTER TABLE lx_fund_flow_own_account_edge ADD KEY idx_lx_fund_flow_own_acc_no (acc_no)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- ============================================================
-- lx_fund_flow_detail_edge 补充字段和索引
-- ============================================================
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'family_relation_type') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN family_relation_type VARCHAR(64) NULL COMMENT ''家庭关系类型,如配偶、父母、子女'' AFTER to_key',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'source_table') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN source_table VARCHAR(32) NULL COMMENT ''流水来源基座可为空增量可为CCDI_BANK_STATEMENT/FIRST/LEVEL1'' AFTER family_relation_type',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'penetrate_level') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN penetrate_level INT NOT NULL DEFAULT 0 COMMENT ''穿透层级一期为0'' AFTER source_table',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'bank_statement_id') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN bank_statement_id BIGINT NULL COMMENT ''增量来源ccdi_bank_statement主键'' AFTER penetrate_level',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'bank_trx_number') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN bank_trx_number VARCHAR(128) NULL COMMENT ''银行流水号/交易流水号,如来源存在则记录'' AFTER bank_statement_id',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'created_time') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT ''创建时间'' AFTER bank_trx_number',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.COLUMNS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND COLUMN_NAME = 'updated_time') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD COLUMN updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT ''更新时间'' AFTER created_time',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_from_to') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_from_to (from_key, to_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_from_date') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_from_date (from_key, trx_date)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_to_date') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_to_date (to_key, trx_date)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_trx_date') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_trx_date (trx_date)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_statement') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_statement (bank_statement_id)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_detail_edge' AND INDEX_NAME = 'idx_lx_fund_flow_detail_family') = 0,
'ALTER TABLE lx_fund_flow_detail_edge ADD KEY idx_lx_fund_flow_detail_family (family_relation_type)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- ============================================================
-- lx_fund_flow_manual_edge 如不存在则补建;已存在则补索引
-- ============================================================
CREATE TABLE IF NOT EXISTS lx_fund_flow_manual_edge (
object_key VARCHAR(64) NOT NULL COMMENT '手工资金流向汇总边唯一标识',
from_object_key VARCHAR(64) NOT NULL COMMENT '起点主体object_key身份证MD5或手工主体MD5',
to_object_key VARCHAR(64) NOT NULL COMMENT '终点主体object_key身份证MD5或手工主体MD5',
from_name VARCHAR(255) NULL COMMENT '起点主体名称冗余',
to_name VARCHAR(255) NULL COMMENT '终点主体名称冗余',
amount DECIMAL(19, 2) NULL COMMENT '手工录入汇总金额',
transaction_count INT NOT NULL DEFAULT 1 COMMENT '手工录入笔数',
direction VARCHAR(8) NOT NULL COMMENT '资金方向1支出2收入',
relation_desc VARCHAR(255) NULL COMMENT '资金流向关系说明',
source_desc VARCHAR(500) NULL COMMENT '手工边来源说明',
remark VARCHAR(1000) NULL COMMENT '分析备注',
source_type VARCHAR(64) NOT NULL DEFAULT 'MANUAL' COMMENT '来源类型固定为MANUAL',
created_by VARCHAR(64) NULL COMMENT '创建人',
created_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
updated_by VARCHAR(64) NULL COMMENT '更新人',
updated_time DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP COMMENT '更新时间',
PRIMARY KEY (object_key)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_general_ci COMMENT='资金流图谱手工资金流向汇总边表';
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_manual_edge' AND INDEX_NAME = 'idx_lx_fund_flow_manual_from') = 0,
'ALTER TABLE lx_fund_flow_manual_edge ADD KEY idx_lx_fund_flow_manual_from (from_object_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_manual_edge' AND INDEX_NAME = 'idx_lx_fund_flow_manual_to') = 0,
'ALTER TABLE lx_fund_flow_manual_edge ADD KEY idx_lx_fund_flow_manual_to (to_object_key)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_manual_edge' AND INDEX_NAME = 'idx_lx_fund_flow_manual_pair_direction') = 0,
'ALTER TABLE lx_fund_flow_manual_edge ADD KEY idx_lx_fund_flow_manual_pair_direction (from_object_key, to_object_key, direction)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
SET @ddl_sql = IF(
(SELECT COUNT(1) FROM INFORMATION_SCHEMA.STATISTICS
WHERE TABLE_SCHEMA = @current_schema AND TABLE_NAME = 'lx_fund_flow_manual_edge' AND INDEX_NAME = 'idx_lx_fund_flow_manual_source_type') = 0,
'ALTER TABLE lx_fund_flow_manual_edge ADD KEY idx_lx_fund_flow_manual_source_type (source_type)',
'SELECT 1'
);
PREPARE stmt FROM @ddl_sql;
EXECUTE stmt;
DEALLOCATE PREPARE stmt;
-- ============================================================
-- 核对结果
-- ============================================================
SELECT TABLE_NAME, TABLE_COLLATION
FROM INFORMATION_SCHEMA.TABLES
WHERE TABLE_SCHEMA = @current_schema
AND TABLE_NAME IN (
'lx_fund_flow_subject_node',
'lx_fund_flow_account_node',
'lx_fund_flow_own_account_edge',
'lx_fund_flow_detail_edge',
'lx_fund_flow_manual_edge'
)
ORDER BY TABLE_NAME;