Spark SQL-Hudi基本使用
spark-sql --master yarn \
--deploy-mode client \
--num-executors 2 \
--executor-memory 1g \
--executor-cores 2 \
--jars /usr/local/service/spark/jars/hudi-spark3.4-bundle_2.12-*.jar \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'
建表
建表参数
| 参数名 | 默认值 | 说明 |
|---|---|---|
| primaryKey | uuid | 表的主键名,多个字段用逗号分隔。 同 hoodie.datasource.write.recordkey.field |
| preCombineField | 表的预合并字段。 同 hoodie.datasource.write.precombine.field | |
| type | COW | 创建的表类型: type = 'COW' type = 'MOR' 同hoodie.datasource.write.table.type |
创建非分区表
-- 创建一个cow表,primaryKey 'id',并提供preCombineField
create table hudi_cow_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
);
-- 创建一个mor非分区表
create table hudi_mor_tbl (
id int,
name string,
price double,
ts bigint
) using hudi
tblproperties (
type = 'mor',
primaryKey = 'id',
preCombineField = 'ts'
);
创建分区表
-- 创建一个cow分区外部表,指定primaryKey和preCombineField
create table hudi_cow_pt_tbl (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl';
通过CTAS (Create Table As Select)建表
通过CTAS从其他表加载数据
-- 创建内部表
create table parquet_mngd using parquet location 'hdfs:///tmp/parquet_dataset/*.parquet';
-- 通过CTAS加载数据
create table hudi_ctas_cow_pt_tbl2 using hudi location 'hdfs:/tmp/hudi/hudi_tbl/' options (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (datestr) as select * from parquet_mngd;
插入数据
默认情况下,如果提供了preCombineKey,则insert into的写操作类型为upsert,否则使用insert。
向非分区表插入数据
insert into hudi_cow_tbl select 1, 'a1', 20;insert into hudi_mor_tbl select 1, 'a1', 20, 1000;
向分区表动态分区插入数据
insert into hudi_cow_pt_tbl partition (dt, hh)select 1 as id, 'a1' as name, 1000 as ts, '2024-07-17' as dt, '10' as hh;
向分区表静态分区插入数据
insert into hudi_cow_pt_tbl partition(dt = '2024-07-07', hh='11') select 2, 'a2', 1000;
使用bulk_insert插入数据
-- 向指定preCombineKey的表插入数据,则写操作为upsert
insert into hudi_mor_tbl select 1, 'a1_1', 20, 1001;
select id, name, price, ts from hudi_mor_tbl;
1 a1_1 20.0 1001
-- 向指定preCombineKey的表插入数据,指定写操作为bulk_insert
set hoodie.sql.bulk.insert.enable=true;
set hoodie.sql.insert.mode=non-strict;
insert into hudi_mor_tbl select 1, 'a1_2', 20, 1002;
select id, name, price, ts from hudi_mor_tbl;
1 a1_2 20.0 1002
1 a1_1 20.0 1001
查询数据
普通查询
select fare, begin_lon, begin_lat, ts from hudi_trips_snapshot where fare > 20.0
时间旅行查询
-- 关闭前面开启的bulk_insert
set hoodie.sql.bulk.insert.enable=false;
create table hudi_cow_pt_tbl1 (
id bigint,
name string,
ts bigint,
dt string,
hh string
) using hudi
tblproperties (
type = 'cow',
primaryKey = 'id',
preCombineField = 'ts'
)
partitioned by (dt, hh)
location '/tmp/hudi/hudi_cow_pt_tbl1';
-- 插入一条id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a0', 1000, '2024-07-17', '10';
select * from hudi_cow_pt_tbl1;
-- 修改id为1的数据
insert into hudi_cow_pt_tbl1 select 1, 'a1', 1001, '2024-07-17', '10';
select * from hudi_cow_pt_tbl1;
-- 基于第一次提交时间进行时间旅行
select * from hudi_cow_pt_tbl1 timestamp as of '20240724154822829' where id = 1;
-- 其他时间格式的时间旅行写法
select * from hudi_cow_pt_tbl1 timestamp as of '2024-07-24 15:48:22.829' where id = 1;
select * from hudi_cow_pt_tbl1 timestamp as of '2024-07-24' where id = 1;
更新数据
update
-- 语法
UPDATE tableIdentifier SET column = EXPRESSION(,column = EXPRESSION) [ WHERE boolExpression]
-- 执行更新
update hudi_mor_tbl set price = price * 2, ts = 1111 where id = 1;
update hudi_cow_pt_tbl1 set name = 'a1_1', ts = 1001 where id = 1;
-- update using non-PK field
update hudi_cow_pt_tbl1 set ts = 1111 where name = 'a1_1';
MergeInto
-- 语法
MERGE INTO tableIdentifier AS target_alias
USING (sub_query | tableIdentifier) AS source_alias
ON <merge_condition>
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN MATCHED [ AND <condition> ] THEN <matched_action> ]
[ WHEN NOT MATCHED [ AND <condition> ] THEN <not_matched_action> ]
<merge_condition> =A equal bool condition
<matched_action> =
DELETE |
UPDATE SET * |
UPDATE SET column1 = expression1 [, column2 = expression2 ...]
<not_matched_action> =
INSERT * |
INSERT (column1 [, column2 ...]) VALUES (value1 [, value2 ...])
-- 执行案例
-- 1、准备source表:非分区的hudi表,插入数据
create table merge_source (id int, name string, price double, ts bigint) using hudi
tblproperties (primaryKey = 'id', preCombineField = 'ts');
insert into merge_source values (1, "old_a1", 22.22, 2900), (2, "new_a2", 33.33, 2000), (3, "new_a3", 44.44, 2000);
merge into hudi_mor_tbl as target
using merge_source as source
on target.id = source.id
when matched then update set *
when not matched then insert *
;
-- 2、准备source表:分区的parquet表,插入数据
create table merge_source2 (id int, name string, flag string, dt string, hh string) using parquet;
insert into merge_source2 values (1, "new_a1", 'update', '2024-07-17', '10'), (2, "new_a2", 'delete', '2024-07-17', '11'), (3, "new_a3", 'insert', '2024-07-17', '12');
merge into hudi_cow_pt_tbl1 as target
using (
select id, name, '2000' as ts, flag, dt, hh from merge_source2
) source
on target.id = source.id
when matched and flag != 'delete' then
update set id = source.id, name = source.name, ts = source.ts, dt = source.dt, hh = source.hh
when matched and flag = 'delete' then delete
when not matched then
insert (id, name, ts, dt, hh) values(source.id, source.name, source.ts, source.dt, source.hh)
;
删除数据
语法
DELETE FROM tableIdentifier [ WHERE BOOL_EXPRESSION]
用法
delete from hudi_cow_nonpcf_tbl where uuid = 1;
delete from hudi_mor_tbl where id % 2 = 0;
-- 使用非主键字段删除
delete from hudi_cow_pt_tbl1 where name = 'a1_1';
Flink使用Hudi
YARN-session模式
Yarn-Session模式
启动集群
bin/yarn-session.sh -s 2 -jm 4096 -tm 4096 -d
启动sql client
bin/sql-client.sh embedded -s yarn-session
建表及插入数据
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
-- 创建hudi表
CREATE TABLE t1(
uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20)
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://HDFS78000435/tmp/hudi_flink/t1',
'table.type' = 'MERGE_ON_READ' –- 默认是COW
);
或如下写法
CREATE TABLE flink_hudi_mor(
uuid VARCHAR(20),
name VARCHAR(10),
age INT,
ts TIMESTAMP(3),
`partition` VARCHAR(20),
PRIMARY KEY(uuid) NOT ENFORCED
)
PARTITIONED BY (`partition`)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://HDFS78000435/tmp/hudi_flink/flink_hudi_mor',
'table.type' = 'MERGE_ON_READ'
);
-- 插入数据
INSERT INTO t1 VALUES
('id1','Danny',23,TIMESTAMP '1970-01-01 00:00:01','par1'),
('id2','Stephen',33,TIMESTAMP '1970-01-01 00:00:02','par1'),
('id3','Julian',53,TIMESTAMP '1970-01-01 00:00:03','par2'),
('id4','Fabian',31,TIMESTAMP '1970-01-01 00:00:04','par2'),
('id5','Sophia',18,TIMESTAMP '1970-01-01 00:00:05','par3'),
('id6','Emma',20,TIMESTAMP '1970-01-01 00:00:06','par3'),
('id7','Bob',44,TIMESTAMP '1970-01-01 00:00:07','par4'),
('id8','Han',56,TIMESTAMP '1970-01-01 00:00:08','par4');
Flink SQL通过Hudi HMS Catalog读写Hudi并同步Hive表(推荐这种方式)
它可以像Spark SQL创建表一样,直接将表建立在Hive中,并且表结构与Hive SQL和Spark SQL兼容,也就是Flink Hudi HMS Catalog中创建的表,可以同时使用Flink SQL、Hive SQL、Spark SQL、Trino查询,也可以同时使用Flink SQL、Spark SQL写Hudi。不像方式二中介绍的方式,Flink SQL写Hudi的表不能被Hive/Spark使用,只能通过同步表的方式。另外在Flink Hudi HMS Catalog中和Spark SQL一样默认开启同步Hive,也就是对于MOR表默认会同步创建对应的_ro表和_rt表,至于COW表因为同步的表名和创建的表名一样,所以读写是同一张表。总之和Spark SQL创建表、读写一致。
如果Hudi表类型是MERGE_ON_READ模式,那么映射的Hive表将会有2张,一张后缀为_rt(实时表),另一张表后缀为_ro(读优化表)。后缀 _rt 对应的Hive表中存储的是Base文件Parquet格式数据+log Avro格式数据,也就是全量数据。后缀为 _ro Hive表中存储的是Base文件对应的数据。
set sql-client.execution.result-mode=tableau;
set execution.checkpointing.interval=1000;
-- 创建 catalog 信息
CREATE CATALOG hudi_catalog WITH (
'type' = 'hudi',
'mode' = 'hms',
'default-database' = 'default',
'hive.conf.dir' = '/usr/local/service/hive/conf',
'table.external' = 'true'
);
use catalog hudi_catalog;
use hudi;
-- sets up the result mode to tableau to show the results directly in the CLI
set execution.result-mode=tableau;
CREATE TABLE test_hudi_flink_mor (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price int,
ts int,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'path' = 'hdfs://HDFS78000007/tmp/hudi/test_hudi_flink_mor',
'table.type' = 'MERGE_ON_READ',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hive_sync.conf.dir'='/usr/local/service/hive/conf',
'spark.version'='spark3.4.2',
'hoodie.datasource.write.precombine.field'='ts'
);
insert into test_hudi_flink_mor values (1,'hudi',10,100,'2024-07-17'),(2,'hudi',10,100,'2024-07-17');
CREATE TABLE test_hudi_flink_cow (
id int PRIMARY KEY NOT ENFORCED,
name VARCHAR(10),
price int,
ts int,
dt VARCHAR(10)
)
PARTITIONED BY (dt)
WITH (
'connector' = 'hudi',
'hoodie.datasource.write.keygenerator.class' = 'org.apache.hudi.keygen.ComplexAvroKeyGenerator',
'hoodie.datasource.write.recordkey.field' = 'id',
'hoodie.datasource.write.hive_style_partitioning' = 'true',
'hive_sync.conf.dir'='/usr/local/service/hive/conf',
'hoodie.datasource.write.precombine.field'='ts'
);
insert into test_hudi_flink_cow values (1,'hudi',10,100,'2024-07-17'),(2,'hudi',10,100,'2024-07-17');
Trino使用Hudi
通过Trino-cli 连接trino
client/trino-cli --server https://10.206.16.246:9443 --catalog hudi --schema default --user hadoop --password --insecure
密码为hadoop对应的Ldap密码,默认为机器密码

Hive使用Hudi
beeline -u 'jdbc:hive2://x.x.x.x:2181,x.x.x.x:2181,x.x.x.x:2181/default;serviceDiscoveryMode=zooKeeper;zooKeeperNamespace=hiveserver2' -nhive -pxxx
注:hive不支持直接读取Flink SQL的MOR表,只支持读取同步到hive生成的_rt和_ro表,并且需要flink-sql mor表触发了文件合并生成.parquet文件后才能读取到数据,trino读flink hudi表也是一样的。