概要描述
在TDT批量数据流中,有3种同步方式可选,包括通过日志
,通过时间戳和标识列
,通过新增与更新时间戳
。本文主要介绍通过新增与更新时间戳
在包含物理删除的场景下是如何实现同步的,希望能够方便大家理解并熟悉该功能点。
功能适用场景:
- 适合表中有记录插入时间和更新时间的列,这张表会进行物理删除
- 存在一张删除记录表,记录被删除的数据(必需有被删除数据的主键)
- 目前只支持单一主键源表
该方法的弊端:
- 需要借助删除记录表每次在ORCTransactionSync_Writer阶段做一次delete操作,该表数据量很大的话,性能会很受影响,相当一做了一次join关联
举例说明:
数据表test_time2,这张表也有插入时间列create_time和更新时间列update_time,但是这张表会进行物理删除,另有删除记录表table_delete_op
id | username | create_time | update_time |
---|---|---|---|
1 | a | 2020-09-23 12:25:24 | 2020-09-23 12:30:24 |
3 | c | 2020-09-23 12:26:24 | 2020-09-23 12:35:24 |
table_delete_op,删除记录表,record_id就是被删除数据的主键,delete_time是删除时间。
id | record_id | delete_time |
---|---|---|
1 | 2 | 2020-09-23 12:31:24 |
2 | 4 | 2020-09-23 12:36:24 |
大致的测试步骤如下:
- 新增测试数据
INSERT INTO test_time2(username,create_time)VALUES('zhangsan',NOW()); INSERT INTO test_time2(username,create_time)VALUES('lisi',NOW()); INSERT INTO test_time2(username,create_time)VALUES('wangwu',NOW());
- update一条数据
UPDATE test_time2 set username='new_lisi' where id=2;
- 物理删除,同时会在删除记录表中记录该操作
delete from test_time2 where id=3; insert into table_delete_op(record_id) values(3);
生成的数据流结构如下:
SyncTimestamp_Reader –> ColumnMap_Transformer –> ORCTransactionSync_Writer
示例
step1:源端创建测试表
--mysql
DROP TABLE IF EXISTS test_time2;
CREATE TABLE test_time2
(
id
int(11) NOT NULL AUTO_INCREMENT,
username
varchar(20) DEFAULT NULL,
create_time
datetime DEFAULT NULL,
update_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
DROP TABLE IF EXISTS table_delete_op;
CREATE TABLE table_delete_op
(
id
int(11) NOT NULL AUTO_INCREMENT,
record_id
int(11) NOT NULL,
delete_time
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=MyISAM AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
--插入3条初始数据
INSERT INTO test_time2(username,create_time)VALUES('zhangsan',NOW());
INSERT INTO test_time2(username,create_time)VALUES('lisi',NOW());
INSERT INTO test_time2(username,create_time)VALUES('wangwu',NOW());
step2:创建TDT批量同步数据流
源端选择:
新建批量数据流,选择同步
# 填写规范如下:
含物理删除操作:这里勾选
主键列:这里选取源表中的PK列ID
新增时间列:这里选取源表中的create_time字段
更新时间列:这里选取源表中的update_time字段
自定义sql:这里填写select record_id as id, delete_time from lkw.table_delete_op
,注意表名必须包含database,且不能有换行,结尾不能有分号
规则配置: 这里不做配置介绍
目标端选择:
列表展示:
点击确定之后,批量数据流创建完成,点击到内部可以看到生成的数据流如下:
SyncJDBC_Reader:
ORCTransactionSync_Writer:
这里我们看下右上角初始的参数配置情况,tdt.job.record.last.original.value
和 tdt.job.record.last.value
均为空
step3:数据流调试(insert)
进入调试模式,点击调试按钮
首次调试,我们简单解析一下执行日志:
SyncJDBC_Reader阶段,通过jdbc读取源表,注意其中的mapreduce.jdbc.input.query
,在第一次执行同步数据流时,由于参数中没有记录tdt.job.record.last.original.value
和 tdt.job.record.last.value
,所以这里选取源数据中UPDATE_TIME列<最大值的部分,相当于第一次做了一次全量导数。
CREATE TABLE default
.TDT__INTERNAL__fe6fd609424942e481d7bc01db4fe566
(
id
int,username
string,create_time
timestamp,update_time
timestamp
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.tdt.JDBCSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.tdt.JDBCDBInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES(
'mapreduce.jdbc.driver.class'='com.mysql.jdbc.Driver',
'mapreduce.jdbc.url'='jdbc:mysql://172.22.23.9:3306',
'mapreduce.jdbc.input.query'='select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:46:05.0\'','mapreduce.jdbc.input.count.query'='select count(*) from (select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:46:05.0\') tdt_sq_alias',
'mapreduce.jdbc.splits.number'='1',
'mapreduce.jdbc.username'='root',
'mapreduce.jdbc.password'='123456',
'mapreduce.jdbc.driver.file'='/tmp/transporter1/DRIVERS/mysql-connector-java-5.1.39_admin1638152300550.jar')
--目标ORC事务表和源表table_data结构一致
CREATE TABLE IF NOT EXISTS default
.test_time2
(
id
int,username
string,create_time
timestamp,update_time
timestamp
)
CLUSTERED BY
(id
)
INTO
3 BUCKETS
STORED AS ORC TBLPROPERTIES ('transactional'='true')
这里我们看下参数变化,tdt.job.record.last.original.value
和 tdt.job.record.last.value
发生改变,值更新为2021-12-16 15:46:05.0
和 时间戳 1639640765
inceptor中当前数据:
step4:数据流调试(执行update)
在mysql中模拟update测试数据
UPDATE test_time2 set username='new_lisi' where id=2;
这里username和update_time均发生改变。
再次调试,分析一下日志:
SyncJDBC_Reader阶段,通过jdbc读取源表,注意其中的mapreduce.jdbc.input.query
,update_time范围要求在tdt.job.record.last.original.value
和 当前源表最大值之间。
CREATE TABLE default
.TDT__INTERNAL__23b38f3b5d044bd08644f503c31f82ce
(
id
int,username
string,create_time
timestamp,update_time
timestamp
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.tdt.JDBCSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.tdt.JDBCDBInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES(
'mapreduce.jdbc.driver.class'='com.mysql.jdbc.Driver',
'mapreduce.jdbc.url'='jdbc:mysql://172.22.23.9:3306',
'mapreduce.jdbc.input.query'='select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:53:26.0\' and update_time
> \'2021-12-16 15:46:05.0\'',
'mapreduce.jdbc.input.count.query'='select count(*) from (select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:53:26.0\' and update_time
> \'2021-12-16 15:46:05.0\') tdt_sq_alias',
'mapreduce.jdbc.splits.number'='1',
'mapreduce.jdbc.username'='root',
'mapreduce.jdbc.password'='123456',
'mapreduce.jdbc.driver.file'='/tmp/transporter1/DRIVERS/mysql-connector-java-5.1.39_admin1638152300550.jar')
ORCTransactionSync_Writer阶段,获取lkw.test_time2
表中哪些数据需要做了update
,然后执行根据这部分的ID做删除操作**TDT__VIEW_FINAL_DELETE_RECORD。**
DELETE default
.test_time2
WHERE
(
id
) IN (
SELECT
id
FROM
default
.TDT__VIEW_FINAL_DELETE_RECORD__ab24496e157044ababf7d0080e224da8
)
然后再去判断获取lkw.table_delete_op
表中哪些数据需要做了delete
,然后执行根据这部分的ID做删除操作(**TDT__VIEW_EXTRA_DELETE_RECORD)。**
DELETE default
.test_time2
WHERE
(
id
) IN (
SELECT
id
FROM
default
.TDT__VIEW_EXTRA_DELETE_RECORD__ab24496e157044ababf7d0080e224da8
)
最后,将这部分update
的真正数据insert到目标表
INSERT
INTO
`
default.
test_time2` (
id
,username
,create_time
,update_time
) select
id
,
username
,
create_time
,
update_time
from
default
.TDT__VIEW__INSERT__ab24496e157044ababf7d0080e224da8
inceptor中当前数据:
step5:数据流调试(执行delete)
在mysql中模拟物理删除数据
delete from test_time2 where id=3;
insert into table_delete_op(record_id) values(3);
再次调试,分析一下日志:
SyncJDBC_Reader阶段,通过jdbc读取源表,注意其中的mapreduce.jdbc.input.query
,UPDATE_TIME范围要求在tdt.job.record.last.original.value
和 最大值之间。
CREATE TABLE default
.TDT__INTERNAL__0c2b6ac722bc4a2b9f9cde3a844e9ab8
(
id
int,username
string,create_time
timestamp,update_time
timestamp
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.tdt.JDBCSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.tdt.JDBCDBInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES(
'mapreduce.jdbc.driver.class'='com.mysql.jdbc.Driver',
'mapreduce.jdbc.url'='jdbc:mysql://172.22.23.9:3306',
'mapreduce.jdbc.input.query'='select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:53:26.0\' and update_time
> \'2021-12-16 15:53:26.0\'',
'mapreduce.jdbc.input.count.query'='select count(*) from (select * from lkw
.test_time2
where 1=1 and update_time
<= \'2021-12-16 15:53:26.0\' and update_time
> \'2021-12-16 15:53:26.0\') tdt_sq_alias',
'mapreduce.jdbc.splits.number'='1',
'mapreduce.jdbc.username'='root',
'mapreduce.jdbc.password'='123456',
'mapreduce.jdbc.driver.file'='/tmp/transporter1/DRIVERS/mysql-connector-java-5.1.39_admin1638152300550.jar')
ORCTransactionSync_Writer阶段,获取lkw.table_delete_op
表中哪些数据需要delete
,然后执行删除操作。
CREATE TABLE default
.TDT__VIEW_EXTRA_DELETE_RECORD__0f6a8faf9b1b4e5eade19a0870f7d7aa
(
id
int,delete_time
timestamp
) ROW FORMAT SERDE 'org.apache.hadoop.hive.ql.io.tdt.JDBCSerde'
STORED AS INPUTFORMAT 'org.apache.hadoop.hive.ql.io.tdt.JDBCDBInputFormat'
OUTPUTFORMAT 'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
TBLPROPERTIES(
'mapreduce.jdbc.driver.class'='com.mysql.jdbc.Driver',
'mapreduce.jdbc.url'='jdbc:mysql://172.22.23.9:3306',
'mapreduce.jdbc.input.query'='select * from (select record_id as id, delete_time from lkw.table_delete_op) tdt_sql_alias where 1=1 and delete_time
<= \'2021-12-16 15:53:26.0\'',
'mapreduce.jdbc.input.count.query'='select count(*) from (select * from (select record_id as id, delete_time from lkw.table_delete_op) tdt_sql_alias where 1=1 and delete_time
<= \'2021-12-16 15:53:26.0\') tdt_sq_alias',
'mapreduce.jdbc.splits.number'='1',
'mapreduce.jdbc.username'='root', 'mapreduce.jdbc.password'='123456',
'mapreduce.jdbc.driver.file'='/tmp/transporter7/DRIVERS/mysql-connector-java-5.1.46_admin1641814032446.jar')
DELETE default
.test_time2
WHERE
(
id
) IN (
SELECT
id
FROM
default
.TDT__VIEW_EXTRA_DELETE_RECORD__0f6a8faf9b1b4e5eade19a0870f7d7aa
)
inceptor中当前数据: