内置JAR作业
- Kerberos认证
#当前 flink 默认配置 hadoop 用户 principal 信息,不需要再做认证操作
#如果需要使用其他用户认证,修改 flink-conf.yaml 中以下信息为期望认证信息
security.kerberos.login.conf: /etc/krb5.conf
security.kerberos.login.keytab: /var/krb5kdc/emr.keytab
security.kerberos.login.principal: hadoop/172.16.0.32@TBDS-6QAEUIB9
- 提交流任务到 yarn
/usr/local/service/flink/bin/flink run -m yarn-cluster ./examples/streaming/TopSpeedWindowing.jar
- 其他操作命令
# 使用命令确认任务状态
/usr/local/service/flink/bin/flink list -m yarn-cluster -yid $YarnAppID
# 触发作业 savepoint,完成后会输出 savepoint 的路径. Savepoint completed. Path: hdfs:///flink/savepoints/$SavePath
/usr/local/service/flink/bin/flink savepoint $JobID hdfs:///flink/savepoints -yid $YarnAppID
# 停止作业
/usr/local/service/flink/bin/flink cancel $JobID -yid $YarnAppID
# 从 savepoint 恢复
/usr/local/service/flink/bin/flink run -m yarn-cluster -s hdfs:///flink/savepoints/$SavePath ./examples/streaming/TopSpeedWindowing.jar
内置SQL作业
运行 SQL 作业前,需要先启动本地集群 / yarn-session,下文以本地集群为例。
- Kerberos认证
#当前 flink 默认配置 hadoop 用户 principal 信息,不需要再做认证操作
#如果需要使用其他用户认证,修改 flink-conf.yaml 中以下信息
security.kerberos.login.conf: /etc/krb5.conf
security.kerberos.login.keytab: /var/krb5kdc/emr.keytab
security.kerberos.login.principal: hadoop/172.16.0.32@TBDS-6QAEUIB9
- 修改 conf/flink-conf.yaml 配置,增加 tm 的内存和 slot 数量。
taskmanager.heap.size: 1024m -> 3072m
taskmanager.numberOfTaskSlots: 1 -> 10
Hive表
- 配置hadoop相关配置,启动 flink 本地集群。
#将 planner & hive connector 移入 lib 目录
mv ./opt/connector/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1.2.jar ./lib
mv ./opt/flink-table-planner_2.12-1.16.1-TBDS-5.3.1.2.jar ./lib/
mv ./lib/flink-table-planner-loader-1.16.1-TBDS-5.3.1.2.jar ./opt/
#清理 yarn-session 模式残留 (如果存在残留文件会自动往 yarn-session 提交)
rm -rf /tmp/.yarn-properties-root
#启动本地集群
bin/start-cluster.sh
- 启动 flink sql-client
/usr/local/service/flink/bin/sql-client.sh embedded
- sql 操作
# 创建 hive catalog
CREATE CATALOG hive_catalog WITH ('type'='hive','default-database'='default','hive-conf-dir'='/usr/local/service/hive/conf/');
#设置 checkpoint
set 'execution.runtime-mode' = 'streaming';
set 'execution.checkpointing.interval'='10000';
#设置 hive 方言
SET table.sql-dialect = hive;
# 使用 hive catalog
USE CATALOG hive_catalog;
CREATE DATABASE hive_catalog.hive_db;
CREATE TABLE hive_catalog.hive_db.t1 (id BIGINT COMMENT 'unique id',data STRING);
INSERT INTO hive_catalog.hive_db.t1 values(1, 'tom');
SELECT * from hive_catalog.hive_db.t1;
- 恢复集群
#退出sql client 后关闭集群
./bin/stop-cluster.sh
#恢复依赖
mv ./lib/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1.2.jar ./opt/connector/
mv ./lib/flink-table-planner_2.12-1.16.1-TBDS-5.3.1.2.jar ./opt/
mv ./opt/flink-table-planner-loader-1.16.1-TBDS-5.3.1.2.jar ./lib/
Iceberg表
- 配置hadoop相关配置,启动 flink 本地集群。
#将 iceberg & hive connector 移入 lib 目录
mv ./opt/connector/iceberg-flink-runtime-1.16-1.4.2.jar ./lib
mv ./opt/connector/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1.2.jar ./lib
#清理 yarn-session 模式残留 (如果存在残留文件会自动往 yarn-session 提交)
rm -rf /tmp/.yarn-properties-root
#启动本地集群
./bin/start-cluster.sh
- 启动 flink sql-client
/usr/local/service/flink/bin/sql-client.sh embedded
- sql 操作
#创建 catalog
CREATE CATALOG iceberg_catalog WITH ('type'='iceberg','catalog-type'='hive','uri'='thrift://172.16.0.86:7004,thrift://172.16.0.132:7004','clients'='5','property-version'='1','warehouse'='hdfs:///usr/hive/warehouse/');
#设置 checkpoint
set 'execution.runtime-mode' = 'streaming';
set 'execution.checkpointing.interval'='10000';
CREATE DATABASE iceberg_catalog.iceberg_db;
CREATE TABLE iceberg_catalog.iceberg_db.t1 (id BIGINT COMMENT 'unique id',data STRING);
INSERT INTO iceberg_catalog.iceberg_db.t1 values(1, 'tom');
SELECT count(*) from iceberg_catalog.iceberg_db.t1;
- 恢复集群
#退出sql client 后关闭集群
./bin/stop-cluster.sh
#恢复依赖
mv ./lib/iceberg-flink-runtime-1.16-1.4.3.jar ./opt/connector
mv ./lib/flink-sql-connector-hive-3.1.2_2.12-1.16.1-TBDS-5.3.1.2.jar ./opt/connector
常用命令
#cd /usr/local/service/flink
# 创建standalone集群
./bin/start-cluster.sh
# 提交作业到standalone集群
./bin/flink run ./examples/streaming/TopSpeedWindowing.jar
# 任务状态查看
./bin/flink list
# 任务终止(不会保存savepoint)
./bin/flink cancel $JobID
# 任务终止(默认会保存savepoint)
./bin/flink stop $JobID
# 创建作业保存点savepoint
./bin/flink savepoint $JOB_ID /tmp/flink-savepoints
# 将任务提交到yarn集群上
# 提交到yarn上的作业无法使用上述的3条命令
./bin/flink run -m yarn-cluster -d examples/streaming/TopSpeedWindowing.jar