基于Spark SQL Iceberg表结构,上层sql无需变动直接使用
以Spark SQL 3.x 读写Iceberg操作
# 启动spark-sql 客户端:
cd /usr/hdp/2.2.0.0-2041/spark
export SPARK_HOME=/path/to/spark/home/
export HADOOP_CONF_DIR=/path/to/hdfs/conf/
export SPARK_CONF_DIR=/path/to/spark/conf/
export HIVE_CONF_DIR=/path/to/hive/conf/
export HADOOP_USER_NAME=tbds
./bin/spark-sql --conf spark.sql.adaptive.enabled=true \
--conf spark.sql.catalog.local=org.apache.iceberg.spark.SparkCatalog \
--conf spark.sql.catalog.local.type=hadoop \
--conf spark.sql.catalog.local.warehouse=/tmp/hive/warehouse \
--conf spark.sql.catalog.spark_catalog=org.apache.iceberg.spark.SparkSessionCatalog \
--conf spark.sql.catalog.spark_catalog.type=hive \
--conf spark.sql.extensions=org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions \
--conf spark.sql.warehouse.dir=/apps/hive/warehouse
# 建库语句:
CREATE DATABASE if not exists iceberg_db_check ;
# 建表语句:
drop table if exists iceberg_db_check.tb_spark_check1;
CREATE TABLE iceberg_db_check.tb_spark_check1 (
id bigint,
data string,
category string)
USING iceberg;
#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (1,'1','1'),(2,'2','2');
#修改字段为分区
ALTER TABLE iceberg_db_check.tb_spark_check1 ADD PARTITION FIELD category;
#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (3,'3','3'),(4,'4','4');
#删除字段分区
ALTER TABLE iceberg_db_check.tb_spark_check1 DROP PARTITION FIELD category;
#插入数据
INSERT INTO iceberg_db_check.tb_spark_check1 VALUES (5,'5','5'),(6,'6','6');
#查询
select * from iceberg_db_check.tb_spark_check1;
Spark Structured Streaming实时写入Iceberg
使用 Structured Streaming 从 Kafka 中实时读取数据,然后将结果实时写入到 Iceberg 中(注:Structured Streaming 只支持实时向 Iceberg 中写入数据,不支持实时从 Iceberg 中读取数据)。
创建Kafka topic
启动Kafka集群,创建“kafka-iceberg-topic”
./kafka-topics.sh --zookeeper node3:2181,node4:2181,node5:2181
--create --topic kafka-iceberg-topic --partitions 3 --replication-factor 3
编写向Kafka生产数据代码
/**
* 向Kafka中写入数据
*/
object WriteDataToKafka {
def main(args: Array[String]): Unit = {
val props = new Properties()
props.put("bootstrap.servers", "node1:9092,node2:9092,node3:9092")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
val producer = new KafkaProducer[String,String](props)
var counter = 0
var keyFlag = 0
while(true){
counter +=1
keyFlag +=1
val content: String = userlogs()
producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", content))
//producer.send(new ProducerRecord[String, String]("kafka-iceberg-topic", s"key-$keyFlag", content))
if(0 == counter%100){
counter = 0
Thread.sleep(5000)
}
}
producer.close()
}
def userlogs()={
val userLogBuffer = new StringBuffer("")
val timestamp = new Date().getTime();
var userID = 0L
var pageID = 0L
//随机生成的用户ID
userID = Random.nextInt(2000)
//随机生成的页面ID
pageID = Random.nextInt(2000);
//随机生成Channel
val channelNames = Array[String]("Spark","Scala","Kafka","Flink","Hadoop","Storm","Hive","Impala","HBase","ML")
val channel = channelNames(Random.nextInt(10))
val actionNames = Array[String]("View", "Register")
//随机生成action行为
val action = actionNames(Random.nextInt(2))
val dateToday = new SimpleDateFormat("yyyy-MM-dd").format(new Date())
userLogBuffer.append(dateToday)
.append("\t")
.append(timestamp)
.append("\t")
.append(userID)
.append("\t")
.append(pageID)
.append("\t")
.append(channel)
.append("\t")
.append(action)
System.out.println(userLogBuffer.toString())
userLogBuffer.toString()
}
}
编写 Structured Streaming 读取 Kafka 数据实时写入 Iceberg
object StructuredStreamingSinkIceberg {
def main(args: Array[String]): Unit = {
//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hdfsCluster/structuredstreaming")
.getOrCreate()
// spark.sparkContext.setLogLevel("Error")
//2.创建Iceberg 表
spark.sql(
"""
|create table if not exists hadoop_prod.iceberg_db.iceberg_table (
| current_day string,
| user_id string,
| page_id string,
| channel string,
| action string
|) using iceberg
""".stripMargin)
val checkpointPath = "hdfs://hdfsCluster/iceberg_table_checkpoint"
val bootstrapServers = "node1:9092,node2:9092,node3:9092"
//多个topic 逗号分开
val topic = "kafka-iceberg-topic"
//3.读取Kafka读取数据
val df = spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", bootstrapServers)
.option("auto.offset.reset", "latest")
.option("group.id", "iceberg-kafka")
.option("subscribe", topic)
.load()
import spark.implicits._
import org.apache.spark.sql.functions._
val resDF = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.as[(String, String)].toDF("id", "data")
val transDF: DataFrame = resDF.withColumn("current_day", split(col("data"), "\t")(0))
.withColumn("ts", split(col("data"), "\t")(1))
.withColumn("user_id", split(col("data"), "\t")(2))
.withColumn("page_id", split(col("data"), "\t")(3))
.withColumn("channel", split(col("data"), "\t")(4))
.withColumn("action", split(col("data"), "\t")(5))
.select("current_day", "user_id", "page_id", "channel", "action")
//结果打印到控制台,Default trigger (runs micro-batch as soon as it can)
// val query: StreamingQuery = transDF.writeStream
// .outputMode("append")
// .format("console")
// .start()
//4.流式写入Iceberg表
val query = transDF.writeStream
.format("iceberg")
.outputMode("append")
//每分钟触发一次Trigger.ProcessingTime(1, TimeUnit.MINUTES)
//每10s 触发一次 Trigger.ProcessingTime(1, TimeUnit.MINUTES)
.trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
.option("path", "hadoop_prod.iceberg_db.iceberg_table")
.option("fanout-enabled", "true")
.option("checkpointLocation", checkpointPath)
.start()
query.awaitTermination()
}
}
Structured Streaming 向 Iceberg 实时写入数据有以下几个注意点:
- 写 Iceberg 表写出数据支持两种模式:append 和 complete,append 是将每个微批数据行追加到表中。complete 是替换每个微批数据内容。
- 向 Iceberg 中写出数据时指定的 path 可以是 HDFS 路径,可以是 Iceberg 表名,如果是表名,要预先创建好 Iceberg 表。
- 写出参数 fanout-enabled 指的是如果 Iceberg 写出的表是分区表,在向表中写数据之前要求 Spark 每个分区的数据必须排序,但这样会带来数据延迟,为了避免这个延迟,可以设置“fanout-enabled”参数为 true,可以针对每个 Spark 分区打开一个文件,直到当前 task 批次数据写完,这个文件再关闭。
- 实时向 Iceberg 表中写数据时,建议 trigger 设置至少为1分钟提交一次,因为每次提交都会产生一个新的数据文件和元数据文件,这样可以减少一些小文件。为了进一步减少数据文件,建议定期合并“data files”和删除旧的快照。
查看 Iceberg 中数据结果
启动向 Kafka 生产数据代码,启动向 Iceberg 中写入数据的 Structured Streaming 程序,执行以下代码来查看对应的 Iceberg 结果:
//1.准备对象
val spark: SparkSession = SparkSession.builder().master("local").appName("StructuredSinkIceberg")
//指定hadoop catalog,catalog名称为hadoop_prod
.config("spark.sql.catalog.hadoop_prod", "org.apache.iceberg.spark.SparkCatalog")
.config("spark.sql.catalog.hadoop_prod.type", "hadoop")
.config("spark.sql.catalog.hadoop_prod.warehouse", "hdfs://hdfsCluster/structuredstreaming")
.getOrCreate()
//2.读取Iceberg 表中的数据结果
spark.sql(
"""
|select * from hadoop_prod.iceberg_db.iceberg_table
""".stripMargin).show()
flink 结合 kafka 实时写到 iceberg 表
使用 HDFS 的一个路径作为 iceberg 的结果表,使用 Flink 实时消费 kafka 中的数据并写入 iceberg 表,并且使用 Hive 作为客户端实时读取。
因为 iceberg 强大的读写分离特性,新写入的数据几乎可以实时读取。
创建 Hadoop Catalog 的 Iceberg 表
// create hadoop catalog
tenv.executeSql("CREATE CATALOG hadoop_catalog WITH (\n" +
" 'type'='iceberg',\n" +
" 'catalog-type'='hadoop',\n" +
" 'warehouse'='hdfs://hdfsCluster/tmp',\n" +
" 'property-version'='1'\n" +
")");
// change catalog
tenv.useCatalog("hadoop_catalog");
tenv.executeSql("CREATE DATABASE if not exists iceberg_hadoop_db");
tenv.useDatabase("iceberg_hadoop_db");
// create iceberg result table
tenv.executeSql("drop table hadoop_catalog.iceberg_hadoop_db.iceberg_002");
tenv.executeSql("CREATE TABLE hadoop_catalog.iceberg_hadoop_db.iceberg_002 (\n" +
" user_id STRING COMMENT 'user_id',\n" +
" order_amount DOUBLE COMMENT 'order_amount',\n" +
" log_ts STRING\n" +
")");
使用Hive Catalog创建Kafka流表
String HIVE_CATALOG = "myhive";
String DEFAULT_DATABASE = "tmp";
String HIVE_CONF_DIR = "/data/bigdata/tbds/etc/hive/conf/";
Catalog catalog = new HiveCatalog(HIVE_CATALOG, DEFAULT_DATABASE, HIVE_CONF_DIR);
tenv.registerCatalog(HIVE_CATALOG, catalog);
tenv.useCatalog("myhive");
// create kafka stream table
tenv.executeSql("DROP TABLE IF EXISTS ods_k_2_iceberg");
tenv.executeSql("CREATE TABLE ods_k_2_iceberg (\n" +
" user_id STRING,\n" +
" order_amount DOUBLE,\n" +
" log_ts TIMESTAMP(3),\n" +
" WATERMARK FOR log_ts AS log_ts - INTERVAL '5' SECOND\n" +
") WITH (\n" +
" 'connector'='kafka',\n" +
" 'topic'='t_kafka_03',\n" +
" 'scan.startup.mode'='latest-offset',\n" +
" 'properties.bootstrap.servers'='xx:9092',\n" +
" 'properties.group.id' = 'testGroup_01',\n" +
" 'format'='json'\n" +
")");
使用SQL连接kafka流表和iceberg 目标表
System.out.println("---> 3. insert into iceberg table from kafka stream table .... ");
tenv.executeSql("INSERT INTO hadoop_catalog.iceberg_hadoop_db.iceberg_002 " +
" SELECT user_id, order_amount, DATE_FORMAT(log_ts, 'yyyy-MM-dd') FROM myhive.tmp.ods_k_2_iceberg");
数据验证
bin/kafka-console-producer.sh --broker-list xx:9092 --topic t_kafka_03
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:12:12"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:15:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:20:00"}
{"user_id":"a1111","order_amount":11.0,"log_ts":"2020-06-29 12:30:00"}
{"user_id":"a1111","order_amount":13.0,"log_ts":"2020-06-29 12:32:00"}
{"user_id":"a1112","order_amount":15.0,"log_ts":"2020-11-26 12:12:12"}
hive> add jar /data/bigdata/tbds/usr/local/cluster-shim/lite/500/lib/hive/lib/iceberg-hive-runtime-0.14.1.jar;
hive> CREATE EXTERNAL TABLE tmp.iceberg_002(user_id STRING,order_amount DOUBLE,log_ts STRING)
STORED BY 'org.apache.iceberg.mr.hive.HiveIcebergStorageHandler'
LOCATION '/tmp/iceberg_hadoop_db/iceberg_002';
hive> select * from tmp.iceberg_002 limit 5;
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 11.0 2020-06-29
a1111 13.0 2020-06-29
Time taken: 0.108 seconds, Fetched: 5 row(s)
使用Flink Table API 消费kafka并实时写入基于HDFS Hadoop Catalog的iceberg 结果表中
trino查看数据历史版本以及回滚到某个历史版本
# 启动trino客户端(先进入镜像,PRESTO服务PRESTO_COORDINATOR节点):
docker ps -a
docker exec -it presto-server bash
/root/trino-cli-360-executable.jar --server localhost:18089 --catalog iceberg --user hive --debug
#建库
create schema if not exists iceberg.iceberg_db_check;
#建表
DROP TABLE if exists iceberg.iceberg_db_check.tb_trino_check1;
CREATE TABLE iceberg.iceberg_db_check.tb_trino_check1 (
c1 integer,
c2 varchar,
c3 varchar)
WITH (
format = 'PARQUET');
#分批次插入数据
insert into iceberg.iceberg_db_check.tb_trino_check1 values (1,'1','1'),(2,'2','2');
insert into iceberg.iceberg_db_check.tb_trino_check1 values (3,'3','3'),(4,'4','4');
insert into iceberg.iceberg_db_check.tb_trino_check1 values (5,'5','5'),(6,'6','6');
#查询历史版本commit的一些记录
SELECT * FROM iceberg.iceberg_db_check."tb_trino_check1$snapshots" ORDER BY committed_at DESC;
#选取上面查询的历史中的某个commit的 snapshot_id 回滚 eg:snapshot_id=4304372289675937535
CALL iceberg.system.rollback_to_snapshot('iceberg_db_check', 'tb_trino_check1', 4304372289675937535);
#在查询,就是对应历史版本的数据了
select * from iceberg.iceberg_db_check.tb_trino_check1;