最佳实践

最近更新时间: 2026-03-13 09:03:00

hudi 数据重排序

在业务数据使用上,如果业务经常对表的某些字段进行过滤查询(排序字段可以有多个,但是如果查询过滤条件只使用第二排序字段的话,不会起到优化作用。如果同时使用第一、第二排序字段,会有优化作用),此时对该hudi 表数据进行数据的重排序,可以有效提高查询效率。

构造查询数据

创建hive 数据表

CREATE TABLE sample_data_partitioned (
  id INT,
  name STRING,
  age INT,
  city STRING,
  date_str STRING
)
PARTITIONED BY (event_date STRING)
ROW FORMAT DELIMITED
FIELDS TERMINATED BY ','
STORED AS TEXTFILE;

导入hive 数据表

hive 表对应的数据文件为:

1. 将上述数据文件上传到hdfs集群上,路径为:/sample_data_partitioned.csv
2. 使用一下命令,将数据load hive 表中:LOAD DATA INPATH '/sample_data_partitioned.csv' INTO TABLE sample_data_partitioned PARTITION(event_date='2023-11-01');

创建hudi 数据表

create table small_file_hudi_cow (
  id int,
  name string,
  age int,
  city STRING,
  date_str STRING
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'id'
)
partitioned by (date_str);

导入hudi 数据

1. 开启spark sql 客户端,加载hudi 相关包和配置:
spark-sql --master yarn \
--num-executors 2 \
--executor-memory 3g \
--executor-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.2-bundle_2.12-0.12.0.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

2. 执行查询导入hudi 数据

INSERT INTO small_file_hudi_cow SELECT id, name, age, city, event_date FROM sample_data_partitioned where event_date='2023-11-01';

数据查询性能

1. 数据查询语句:select * from small_file_hudi_cow  where name = 'HGvCqKEhDMMtLrPMhmWs';

2. 为了减少数据预加载影响,每次重新启动spark sql 客户端进行测试。

原始查询耗时

数据重排序查询耗时

对数据进行重排序

 异步进行数据重排序和小文件合并处理:
 
 spark-submit \
  --jars '/xxx/hudi-utilities-bundle_2.12-*.jar' \
  --class 'org.apache.hudi.utilities.HoodieClusteringJob' \
  /xxx/hudi-utilities-bundle_2.12-*.jar \
  --spark-memory '2g' \
  --mode 'scheduleAndExecute' \
  --base-path "/usr/hive/warehouse/small_file_hudi_cow/" \
  --table-name "small_file_hudi_cow" \
  --hoodie-conf "hoodie.clustering.async.enabled=true" \
  --hoodie-conf "hoodie.clustering.async.max.commits=1" \
  --hoodie-conf "hoodie.clustering.plan.strategy.sort.columns=name" \
  --hoodie-conf "hoodie.clustering.plan.strategy.small.file.limit=314572800" \
  --hoodie-conf "hoodie.clustering.plan.strategy.target.file.max.bytes=1073741824"
  

重排序后查询耗时

具体使用还可参考:
https://cloud.tencent.com/developer/article/1928686
https://hudi.apache.org/docs/clustering#execution-strategy

Hudi Savepoint操作说明

Savepoint用于保存并还原自定义的版本数据。(MoR表暂时不支持Savepoint)
Hudi提供的Savepoint就可以将不同的commit保存起来以便清理程序不会将其删除,后续可以使用Rollback进行恢复。
使用Spark SQL管理Savepoint。
示例如下:

  • 创建Savepoint
call create_savepoint(table => 'hudi_cow_nonpcf_tbl', commit_time => '20240906173805505');
  • 查看所有存在的Savepoint
call show_savepoints(table => 'hudi_cow_nonpcf_tbl');
  • 回滚Savepoint
call rollback_to_savepoint(table => 'hudi_cow_nonpcf_tbl', instant_time => '20240906173805505');

hudi-utilities之HoodieDeltaStreamer

工具说明

HoodieDeltaStreamer工具 (hudi-utilities-bundle中的一部分) 提供了从DFS或Kafka等不同来源进行摄取的方式,并具有以下功能:

  • 精准一次从Kafka采集新数据,从Sqoop、HiveIncrementalPuller的输出或DFS文件夹下的文件增量导入。
  • 导入的数据支持json、Avro或自定义数据类型。
  • 管理检查点,回滚和恢复。
  • 利用 DFS 或 Confluent schema registry的 Avro Schema。
  • 支持自定义转换操作。
[hadoop@10 spark]$ spark-submit \
> --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer \
> --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
> /usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar --help
Usage: <main class> [options]
...

**注:**默认使用的是hudi-utilities-slim-bundle工具,因此在使用这个工具时需要连同hudi-spark3.4-bundle的jar包一起带上。

SqlSource

创建Hive历史表

create database hive_test location '/hive_test';
create table hive_test.test_source (
  id int,
  name string,
  price double,
  dt string,
  ts bigint
);
insert into hive_test.test_source values (105,'hudi', 10.0,'2024-07-17',100);

Spark SQL创建Hudi目标表

create database hudi location '/hudi';
create table hudi.test_hudi_target (
  id int,
  name string,
  price double,
  ts long,
  dt string
) using hudi
partitioned by (dt)
options (
  primaryKey = 'id',
  preCombineField = 'ts',
  type = 'cow'
);

配置文件

common.properties

hoodie.datasource.write.hive_style_partitioning=true
hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.SimpleKeyGenerator
hoodie.datasource.hive_sync.use_jdbc=false
hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.MultiPartKeysValueExtractor

sql_source.properties

include=common.properties
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=dt
# 非分区表配置 hoodie.datasource.write.partitionpath.field=
hoodie.deltastreamer.source.sql.sql.query = select * from hive_test.test_source
# 和同步Hive相关的配置
hoodie.datasource.hive_sync.table=test_hudi_target
hoodie.datasource.hive_sync.database=hudi
## 非分区表可以不设置
hoodie.datasource.hive_sync.partition_fields=dt
## 内部表,默认外部表
hoodie.datasource.hive_sync.create_managed_table = true
hoodie.datasource.hive_sync.serde_properties = primaryKey=id

启动命令

spark-submit --conf "spark.sql.catalogImplementation=hive" \
--master yarn --deploy-mode client --executor-memory 2G --num-executors 3 --executor-cores 2 --driver-memory 4G --driver-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--principal hadoop/10.206.16.129@TBDS-MWY6XL24 --keytab /var/krb5kdc/emr.keytab \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer /usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar \
--props file:///usr/local/service/spark/sql_source.properties  \
--target-base-path /hudi/test_hudi_target \
--target-table test_hudi_target \
--op BULK_INSERT \
--table-type COPY_ON_WRITE \
--source-ordering-field ts \
--source-class org.apache.hudi.utilities.sources.SqlSource \
--enable-sync  \
--checkpoint earliest \
--hoodie-conf 'hoodie.datasource.hive_sync.create_managed_table = true' \
--hoodie-conf 'hoodie.datasource.hive_sync.serde_properties = primaryKey=id'

注:如果运行中报如下错误:Caused by: org.apache.avro.SchemaParseException: Illegal character in: test_source_json.id
需要修改Hive配置(hive-site.xml):hive.resultset.use.unique.column.names=false

KafkaSource

创建Kerberos环境Kafka客户端认证文件:kafka-client.properties

security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=hadoop
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./admin.keytab" principal="admin@TBDS-ME7KG01C";

创建kafka测试数据

# 创建topic
/usr/local/service/kafka/bin/kafka-topics.sh --bootstrap-server 10.206.16.81:9092 --create --topic hudi_test --command-config /tmp/kafka-client.properties

# 造数据
/usr/local/service/kafka/bin/kafka-console-producer.sh --topic hudi_test --bootstrap-server  10.206.16.81:9092 --producer.config /tmp/kafka-client.properties

{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2024-07-17"}
{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2024-07-17"}
{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2024-07-17"}

# 测试消费 消费topic验证数据是否成功写到对应的topic
/usr/local/service/kafka/bin/kafka-console-consumer.sh --topic hudi_test --from-beginning --bootstrap-server 10.206.16.81:9092 --group test_hudi_group  --consumer.config /tmp/kafka-client.properties

{"id":1,"name":"hudi","price":11.0,"ts":100,"dt":"2024-07-17"}
{"id":2,"name":"hudi","price":12.0,"ts":100,"dt":"2024-07-17"}
{"id":3,"name":"hudi","price":13.0,"ts":100,"dt":"2024-07-17"}

Hudi配置文件

kafka_source.properties

include=common.properties
  
hoodie.datasource.write.recordkey.field=id
hoodie.datasource.write.partitionpath.field=dt

hoodie.streamer.source.kafka.topic=hudi_test
bootstrap.servers=10.206.16.74:9092
auto.offset.reset=earliest
group.id=test_hudi_group_2
security.protocol=SASL_PLAINTEXT
sasl.mechanism=GSSAPI
sasl.kerberos.service.name=hadoop
sasl.jaas.config=com.sun.security.auth.module.Krb5LoginModule required useKeyTab=true storeKey=true keyTab="./admin.keytab" principal="admin@TBDS-MWY6XL24";

hoodie.deltastreamer.schemaprovider.source.schema.file=hdfs://HDFS78000026/tmp/hudi/spark/source-schema-json.avsc
hoodie.deltastreamer.schemaprovider.target.schema.file=hdfs://HDFS78000026/tmp/hudi/spark/target-schema-json.avsc

source-schema-json.avsc

{
  "type": "record",
  "name": "Profiles",
  "fields": [
    {
      "name": "id",
      "type": [ "null", "int" ],
      "default": null
    },
    {
      "name": "name",
      "type": [ "null", "string" ],
      "default": null
    },
    {
      "name": "price",
      "type": [ "null", "double" ],
      "default": null
    },
    {
      "name": "ts",
      "type": [ "null", "long" ],
      "default": null
    },
    {
      "name": "dt",
      "type": [ "null", "string" ],
      "default": null
    }
  ]
}
# 创建target schema
cp source-schema-json.avsc target-schema-json.avsc
将两个文件上传到hdfs

启动命令

spark-submit --principal admin@TBDS-MWY6XL24 --keytab ./admin.keytab \
--deploy-mode client \
--files ./admin.keytab \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--conf "spark.executor.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--conf "spark.driver.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar  \
--props file:///usr/local/service/spark/kafka_source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field id \
--target-base-path hdfs://HDFS78000026/tmp/hudi/hudi_test  \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ

上面的都是一次性读取转化,kafka也可以连续模式读取增量数据,通过参数--continuous,即:

spark-submit --principal admin@TBDS-MWY6XL24 --keytab ./admin.keytab \
--deploy-mode client \
--files ./admin.keytab \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar \
--conf "spark.executor.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--conf "spark.driver.extraJavaOptions=-Dsun.security.krb5.debug=true" \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer  \
/usr/local/service/spark/jars/hudi-utilities-slim-bundle_2.12-0.14.1.jar  \
--props file:///usr/local/service/spark/kafka_source.properties \
--schemaprovider-class org.apache.hudi.utilities.schema.FilebasedSchemaProvider  \
--source-class org.apache.hudi.utilities.sources.JsonKafkaSource  \
--source-ordering-field id \
--target-base-path hdfs://HDFS78000026/tmp/hudi/hudi_test  \
--target-table hudi_test \
--op BULK_INSERT \
--table-type MERGE_ON_READ \
--continuous

连续模式默认间隔0s即没有间隔连续性的读取checkpoint判断kafka(和offset对比)里是否有增量,可以通过参数--min-sync-interval-seconds来修改间隔,比如 –min-sync-interval-seconds 60,设置60s读取一次。可以往kafka topic里再造几条JSON数据,进行验证,是否可以正常读取增量数据。
注:admin.keytab和admin@TBDS-MWY6XL24 为用户对应的kerberos认证信息,使用时需要在ranger上配置对应的yarn队列权限和kafka topic消费权限。
kafka_source.properties中sasl.jaas.config配置中的认证信息也要与其保持一致。

查看导入结果

启动Spark SQL

bin/spark-sql --master yarn --deploy-mode client --num-executors 2 --executor-memory 1g --executor-cores 2 --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

指定location创建hudi表

bin/spark-sql --master yarn --deploy-mode client --num-executors 2 --executor-memory 1g --executor-cores 2 --jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-0.14.1.jar --conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' --conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' --conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar'

查询hudi表

select * from hudi_test;

注:如果要通过Flink SQL插入或者更新spark-hudi表,则spark在创建表时需要配置hive的属性。

  'hive_sync.metastore.uris' = 'thrift://10.4.4.11:7004',
  'hive_sync.conf.dir'='/usr/local/service/hive/conf'
 如下所示:
 create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts',
  'hive_sync.enable' = 'true',
  'hive_sync.metastore.uris' = 'thrift://10.4.4.11:7004',
  'hive_sync.conf.dir'='/usr/local/service/hive/conf'
);