对于 Spark、Flink、Hive 引擎, TBDS 默认集成了标准 iceberg 1.6.1 版本。Trino 自带了 iceberg connector,原生支持 iceberg。
Spark
在 spark-default.conf 文件中,已经默认设置了以下三种 iceberg catalog,用户可以使用这些 catalog 对 iceberg 进行操作。
| Name | Type | Implement |
|---|---|---|
| local | hadoop | org.apache.iceberg.spark.SparkCatalog |
| spark_catalog | hive | org.apache.iceberg.spark.SparkSessionCatalog |
| hive_prod | hive | org.apache.iceberg.spark.SparkSessionCatalog |
kerberos认证
klist -kt /path/to/xxx.keytab
kinit -kt /path/to/xxx.keytab hadoop/xxx@TBDS-{ClusterId}
JAR包下载
- maven 仓库下载对应spark版本的JAR包,上传集群spark安装目录 /jars 下

- spark-defaults.conf配置
使用Hive catalog
spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hive_prod.type=hive
spark.sql.catalog.hive_prod.uri=thrift://xx.xx.xx.xx:xx
使用Hadoop catalog
spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
spark.sql.catalog.hadoop_prod.type=hadoop
spark.sql.catalog.hadoop_prod.warehouse=hdfs://xx.xx.x.xx:xx/warehouse/spark-iceberg
添加扩展参数
spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions
DDL
- 创建普通分区表
CREATE TABLE hive_prod.default.spark_iceberg1 (
id bigint,
data string,
category string)
USING iceberg
PARTITIONED BY (category);
#插入
insert into spark_iceberg1 values (1,"TOM","a");
insert into spark_iceberg1 values (1,"jelly","b");
insert into spark_iceberg1 values (1,"Danny","c");
插入数据后分区结果如下:
- 创建隐藏分区表
CREATE TABLE hive_prod.default.spark_iceberg2 (
id bigint,
data string,
category string,
ts timestamp)
USING iceberg
PARTITIONED BY (days(ts), category);
#插入数据
INSERT INTO hive_prod.default.spark_iceberg2 (id, data, category, ts)
VALUES
(1, 'value1', 'A', CAST('2022-01-01 00:00:00' AS TIMESTAMP)),
(2, 'value2', 'B', CAST('2022-01-02 00:00:00' AS TIMESTAMP)),
(3, 'value3', 'A', CAST('2022-01-03 00:00:00' AS TIMESTAMP));
插入数据后分区结果如下:
- 查询
CREATE TABLE hive_prod.default.spark_iceberg3 (
id INT,
data STRING,
properties MAP<STRING, STRING>)
USING iceberg;
INSERT INTO spark_iceberg3 VALUES
(1, 'value1', map('key1', 'valueA', 'key2', 'valueB')),
(2, 'value2', map('key1', 'valueC', 'key2', 'valueD')),
(3, 'value3', map('key1', 'valueA', 'key2', 'valueE'));
SELECT * FROM spark_iceberg3 WHERE properties['key1'] = 'valueA'
- 其他SQL
# merge into
/usr/local/service/spark/bin/spark-sql -e "
MERGE INTO default.test_target_table t
USING (SELECT * FROM default.test_source_table) s
ON t.id = s.id
WHEN MATCHED AND s.category = 'delete' THEN DELETE
WHEN MATCHED AND s.category = 'update' THEN UPDATE SET t.data = s.data, t.ts = s.ts
WHEN NOT MATCHED THEN INSERT *;
"
# 删除 iceberg 表
/usr/local/service/spark/bin/spark-sql -e "
DROP TABLE default.test_source_table;
"
# 删除 iceberg 表 并且删除数据
/usr/local/service/spark/bin/spark-sql -e "
DROP TABLE default.test_target_table PURGE;
Flink
Kerberos认证 (SIMPLE 认证可跳过)
//kerberos认证
klist -kt /path/to/xxx.keytab
kinit -kt /path/to/xxx.keytab hadoop/xxx@TBDS-{ClusterId}
JAR包准备以及Flink启动
maven 仓库下载对应flink版本的JAR包:flink-sql-connector-hive-3.1.2_2.12-1.14.0.jar 和 iceberg-flink-runtime-1.14-1.2.1.jar
将JAR包移至/path/to/flink/libs(注意,当前TBDS-flink下的两个jar包在flink/opt/connect目录下,测试完后注意及时删除/flink/libs目录下的jar包避免产生冲突)。
启动命令如下
/path/to/flink/bin/sql-client.sh embedded
创建及使用 iceberg catalog
CREATE CATALOG iceberg_catalog WITH (
'type'='iceberg',
'catalog-type'='hive',
'uri'='thrift://xx.xx.xx.xx:xx',
'clients'='5',
'property-version'='1',
'warehouse'='hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hive'
);
USE CATALOG iceberg_catalog;
# 创建表
CREATE TABLE `default`.`sample` (
id BIGINT COMMENT 'unique id',
data STRING
);
# 写入表
INSERT INTO `default`.`sample` VALUES (1, 'a');
# 查询表
SET execution.runtime-mode = batch;
SELECT FROM `default`.`sample`;
Hive
kerberos认证
klist -kt /path/to/xxx.keytab
kinit -kt /path/to/xxx.keytab hadoop/xxx@TBDS-{ClusterId}
JAR包下载
maven 仓库下载对应hive版本的JAR包,上传集群hive安装目录 /lib 下
Hive中Iceberg格式表插入数据时需要“libfb303-0.9.3.jar”包,将此包也上传到Hive服务端和客户端对应的lib目录下。
Hive加载第三方JAR包直接丢到${HIVE_HOME}/lib 下不生效
推荐方式,将JAR包上传到${HIVE_HOME}/auxlib 下,启动hive时指定目录加载jar包。
hive-site.xml配置与连接hive
#开启hive全局支持
iceberg.engine.hive.enabled=true
#jar包加载路径
hive.aux.jars.path=/usr/local/service/hive/auxlib
#Hive 3.1.2 或更高版本上使用 Tez 引擎,Tez 需要升级到 >= 0.10.1,并加如下配置
tez.mrreader.config.update.properties=hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids
#Iceberg 开始0.11.0,当将 Hive 与 Tez 一起使用时,还必须禁用矢量化
hive.vectorized.execution.enabled=false
- catalog使用 hive
#设置catalog类型及catalog名
set iceberg.catalog.iceberg_hive.type=hive
#设置元数据地址
set iceberg.catalog.iceberg_hive.uri=thrift://xx.xx.xx.xx:xx
- catalog使用 hadoop
#设置catalog类型及catalog名
set iceberg.catalog.iceberg_hadoop.type=hadoop;
#catalog存储路径
set iceberg.catalog.iceberg_hadoop.warehouse=hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hadoop;
- 连接Hive
# 通过beeline连接 hiveserver
[hadoop@172 ~] beeline -u 'jdbc:hive2://xx.xx.xx.xx:xx/;principal=xx@TBDS-{ClusterId}'
DDL
- Catalog: Iceberg_hive下创建表:
CREATE TABLE iceberg_test1(id int, name string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
TBLPROPERTIES('iceberg.catalog'='iceberg_hive');
insert into iceberg_test1 values (1, "Tom");
select * from iceberg_test1;
- Catalog: Iceberg_hadoop下创建表:
CREATE TABLE iceberg_test2(id int, name string)
stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION 'hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hadoop/default/iceberg_test2'
TBLPROPERTIES('iceberg.catalog'='iceberg_hadoop');
insert into iceberg_test2 values (1 ,"TOM");
select * from iceberg_test2;
Trino
kerberos认证
klist -kt /path/to/xxx.keytab
kinit -kt /path/to/xxx.keytab hadoop/xxx@TBDS-{ClusterId}
环境确认
- 在路径:/usr/local/service/trino/etc/catalog 下,添加 iceberg.properties 文件并重启trino,文件内容如下:
connector.name=iceberg
hive.hdfs.impersonation.enabled=true
hive.metastore.thrift.impersonation.enabled=true
hive.metastore.uri=thrift://xx.xx.xx.xx:xx
hive.config.resources=/usr/local/service/hadoop/etc/hadoop/core-site.xml,/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
hive.hdfs.authentication.type=KERBEROS
hive.hdfs.trino.keytab=/path/to/xxx.keytab
hive.hdfs.trino.principal=hadoop/xxx@TBDS-{ClusterId}
hive.metastore.authentication.type=KERBEROS
hive.metastore.client.principal=hadoop/xxx@TBDS-{ClusterId}
hive.metastore.service.principal=hadoop/xxx@TBDS-{ClusterId}
hive.metastore.client.keytab=/path/to/xxx.keytab
- trino-cli
确认是否存在cd /usr/local/service/trino/client
如果不存在做如下操作- 按照如下链接下载客户端
https://repo1.maven.org/maven2/io/trino/trino-cli/389/trino-cli-389-executable.jar - 将步骤1所在下载的JAR放在集群的任一服务器,并将其改名为trino
- 按照如下链接下载客户端
# 将下载的客户端改名
mv trino-cli-389-executable.jar trino
# 查看版本信息
./trino --version
# 通过客户端连接访问trino,把${ip}更换成trino coordinator节点ip
./trino --server ${ip}:9000 --catalog hive --user hadoop --debug
登录trino-cli
[hadoop@10 ~]# /usr/local/service/trino/client/trino-cli --server http://{coordinator_ip}:9000 --user hadoop --catalog iceberg --schema default
DDL
#一些操作
show catalogs;
use iceberg.iceberg_auto_functional_spark_test;
show tables;
select * from iceberg_01_upper;
DESCRIBE iceberg_01_upper;
更多使用样例可参考官方文档:
Spark:Getting Started:https://iceberg.apache.org/docs/1.4.3/spark-getting-started/
Flink:Flink Getting Started:https://iceberg.apache.org/docs/1.4.3/flink/
Hive:Hive:https://iceberg.apache.org/docs/1.4.3/hive/
Trino:Trino:https://trino.io/docs/current/connector/iceberg.html
