开发规范

最近更新时间: 2026-03-13 09:03:00

Hudi表模型设计规范

基本规则

  • Hudi表必须设置合理的主键。Hudi表提供了数据更新和幂等写入能力,该能力要求Hudi表必须设置主键,主键设置不合理会导致数据重复。主键可以为单一主键也可以为复合主键,两种主键类型均要求主键不能有null值和空值,可以参考以下示例设置主键:
    SparkSQL:
  -- 通过primaryKey指定主键,如果是复合主键需要用逗号分隔。
  create table hudi_table (
  id1 int,
  id2 int,
  name string,
  price double)
  using hudi
  options (
  primaryKey = 'id1,id2',
  preCombineField = 'price'
  );

SparkDatasource:

--通过hoodie.datasource.write.recordkey.field指定主键。
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", COPY_ON_WRITE)
.option("hoodie.datasource.write.precombine.field", "price")
.option("hoodie.datasource.write.recordkey.field", "id1,id2").

FlinkSQL:

--通过hoodie.datasource.write.recordkey.field指定主键。
create table hudi_table(
id1 int,
id2 int
,name string,
price double) 
partitioned by (name) 
with ('connector' = 'hudi','hoodie.datasource.write.recordkey.field' = 'id1,id2',
'write.precombine.field' = 'price'
)

Hudi表必须配置precombine字段:
在数据同步过程中不可避免会出现数据重复写入、数据乱序问题,例如:异常数据恢复、写入程序异常重启等场景。通过设置合理precombine字段值可以保证数据的准确性,老数据不会覆盖新数据,也就是幂等写入能力。该字段可用选择的类型包括:业务表中更新时间戳、数据库的提交时间戳等。precombine字段不能有null值和空值,可以参考以下示例设置precombine字段:
SparkSQL:

--通过preCombineField指定precombine字段。
create table hudi_table (
id1 int,
id2 int,
name string,
price double) 
using hudi
options (primaryKey = 'id1,id2',
preCombineField = 'price'
);

SparkDatasource:

--通过hoodie.datasource.write.precombine.field指定precombine字段。
df.write.format("hudi")
.option("hoodie.datasource.write.table.type", COPY_ON_WRITE)
.option("hoodie.datasource.write.precombine.field", "price")
.option("hoodie.datasource.write.recordkey.field", "id1,id2")

Flink:

--通过write.precombine.field指定precombine字段。
create table hudi_table(
id1 int,
id2 int,
name string,
price double) partitioned by (name)
with ('connector' = 'hudi',
'hoodie.datasource.write.recordkey.field' = 'id1,id2',
'write.precombine.field' = 'price')

表类型选择

在建表阶段依据实际应用开发需求选择合适的表类型:MOR或者COW。
流式计算采用MOR表:
流式计算为低时延的实时计算,需要高性能的流式读写能力,在Hudi表中存在的MOR和COW两种模型中,MOR表的流式读写性能相对较好,因此在流式计算场景下采用MOR表模型。关于MOR表在读写性能的对比关系如下:

对比维度 MOR表 COW表
流式写
流式读
批量写
批量读
  • 实时入湖,表模型采用MOR表。
    实时入湖一般的性能要求都在分钟内或者分钟级,结合Hudi两种表模型的对比,因此在实时入湖场景中需要选择MOR表模型。
  • Hudi表名以及列名采用小写字母。
    多引擎读写同一张Hudi表时,为了规避引擎之间大小写的支持不同,统一采用小写字母。

建议

  • Spark批处理场景,对写入时延要求不高的场景,采用COW表。COW表模型中,写入数据存在写放大问题,因此写入速度较慢;但COW具有非常好的读取性能力。而且批量计算对写入时延不是很敏感,因此可以采用COW表。
  • Hudi表的写任务要开启Hive元数据同步功能。SparkSQL天然与Hive集成,无需考虑元数据问题。该条建议针对的是通过Spark Datasource API或者Flink写Hudi表的场景,通过这两种方式写Hudi时需要增加向Hive同步元数据的配置项;该配置的目的是将Hudi表的元数据统一托管到Hive元数据服务中,为后续的跨引擎操作数据以及数据管理提供便利。
  • 针对Spark和Trino要求查询Hudi表数据一致性要求高的场景使用COW表类型。
    此外针对COW和MOR还有如下一些选择建议
    COW
  • 写少读多场景。
  • 写入频率低或者不需要实时写入的场景。
  • 只有插入,无update操作的场景。
  • 针对读取时延要求较高的场景。

MOR

  • 写多读少的场景。
  • 高频的写入和更新,并且对写入性能要求较高的场景。
  • 需要流式写入的场景。

数据类型设计规范

  • 对于整数,小于等于9位的使用int类型,超过9位的使用long类型。
  • 小数均采用decimal(precision,scale)类型。
  • 字符类型字段,统一使用string类型。
  • 数值类型中,建议用0代表值为NULL的场景。

写入模式规范

hudi提供三种写入模式:bulk_insert、insert、upsert。

  • 可容忍主键重复的记录,并且数据量较多的场景,进行初始化时建议使用bulk_insert模式。
  • 可容忍主键重复的记录,日志或者流式追加写的场景。使用insert模式时,为防止造成小文件过多问题,需要设置clustering。
  • MOR表的数据初始化、对已有表数据修正和更新、不容忍主键重复的记录,数据实时更新和插入的场景建议使用upsert模式。

Hudi表索引设计规范

规则

  • 禁止修改表索引类型。Hudi表的索引会决定数据存储方式,随意修改索引类型会导致表中已有的存量数据与新增数据之间出现数据重复和数据准确性问题。常见的索引类型如下:
    • 布隆索引:Spark引擎独有索引,采用bloomfilter机制,将布隆索引内容写入到Parquet文件的footer中。
    • Bucket索引:在写入数据过程中,通过主键进行Hash计算,将数据进行分桶写入;该索引写入速度最快,但是需要合理配置分桶数目;Flink、Spark均支持该索引写入。
    • 状态索引:Flink引擎独有索引,是将行记录的存储位置记录到状态后端的一种索引形式,在作业冷启动过程中会遍历所有数据存储文件生成索引信息。
  • 用Flink状态索引,Flink写入后,不支持Spark继续写入。Flink在写Hudi的MOR表只会生成log文件,后续通过compaction操作,将log文件转为parquet文件。Spark在更新Hudi表时严重依赖parquet文件是否存在,如果当前Hudi表写的是log文件,采用Spark写入就会导致重复数据的产生。在批量初始化阶段 ,先采用Spark批量写入Hudi表,再用Flink基于Flink状态索引写入不会有问题,原因是Flink冷启动的时候会遍历所有的数据文件生成状态索引。
  • 实时入湖场景中,Spark引擎采用Bucket索引,Flink引擎可以用Bucket索引或者状态索引。实时入湖都是需要分钟内或者分钟级的高性能入湖,索引的选择会影响到写Hudi表的性能。在性能方面各个索引的区别如下:
    • Bucket索引优点:写入过程中对主键进行hash分桶写入,性能比较高,不受表的数据量限制。Flink和Spark引擎都支持,Flink和Spark引擎可以实现交叉混写同一张表。缺点:Bucket个数不能动态调整,数据量波动和整表数据量持续上涨会导致单个Bucket数据量过大出现大数据文件。需要结合分区表来进行平衡改善。
    • Flink状态索引优点:主键的索引信息存在状态后端,数据更新只需要点查状态后端即可,速度较快;同时生成的数据文件大小稳定,不会产生小文件、超大文件问题。缺点:该索引为Flink特有索引。在表的总数据行数达到数亿级别,需要优化状态后端参数来保持写入的性能。使用该索引无法支持Flink和Spark交叉混写。
  • 对于数据总量持续上涨的表,采用Bucket索引时,须使用时间分区,分区键采用数据创建时间。参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。如果Bucket索引+分区表的模式无法平衡Bucket桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。

建议

  • 基于Flink的流式写入的表,在数据量超过2亿条记录,采用Bucket索引,2亿以内可以采用Flink状态索引。参照Flink状态索引的特点,Hudi表超过一定数据量后,Flink作业状态后端压力很大,需要优化状态后端参数才能维持性能;同时由于Flink冷启动的时候需要遍历全表数据,大数据量也会导致Flink作业启动缓慢。因此基于简化使用的角度,针对大数据量的表,可以通过采用Bucket索引来避免状态后端的复杂调优。如果Bucket索引+分区表的模式无法平衡Bucket。桶过大的问题,还是可以继续采用Flink状态索引,按照规范去优化对应的配置参数即可。
  • 基于Bucket索引的表,按照单个Bucket 2GB数据量进行设计。
  • 2GB的数据存储成列存Parquet文件后,大概的数据文件大小是150MB ~ 256MB左右。不同业务数据会有出入。而HDFS单个数据块一般会是128MB,这样可以有效地利用存储空间。
  • 数据读写占用的内存空间都是原始数据大小(包括空值也是会占用内存的),2GB在大数据计算过程中,处于单task读写可接受范围之内。
  • 如果是单个Bucket的数据量超过了该值范围,可能会有什么影响?
  • 读写任务可能会出现OOM的问题,解决方法就是提升单个task的内存占比。
  • 读写性能下降,因为单个task的处理的数据量变大,导致处理耗时变大。
    为什么建议是2GB?
    • 2GB的数据存储成列存Parquet文件后,大概的数据文件大小是150MB ~ 256MB左右。不同业务数据会有出入。而HDFS单个数据块一般会是128MB,这样可以有效地利用存储空间。
    • 数据读写占用的内存空间都是原始数据大小(包括空值也是会占用内存的),2GB在大数据计算过程中,处于单task读写可接受范围之内。
      如果是单个Bucket的数据量超过了该值范围,可能会有什么影响?
    • 读写任务可能会出现OOM的问题,解决方法就是提升单个task的内存占比。
    • 读写性能下降,因为单个task的处理的数据量变大,导致处理耗时变大。

Hudi表分区设计规范

规则

分区键不可以被更新。
Hudi具有主键唯一性机制,但在分区表的场景下通常只能保证分区内主键唯一,因此如果分区键的值发生变更后,会导致相同主键的行记录出现多条的情况。在以日期分区的场景,可采用数据的创建时间为分区字段,切记不要采用数据更新时间做分区。

说明:

当指定Hudi的索引类型为Global索引类型时,Hudi支持跨分区进行数据更新,但Global索引性能较差一般不建议使用。

建议

  • 事实表采用日期分区表,维度表采用非分区或者大颗粒度的日期分区。
    • 事实表:数据总量大,增量大,数据读取多以日期做切分,读取一定时间段的数据。
    • 维度表:总量相对小,增量小,多以更新操作为主,数据读取会是全表读取,或者按照对应业务ID过滤。
  • 基于以上考虑,维度表采用天分区会导致文件数过多,而且是全表读取,会导致所需要的文件读取Task过多,采用大颗粒度的日期分区,例如年分区,可以有效降低分区个数和文件数量;对于增量不是很大的维度表,也可以采用非分区表。如果维度表的总数据量很大或者增量也很大,可以考虑采用某个业务ID进行分区,在大部分数据处理逻辑中针对大维度表,会有一定的业务条件进行过滤来提升处理性能,这类表要结合一定的业务场景来进行优化,无法从单纯的日期分区进行优化。事实表读取方式都会按照时间段切分,近一年、近一个月或者近一天,读取的文件数相对稳定可控,所以事实表优先考虑日期分区表。
  • 分区采用日期字段,分区表粒度,要基于数据更新范围确定,不要过大也不要过小。分区粒度可以采用年、月、日,分区粒度的目标是减少同时写入的文件桶数,尤其是在有数据量更新,且更新数据有一定时间范围规律的,比如:近一个月的数据更新占比最大,可以按照月份创建分区;近一天内的数据更新占比大,可以按照天进行分区。采用Bucket索引,写入是通过主键Hash打散的,数据会均匀的写入到分区下每个桶。因为各个分区的数据量是会有波动的,分区下桶的个数设计一般会按照最大分区数据量计算,这样会出现越细粒度的分区,桶的个数会冗余越多。例如:采用天级分区,平均的日增数据量是3GB,最多一天的日志是8GB,这个会采用Bucket桶数= 8GB/2GB = 4 来创建表;每天的更新数据占比较高,且主要分散到近一个月。这样会导致结果是,每天的数据会写入到全月的Bucket桶中,那就是4*30 = 120个桶。如果采用月分区,分区桶的个数= 3GB * 30 /2GB = 45个桶 ,这样写入的数据桶数减少到了45个桶。在有限的计算资源下,写入的桶数越少,性能越高。
  • 分区层级不宜过多,最好不要超过3层。
  • 建议hudi表与hive采用一致的分区风格,hoodie.datasource.write.hive_style_partitioning=true。
  • 实时表通过记录创建日期分区,维度表创建为非分区,或者采用分区粒度较低的字段进行分区。

Hudi数据表Compaction规范

MOR表更新数据以行存log的形式写入,log读取时需要按主键合并,并且是行存的,导致log读取效率比parquet低很多。为了解决log读取的性能问题,Hudi通过compaction将log压缩成parquet文件,大幅提升读取性能。

规则

  • 有数据持续写入的表,24小时内至少执行一次compaction。对于MOR表,不管是流式写入还是批量写入,需要保证每天至少完成1次Compaction操作。如果长时间不做compaction,Hudi表的log将会越来越大,这必将会出现以下问题:
    • Hudi表读取很慢,且需要很大的资源。 这是由于读MOR表涉及到log合并,大log合并需要消耗大量的资源并且速度很慢。
    • 长时间进行一次Compaction需要耗费很多资源才能完成,且容易出现OOM。
    • 阻塞Clean,如果没有Compaction操作来产生新版本的Parquet文件,那旧版本的文件就不能被Clean清理,增加存储压力。
  • CPU与内存比例为1:41:8。Compaction作业是将存量的parquet文件内的数据与新增的log中的数据进行合并,需要消耗较高的内存资源,按照之前的表设计规范以及实际流量的波动结合考虑,建议Compaction作业CPU与内存的比例按照1:41:8配置,保证Compaction作业稳定运行。当Compaction出现OOM问题,可以通过调大内存占比解决。

建议

  • 通过增加并发数提升Compaction性能。CPU和内存比例配置合理会保证Compaction作业是稳定的,实现单个Compaction task的稳定运行。但是Compaction整体的运行时长取决于本次Compaction处理文件数以及分配的cpu核数(并发能力),因此可以通过增加Compaction作业的CPU核的个数来提升Compaction性能(注意增加cpu也要保证CPU与内存的比例)。
  • Hudi表采用异步Compaction。为了保证流式入库作业的稳定运行,就需要保证流式作业不在实时入库的过程中做其它任务,比如Flink写Hudi的同时会做Compaction。这看似是一个不错的方案,即完成了入库又完成Compaction。但是Compaction操作是非常消耗内存和IO的,它会给流式入库作业带来以下影响:
    • 增加端到端时延:Compaction会放大写入时延,因为Compaction比入库更耗时。
    • 作业不稳定:Compaction会给入库作业带来更多的不稳定性,Compaction OOM将会导致整个作业直接失败。
  • 建议2~4小时进行一次compaction。Compaction是MOR表非常重要且必须执行的维护手段,对于实时任务来说,要求Compaction执行合并的过程必须和实时任务解耦,通过周期调度Spark任务来完成异步Compaction,这个方案的关键之处在于如何合理的设置这个周期,周期如果太短意味着Spark任务可能会空跑,周期如果太长可能会积压太多的Compaction Plan没有去执行而导致Spark任务耗时长并且也会导致下游的读作业时延高。对此场景,在这里给出以下建议:按照集群资源使用情况,可以每2小时或每4个小时去调度执行一次异步Compaction作业,这是一个基本的维护MOR表的方案。
  • 采用Spark异步执行Compaction,不采用Flink进行Compaction。Flink写hudi建议的方案是Flink只负责写数据和生成Compaction计划,由单独的Spark作业异步执行compaction、clean和archive。Compaction计划的生成是轻量级的对Flink写入作业影响可以忽略。上述方案落地的具体步骤参考如下:
    • Flink只负责写数据和生成Compaction计划Flink流任务建表语句中添加如下参数,控制Flink任务写Hudi时只会生成Compaction plan。
  'compaction.async.enabled' = 'false'      -- 关闭Flink 执行Compaction任务
  'compaction.schedule.enabled' = 'true'    -- 开启Compaction计划生成
  'compaction.delta_commits' = '5'          -- MOR表默认5次checkpoint尝试生成compaction plan,该参数需要根据具体业务调整
  'clean.async.enabled' = 'false'           -- 关闭Clean操作
  'hoodie.archive.automatic' = 'false'      -- 关闭Archive操作
  • Spark离线完成Compaction计划的执行,以及Clean和Archive操作。
    在调度平台(可以使用华为的DataArts)运行一个定时调度的离线任务来让Spark完成Hudi表的Compaction计划执行以及Clean和Archive操作。
    set hoodie.archive.automatic = false;
    set hoodie.clean.automatic = false;
    set hoodie.compact.inline = true;
    set hoodie.run.compact.only.inline=true;
    set hoodie.cleaner.commits.retained = 500;  -- clean保留timeline上最新的500个deltacommit对应的数据文件,之前的deltacommit所对应的旧版本文件会被清理。该值需要大于compaction.delta_commits设置的值,需要根据具体业务调整。
    set hoodie.keep.max.commits = 700;  -- timeline最多保留700个deltacommit
    set hoodie.keep.min.commits = 501;  -- timeline最少保留500个deltacommit。该值需要大于hoodie.cleaner.commits.retained设置的值,需要根据具体业务调整。
    run compaction on <database name>. <table name>;   -- 执行Compaction计划run clean on <database name>. <table name>;        -- 执行Clean操作
    run archivelog on <database name>.<table name>;    -- 执行Archive操作
  • 异步Compaction可以将多个表串行到一个作业,资源配置相近的表放到一组,该组作业的资源配置为最大消耗资源的表所需的资源。
    对于在Hudi表采用异步Compaction和采用Spark异步执行Compaction,这里给出以下开发建议:
    • 不需要对每张Hudi表都开发异步Compaction任务,这样会导致作业开发成本高,集群作业爆炸,集群资源不能有效的利用和释放。
    • 异步Compaction任务可以通过执行SparkSQL来完成,多个Hudi表的Compaction、Clean和Archive可以放在同一个任务来执行,比如对table1和table2用同一个任务来执行异步维护操作:
  set hoodie.clean.async = true;
  set hoodie.clean.automatic = false;
  set hoodie.compact.inline = true;
  set hoodie.run.compact.only.inline=true;
  set hoodie.cleaner.commits.retained = 500;
  set hoodie.keep.min.commits = 501;
  set hoodie.keep.max.commits = 700;
  run compaction on <database name>. <table1>;
  run clean on <database name>. <table1>;
  run archivelog on <database name>.<table1>;
  run compaction on <database name>.<table2>;
  run clean on <database name>.<table2>;
  run archivelog on <database name>.<table2>;

Hudi数据表Clean规范

Clean也是Hudi表的维护操作之一,该操作对于MOR表和COW表都需要执行。Clean操作的目的是为了清理旧版本文件(Hudi不再使用的数据文件),这不但可以节省Hudi表List过程的时间,也可以缓解存储压力。

规则

Hudi表必须执行Clean。
对于Hudi的MOR、COW表,都需要开启Clean。

  • Hudi表在写入数据时会自动判断是否需要执行Clean,因为Clean的开关默认打开(hoodie.clean.automatic默认为true)。
  • Clean操作并不是每次写数据时都会触发,至少需要满足两个条件:
  1. Hudi表中需要有旧版本的文件。对于COW表来说,只要保证数据被更新过就一定存在旧版本的文件。对于MOR表来说,要保证数据被更新过并且做过Compaction才能有旧版本的文件。
  2. Hudi表满足hoodie.cleaner.commits.retained设置的阈值。如果是Flink写hudi,则至少提交的checkpoint要超过这个阈值;如果是批写Hudi,则批写次数要超过这个阈值。

建议

  • MOR表下游采用批量读模式,采用clean的版本数为compaction版本数+1。MOR表一定要保证Compaction Plan能够被成功执行,Compaction Plan只是记录了Hudi表中哪些Log文件要和哪些Parquet文件合并,所以最重要的地方在于保证Compaction Plan在被执行的时候它需要合并的文件都存在。而Hudi表中只有Clean操作可以清理文件,所以建议Clean的触发阈值(hoodie.cleaner.commits.retained的值)至少要大于Compaction的触发阈值(对于Flink任务来说就是compaction.delta_commits的值)。
  • MOR表下游采用流式计算,历史版本保留小时级。如果MOR表的下游是流式计算,例如Flink流读,可以按照业务需要保留小时级的历史版本,这样的话近几个小时之内的增量数据可以通过log文件读出,如果保留时长过短,下游flink作业在重启或者异常中断阻塞的情况下,上游增量数据已经Clean掉了,flink需要从parquet文件读增量数据,性能会有下降;如果保留时间过长,会导致log里面的历史数据冗余存储。具体可以按照下面的计算公式来保留2个小时的历史版本数据:版本数设置为3600*2/版本interval时间,版本interval时间来自于flink作业的checkpoint周期,或者上游批量写入的周期。
  • COW表如果业务没有历史版本数据保留的特殊要求,保留版本数设置为1。COW表的每个版本都是表的全量数据,保留几个版本就会冗余多少个版本。因此如果业务无历史数据回溯的需求,保留版本数设置为1,也就是保留当前最新版本。
  • clean作业每天至少执行一次,可以2~4小时执行一次。Hudi的MOR表和COW表都需要保证每天至少1次Clean,MOR表的Clean可以参考2.2.1.6小节和Compaction放在一起异步去执行。COW的Clean可以在写数据时自动判断是否执行。

Hudi数据表Archive规范

Archive(归档)是为了减轻Hudi读写元数据的压力,所有的元数据都存放在这个路径:Hudi表根目录/.hoodie目录,如果.hoodie目录下的文件数量超过10000就会发现Hudi表有非常明显的读写时延。

规则

Hudi表必须执行Archive。
对于Hudi的MOR类型和COW类型的表,都需要开启Archive。

  • Hudi表在写入数据时会自动判断是否需要执行Archive,因为Archive的开关默认打开(hoodie.archive.automatic默认为true)。
  • Archive操作并不是每次写数据时都会触发,至少需要满足以下两个条件:
  1. Hudi表满足hoodie.keep.max.commits设置的阈值。如果是Flink写hudi至少提交的checkpoint要超过这个阈值;如果是Spark写hudi,写Hudi的次数要超过这个阈值。
  2. Hudi表做过Clean,如果没有做过Clean就不会执行Archive。

建议

Archive作业每天至少执行一次,可以2~4小时执行一次。
Hudi的MOR表和COW表都需要保证每天至少1次Archive,MOR表的Archive可以参考2.2.1.6小节和Compaction放在一起异步去执行。COW的Archive可以在写数据时自动判断是否执行。

Spark on Hudi表数据维护规范

SparkSQL建表参数规范

规则

  • 建表必须指定primaryKey和preCombineField。Hudi表提供了数据更新的能力和幂等写入的能力,该能力要求数据记录必须设置主键用来识别重复数据和更新操作。不指定主键会导致表丢失数据更新能力,不指定preCombineField会导致主键重复。
参数名称 参数描述 输入值 说明
primaryKey hudi主键 按需 必须指定,可以是复合主键但是必须全局唯一。
preCombineField 预合并键,相同主键的多条数据按该字段进行合并 按需 必须指定,相同主键的数据会按该字段合并,不能指定多个字段。
  • 禁止建表时将hoodie.datasource.hive_sync.enable指定为false。指定为false将导致新写入的分区无法同步到Hive Metastore中。由于缺失新写入的分区信息,查询引擎读取该时会丢数。
  • 禁止指定Hudi的索引类型为INMEMORY类型。该索引仅是为了测试使用。生产环境上使用该索引将导致数据重复。

建表示例

create table data_partition(id int, comb int, col0 int,yy int, mm int, dd int)
 using hudi                               --指定hudi 数据源
 partitioned by(yy,mm,dd)                 --指定分区, 支持多级分区
 location '/opt/log/data_partition'       --指定路径,如果不指定建表在hive warehouse里
 options(type='mor',                              --表类型 mor 或者 
 cowprimaryKey='id',                         --主键,可以是复合主键但是必须全局唯一
 preCombineField='comb'                   --预合并字段,相同主键的数据会按该字段合并,当前不能指定多个字段  
 )

Spark增量读取Hudi参数规范

规则

增量查询之前必须指定当前表的查询为增量查询模式,并且查询后重写设置表的查询模式。
如果增量查询完,不重新将表查询模式设置回去,将影响后续的实时查询。

示例

set hoodie.tableName.consume.mode=INCREMENTAL;--必须设置当前表读取为增量读取模式。
set hoodie.tableName.consume.start.timestamp=20201227153030;--指定初始增量拉取commit。
set hoodie.tableName.consume.end.timestamp=20210308212318;  --指定增量拉取结束commit,如果不指定的话采用最新的commit。
select * from tableName where `_hoodie_commit_time`>'20201227153030' and `_hoodie_commit_time`<='20210308212318'; --结果必须根据start.timestamp和end.timestamp进行过滤,如果没有指定end.timestamp,则只需要根据start.timestamp进行过滤。
set hoodie.tableName.consume.mode=SNAPSHOT;  --使用完增量模式,必须把查询模式重新设置回来。

Spark异步任务执行表compaction参数设置规范

  • 写作业未停止情况下,禁止手动执行run schedule命令生成compaction计划。
    错误示例:
run schedule on dsrTable

如果还有别的任务在写这张表,执行该操作会导致数据丢失。

  • 执行run compaction命令时,禁止将hoodie.run.compact.only.inline设置成false,该值需要设置成true。
    错误示例:
  set hoodie.run.compact.only.inline=false;
  run compaction on dsrTable;

如果还有别的任务在写这张表,执行上述操作会导致数据丢失。
正确示例:异步Compaction

set hoodie.compact.inline = true;
set hoodie.run.compact.only.inline=true;
run compaction on dsrTable;

Spark on Hudi表数据维护规范

禁止通过Alter命令修改表关键属性信息:type/primaryKey/preCombineField/hoodie.index.type
错误示例,执行如下语句修改表关键属性:

alter table dsrTable set tblproperties('type'='xx');
alter table dsrTable set tblproperties('primaryKey'='xx');
alter table dsrTable set tblproperties('preCombineField'='xx');
alter table dsrTable set tblproperties('hoodie.index.type'='xx');

Hive/trino等引擎可以直接修改表属性,但是这种修改会导致整个Hudi表出现数据重复,甚至数据损坏;因此禁止修改上述属性。

Spark并发写Hudi建议

  • 涉及到并发场景,推荐采用分区间并发写的方式:即不同的写入任务写不同的分区。
  • 分区并发参数控制:
    • SQL方式:
set hoodie.support.partition.lock=true;
  • DataSource API方式:
df.write.format("hudi")
  .options(xxx)
  .option("hoodie.support.partition.lock", "true")
  .mode(xxx)
  .save("/tmp/tablePath")

说明:

所有参与分区间并发写入的任务,都必须配置上述参数。

  • 不建议同分区内并发写,这种并发写入需要开启Hudi OCC方式并发写入,必须严格遵守并发参数配置,否则会出现表数据损坏的问题。
    并发OCC参数控制:
    - SQL方式:
    --开启OCC。
    set hoodie.write.concurrency.mode=optimistic_concurrency_control;
    set hoodie.cleaner.policy.failed.writes=LAZY;--开启并发ZooKeeper锁。
    set hoodie.write.lock.provider=org.apache.hudi.client.transaction.lock.ZookeeperBasedLockProvider; --设置使用ZooKeeper锁。
    set hoodie.write.lock.zookeeper.url=<zookeeper_url>;  --设置使用ZooKeeper地址。
    set hoodie.write.lock.zookeeper.port=<zookeeper_port>; --设置使用ZooKeeper端口。
    set hoodie.write.lock.zookeeper.lock_key=<table_name>;  --设置锁名称。
    set hoodie.write.lock.zookeeper.base_path=<table_path>; --设置zk锁路径。
  • DataSource API方式:
df.write
    .format("hudi")
    .options(xxx)
    .option("hoodie.write.concurrency.mode", "optimistic_concurrency_control")
    .option("hoodie.cleaner.policy.failed.writes", "LAZY")
    .option("hoodie.write.lock.zookeeper.url", "zookeeper_url")
    .option("hoodie.write.lock.zookeeper.port", "zookeeper_port")
    .option("hoodie.write.lock.zookeeper.lock_key", "table_name")
    .option("hoodie.write.lock.zookeeper.base_path", "table_path")
    .mode(xxx)
    .save("/tmp/tablePath")

说明:

  1. 所有参与并发写入的任务,都必须配置上述参数。OCC不会保证所有参与并发写入的任务都执行成功;当出现多个写任务更新同一个文件时,只有一个任务可以成功,其余失败。
  2. 并发场景下,需要设置cleaner policy为Lazy,因此无法自动清理垃圾文件。

Spark读写Hudi资源配置建议

  • Spark读写Hudi任务资源配置规则,内存和CPU核心的比例2:1,堆外内存和CPU核心比例0.5:1;即一个核心,需要2G堆内存,0.5G堆外内存。

    说明:

    Spark初始化入库场景,由于处理的数据量比较大,上述资源配比需要调整,内存和Core的比例推荐4:1,堆外内存和Core的比例1:1。

示例:

spark-submit
--master yarn-cluster
--executor-cores  2                          --核心
--executor-memory 4g                         --堆内存
--conf spark.executor.memoryOverhead=1024    --堆外内存
  • 基于Spark进行ETL计算,CPU核心 :内存比例建议>1:2,推荐1:41:8上一个规则是指纯读写的资源配比,如果Spark的作业除了读写还有业务逻辑计算,该过程会导致需要内存增加,因此建议CPU核心与内存的比例大于1:2,如果逻辑比较复杂适当调大内存,这要基于实际情况进行调整。一般默认推荐配置为1:41:8。
  • 针对bucket表的写入资源配置,建议给的CPU核心数量不小于桶数目 (分区表每次可能写入多个分区,理想情况下建议给的CPU核心数量=写入分区分桶数;实际配置的core小于这个值,写入性能线性下降)。示例:当前表bucket数为3,同时写入分区数为2, 建议入库Spark任务配置的core数量大于等于32。
spark-submit
--master yarn-cluster
--executor-cores 2
--executor-memory 4g
--excutor-num 3

以上配置代表executor-numexecutor-cores=6 >=分区数分桶数=6。

Spark On Hudi性能调优

优化Spark Shuffle参数提升Hudi写入效率

  • 开启spark.shuffle.readHostLocalDisk=true,本地磁盘读取shuffle数据,减少网络传输的开销。
  • 开启spark.io.encryption.enabled=false,关闭shuffle过程写加密磁盘,提升shuffle效率。
  • 开启spark.shuffle.service.enabled=true,启动shuffle服务,提升任务shuffle的稳定性。
配置项 集群默认值 调整后
--conf spark.shuffle.readHostLocalDisk false true
--conf spark.io.encryption.enabled true false
--conf spark.shuffle.service.enabled false true

调整Spark调度参数优化OBS场景下Spark调度时延

  • 开启对于OBS存储,可以关闭Spark的本地性进行优化,尽可能提升Spark调度效率。
配置项 集群默认值 调整后
--conf spark.locality.wait 3s 0s
--conf spark.locality.wait.process 3s 0s
--conf spark.locality.wait.node 3s 0s
--conf spark.locality.wait.rack 3s 0s

优化shuffle并行度,提升Spark加工效率

所谓的shuffle并发度如下图所示:

集群默认是200,作业可以单独设置。如果发现瓶颈stage(执行时间长),且分配给当前作业的核数大于当前的并发数,说明并发度不足。通过以下配置优化。

场景 配置项 集群默认值 调整后
Jar作业 spark.default.parallelism 200 按实际作业可用资源2倍设置
SQL作业 spark.sql.shuffle.partitions 200 按实际作业可用资源2倍设置
hudi入库作业 hoodie.upsert.shuffle.parallelism 200 非bucket表使用,按实际作业可用资源2倍设置

注意:

动态资源调度情况下(spark.dynamicAllocation.enabled= true)时,资源按照spark.dynamicAllocation.maxExecutors评估。

Bucket表,可以开启桶裁剪提升主键点查效率

示例:
业务经常使用主键id作为查询条件,执行点查;比如select xxx where id = idx ... 。
建表时,可以加入如下属性,提升查询效率。默认配置下属性值等于primaryKey,即主键。

hoodie.bucket.index.hash.field=id

初始化Hudi表时,可以使用BulkInsert方式快速写入数据

示例:

set hoodie.combine.before.insert=true;                --入库前去重,如果数据没有重复 该参数无需设置。
set hoodie.datasource.write.operation = bulk_insert;  --指定写入方式为bulk insert方式。
set hoodie.bulkinsert.shuffle.parallelism = 4;        --指定bulk_insert写入时的并行度,等于写入完成后保存的分区parquet文件数。
insert into dsrTable select * from srcTabble

开启log列裁剪,提升MOR表查询效率

mor表读取的时候涉及到Log和Parquet的合并,性能不是很理想。可以开启log列裁剪减少合并时IO读取开销。
SparkSQL执行查询,先执行:

set hoodie.enable.log.column.prune=true;

Spark加工Hudi表时其他参数优化

  • 设置spark.sql.enableToString=false,降低Spark解析复杂SQL时候内存使用,提升解析效率。
  • 设置spark.speculation=false,关闭推测执行,开启该参数会带来额外的cpu消耗,同时Hudi不支持启动该参数,启用该参数写Hudi有概率导致文件损坏。
配置项 集群默认值 调整后
--conf spark.sql.enableToString true false
--conf spark.speculation false false

Bucket调优示例

创建Bucket索引表调优

Bucket索引常用设置参数:

  • Spark:
hoodie.index.type=BUCKET
hoodie.bucket.index.num.buckets=5
  • Flink
index.type=BUCKET
hoodie.bucket.index.num.buckets=5

判断使用分区表还是非分区表

根据表的使用场景一般将表分为事实表和维度表:

  • 事实表通常整表数据规模较大,以新增数据为主,更新数据占比小,且更新数据大多落在近一段时间范围内(年或月或天),下游读取该表进行ETL计算时通常会使用时间范围进行裁剪(例如最近一天、一月、一年),这种表通常可以通过数据的创建时间来做分区以保证最佳读写性能。
  • 维度表数据量一般整表数据规模较小,以更新数据为主,新增较少,表数据量比较稳定,且读取时通常需要全量读取做join之类的ETL计算,因此通常使用非分区表性能更好。
  • 分区表的分区键不允许更新,否则会产生重复数据。
    例外场景:超大维度表和超小事实表
    特殊情况如存在持续大量新增数据的维度表(表数据量在200G以上或日增长量超过60M)或数据量非常小的事实表(表数据量小于10G且未来三至五年增长后也不会超过10G)需要针对具体场景来进行例外处理:
  • 持续大量新增数据的维度表方法一:预留桶数,如使用非分区表则需通过预估较长一段时间内的数据增量来预先增加桶数,缺点是随着数据的增长,文件依然会持续膨胀;方法二:大粒度分区(推荐),如果使用分区表则需要根据数据增长情况来计算,例如使用年分区,这种方式相对麻烦些但是多年后表无需重新导入。方法三:数据老化,按照业务逻辑分析大的维度表是否可以通过数据老化清理无效的维度数据从而降低数据规模。
  • 数据量非常小的事实表这种可以在预估很长一段时间的数据增长量的前提下使用非分区表预留稍宽裕一些的桶数来提升读写性能。
    确认表内桶数
    Hudi表的桶数设置,关系到表的性能,需要格外引起注意。
    以下几点,是设置桶数的关键信息,需要建表前确认。
  • 非分区表
  1. 单表数据总条数 = select count(1) from tablename(入湖时需提供);
  2. 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小)
  3. 单表数据量大小(G) = 单表数据总条数*单表数据大小/1024/1024
  4. 非分区表桶数 = MAX(单表数据量大小(G)/2G*2,再向上取整,4)
  • 分区表
  1. 最近一个月最大数据量分区数据总条数 = 入湖前咨询产品线
  2. 单条数据大小 = 平均 1KB(华为建议通过select * from tablename limit 100将查询结果粘贴在notepad++中得出100条数据的大小再除以100得到单条平均大小)
  3. 单分区数据量大小(G) = 最近一个月最大数据量分区数据总条数*单表数据大小/1024/1024
  4. 分区表桶数 = MAX(单分区数据量大小(G)/2G,再后向上取整,1)

    注意:

    1. 需要使用的是表的总数据大小,而不是压缩以后的文件大小。
    2. 桶的设置以偶数最佳,非分区表最小桶数请设置4个,分区表最小桶数请设置1个。