Flink-1.17 批处理读取写入 ArgoDB-5.2

  其他常见问题
内容纲要

概要描述

使用 Flink 批处理写入 ArgoDB 5.2 的操作步骤,这里需要注意 2 点:

  1. Flink 版本:Flink 1.17 及以上
  2. ArgoDB 版本是 ArgoDB 5.2 及以上,且打了最新的 Patch

详细说明

  1. 准备 Flink On YARN 环境
  2. 准备 sql

准备 Flink On YARN 环境

Flink-1.17 on YARN-9.3 模式

然后启动 Flink Session

source /root/TDH-Client/init.sh y y hadoop3
export HADOOP_CLASSPATH=hadoop classpath
export HADOOP_CONF_DIR=/etc/yarn1/conf/
./flink-1.17.2/bin/yarn-session.sh -d

准备 sql 文件

如果 sql 文件比较多,建议是创建一个 sql 目录,存放sql 文件,比如这样:

file

Quark SQL,在 ArgoDB 中创建表
--quark上创建Holodesk表
create table argodb_read_test_001
(
    c0  tinyint,
    c1  smallint,
    c2  int,
    c3  bigint,
    c4  float,
    c5  double,
    c6  decimal(30, 17),
    c7  string,
    c8  varchar(11),
    c9  boolean,
    c10 timestamp
) stored as holodesk;

create table argodb_write_test_001
(
    user string,
    message string,
    num decimal(22,2)
) stored as holodesk;
Flink SQL
--Flink上创建Holodesk表视图
SET sql-client.execution.result-mode=TABLEAU;
SET 'parallelism.default' = '2';
create table flink_read_test_001
(
    c0  tinyint,
    c1  smallint,
    c2  int,
    c3  bigint,
    c4  float,
    c5  double,
    c6  decimal(30, 17),
    c7  string,
    c8  varchar(11),
    c9  boolean,
    c10 timestamp
) WITH (
      'connector' = 'argodb',
      'master.group' = '172.22.37.171:9630,172.22.37.172:9630,172.22.37.173:9630',
      'table.name' = 'default.argodb_read_test_001',
      'shiva2.enable' = 'true',
      'metastore.url' = 'jdbc:hive2://172.22.37.172:10000/default',
      'username' = 'hive',
      'password' = '123456'
      );

select c0,
       c1,
       c2,
       c4,
       c5,
       c6,
       c7,
       c8,
       c9,
       c10
from flink_read_test_001 limit 2
提交 flink-read.sql
# 认证用户
kinit hive

# 提交 sql
./bin/sql-client.sh -f sql/flink-read.sql
flink 写 ArgoDB 的 Flink SQL实现
set sql-client.execution.result-mode=TABLEAU;

create table flink_write_test_001 (
    user string,
    message string,
    num decimal(22,2)
) WITH (
    'connector' = 'argodb',
    'master.group' = '172.22.37.171:9630,172.22.37.172:9630,172.22.37.173:9630',
    'table.name' = 'default.argodb_write_test_001',
    'shiva2.enable' = 'true',
    'compression' = 'snappy',
    'use.external.address' = 'true',
    'metastore.url' = 'jdbc:hive2://172.22.37.172:10000/default',
    'username' = 'hive',
    'password' = '123456'
);

insert into flink_write_test_001 values ('星环科技','688031',99.56);

select user,
       message,
       num
from flink_write_test_001 limit 20;
提交 flink-write.sql
# 认证用户
kinit hive

# 提交 sql
./bin/sql-client.sh -f sql/flink-write.sql

这篇文章对您有帮助吗?

平均评分 0 / 5. 次数: 0

尚无评价,您可以第一个评哦!

非常抱歉,这篇文章对您没有帮助.

烦请您告诉我们您的建议与意见,以便我们改进,谢谢您。