快速使用

最近更新时间: 2026-03-13 09:03:00

对于 Spark、Flink、Hive 引擎, TBDS 默认集成了标准 iceberg 1.6.1 版本。Trino 自带了 iceberg connector,原生支持 iceberg。

Spark

在 spark-default.conf 文件中,已经默认设置了以下三种 iceberg catalog,用户可以使用这些 catalog 对 iceberg 进行操作。

Name Type Implement
local hadoop org.apache.iceberg.spark.SparkCatalog
spark_catalog hive org.apache.iceberg.spark.SparkSessionCatalog
hive_prod hive org.apache.iceberg.spark.SparkSessionCatalog

kerberos认证

  klist -kt /path/to/xxx.keytab
  kinit -kt /path/to/xxx.keytab  hadoop/xxx@TBDS-{ClusterId}

JAR包下载

  • maven 仓库下载对应spark版本的JAR包,上传集群spark安装目录 /jars 下
  • spark-defaults.conf配置
    使用Hive catalog
  spark.sql.catalog.hive_prod=org.apache.iceberg.spark.SparkCatalog
  spark.sql.catalog.hive_prod.type=hive
  spark.sql.catalog.hive_prod.uri=thrift://xx.xx.xx.xx:xx

使用Hadoop catalog

  spark.sql.catalog.hadoop_prod=org.apache.iceberg.spark.SparkCatalog
  spark.sql.catalog.hadoop_prod.type=hadoop
  spark.sql.catalog.hadoop_prod.warehouse=hdfs://xx.xx.x.xx:xx/warehouse/spark-iceberg

添加扩展参数

  spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions

DDL

  • 创建普通分区表
  CREATE TABLE hive_prod.default.spark_iceberg1 (
      id bigint,
      data string,
      category string)
  USING iceberg
  PARTITIONED BY (category);
  
  #插入
  insert into spark_iceberg1 values (1,"TOM","a");
  insert into spark_iceberg1 values (1,"jelly","b");
  insert into spark_iceberg1 values (1,"Danny","c");

插入数据后分区结果如下:

  • 创建隐藏分区表
  CREATE TABLE hive_prod.default.spark_iceberg2 (
    id bigint,
    data string,
    category string,
    ts timestamp)
USING iceberg
PARTITIONED BY (days(ts), category);

#插入数据
INSERT INTO hive_prod.default.spark_iceberg2 (id, data, category, ts)
VALUES
  (1, 'value1', 'A', CAST('2022-01-01 00:00:00' AS TIMESTAMP)),
  (2, 'value2', 'B', CAST('2022-01-02 00:00:00' AS TIMESTAMP)),
  (3, 'value3', 'A', CAST('2022-01-03 00:00:00' AS TIMESTAMP));

插入数据后分区结果如下:

  • 查询
  CREATE TABLE hive_prod.default.spark_iceberg3 (
    id INT,
    data STRING,
    properties MAP<STRING, STRING>)
  USING iceberg;
  
  INSERT INTO spark_iceberg3 VALUES
    (1, 'value1', map('key1', 'valueA', 'key2', 'valueB')),
    (2, 'value2', map('key1', 'valueC', 'key2', 'valueD')),
    (3, 'value3', map('key1', 'valueA', 'key2', 'valueE'));
  
  SELECT * FROM spark_iceberg3 WHERE properties['key1'] = 'valueA'
  • 其他SQL
  # merge into
  /usr/local/service/spark/bin/spark-sql -e "
  MERGE INTO default.test_target_table t
  USING (SELECT * FROM default.test_source_table) s
  ON t.id = s.id
  WHEN MATCHED AND s.category = 'delete' THEN DELETE
  WHEN MATCHED AND s.category = 'update' THEN UPDATE SET t.data = s.data, t.ts = s.ts
  WHEN NOT MATCHED THEN INSERT *;
  "
  # 删除 iceberg 表
  /usr/local/service/spark/bin/spark-sql -e "
  DROP TABLE default.test_source_table;
  "
  # 删除 iceberg 表 并且删除数据
  /usr/local/service/spark/bin/spark-sql -e "
  DROP TABLE default.test_target_table PURGE;

Flink

Kerberos认证 (SIMPLE 认证可跳过)

  //kerberos认证
  klist -kt /path/to/xxx.keytab
  kinit -kt /path/to/xxx.keytab  hadoop/xxx@TBDS-{ClusterId}

JAR包准备以及Flink启动

maven 仓库下载对应flink版本的JAR包:flink-sql-connector-hive-3.1.2_2.12-1.14.0.jar 和 iceberg-flink-runtime-1.14-1.2.1.jar
将JAR包移至/path/to/flink/libs(注意,当前TBDS-flink下的两个jar包在flink/opt/connect目录下,测试完后注意及时删除/flink/libs目录下的jar包避免产生冲突)。
启动命令如下

  /path/to/flink/bin/sql-client.sh embedded

创建及使用 iceberg catalog

  CREATE CATALOG iceberg_catalog WITH (
    'type'='iceberg',
    'catalog-type'='hive',
    'uri'='thrift://xx.xx.xx.xx:xx',
    'clients'='5',
    'property-version'='1',
    'warehouse'='hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hive'
  );
  
  USE CATALOG iceberg_catalog;
  # 创建表
  CREATE TABLE `default`.`sample` (
      id BIGINT COMMENT 'unique id',
      data STRING
  );
  
  # 写入表
  INSERT INTO `default`.`sample` VALUES (1, 'a');
  
  # 查询表
  SET execution.runtime-mode = batch;
  SELECT FROM `default`.`sample`;

Hive

kerberos认证

  klist -kt /path/to/xxx.keytab
  kinit -kt /path/to/xxx.keytab  hadoop/xxx@TBDS-{ClusterId}

JAR包下载

maven 仓库下载对应hive版本的JAR包,上传集群hive安装目录 /lib 下
Hive中Iceberg格式表插入数据时需要“libfb303-0.9.3.jar”包,将此包也上传到Hive服务端和客户端对应的lib目录下。
Hive加载第三方JAR包直接丢到${HIVE_HOME}/lib 下不生效
推荐方式,将JAR包上传到${HIVE_HOME}/auxlib 下,启动hive时指定目录加载jar包。

hive-site.xml配置与连接hive

  #开启hive全局支持
  iceberg.engine.hive.enabled=true
  #jar包加载路径
  hive.aux.jars.path=/usr/local/service/hive/auxlib
  #Hive 3.1.2 或更高版本上使用 Tez 引擎,Tez 需要升级到 >= 0.10.1,并加如下配置
  tez.mrreader.config.update.properties=hive.io.file.readcolumn.names,hive.io.file.readcolumn.ids
  #Iceberg 开始0.11.0,当将 Hive 与 Tez 一起使用时,还必须禁用矢量化
  hive.vectorized.execution.enabled=false
  • catalog使用 hive
  #设置catalog类型及catalog名
  set iceberg.catalog.iceberg_hive.type=hive
  #设置元数据地址
  set iceberg.catalog.iceberg_hive.uri=thrift://xx.xx.xx.xx:xx
  • catalog使用 hadoop
  #设置catalog类型及catalog名
  set iceberg.catalog.iceberg_hadoop.type=hadoop;
  #catalog存储路径
  set iceberg.catalog.iceberg_hadoop.warehouse=hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hadoop;
  • 连接Hive
  # 通过beeline连接 hiveserver
  [hadoop@172 ~] beeline -u 'jdbc:hive2://xx.xx.xx.xx:xx/;principal=xx@TBDS-{ClusterId}'

DDL

  • Catalog: Iceberg_hive下创建表:
  CREATE TABLE iceberg_test1(id int, name string)
  stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  TBLPROPERTIES('iceberg.catalog'='iceberg_hive');
  insert into iceberg_test1 values (1, "Tom");
  select * from iceberg_test1;
  • Catalog: Iceberg_hadoop下创建表:
  CREATE TABLE iceberg_test2(id int, name string)
  stored by 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
  LOCATION 'hdfs://xx.xx.xx.xx:xx/warehouse/iceberg-hadoop/default/iceberg_test2'
  TBLPROPERTIES('iceberg.catalog'='iceberg_hadoop');
  insert into iceberg_test2 values (1 ,"TOM");
  select * from iceberg_test2;

Trino

kerberos认证

  klist -kt /path/to/xxx.keytab
  kinit -kt /path/to/xxx.keytab  hadoop/xxx@TBDS-{ClusterId}

环境确认

  • 在路径:/usr/local/service/trino/etc/catalog 下,添加 iceberg.properties 文件并重启trino,文件内容如下:
  connector.name=iceberg
  hive.hdfs.impersonation.enabled=true
  hive.metastore.thrift.impersonation.enabled=true
  
  hive.metastore.uri=thrift://xx.xx.xx.xx:xx
  hive.config.resources=/usr/local/service/hadoop/etc/hadoop/core-site.xml,/usr/local/service/hadoop/etc/hadoop/hdfs-site.xml
  hive.hdfs.authentication.type=KERBEROS
  hive.hdfs.trino.keytab=/path/to/xxx.keytab
  hive.hdfs.trino.principal=hadoop/xxx@TBDS-{ClusterId}
  hive.metastore.authentication.type=KERBEROS
  hive.metastore.client.principal=hadoop/xxx@TBDS-{ClusterId}
  hive.metastore.service.principal=hadoop/xxx@TBDS-{ClusterId}
  hive.metastore.client.keytab=/path/to/xxx.keytab
  # 将下载的客户端改名
  mv trino-cli-389-executable.jar trino
  # 查看版本信息
  ./trino --version
  # 通过客户端连接访问trino,把${ip}更换成trino coordinator节点ip
  ./trino --server ${ip}:9000 --catalog hive --user hadoop --debug

登录trino-cli

  [hadoop@10 ~]# /usr/local/service/trino/client/trino-cli --server http://{coordinator_ip}:9000 --user hadoop --catalog iceberg --schema default

DDL

  #一些操作
  show catalogs;
  
  use iceberg.iceberg_auto_functional_spark_test;
  
  show tables;
  
  select * from iceberg_01_upper;
  
  DESCRIBE iceberg_01_upper;

更多使用样例可参考官方文档:
Spark:Getting Startedhttps://iceberg.apache.org/docs/1.4.3/spark-getting-started/
Flink:Flink Getting Startedhttps://iceberg.apache.org/docs/1.4.3/flink/
Hive:Hivehttps://iceberg.apache.org/docs/1.4.3/hive/
Trino:Trinohttps://trino.io/docs/current/connector/iceberg.html