RBF组件示例

最近更新时间: 2026-03-13 09:03:00

TBDS的RBF NameService默认名称为nsfed,要使用RBF,就需要组件使用nsfed来读写数据。

Router示例

  1. 任意挂载nameservice 路径到全局路径。
    hdfs dfsrouteradmin -add /xxx NameService /path/
  2. hdfs dfs -ls hdfs://nsfed/xxx.
    hdfs dfs -ls hdfs://NameService/path/
  3. 访问Router webui

MR示例

  1. 提交yarn任务,输入输出均为NameService的hdfs。
yarn jar /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount hdfs://NameService1/xxx hdfs://NameService1/NS/yarn_wc_output1/ 
yarn jar /usr/local/service/hadoop/share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar wordcount hdfs://nsfed/xxx hdfs://NameService1/NS/yarn_wc_output1/ 

Hive示例

进入beeline,参考:

/usr/local/service/hive/bin/beeline -u "jdbc:hive2://{ip}:7001/;principal=hadoop/{ip}@{realm}"
  1. hive on tez(tez未使用可忽略)
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from hive_hdfs_nsfed_tez order by id ASC;
  1. hive on mr指定nsfed读写表
set hive.execution.engine=mr;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from hive_hdfs_nsfed_tez order by id ASC;

Spark 示例

spark-sql 进入,参考:

./bin/./spark-sql --master yarn --principal hadoop/{ip}@{realm} --keytab /var/krb5kdc/emr.keytab
  1. spark-sql创建iceberg表指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) USING iceberg LOCATION 'hdfs://NameService/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from sparksql_hdfs_nsfed order by id ASC;
  1. spark-sql创建hive表指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from sparksql_hdfs_nsfed order by id ASC;
  1. spark-submit基本操作指定nsfed某路径
  2. 准备输入数据并上传至hdfs
vim wordcount.txt
···
hello world
hello spark
hello hadoop
scala java
java Kyuubi
···
hdfs dfs -mkdir hdfs://nsfed/spark/resource
hdfs dfs -put ./wordcount.txt hdfs://nsfed/spark/resource
2、bin/spark-submit --master yarn --deploy-mode client --class org.apache.spark.examples.JavaWordCount /usr/local/service/spark/examples/jars/spark-examples*.jar hdfs://nsfed/spark/resource/wordcount.txt
(--conf spark.kerberos.access.hadoopFileSystems=hdfs://nsfed/   该配置文件可添可不添加)

Flink示例

准备:

#进入 Flink 目录
cd /usr/local/service/flink

#先修改 conf/flink-conf.yaml 配置,增加 tm 的内存和 slot 数量
#taskmanager.heap.size: 1024m  -> 3072m
#taskmanager.numberOfTaskSlots: 1   ->  10

#将 planner & hive connector 移入 lib 目录
mv ./opt/connector/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./lib
mv ./opt/flink-table-planner_2.12-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./lib/
mv ./lib/flink-table-planner-loader-1.16.1-TBDS-5.3.1_2023p4-SNAPSHOT.jar ./opt/

#清理 yarn-session 模式残留 (如果存在残留文件会自动往 yarn-session 提交)
rm -rf /tmp/.yarn-properties-root

进入flink参考:./sql-client.sh embedded

  1. flink创建hive表指定nsfed某路径
1、CREATE CATALOG hive_catalog WITH ('type'='hive','default-database'='default','hive-conf-dir'='/usr/local/service/hive/conf/', 'hive-version'='3.1.2');

#设置 checkpoint
set 'execution.runtime-mode' = 'streaming';
set 'execution.checkpointing.interval'='10000';

#设置 hive 方言
SET table.sql-dialect = hive;

#创建数据库
CREATE DATABASE hive_catalog.hive_db;

drop table hive_catalog.hive_db.t1;

-- 创建hive表
CREATE TABLE  hive_catalog.hive_db.t1 (uuid string, name string, age int, ts TIMESTAMP) 
PARTITIONED BY (pt string) 
LOCATION 'hdfs:/nsfed/tmp/hudi_flink/t1' 
TBLPROPERTIES (
    'sink.partition-commit.policy.kind'='metastore'
);
2、插入数据:
INSERT INTO hive_catalog.hive_db.t1 PARTITION (pt = 'par1')  select'id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01';
SELECT * from hive_catalog.hive_db.t1;
3、启动flink集群
/usr/local/service/flink/bin/start-cluster.sh
#执行 kinit 认证
kinit -kt /var/krb5kdc/emr.keytab hadoop/*.*.*.*@TBDS-*******
开启flink 客户端
/usr/local/service/flink/bin/sql-client.sh
4、/usr/local/service/flink/bin/stop-cluster.sh
关闭flink集群
  1. flink创建hudi表指定nsfed某路径
    1、创建 catalog 信息

CREATE CATALOG hudi_catalog WITH ('type' = 'hudi','mode' = 'hms','default-database' = 'default','hive.conf.dir' = '/usr/local/service/hive/conf','table.external' = 'true');

CREATE DATABASE hudi_catalog.hudi_db;
-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

CREATE TABLE hudi_catalog.hudi_db.t2 (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://HDFS78000004/tmp/hudi/t2',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/usr/local/service/hive/conf',
  'spark.version'='spark3.4.2',
  'hoodie.datasource.write.precombine.field'='ts'
);

2、插入数据:

INSERT INTO hudi_catalog.hudi_db.t2 VALUES(1,'Danny',23,15,'par1');
SELECT * from hudi_catalog.hudi_db.t2;

3、启动flink集群

/usr/local/service/flink/bin/start-cluster.sh
#执行 kinit 认证
kinit -kt /var/krb5kdc/emr.keytab hadoop/*.*.*.*@TBDS-*******
#开启flink 客户端
/usr/local/service/flink/bin/sql-client.sh

4、关闭flink集群


/usr/local/service/flink/bin/stop-cluster.sh

Trino示例

进入Trino 交互参考:

/usr/local/service/trino/client/trino-cli --server https://{ip}:9443 --user hadoop --krb5-config-path /etc/krb5.conf --krb5-principal hadoop/{ip}@{realm} --krb5-keytab-path /var/krb5kdc/emr.keytab --krb5-disable-remote-service-hostname-canonicalization --krb5-remote-service-name hadoop --insecure
  1. Trino基本操作指定nsfed某路径
#1、库支持nsfed:
create SCHEMA if not exists hive.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use hive.nsfed_db;
drop table if exists hive.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from trino_hdfs_nsfed order by id ASC;

hdfs dfs -ls hdfs://nsfed/path/to/directory/trino_hdfs_nsfed

#2、表支持nsfed:
# ps:该表为外表,不可通过 Trino插入
create SCHEMA if not exists hive.nsfed_db_common;
use hive.nsfed_db_common;
drop table if exists trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) with (EXTERNAL_LOCATION='hdfs://nsfed/path/to/directory');
SHOW CREATE TABLE trino_hdfs_nsfed;
  1. Trino操作iceberg指定联邦某路径
#1、库支持nsfed:
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from trino_hdfs_nsfed order by id ASC;

hdfs dfs -ls hdfs://nsfed/path/to/directory/xx/xx
#2、表支持nsfed:
# ps:该表为外表,不可通过 Trino插入
create SCHEMA if not exists iceberg.nsfed_db_common;
use iceberg.nsfed_db_common;
drop table if exists trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) WITH (EXTERNAL_LOCATION='hdfs://nsfed/path/to/directory');
SHOW CREATE TABLE trino_hdfs_nsfed;

Impala示例

进入Impala参考:

 /usr/local/service/impala/shell/impala-shell -i {ip}:27009 -s hadoop

INVALIDATE METADATA;
  1. Impala基本操作指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://nsfed/path/to/directory';
spark-sql:
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from sparksql_hdfs_nsfed order by id ASC;

Kyuubi示例

进入Kyuubi交互参考:

spark:
bin/beeline -u "jdbc:hive2:/{ip}:10009/default;principal=hadoop/{ip}@{realm};#Kyuubi.engine.type=SPARK_SQL;Kyuubi.engine.share.level=CONNECTION"

Trino:
bin/beeline -u "jdbc:hive2://{ip}:10009/default;principal=hadoop/{ip}@{realm};#Kyuubi.engine.type=TRINO;Kyuubi.engine.share.level=CONNECTION"
  1. Kyuubi on spark基本操作指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' LOCATION 'hdfs://nsfed/path/to/directory';
spark-sql:
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from sparksql_hdfs_nsfed order by id ASC;
  1. Kyuubi on Trino基本操作iceberg指定nsfed某路径
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double) USING iceberg LOCATION 'hdfs://nsfed/path/to/directory';
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90); 
select * from trino_hdfs_nsfed order by id ASC;

Hue示例

打开hue原生页面,在hue页面操作Trino。

create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.sparksql_hdfs_nsfed;
create table sparksql_hdfs_nsfed( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into sparksql_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
select * from sparksql_hdfs_nsfed order by id ASC;

Amoro示例

  1. Amoro任务提交指定nsfed路径
#1、创建nsfed下的hive表并插入数据
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://nsfed/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90), (102, '石榴x', 91), (103, '石榴1', 92); 
#2、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#3、创建新增NameService下的hive表并插入数据
set hive.execution.engine=tez;
create database if not exists nsfed_db;
use nsfed_db;
drop table if exists nsfed_db.hive_hdfs_nsfed_tez;
create table hive_hdfs_nsfed_tez( id bigint, name string, price double) LOCATION 'hdfs://NameService/path/to/directory';
insert into hive_hdfs_nsfed_tez values(100, '草莓', 80.5), (101, '石榴', 90), (102, '石榴x', 91), (103, '石榴1', 92); 
#4、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#5、Trino创建nsfed下的iceberg表并插入数据
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://nsfed/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
#6、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常
#7、trino创建新增NameService下的iceberg表并插入数据
create SCHEMA if not exists iceberg.nsfed_db with (LOCATION='hdfs://NameService/path/to/directory');
use iceberg.nsfed_db;
drop table if exists iceberg.nsfed_db.Trino_hdfs_nsfed;
create table trino_hdfs_nsfed( id bigint, name varchar, price double);
insert into trino_hdfs_nsfed values(100, '草莓', 80.5), (101, '石榴', 90);
#8、Amoro页面对该表进行优化任务设置,
yarn页面任务运行正常

页面操作可参考:
第一步:找到 Amoro 安装节点的外网 IP。

第二步:外网ip:1630 访问 Amoro webui。

第三步:优化器组扩展一个优化器。

第四步:建 iceberg 表,不停插入数据。

第五步: Amoro 监测到有小文件,自动开启优化。