内容纲要
概要描述
研发出jar包对cdc的数据库日志提供json解析方法
详细说明
使用方式:
9.0.0及其之后的版本
--mysql表
CREATE TABLE students
(
id
int(6) unsigned NOT NULL AUTO_INCREMENT,
firstname
varchar(30) NOT NULL,
lastname
varchar(30) NOT NULL,
email
varchar(50) DEFAULT NULL,
reg_date
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
--slipstream 输入流
CREATE stream students_sync(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp,
op_type string
) tblproperties (
'kafka.zookeeper'='localhost:2181',
'kafka.broker.list'='localhost:9092',
'changelog.format'='mysql',
'topic'='dbserver1.inventory.students'
);
--slipstream 输出表
CREATE table students_sync_out(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp
);
--创建普通流任务
create streamjob mysql_changelog_syncjob as ("insert into students_sync_out select id, firstname, lastname, email, reg_date from students_sync");
--创建带有delete_mark的流任务 TODO
create streamjob mysql_changelog_syncjob_with_delete_mark as
("insert /*+USE_DELETE_MARK*/ into students_sync
select id, firstname, lastname, email, reg_data,
case when op='D' then true else false end as delete_mark
from students_sync_stream");
start streamjob mysql_changelog_syncjob;
--在7.0 - 9.0(不包括9.0)的sql
--mysql表
CREATE TABLE students
(
id
int(6) unsigned NOT NULL AUTO_INCREMENT,
firstname
varchar(30) NOT NULL,
lastname
varchar(30) NOT NULL,
email
varchar(50) DEFAULT NULL,
reg_date
timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
PRIMARY KEY (id
)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;
-- 创建stream
CREATE stream students_sync_pre(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp,
op_type string
) tblproperties (
'kafka.zookeeper'='localhost:2181',
'kafka.broker.list'='localhost:9092',
'kafka.decoder'='io.transwarp.slipstream.api.decoder.DBZBinlogDecoder', -- 解析函数, 下面有解析函数表
'kafka.decoder.columns.type'='int/string/string/string/timestamp/string',
'kafka.decoder.columns'='id/firstname/lastname/email/reg_date/op_type', --不要用","分隔, 因为如果是带精度的数字类型比如decimal(4,2), 程序会难以识别两个逗号
'topic'='dbserver1.inventory.students'
);
--slipstream 输出表
CREATE table students_sync_out(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp
);
--创建普通流任务
create streamjob mysql_changelog_syncjob_pre as ("insert into students_sync_out select id, firstname, lastname, email, reg_date from students_sync_pre");
--创建带有delete_mark的流任务 TODO
create streamjob mysql_changelog_syncjob_with_delete_mark as
("insert /*+USE_DELETE_MARK*/ into students_sync
select id, firstname, lastname, email, reg_data,
case when op='D' then true else false end as delete_mark
from students_sync_stream");
start streamjob mysql_changelog_syncjob_pre;
**transwarp-5.2 ogg oracle**
create table table_ogg_oracle_decoder (
EMPNO int,
ENAME string,
JOB string,
MGR int,
HIREDATE timestamp,
SAL double,
COMM double,
DEPTNO int
)
drop stream stream_ogg_oracle_decoder;
create stream stream_ogg_oracle_decoder( EMPNO int,
ENAME string,
JOB string,
MGR int,
HIREDATE timestamp,
SAL double,
COMM double,
DEPTNO int,
op_type string --op_type字段别忘了
) tblproperties (
"topic"="topic_ogg_oracle",
"kafka.zookeeper"="stream371:2181",
"kafka.broker.list"="stream371:9092",
"kafka.decoder"="io.transwarp.slipstream.api.decoder.OGGOracleDecoder", --解析函数, 下面有解析函数表对照
"transwarp.consumer.kafka.decoder.delimited"="/", -- 使用默认的"/"分割coloums和colums.type时可以不写,不要用","分隔, 因为如果是带精度的数字类型比如decimal(4,2), 程序无法识别
"transwarp.consumer.kafka.decoder.columns.type"="int/string/string/int/timestamp/double/double/int/string", -- 这里的key跟其他版本不一样,注意区分
"transwarp.consumer.kafka.decoder.columns"="EMPNO/ENAME/JOB/MGR/HIREDATE/SAL/COMM/DEPTNO/op_type",
"topic"="topic_ogg_oracle"
);
create streamjob streamjob_ogg_oracle_decoder as ("insert into table_ogg_oracle_decoder select EMPNO ,
ENAME ,
JOB ,
MGR ,
HIREDATE ,
SAL ,
COMM ,
DEPTNO from stream_ogg_oracle_decoder");
通过add jar的方式使用jar包:https://nj.transwarp.cn:8180/?p=1541
transwarp-5.2 jar包MD5:629ae977122ea4fac4039c521d5ee083
jar包下载链接:
https://nj.transwarp.club:666/main.html?download&weblink=8a4ef0d222abc6cfe481dd1f737c61fa&realfilename=formatter-transwarp-5.2.jar