最佳实践

最近更新时间: 2026-03-13 09:03:00

基于Spark SQL Iceberg表结构,上层sql无需变动直接使用

以Spark SQL 3.x 读写Iceberg操作

# 启动spark-sql 客户端:
cd /usr/hdp/2.2.0.0-2041/spark
export SPARK_HOME=/path/to/spark/home/
export HADOOP_CONF_DIR=/path/to/hdfs/conf/
export SPARK_CONF_DIR=/path/to/spark/conf/
export HIVE_CONF_DIR=/path/to/hive/conf/
export HADOOP_USER_NAME=tbds
./bin/spark-sql --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=/tmp/hive/warehouse \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.warehouse.dir=/apps/hive/warehouse 
# 建库语句:
CREATE DATABASE if not exists iceberg_db_check ;

# 建表语句:
drop table if exists iceberg_db_check.tb_spark_check1;
CREATE TABLE iceberg_db_check.tb_spark_check1 (
    id bigint,
    data string,
    category string)
USING iceberg;

#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (1,'1','1'),(2,'2','2');
#修改字段为分区
ALTER TABLE iceberg_db_check.tb_spark_check1 ADD PARTITION FIELD category;
#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (3,'3','3'),(4,'4','4');
#删除字段分区
ALTER TABLE iceberg_db_check.tb_spark_check1 DROP PARTITION FIELD category;
#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (5,'5','5'),(6,'6','6');

#查询
select * from iceberg_db_check.tb_spark_check1;

Spark Structured Streaming实时写入Iceberg

使用 Structured Streaming 从 Kafka 中实时读取数据,然后将结果实时写入到 Iceberg 中(注:Structured Streaming 只支持实时向 Iceberg 中写入数据,不支持实时从 Iceberg 中读取数据)。

创建Kafka topic

启动Kafka集群,创建“kafka-iceberg-topic”

./kafka-topics.sh  --zookeeper node3:2181,node4:2181,node5:2181  
--create  --topic kafka-iceberg-topic  --partitions 3 --replication-factor 3

编写向Kafka生产数据代码

/**
  * 向Kafka中写入数据
  */
object WriteDataToKafka {
  def main(args: Array[String]): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

    val producer = new KafkaProducer[String,String](props)
    var counter = 0
    var keyFlag = 0
    while(true){
      counter +=1
      keyFlag +=1
      val content: String = userlogs()
      producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
      //producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
      if(0 == counter%100){
        counter = 0
        Thread.sleep(5000)
      }
    }
    producer.close()
  }

  def userlogs()={
    val userLogBuffer = new StringBuffer("")
    val timestamp = new Date().getTime();
    var userID = 0L
    var pageID = 0L

    //随机生成的用户ID
    userID = Random.nextInt(2000)

    //随机生成的页面ID
    pageID =  Random.nextInt(2000);

    //随机生成Channel
    val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
    val channel = channelNames(Random.nextInt(10))

    val actionNames = Array[String]("View", "Register")
    //随机生成action行为
    val action = actionNames(Random.nextInt(2))

    val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
    userLogBuffer.append(dateToday)
      .append("\t")
      .append(timestamp)
      .append("\t")
      .append(userID)
      .append("\t")
      .append(pageID)
      .append("\t")
      .append(channel)
      .append("\t")
      .append(action)
    System.out.println(userLogBuffer.toString())
    userLogBuffer.toString()
  }
}

编写 Structured Streaming 读取 Kafka 数据实时写入 Iceberg

object StructuredStreamingSinkIceberg {
  def main(args: Array[String]): Unit = {
    //1.准备对象
    val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
      //指定hadoop catalog,catalog名称为hadoop_prod
      .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
      .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
      .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hdfsCluster/structuredstreaming")
      .getOrCreate()
//    spark.sparkContext.setLogLevel("Error")

    //2.创建Iceberg 表
    spark.sql(
      """
        |create table if not exists hadoop_prod.iceberg_db.iceberg_table (
        | current_day string,
        | user_id string,
        | page_id string,
        | channel string,
        | action string
        |) using iceberg
      """.stripMargin)

    val checkpointPath = "hdfs://hdfsCluster/iceberg_table_checkpoint"
    val bootstrapServers = "node1:9092,node2:9092,node3:9092"
    //多个topic 逗号分开
    val topic = "kafka-iceberg-topic"

    //3.读取Kafka读取数据
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("auto.offset.reset", "latest")
      .option("group.id", "iceberg-kafka")
      .option("subscribe", topic)
      .load()

    import spark.implicits._
    import org.apache.spark.sql.functions._

    val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
      .as[(String, String)].toDF("id", "data")

    val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))
      .withColumn("ts", split(col("data"), "\t")(1))
      .withColumn("user_id", split(col("data"), "\t")(2))
      .withColumn("page_id", split(col("data"), "\t")(3))
      .withColumn("channel", split(col("data"), "\t")(4))
      .withColumn("action", split(col("data"), "\t")(5))
      .select("current_day", "user_id", "page_id", "channel", "action")

    //结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
//    val query: StreamingQuery = transDF.writeStream
//      .outputMode("append")
//      .format("console")
//      .start()

    //4.流式写入Iceberg表
    val query = transDF.writeStream
      .format("iceberg")
      .outputMode("append")
      //每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
      //每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
      .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
      .option("path", "hadoop_prod.iceberg_db.iceberg_table")
      .option("fanout-enabled", "true")
      .option("checkpointLocation", checkpointPath)
      .start()
    query.awaitTermination()
  }
}

Structured Streaming 向 Iceberg 实时写入数据有以下几个注意点:

  • 写 Iceberg 表写出数据支持两种模式:append 和 complete,append 是将每个微批数据行追加到表中。complete 是替换每个微批数据内容。
  • 向 Iceberg 中写出数据时指定的 path 可以是 HDFS 路径,可以是 Iceberg 表名,如果是表名,要预先创建好 Iceberg 表。
  • 写出参数 fanout-enabled 指的是如果 Iceberg 写出的表是分区表,在向表中写数据之前要求 Spark 每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为 true,可以针对每个 Spark 分区打开一个文件,直到当前 task 批次数据写完,这个文件再关闭。
  • 实时向 Iceberg 表中写数据时,建议 trigger 设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”和删除旧的快照。

查看 Iceberg 中数据结果

启动向 Kafka 生产数据代码,启动向 Iceberg 中写入数据的 Structured Streaming 程序,执行以下代码来查看对应的 Iceberg 结果:

//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
  //指定hadoop catalog,catalog名称为hadoop_prod
  .config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
  .config("spark.sql.catalog.hadoop_prod.type", "hadoop")
  .config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hdfsCluster/structuredstreaming")
  .getOrCreate()

//2.读取Iceberg 表中的数据结果
spark.sql(
  """
    |select * from hadoop_prod.iceberg_db.iceberg_table
  """.stripMargin).show()

flink 结合 kafka 实时写到 iceberg 表

使用 HDFS 的一个路径作为 iceberg 的结果表,使用 Flink 实时消费 kafka 中的数据并写入 iceberg 表,并且使用 Hive 作为客户端实时读取。
因为 iceberg 强大的读写分离特性,新写入的数据几乎可以实时读取。

创建 Hadoop Catalog 的 Iceberg 表

// create hadoop catalog
tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
        "  'type'='iceberg',\n" +
        "  'catalog-type'='hadoop',\n" +
        "  'warehouse'='hdfs://hdfsCluster/tmp',\n" +
        "  'property-version'='1'\n" +
        ")");

// change catalog
tenv.useCatalog("hadoop_catalog");
tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
tenv.useDatabase("iceberg_hadoop_db");
// create iceberg result table
tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002"); 
tenv.executeSql("CREATE TABLE  hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n" +
                "    user_id STRING COMMENT 'user_id',\n" +
                "    order_amount DOUBLE COMMENT 'order_amount',\n" +
                "    log_ts STRING\n" +
                ")");

使用Hive Catalog创建Kafka流表

String HIVE_CATALOG = "myhive";
String DEFAULT_DATABASE = "tmp";
String HIVE_CONF_DIR = "/data/bigdata/tbds/etc/hive/conf/";
Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
tenv.registerCatalog(HIVE_CATALOG, catalog);
tenv.useCatalog("myhive");
// create kafka stream table
tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
tenv.executeSql("CREATE TABLE ods_k_2_iceberg (\n" +
                " user_id STRING,\n" +
                " order_amount DOUBLE,\n" +
                " log_ts TIMESTAMP(3),\n" +
                " WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +
                ") WITH (\n" +
                "  'connector'='kafka',\n" +
                "  'topic'='t_kafka_03',\n" +
                "  'scan.startup.mode'='latest-offset',\n" +
                "  'properties.bootstrap.servers'='xx:9092',\n" +
                "  'properties.group.id' = 'testGroup_01',\n" +
                "  'format'='json'\n" +
                ")");

使用SQL连接kafka流表和iceberg 目标表

System.out.println("---> 3. insert into iceberg  table from kafka stream table .... ");
tenv.executeSql("INSERT INTO  hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
     " SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");

数据验证

bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
hive> add jar /data/bigdata/tbds/usr/local/cluster-shim/lite/500/lib/hive/lib/iceberg-hive-runtime-0.14.1.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler' 
LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
hive> select * from tmp.iceberg_002  limit 5;
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    11.0    2020-06-29
a1111    13.0    2020-06-29
Time taken: 0.108 seconds, Fetched: 5 row(s)

使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中

trino查看数据历史版本以及回滚到某个历史版本

# 启动trino客户端(先进入镜像,PRESTO服务PRESTO_COORDINATOR节点):
docker ps -a
docker exec -it presto-server bash
/root/trino-cli-360-executable.jar --server localhost:18089 --catalog iceberg --user hive --debug
#建库
create schema if not exists iceberg.iceberg_db_check;
#建表
DROP TABLE if exists iceberg.iceberg_db_check.tb_trino_check1;
CREATE TABLE iceberg.iceberg_db_check.tb_trino_check1 (
    c1 integer,
    c2 varchar,
    c3 varchar)
WITH (
    format = 'PARQUET');
    
#分批次插入数据
insert into iceberg.iceberg_db_check.tb_trino_check1 values (1,'1','1'),(2,'2','2');
insert into iceberg.iceberg_db_check.tb_trino_check1 values (3,'3','3'),(4,'4','4');
insert into iceberg.iceberg_db_check.tb_trino_check1 values (5,'5','5'),(6,'6','6');

#查询历史版本commit的一些记录
SELECT * FROM iceberg.iceberg_db_check."tb_trino_check1$snapshots" ORDER BY committed_at DESC;

#选取上面查询的历史中的某个commit的 snapshot_id 回滚 eg:snapshot_id=4304372289675937535 
CALL iceberg.system.rollback_to_snapshot('iceberg_db_check', 'tb_trino_check1', 4304372289675937535);

#在查询,就是对应历史版本的数据了
select * from iceberg.iceberg_db_check.tb_trino_check1;