快速使用

最近更新时间: 2026-03-13 09:03:00

Spark SQL-Hudi基本使用

spark-sql --master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-memory 1g \
--executor-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-*.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

建表

建表参数

参数名 默认值 说明
primaryKey uuid 表的主键名,多个字段用逗号分隔。 同 hoodie.datasource.write.recordkey.field
preCombineField 表的预合并字段。 同 hoodie.datasource.write.precombine.field
type COW 创建的表类型: type = 'COW' type = 'MOR' 同hoodie.datasource.write.table.type

创建非分区表

-- 创建一个cow表,primaryKey 'id',并提供preCombineField
create table hudi_cow_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
);

-- 创建一个mor非分区表
create table hudi_mor_tbl (
  id int,
  name string,
  price double,
  ts bigint
) using hudi
tblproperties (
  type = 'mor',
  primaryKey = 'id',
  preCombineField = 'ts'
);

创建分区表

-- 创建一个cow分区外部表,指定primaryKey和preCombineField
create table hudi_cow_pt_tbl (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';

通过CTAS (Create Table As Select)建表


通过CTAS从其他表加载数据

-- 创建内部表
create table parquet_mngd using parquet location 'hdfs:///tmp/parquet_dataset/*.parquet';

-- 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'hdfs:/tmp/hudi/hudi_tbl/' options (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (datestr) as select * from parquet_mngd;

插入数据

默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。

向非分区表插入数据

insert into hudi_cow_tbl select 1, 'a1', 20;insert into hudi_mor_tbl select 1, 'a1', 20, 1000;

向分区表动态分区插入数据

insert into hudi_cow_pt_tbl partition (dt, hh)select 1 as id, 'a1' as name, 1000 as ts, '2024-07-17' as dt, '10' as hh;

向分区表静态分区插入数据

insert into hudi_cow_pt_tbl partition(dt = '2024-07-07', hh='11') select 2, 'a2', 1000;

使用bulk_insert插入数据

-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1   a1_1    20.0    1001

-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert 
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;

insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1       a1_2    20.0    1002
1       a1_1    20.0    1001

查询数据

普通查询

select fare, begin_lon, begin_lat, ts from  hudi_trips_snapshot where fare > 20.0

时间旅行查询

-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;

create table hudi_cow_pt_tbl1 (
  id bigint,
  name string,
  ts bigint,
  dt string,
  hh string
) using hudi
tblproperties (
  type = 'cow',
  primaryKey = 'id',
  preCombineField = 'ts'
 )
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';

-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2024-07-17', '10';
select * from hudi_cow_pt_tbl1;

-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2024-07-17', '10';
select * from hudi_cow_pt_tbl1;

-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20240724154822829' where id = 1;

-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2024-07-24 15:48:22.829' where id = 1;

select * from hudi_cow_pt_tbl1 timestamp as of '2024-07-24' where id = 1;

更新数据

update

-- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]

-- 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;

update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;

-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';

MergeInto

-- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ]  THEN <not_matched_action> ]

<merge_condition> =A equal bool condition 
<matched_action>  =
  DELETE  |
  UPDATE SET *  |
  UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action>  =
  INSERT *  |
  INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])

-- 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);

merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;

-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2024-07-17', '10'), (2, "new_a2", 'delete', '2024-07-17', '11'), (3, "new_a3", 'insert', '2024-07-17', '12');

merge into hudi_cow_pt_tbl1 as target
using (
  select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
 update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
 insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;

删除数据

语法

DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]

用法

delete from hudi_cow_nonpcf_tbl where uuid = 1;

delete from hudi_mor_tbl where id % 2 = 0;

-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';

Flink使用Hudi

YARN-session模式

Yarn-Session模式
启动集群
bin/yarn-session.sh -s 2 -jm 4096 -tm 4096 -d

启动sql client
bin/sql-client.sh embedded -s yarn-session

建表及插入数据

set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;

-- 创建hudi表
CREATE TABLE t1(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://HDFS78000435/tmp/hudi_flink/t1',
  'table.type' = 'MERGE_ON_READ' –- 默认是COW
);
或如下写法
CREATE TABLE flink_hudi_mor(
  uuid VARCHAR(20),
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20),
  PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://HDFS78000435/tmp/hudi_flink/flink_hudi_mor',
  'table.type' = 'MERGE_ON_READ'
);

-- 插入数据
INSERT INTO t1 VALUES
  ('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
  ('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
  ('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
  ('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
  ('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
  ('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
  ('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
  ('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
  
  

Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(推荐这种方式)

它可以像Spark SQL创建表一样,直接将表建立在Hive中,并且表结构与Hive SQL和Spark SQL兼容,也就是Flink Hudi HMS Catalog中创建的表,可以同时使用Flink SQL、Hive SQL、Spark SQL、Trino查询,也可以同时使用Flink SQL、Spark SQL写Hudi。不像方式二中介绍的方式,Flink SQL写Hudi的表不能被Hive/Spark使用,只能通过同步表的方式。另外在Flink Hudi HMS Catalog中和Spark SQL一样默认开启同步Hive,也就是对于MOR表默认会同步创建对应的_ro表和_rt表,至于COW表因为同步的表名和创建的表名一样,所以读写是同一张表。总之和Spark SQL创建表、读写一致。
如果Hudi表类型是MERGE_ON_READ模式,那么映射的Hive表将会有2张,一张后缀为_rt(实时表),另一张表后缀为_ro(读优化表)。后缀 _rt 对应的Hive表中存储的是Base文件Parquet格式数据+log Avro格式数据,也就是全量数据。后缀为 _ro Hive表中存储的是Base文件对应的数据。


set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
-- 创建 catalog 信息
CREATE CATALOG hudi_catalog WITH (
    'type' = 'hudi',
    'mode' = 'hms',
    'default-database' = 'default',
    'hive.conf.dir' = '/usr/local/service/hive/conf',
    'table.external' = 'true'
);

use catalog hudi_catalog;
use hudi;

-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;

CREATE TABLE test_hudi_flink_mor (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://HDFS78000007/tmp/hudi/test_hudi_flink_mor',
  'table.type' = 'MERGE_ON_READ',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/usr/local/service/hive/conf',
  'spark.version'='spark3.4.2',
  'hoodie.datasource.write.precombine.field'='ts'
);

insert into test_hudi_flink_mor values (1,'hudi',10,100,'2024-07-17'),(2,'hudi',10,100,'2024-07-17');

CREATE TABLE test_hudi_flink_cow (
  id int PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  price int,
  ts int,
  dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
  'connector' = 'hudi',
  'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
  'hoodie.datasource.write.recordkey.field' = 'id',
  'hoodie.datasource.write.hive_style_partitioning' = 'true',
  'hive_sync.conf.dir'='/usr/local/service/hive/conf',
  'hoodie.datasource.write.precombine.field'='ts'
);

insert into test_hudi_flink_cow values (1,'hudi',10,100,'2024-07-17'),(2,'hudi',10,100,'2024-07-17');

Trino使用Hudi

通过Trino-cli 连接trino

client/trino-cli --server https://10.206.16.246:9443 --catalog hudi --schema default --user hadoop --password --insecure
密码为hadoop对应的Ldap密码,默认为机器密码

Hive使用Hudi

beeline -u 'jdbc:hive2://x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2' -nhive -pxxx

注:hive不支持直接读取Flink SQL的MOR表,只支持读取同步到hive生成的_rt和_ro表,并且需要flink-sql mor表触发了文件合并生成.parquet文件后才能读取到数据,trino读flink hudi表也是一样的。