最佳实践

最近更新时间: 2026-03-13 09:03:00

Hive on Spark任务

set hive.execution.engine=spark;
create database if not exists hive_db_spark;
use hive_db_spark;
drop table hive_table_spark;
create table hive_table_spark (id int, name string, age int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

insert into hive_table_spark values(1,'gtc',25),(2,'tom',35),(3,'danny',24),(4,'hive',26),(5,'test',24);
select * from hive_table_spark;

任务执行过程图

  • 用户的每个Session会创建一个SparkClient,SparkClient会启动RemoteDriver进程,并由RemoteDriver创 建SparkContext。SparkTask执行时,通过Session提交任务,任务的主体就是对应的SparkWork。
  • SparkClient 将任务提交给RemoteDriver,并返回一个SparkJobRef,通过该SparkJobRef,客户端可以监控任务执行进度,进行错误处理, 以及采集统计信息等。由于最终的RDD计算没有返回结果,因此客户端只需要监控执行进度而不需要处理返回值。
  • RemoteDriver通过 SparkListener收集任务级别的统计数据,通过Accumulator收集Operator级别的统计数据(Accumulator被包装为 SparkCounter),并在任务结束时返回给SparkClient。
  • SparkClient 与RemoteDriver之间通过基于Netty的RPC进行通信。除了提交任务,SparkClient还提供了诸如添加JAR包、获取集群信息等接口。如果客户端需要使用更一般的SparkContext的功能,可以自定义一个任务并通过SparkClient发送到RemoteDriver上执行
  • Hive on Spark对于Spark集群的部署方式没有特别的要求,RemoteDriver可以连接到任意的Spark集群来执行任务。在我们的测试中,Hive on Spark在Standalone和Spark on YARN的集群上都能正常工作。

常见任务类型

Hive on MR任务

set hive.execution.engine=mr;
create database if not exists hive_db_mr;
use hive_db_mr;
drop table hive_table_mr;
create table hive_table_mr (id int, name string, age int) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',';

insert into hive_table_mr values(1,'gtc',25),(2,'tom',35),(3,'danny',24),(4,'hive',26),(5,'test',24);
select * from hive_table_mr;

任务执行过程图


Reduce过程分为三个不同步骤:Copy、Sort(实际应当称为Merge)及Reduce。在Copy过程中,Reducer尝试从NodeManagers获取Maps的输出并存储在内存或硬盘中。紧接着进行Shuffle过程(包含Sort及Reduce),这个过程将获取到的Maps输出进行存储并有序地合并然后提供给Reducer。当Job有大量的Maps输出需要处理的时候,Shuffle过程将变得非常耗时。对于一些特定的任务(例如hash join或hash aggregation类型的SQL任务),Shuffle过程中的排序并非必需的。但是Shuffle却默认必须进行排序,所以需要对此处进行改进。

Hive Join数据优化

操作场景

使用Join语句时,如果数据量大,可能造成命令执行速度和查询速度慢,此时可进行Join优化。
Join优化可分为以下方式:

  • Map Join
  • Sort Merge Bucket Map Join
  • Join顺序优化

Map Join

Hive的Map Join适用于能够在内存中存放下的小表(指表大小小于25MB),通过“hive.mapjoin.smalltable.filesize”定义小表的大小,默认为25MB。
Map Join的方法有两种:

  • 使用/*+ MAPJOIN(join_table) */。
  • 执行语句前设置如下参数,当前版本中该值默认为“true”。set hive.auto.convert.join=true;
    使用Map Join时没有Reduce任务,而是在Map任务前起了一个MapReduce Local Task,这个Task通过TableScan读取小表内容到本机,在本机以HashTable的形式保存并写入硬盘上传到HDFS,并在Distributed Cache中保存,在Map Task中从本地磁盘或者Distributed Cache中读取小表内容直接与大表join得到结果并输出。
    使用Map Join时需要注意小表不能过大,如果小表将内存基本用尽,会使整个系统性能下降甚至出现内存溢出的异常。

Sort Merge Bucket Map Join

使用Sort Merge Bucket Map Join必须满足以下2个条件:

  • join的两张表都很大,内存中无法存放。
  • 两张表都按照join key进行分桶(clustered by (column))和排序(sorted by(column)),且两张表的分桶数正好是倍数关系。
    通过如下设置,启用Sort Merge Bucket Map Join:
    set hive.optimize.bucketmapjoin=true;
    set hive.optimize.bucketmapjoin.sortedmerge=true;
    这种Map Join也没有Reduce任务,是在Map任务前启动MapReduce Local Task,将小表内容按桶读取到本地,在本机保存多个桶的HashTable备份并写入HDFS,并保存在Distributed Cache中,在Map Task中从本地磁盘或者Distributed Cache中按桶一个一个读取小表内容,然后与大表做匹配直接得到结果并输出。

Join顺序优化

当有3张及以上的表进行Join时,选择不同的Join顺序,执行时间存在较大差异。使用恰当的Join顺序可以有效缩短任务执行时间。
Join顺序原则:

  • Join出来结果较小的组合,例如表数据量小或两张表Join后产生结果较少,优先执行。
  • Join出来结果大的组合,例如表数据量大或两张表Join后产生结果较多,在后面执行。
    例如,customer表的数据量最多,orders表和lineitem表优先Join可获得较少的中间结果。
    原有的Join语句如下:
select
  l_orderkey,
  sum(l_extendedprice * (1 - l_discount)) as revenue,
  o_orderdate,
  o_shippriority
from
  customer,
  orders,
  lineitem
where
  c_mktsegment = 'BUILDING'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-22'
  and l_shipdate > '1995-03-22'
limit 10;

Join顺序优化后如下:

select
  l_orderkey,
  sum(l_extendedprice * (1 - l_discount)) as revenue,
  o_orderdate,
  o_shippriority
from
  orders,
  lineitem,
  customer
where
  c_mktsegment = 'BUILDING'
  and c_custkey = o_custkey
  and l_orderkey = o_orderkey
  and o_orderdate < '1995-03-22'
  and l_shipdate > '1995-03-22'
limit 10;

注意事项

  • Join数据倾斜问题。执行任务的时候,任务进度长时间维持在99%,这种现象叫数据倾斜。
  • 数据倾斜是经常存在的,因为有少量的Reduce任务分配到的数据量和其他Reduce差异过大,导致大部分Reduce都已完成任务,但少量Reduce任务还没完成的情况。
  • 解决数据倾斜的问题,可通过设置“set hive.optimize.skewjoin=true”并调整“hive.skewjoin.key”的大小。“hive.skewjoin.key”是指Reduce端接收到多少个key即认为数据是倾斜的,并自动分发到多个Reduce。

Hive Group By语句优化

操作场景

优化Group by语句,可提升命令执行速度和查询速度。
Group by的时候, Map端会先进行分组, 分组完后分发到Reduce端, Reduce端再进行分组。可采用Map端聚合的方式来进行Group by优化,开启Map端初步聚合,减少Map的输出数据量。

操作步骤

在Hive客户端进行如下设置:
set hive.map.aggr=true;

注意事项

  • Group By数据倾斜Group By也同样存在数据倾斜的问题,设置“hive.groupby.skewindata”为“true”,生成的查询计划会有两个MapReduce Job,第一个Job的Map输出结果会随机的分布到Reduce中,每个Reduce做聚合操作,并输出结果,这样的处理会使相同的Group By Key可能被分发到不同的Reduce中,从而达到负载均衡,第二个Job再根据预处理的结果将Group By Key分发给Reduce任务,以完成最终的聚合操作。
  • Count Distinct聚合问题当使用聚合函数count distinct完成去重计数时,处理值为空的情况会使Reduce产生很严重的数据倾斜,可以将空值单独处理,如果是计算count distinct,可以通过where子句将该值排除掉,并在最后的count distinct结果中加1。如果还有其他计算,可以先将值为空的记录单独处理,再和其他计算结果合并。