slipstream对接数据库日志的统一decoder功能

  其他常见问题
内容纲要

概要描述


研发出jar包对cdc的数据库日志提供json解析方法

详细说明


使用方式:

9.0.0及其之后的版本
--mysql表
CREATE TABLE students (
  id int(6) unsigned NOT NULL AUTO_INCREMENT,
  firstname varchar(30) NOT NULL,
  lastname varchar(30) NOT NULL,
  email varchar(50) DEFAULT NULL,
  reg_date timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;

--slipstream 输入流
CREATE stream students_sync( 
id int,
firstname string,
lastname string,
email string,
reg_date timestamp,
op_type string
) tblproperties (
'kafka.zookeeper'='localhost:2181',
'kafka.broker.list'='localhost:9092', 
'changelog.format'='mysql',
'topic'='dbserver1.inventory.students'
);

--slipstream 输出表
CREATE table students_sync_out(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp
);

--创建普通流任务
create streamjob mysql_changelog_syncjob as ("insert into students_sync_out select id, firstname, lastname, email, reg_date from students_sync");

--创建带有delete_mark的流任务 TODO
create streamjob mysql_changelog_syncjob_with_delete_mark as
("insert  /*+USE_DELETE_MARK*/ into students_sync
select id, firstname, lastname, email, reg_data,
case when op='D' then true else false end as delete_mark
from students_sync_stream");

start streamjob mysql_changelog_syncjob;
--在7.0 - 9.0(不包括9.0)的sql

--mysql表
CREATE TABLE students (
  id int(6) unsigned NOT NULL AUTO_INCREMENT,
  firstname varchar(30) NOT NULL,
  lastname varchar(30) NOT NULL,
  email varchar(50) DEFAULT NULL,
  reg_date timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
  PRIMARY KEY (id)
) ENGINE=InnoDB AUTO_INCREMENT=5 DEFAULT CHARSET=latin1;

-- 创建stream
CREATE stream students_sync_pre(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp,
op_type string
) tblproperties (
'kafka.zookeeper'='localhost:2181',
'kafka.broker.list'='localhost:9092',
'kafka.decoder'='io.transwarp.slipstream.api.decoder.DBZBinlogDecoder',   -- 解析函数, 下面有解析函数表
'kafka.decoder.columns.type'='int/string/string/string/timestamp/string',
'kafka.decoder.columns'='id/firstname/lastname/email/reg_date/op_type',   --不要用","分隔, 因为如果是带精度的数字类型比如decimal(4,2), 程序会难以识别两个逗号
'topic'='dbserver1.inventory.students'
);

--slipstream 输出表
CREATE table students_sync_out(
id int,
firstname string,
lastname string,
email string,
reg_date timestamp
);

--创建普通流任务
create streamjob mysql_changelog_syncjob_pre as ("insert into students_sync_out select id, firstname, lastname, email, reg_date from students_sync_pre");

--创建带有delete_mark的流任务 TODO
create streamjob mysql_changelog_syncjob_with_delete_mark as
("insert  /*+USE_DELETE_MARK*/ into students_sync
select id, firstname, lastname, email, reg_data,
case when op='D' then true else false end as delete_mark
from students_sync_stream");

start streamjob mysql_changelog_syncjob_pre;
**transwarp-5.2 ogg oracle**
create table table_ogg_oracle_decoder (
    EMPNO int,
    ENAME string,
    JOB string,
    MGR int,
    HIREDATE timestamp,
    SAL double,
    COMM double,
    DEPTNO int
)

drop stream stream_ogg_oracle_decoder;
create stream stream_ogg_oracle_decoder( EMPNO int,
    ENAME string,
    JOB string,
    MGR int,
    HIREDATE timestamp,
    SAL double,
    COMM double,
    DEPTNO int,
    op_type string     --op_type字段别忘了
) tblproperties (
"topic"="topic_ogg_oracle",
"kafka.zookeeper"="stream371:2181",
"kafka.broker.list"="stream371:9092",
"kafka.decoder"="io.transwarp.slipstream.api.decoder.OGGOracleDecoder",   --解析函数, 下面有解析函数表对照
"transwarp.consumer.kafka.decoder.delimited"="/",  -- 使用默认的"/"分割coloums和colums.type时可以不写,不要用","分隔, 因为如果是带精度的数字类型比如decimal(4,2), 程序无法识别
"transwarp.consumer.kafka.decoder.columns.type"="int/string/string/int/timestamp/double/double/int/string",  -- 这里的key跟其他版本不一样,注意区分
"transwarp.consumer.kafka.decoder.columns"="EMPNO/ENAME/JOB/MGR/HIREDATE/SAL/COMM/DEPTNO/op_type",
"topic"="topic_ogg_oracle"
);

create streamjob streamjob_ogg_oracle_decoder as ("insert into table_ogg_oracle_decoder select EMPNO ,
    ENAME ,
    JOB ,
    MGR ,
    HIREDATE ,
    SAL ,
    COMM ,
    DEPTNO from stream_ogg_oracle_decoder");

通过add jar的方式使用jar包:https://nj.transwarp.cn:8180/?p=1541
transwarp-5.2 jar包MD5:629ae977122ea4fac4039c521d5ee083
jar包下载链接:
https://nj.transwarp.club:666/main.html?download&weblink=8a4ef0d222abc6cfe481dd1f737c61fa&realfilename=formatter-transwarp-5.2.jar

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。