内容纲要
概要描述
使用 Flink 批处理写入 ArgoDB 5.2 的操作步骤,这里需要注意 2 点:
- Flink 版本:Flink 1.17 及以上
- ArgoDB 版本是 ArgoDB 5.2 及以上,且打了最新的 Patch
详细说明
- 准备 Flink On YARN 环境
- 准备 sql
准备 Flink On YARN 环境
然后启动 Flink Session
source /root/TDH-Client/init.sh y y hadoop3
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=/etc/yarn1/conf/
./flink-1.17.2/bin/yarn-session.sh -d
准备 sql 文件
如果 sql 文件比较多,建议是创建一个 sql 目录,存放sql 文件,比如这样:
Quark SQL,在 ArgoDB 中创建表
--quark上创建Holodesk表
create table argodb_read_test_001
(
c0 tinyint,
c1 smallint,
c2 int,
c3 bigint,
c4 float,
c5 double,
c6 decimal(30, 17),
c7 string,
c8 varchar(11),
c9 boolean,
c10 timestamp
) stored as holodesk;
create table argodb_write_test_001
(
user
string,
message string,
num decimal(22,2)
) stored as holodesk;
Flink SQL
--Flink上创建Holodesk表视图
SET sql-client.execution.result-mode=TABLEAU;
SET 'parallelism.default' = '2';
create table flink_read_test_001
(
c0 tinyint,
c1 smallint,
c2 int,
c3 bigint,
c4 float,
c5 double,
c6 decimal(30, 17),
c7 string,
c8 varchar(11),
c9 boolean,
c10 timestamp
) WITH (
'connector' = 'argodb',
'master.group' = '172.22.37.171:9630,172.22.37.172:9630,172.22.37.173:9630',
'table.name' = 'default.argodb_read_test_001',
'shiva2.enable' = 'true',
'metastore.url' = 'jdbc:hive2://172.22.37.172:10000/default',
'username' = 'hive',
'password' = '123456'
);
select c0,
c1,
c2,
c4,
c5,
c6,
c7,
c8,
c9,
c10
from flink_read_test_001 limit 2
提交 flink-read.sql
# 认证用户
kinit hive
# 提交 sql
./bin/sql-client.sh -f sql/flink-read.sql
flink 写 ArgoDB 的 Flink SQL实现
set sql-client.execution.result-mode=TABLEAU;
create table flink_write_test_001 (
user
string,
message string,
num decimal(22,2)
) WITH (
'connector' = 'argodb',
'master.group' = '172.22.37.171:9630,172.22.37.172:9630,172.22.37.173:9630',
'table.name' = 'default.argodb_write_test_001',
'shiva2.enable' = 'true',
'compression' = 'snappy',
'use.external.address' = 'true',
'metastore.url' = 'jdbc:hive2://172.22.37.172:10000/default',
'username' = 'hive',
'password' = '123456'
);
insert into flink_write_test_001 values ('星环科技','688031',99.56);
select user
,
message,
num
from flink_write_test_001 limit 20;
提交 flink-write.sql
# 认证用户
kinit hive
# 提交 sql
./bin/sql-client.sh -f sql/flink-write.sql