TBDS的RBF NameService默认名称为nsfed,要使用RBF,就需要组件使用nsfed来读写数据。
Router示例
- 任意挂载nameservice 路径到全局路径。
hdfs dfsrouteradmin -add /xxx NameService /path/ - hdfs dfs -ls hdfs://nsfed/xxx.
hdfs dfs -ls hdfs://NameService/path/ - 访问Router webui
MR示例
- 提交yarn任务,输入输出均为NameService的hdfs。
yarn jar /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount hdfs://NameService1/xxx hdfs://NameService1/NS/yarn_wc_output1/
yarn jar /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount hdfs://nsfed/xxx hdfs://NameService1/NS/yarn_wc_output1/
Hive示例
进入beeline,参考:
/usr/local/service/hive/bin/beeline -u "jdbc:hive2://{ip}:7001/;principal=hadoop/{ip}@{realm}"
- hive on tez(tez未使用可忽略)
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90);
select * from hive_hdfs_nsfed_tez order by id ASC;
- hive on mr指定nsfed读写表
set hive.execution.engine=mr;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90);
select * from hive_hdfs_nsfed_tez order by id ASC;
Spark 示例
spark-sql 进入,参考:
./bin/./spark-sql --master yarn --principal hadoop/{ip}@{realm} --keytab /var/krb5kdc/emr.keytab
- spark-sql创建iceberg表指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) USING iceberg LOCATION 'hdfs://NameService/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;
- spark-sql创建hive表指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;
- spark-submit基本操作指定nsfed某路径
- 准备输入数据并上传至hdfs
vim wordcount.txt
···
hello world
hello spark
hello hadoop
scala java
java Kyuubi
···
hdfs dfs -mkdir hdfs://nsfed/spark/resource
hdfs dfs -put ./wordcount.txt hdfs://nsfed/spark/resource
2、bin/spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.JavaWordCount /usr/local/service/spark/examples/jars/spark-examples*.jar hdfs://nsfed/spark/resource/wordcount.txt
(--conf spark.kerberos.access.hadoopFileSystems=hdfs://nsfed/ 该配置文件可添可不添加)
Flink示例
准备:
#进入 Flink 目录
cd /usr/local/service/flink
#先修改 conf/flink-conf.yaml 配置,增加 tm 的内存和 slot 数量
#taskmanager.heap.size: 1024m -> 3072m
#taskmanager.numberOfTaskSlots: 1 -> 10
#将 planner & hive connector 移入 lib 目录
mv ./opt/connector/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./lib
mv ./opt/flink-table-planner_2.12-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./lib/
mv ./lib/flink-table-planner-loader-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./opt/
#清理 yarn-session 模式残留 (如果存在残留文件会自动往 yarn-session 提交)
rm -rf /tmp/.yarn-properties-root
进入flink参考:./sql-client.sh embedded
- flink创建hive表指定nsfed某路径
1、CREATE CATALOG hive_catalog WITH ('type'='hive','default-database'='default','hive-conf-dir'='/usr/local/service/hive/conf/', 'hive-version'='3.1.2');
#设置 checkpoint
set 'execution.runtime-mode' = 'streaming';
set 'execution.checkpointing.interval'='10000';
#设置 hive 方言
SET table.sql-dialect = hive;
#创建数据库
CREATE DATABASE hive_catalog.hive_db;
drop table hive_catalog.hive_db.t1;
-- 创建hive表
CREATE TABLE hive_catalog.hive_db.t1 (uuid string, name string, age int, ts TIMESTAMP)
PARTITIONED BY (pt string)
LOCATION 'hdfs:/nsfed/tmp/hudi_flink/t1'
TBLPROPERTIES (
'sink.partition-commit.policy.kind'='metastore'
);
2、插入数据:
INSERT INTO hive_catalog.hive_db.t1 PARTITION (pt = 'par1') select'id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01';
SELECT * from hive_catalog.hive_db.t1;
3、启动flink集群
/usr/local/service/flink/bin/start-cluster.sh
#执行 kinit 认证
kinit -kt /var/krb5kdc/emr.keytab hadoop/*.*.*.*@TBDS-*******
开启flink 客户端
/usr/local/service/flink/bin/sql-client.sh
4、/usr/local/service/flink/bin/stop-cluster.sh
关闭flink集群
- flink创建hudi表指定nsfed某路径
1、创建 catalog 信息
CREATE CATALOG hudi_catalog WITH ('type' = 'hudi','mode' = 'hms','default-database' = 'default','hive.conf.dir' = '/usr/local/service/hive/conf','table.external' = 'true');
CREATE DATABASE hudi_catalog.hudi_db;
-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;
CREATE TABLE hudi_catalog.hudi_db.t2 (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price int,
ts int,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://HDFS78000004/tmp/hudi/t2',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hive_sync.conf.dir'='/usr/local/service/hive/conf',
'spark.version'='spark3.4.2',
'hoodie.datasource.write.precombine.field'='ts'
);
2、插入数据:
INSERT INTO hudi_catalog.hudi_db.t2 VALUES(1,'Danny',23,15,'par1');
SELECT * from hudi_catalog.hudi_db.t2;
3、启动flink集群
/usr/local/service/flink/bin/start-cluster.sh
#执行 kinit 认证
kinit -kt /var/krb5kdc/emr.keytab hadoop/*.*.*.*@TBDS-*******
#开启flink 客户端
/usr/local/service/flink/bin/sql-client.sh
4、关闭flink集群
/usr/local/service/flink/bin/stop-cluster.sh
Trino示例
进入Trino 交互参考:
/usr/local/service/trino/client/trino-cli --server https://{ip}:9443 --user hadoop --krb5-config-path /etc/krb5.conf --krb5-principal hadoop/{ip}@{realm} --krb5-keytab-path /var/krb5kdc/emr.keytab --krb5-disable-remote-service-hostname-canonicalization --krb5-remote-service-name hadoop --insecure
- Trino基本操作指定nsfed某路径
#1、库支持nsfed:
create SCHEMA if not exists hive.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use hive.nsfed_db;
drop table if exists hive.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from trino_hdfs_nsfed order by id ASC;
hdfs dfs -ls hdfs://nsfed/path/to/directory/trino_hdfs_nsfed
#2、表支持nsfed:
# ps:该表为外表,不可通过 Trino插入
create SCHEMA if not exists hive.nsfed_db_common;
use hive.nsfed_db_common;
drop table if exists trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) with (EXTERNAL_LOCATION='hdfs://nsfed/path/to/directory');
SHOW CREATE TABLE trino_hdfs_nsfed;
- Trino操作iceberg指定联邦某路径
#1、库支持nsfed:
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from trino_hdfs_nsfed order by id ASC;
hdfs dfs -ls hdfs://nsfed/path/to/directory/xx/xx
#2、表支持nsfed:
# ps:该表为外表,不可通过 Trino插入
create SCHEMA if not exists iceberg.nsfed_db_common;
use iceberg.nsfed_db_common;
drop table if exists trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) WITH (EXTERNAL_LOCATION='hdfs://nsfed/path/to/directory');
SHOW CREATE TABLE trino_hdfs_nsfed;
Impala示例
进入Impala参考:
/usr/local/service/impala/shell/impala-shell -i {ip}:27009 -s hadoop
INVALIDATE METADATA;
- Impala基本操作指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://nsfed/path/to/directory';
spark-sql:
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;
Kyuubi示例
进入Kyuubi交互参考:
spark:
bin/beeline -u "jdbc:hive2:/{ip}:10009/default;principal=hadoop/{ip}@{realm};#Kyuubi.engine.type=SPARK_SQL;Kyuubi.engine.share.level=CONNECTION"
Trino:
bin/beeline -u "jdbc:hive2://{ip}:10009/default;principal=hadoop/{ip}@{realm};#Kyuubi.engine.type=TRINO;Kyuubi.engine.share.level=CONNECTION"
- Kyuubi on spark基本操作指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://nsfed/path/to/directory';
spark-sql:
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;
- Kyuubi on Trino基本操作iceberg指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) USING iceberg LOCATION 'hdfs://nsfed/path/to/directory';
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from trino_hdfs_nsfed order by id ASC;
Hue示例
打开hue原生页面,在hue页面操作Trino。
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;
Amoro示例
- Amoro任务提交指定nsfed路径
#1、创建nsfed下的hive表并插入数据
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90), (102, '石榴x', 91), (103, '石榴1', 92);
#2、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#3、创建新增NameService下的hive表并插入数据
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://NameService/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90), (102, '石榴x', 91), (103, '石榴1', 92);
#4、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#5、Trino创建nsfed下的iceberg表并插入数据
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
#6、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#7、trino创建新增NameService下的iceberg表并插入数据
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://NameService/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.Trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
#8、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
页面操作可参考:
第一步:找到 Amoro 安装节点的外网 IP。
第二步:外网ip:1630 访问 Amoro webui。
第三步:优化器组扩展一个优化器。
第四步:建 iceberg 表,不停插入数据。
第五步: Amoro 监测到有小文件,自动开启优化。