hudi 数据重排序
在业务数据使用上,如果业务经常对表的某些字段进行过滤查询(排序字段可以有多个,但是如果查询过滤条件只使用第二排序字段的话,不会起到优化作用。如果同时使用第一、第二排序字段,会有优化作用),此时对该hudi 表数据进行数据的重排序,可以有效提高查询效率。
构造查询数据
创建hive 数据表
CREATE TABLE sample_data_partitioned (
id INT,
name STRING,
age INT,
city STRING,
date_str STRING
)
PARTITIONED BY (event_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;
导入hive 数据表
hive 表对应的数据文件为:
1. 将上述数据文件上传到hdfs集群上,路径为:/sample_data_partitioned.csv
2. 使用一下命令,将数据load hive 表中:LOAD DATA INPATH '/sample_data_partitioned.csv' INTO TABLE sample_data_partitioned PARTITION(event_date='2023-11-01');
创建hudi 数据表
create table small_file_hudi_cow (
id int,
name string,
age int,
city STRING,
date_str STRING
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'id'
)
partitioned by (date_str);
导入hudi 数据
1. 开启spark sql 客户端,加载hudi 相关包和配置:
spark-sql --master yarn \
--num-executors 2 \
--executor-memory 3g \
--executor-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.2-bundle_2.12-0.12.0.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
2. 执行查询导入hudi 数据
INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-01';
数据查询性能
1. 数据查询语句:select * from small_file_hudi_cow where name = 'HGvCqKEhDMMtLrPMhmWs';
2. 为了减少数据预加载影响,每次重新启动spark sql 客户端进行测试。
原始查询耗时

数据重排序查询耗时
对数据进行重排序
异步进行数据重排序和小文件合并处理:
spark-submit \
--jars '/xxx/hudi-utilities-bundle_2.12-*.jar' \
--class 'org.apache.hudi.utilities.HoodieClusteringJob' \
/xxx/hudi-utilities-bundle_2.12-*.jar \
--spark-memory '2g' \
--mode 'scheduleAndExecute' \
--base-path "/usr/hive/warehouse/small_file_hudi_cow/" \
--table-name "small_file_hudi_cow" \
--hoodie-conf "hoodie.clustering.async.enabled=true" \
--hoodie-conf "hoodie.clustering.async.max.commits=1" \
--hoodie-conf "hoodie.clustering.plan.strategy.sort.columns=name" \
--hoodie-conf "hoodie.clustering.plan.strategy.small.file.limit=314572800" \
--hoodie-conf "hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824"
重排序后查询耗时

具体使用还可参考:
https://cloud.tencent.com/developer/article/1928686
https://hudi.apache.org/docs/clustering#execution-strategy
Hudi Savepoint操作说明
Savepoint用于保存并还原自定义的版本数据。(MoR表暂时不支持Savepoint)
Hudi提供的Savepoint就可以将不同的commit保存起来以便清理程序不会将其删除,后续可以使用Rollback进行恢复。
使用Spark SQL管理Savepoint。
示例如下:
- 创建Savepoint
call create_savepoint(table => 'hudi_cow_nonpcf_tbl', commit_time => '20240906173805505');
- 查看所有存在的Savepoint
call show_savepoints(table => 'hudi_cow_nonpcf_tbl');
- 回滚Savepoint
call rollback_to_savepoint(table => 'hudi_cow_nonpcf_tbl', instant_time => '20240906173805505');
hudi-utilities之HoodieDeltaStreamer
工具说明
HoodieDeltaStreamer工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能:
- 精准一次从Kafka采集新数据,从Sqoop、HiveIncrementalPuller的输出或DFS文件夹下的文件增量导入。
- 导入的数据支持json、Avro或自定义数据类型。
- 管理检查点,回滚和恢复。
- 利用 DFS 或 Confluent schema registry的 Avro Schema。
- 支持自定义转换操作。
[hadoop@10 spark]$ spark-submit \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
> /usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar --help
Usage: <main class> [options]
...
**注:**默认使用的是hudi-utilities-slim-bundle工具,因此在使用这个工具时需要连同hudi-spark3.4-bundle的jar包一起带上。
SqlSource
创建Hive历史表
create database hive_test location '/hive_test';
create table hive_test.test_source (
id int,
name string,
price double,
dt string,
ts bigint
);
insert into hive_test.test_source values (105,'hudi', 10.0,'2024-07-17',100);
Spark SQL创建Hudi目标表
create database hudi location '/hudi';
create table hudi.test_hudi_target (
id int,
name string,
price double,
ts long,
dt string
) using hudi
partitioned by (dt)
options (
primaryKey = 'id',
preCombineField = 'ts',
type = 'cow'
);
配置文件
common.properties
hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor
sql_source.properties
include=common.properties
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=dt
# 非分区表配置 hoodie.datasource.write.partitionpath.field=
hoodie.deltastreamer.source.sql.sql.query = select * from hive_test.test_source
# 和同步Hive相关的配置
hoodie.datasource.hive_sync.table=test_hudi_target
hoodie.datasource.hive_sync.database=hudi
## 非分区表可以不设置
hoodie.datasource.hive_sync.partition_fields=dt
## 内部表,默认外部表
hoodie.datasource.hive_sync.create_managed_table = true
hoodie.datasource.hive_sync.serde_properties = primaryKey=id
启动命令
spark-submit --conf "spark.sql.catalogImplementation=hive" \
--master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 4G --driver-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--principal hadoop/10.206.16.129@TBDS-MWY6XL24 --keytab /var/krb5kdc/emr.keytab \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar \
--props file:///usr/local/service/spark/sql_source.properties \
--target-base-path /hudi/test_hudi_target \
--target-table test_hudi_target \
--op BULK_INSERT \
--table-type COPY_ON_WRITE \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.SqlSource \
--enable-sync \
--checkpoint earliest \
--hoodie-conf 'hoodie.datasource.hive_sync.create_managed_table = true' \
--hoodie-conf 'hoodie.datasource.hive_sync.serde_properties = primaryKey=id'
注:如果运行中报如下错误:Caused by: org.apache.avro.SchemaParseException: Illegal character in: test_source_json.id
需要修改Hive配置(hive-site.xml):hive.resultset.use.unique.column.names=false
KafkaSource
创建Kerberos环境Kafka客户端认证文件:kafka-client.properties
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=hadoop
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./admin.keytab" principal="admin@TBDS-ME7KG01C";
创建kafka测试数据
# 创建topic
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server 10.206.16.81:9092 --create --topic hudi_test --command-config /tmp/kafka-client.properties
# 造数据
/usr/local/service/kafka/bin/kafka-console-producer.sh --topic hudi_test --bootstrap-server 10.206.16.81:9092 --producer.config /tmp/kafka-client.properties
{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2024-07-17"}
{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2024-07-17"}
{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2024-07-17"}
# 测试消费 消费topic验证数据是否成功写到对应的topic
/usr/local/service/kafka/bin/kafka-console-consumer.sh --topic hudi_test --from-beginning --bootstrap-server 10.206.16.81:9092 --group test_hudi_group --consumer.config /tmp/kafka-client.properties
{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2024-07-17"}
{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2024-07-17"}
{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2024-07-17"}
Hudi配置文件
kafka_source.properties
include=common.properties
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=dt
hoodie.streamer.source.kafka.topic=hudi_test
bootstrap.servers=10.206.16.74:9092
auto.offset.reset=earliest
group.id=test_hudi_group_2
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=hadoop
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./admin.keytab" principal="admin@TBDS-MWY6XL24";
hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://HDFS78000026/tmp/hudi/spark/source-schema-json.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://HDFS78000026/tmp/hudi/spark/target-schema-json.avsc
source-schema-json.avsc
{
"type": "record",
"name": "Profiles",
"fields": [
{
"name": "id",
"type": [ "null", "int" ],
"default": null
},
{
"name": "name",
"type": [ "null", "string" ],
"default": null
},
{
"name": "price",
"type": [ "null", "double" ],
"default": null
},
{
"name": "ts",
"type": [ "null", "long" ],
"default": null
},
{
"name": "dt",
"type": [ "null", "string" ],
"default": null
}
]
}
# 创建target schema
cp source-schema-json.avsc target-schema-json.avsc
将两个文件上传到hdfs
启动命令
spark-submit --principal admin@TBDS-MWY6XL24 --keytab ./admin.keytab \
--deploy-mode client \
--files ./admin.keytab \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--conf "spark.executor.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--conf "spark.driver.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar \
--props file:///usr/local/service/spark/kafka_source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field id \
--target-base-path hdfs://HDFS78000026/tmp/hudi/hudi_test \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ
上面的都是一次性读取转化,kafka也可以连续模式读取增量数据,通过参数--continuous,即:
spark-submit --principal admin@TBDS-MWY6XL24 --keytab ./admin.keytab \
--deploy-mode client \
--files ./admin.keytab \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--conf "spark.executor.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--conf "spark.driver.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
/usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar \
--props file:///usr/local/service/spark/kafka_source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource \
--source-ordering-field id \
--target-base-path hdfs://HDFS78000026/tmp/hudi/hudi_test \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ \
--continuous
连续模式默认间隔0s即没有间隔连续性的读取checkpoint判断kafka(和offset对比)里是否有增量,可以通过参数--min-sync-interval-seconds来修改间隔,比如 –min-sync-interval-seconds 60,设置60s读取一次。可以往kafka topic里再造几条JSON数据,进行验证,是否可以正常读取增量数据。
注:admin.keytab和admin@TBDS-MWY6XL24 为用户对应的kerberos认证信息,使用时需要在ranger上配置对应的yarn队列权限和kafka topic消费权限。
kafka_source.properties中sasl.jaas.config配置中的认证信息也要与其保持一致。
查看导入结果
启动Spark SQL
bin/spark-sql --master yarn --deploy-mode client --num-executors 2 --executor-memory 1g --executor-cores 2 --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
指定location创建hudi表
bin/spark-sql --master yarn --deploy-mode client --num-executors 2 --executor-memory 1g --executor-cores 2 --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'
查询hudi表
select * from hudi_test;
注:如果要通过Flink SQL插入或者更新spark-hudi表,则spark在创建表时需要配置hive的属性。
'hive_sync.metastore.uris' = 'thrift://10.4.4.11:7004',
'hive_sync.conf.dir'='/usr/local/service/hive/conf'
如下所示:
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts',
'hive_sync.enable' = 'true',
'hive_sync.metastore.uris' = 'thrift://10.4.4.11:7004',
'hive_sync.conf.dir'='/usr/local/service/hive/conf'
);