常见问题

最近更新时间: 2026-03-13 09:03:00

Spark on YARN 如何查看日志

登录 TBDS Manager 管控平台,点击“集群列表”,选择 Spark 程序运行所对应的集群。点击“集群服务”,选择 YARN 服务,点击 WebUI 地址跳转至 YARN UI 界面。YARN UI 页面根据 ApplicationID、User、Name 等信息,找到对应的 Spark 任务。Spark 任务的 ApplicationID 在日志级别为 INFO 及以下会输出至控制台,也可以根据任务名等信息进行查找对应的 Spark 任务,然后点击 ApplicationID 链接,最后点击 Logs 查看日志。

Spark on K8s 如何查看日志

如下图所示,Spark History Server 页面 Driver、Executor 将返回以下 4 个日志跳转链接 stdout、stderr、agg_stdout、agg_stderr。其中前两个链接只能查看运行中任务的日志,一旦任务运行结束,再点击查看,将报 500 错误,属于正常现象;此时可以点击后两个链接,下载运行结束后任务的全部日志,同样,若运行过程中点击后两个链接,也可能报 500 错误,属于正常现象。另外,为防止日志文件过大,任务运行过程中,容器中的日志会定期上传。若日志大于 16M,将上传 stdout、stderr 日志至 HDFS 等底层存储,然后清空该日志,因此有时 stdout、stderr 只显示了部分运行中的日志,其余部分日志可点击 agg_stdout、agg_stderr 查看。

Spark 如何减少小文件产生

小文件通常是指文件大小显著小于 HDFS Block 块(默认 128MB)的文件,小文件过多会给 HDFS 带来严重的性能瓶颈(主要表现在 NameNode 节点元数据的管理以及客户端的元数据访问请求),并且对用户作业的稳定性和集群数据的维护也会带来很大的挑战。
对于 Spark 来说,由于静态分区写入,Hive 表分区只有 1 个,而动态分区写入,Hive 表分区字段一般是固定的业务字段,因此减少最后一个 Stage 的 RDD 分区数是 Spark 小文件合并的主要手段。然而,RDD 分区数的减少意味着任务并行度的减少,因此需要权衡小文件的个数与任务的并行度。

  1. 降低并行度:调整 spark.sql.shuffle.partitions 和 spark.default.parallelism,默认值 200,该参数只在 shuffle 时才会生效,需要根据集群大小和数据量来调整,从小文件角度,可设置为输出数据量 / (128M ~ 1G) 作为初始值;从资源利用角度,可设置为核心数的 2-3 倍作为初始值,然后根据实际效果进行调整。

  2. 重分区:Spark RDD 使用 coalesce 与 repartition 算子,两者均是将 RDD 中的数据进行重新分区,repartition(numPartitions) 语义与 coalesce(numPartitions, true) 一致(第二个参数表示是否 shuffle),即 repartition 一定会进行 shuffle。而 coalesce 将相邻的分区直接合并在一起,形成的依赖关系是多对一的窄依赖,不会触发数据的 shuffle,因此性能更好。在不进行 shuffle 的情况下,coalesce 不能将一个分区拆分为多份,且当 RDD 中不同分区中的数据量差别较大时,直接合并容易造成数据倾斜。为了增加分区或解决数据倾斜问题,可以指定 shuffle=true 将数据打乱。
    自 Spark 2.4 开始,Spark SQL 可使用 Coalesce Hints 控制输出文件的数量,就像 Dataset API 中的 coalesce、repartition 和 repartitionByRange 一样,它们可用于性能调整和减少输出文件的数量。其中,COALESCE 只有一个分区数作为参数;REPARTITION 参数包括分区数、列或两者。REPARTITION_BY_RANGE 必须包含列名,分区数是可选参数。

    INSERT ... SELECT /*+ COALESCE(3) */ * FROM t
    INSERT ... SELECT /*+ REPARTITION(3) */ * FROM t
    INSERT ... SELECT /*+ REPARTITION(c) */ * FROM t
    INSERT ... SELECT /*+ REPARTITION(3, c) */ * FROM t
    INSERT ... SELECT /*+ REPARTITION_BY_RANGE(c) */ * FROM t
    INSERT ... SELECT /*+ REPARTITION_BY_RANGE(3, c) */ * FROM t
    
  3. distribute by:通过控制 map 输出结果的分发,相同字段的 map 输出会发到一个 reduce 节点处理,如果字段均匀随机,能保证每个分区的数量基本一致,用法一般为:

    INSERT ... SELECT ... FROM ... DISTRIBUTE BY 分区, ceil(rand() * 15))
    
  4. 自适应查询 AQE:通过设置 Spark shuffle partition 的上限和下限对不同作业不同阶段的 reduce 个数进行动态调整,也可以通过参数对 join 和数据倾斜问题进行优化,针对小文件合并,主要设置以下几个参数。需要注意的是此方法并不会严格保证按每个文件 128M 进行合并,也不能指定最终合成多少个文件。

    -- 是否开启AQE,默认false,Spark 3.2默认true
    set spark.sql.adaptive.enabled=true;
    
    -- 当启用AQE且该值为true时,Spark将根据目标大小(由spark.sql.adaptive.advisoryPartitionSizeInBytes指定)合并连续的Shuffle分区,以避免过多的小任务,默认true
    set spark.sql.adaptive.coalescePartitions.enabled=true;
    
    -- 自适应优化期间Shuffle分区的建议大小,默认64M,Spark 3.0使用spark.sql.adaptive.advisoryPartitionSizeInBytes
    set spark.sql.adaptive.shuffle.targetPostShuffleInputSize=128M;
    set spark.sql.adaptive.advisoryPartitionSizeInBytes=128M;
    
    -- 合并后,建议(但不保证)的Shuffle分区的最小数量,Spark 3.2使用spark.sql.adaptive.coalescePartitions.minPartitionSize,合并后Shuffle分区的最小大小,其值最多为spark.sql.adaptive.advisoryPartitionSizeInBytes的20%,默认1M
    set spark.sql.adaptive.coalescePartitions.minPartitionNum=1;
    set spark.sql.adaptive.coalescePartitions.minPartitionSize=10M;
    
    -- 仅Spark 3.2,该值为true(默认)时,Spark在合并Shuffle分区时会忽略spark.sql.adaptive.advisoryPartitionSizeInBytes指定的目标大小,而只尊重spark.sql.adaptive.coalescePartitions.minPartitionSize指定的最小分区大小,以最大化并行性。这是为了避免在启用AQE执行时出现性能下降
    set spark.sql.adaptive.coalescePartitions.parallelismFirst=false;
    
  5. 集成 Kyuubi 插件:注入自定义规则来添加额外的 Shuffle,然后通过 AQE 合并小文件。需要将 Kyuubi Spark SQL 扩展 kyuubi-extension-spark-*.jar(一般位于 /usr/local/service/kyuubi/extension 目录) 放入 $SPARK_HOME/jars 目录,并在 $SPARK_HOME/conf/spark-defaults.conf 中添加配置 spark.sql.extensions=org.apache.kyuubi.sql.KyuubiSparkSQLExtension(新参数值在原来值的基础上追加,逗号分隔)。如果配置了 spark.yarn.archive 参数,还需要将 $SPARK_HOME/jars 目录重新打包,并上传至 HDFS 对应目录(默认为 /tbds/spark/jars/spark_jars.tar.gz),打包压缩命令为:tar -czf /usr/local/service/spark/spark_jars.tar.gz -C /usr/local/service/spark/jars .

    -- 在查询计划的顶部添加repartition节点,默认true
    set spark.sql.optimizer.insertRepartitionBeforeWrite.enabled=true
    
    -- 当设置为true时,即使原始计划没有Shuffle操作,也会添加repartition,默认false
    set spark.sql.optimizer.insertRepartitionBeforeWriteIfNoShuffle.enabled=true;
    
    -- AQE相关参数
    set spark.sql.adaptive.advisoryPartitionSizeInBytes=128M;
    set spark.sql.adaptive.coalescePartitions.minPartitionSize=10M;
    

Spark 启动时频繁报 HBase错误

由于 Spark 启动时需要向 HBase 获取 Token,因此如果获取 Token 失败(如 HBase服务异常),Spark 将不断尝试,报错信息类似:Call exception, tries=6, retries=16, started=4731 ms ago, cancelled=false, msg=org.apache.hadoop.hbase.ipc.ServerNotRunningYetException。如果 Spark 不需要访问 HBase,可以在提交任务时,设置参数:--conf spark.security.credentials.hbase.enabled=false。

Spark 如何解决内存不足

Spark 任务运行内存不足,常见报错包括:

  1. java.lang.OutOfMemoryError: Java heap space
  2. java.lang.OutOfMemoryError: GC overhead limit exceeded
  3. Cannot allocate memory
  4. The job has been killed by "OOM Killer", please check your job's memory usage
    可尝试调整如下参数:
  5. spark.driver.memory:设置 Driver 进程的堆内存大小,通常与 spark.driver.cores 保持 1:4 设置即可。当使用 collect 等算子将数据收集到 Driver 端,或抛出 java.lang.OutOfMemoryError 异常时,可以适当调大该值。
  6. spark.driver.memoryOverhead:Driver 进程的非堆内存大小,默认大小为 spark.driver.memory * 0.1,最小 384 MB。通常不需要额外设置,当 Driver 日志出现 Cannot allocate memory 报错,可以适当调大该值。
  7. spark.executor.memory:Executor 进程的堆内存大小,通常与 spark.executor.cores 保持 1:4 设置即可。当抛出 java.lang.OutOfMemoryError 异常时,可以适当调大该值。
  8. spark.executor.memoryOverhead:为每个 Executor 分配的额外内存量,主要用于 JVM 自身、字符串、NIO Buffer 等开销。默认大小为spark.executor.memory * 0.1,最小 384 MB。通常不需要额外设置,当 Executor 日志出现 Cannot allocate memory 或 OOM Killer 报错时,可以适当调大该值。

Ranger 策略过多导致 Driver OOM

尝试去掉 spark-defaults.conf 文件中 spark.sql.extensions 配置参数值 org.apache.kyuubi.plugin.spark.authz.ranger.RangerSparkExtension,若 Spark Driver 正常运行,则确认是由于 Ranger 策略过多导致 Driver OOM。
由于 Ranger 插件需要将策略加载到 Driver 内存中,所以当 Ranger 策略数量增大时,需要对应调大 Spark Driver Memory(参数:spark.driver.memory),建议每增加 10w 条策略增加约 500M 内存。

Spark 如何开启审计日志

Spark 审计日志默认关闭,若要开启,将 ranger-spark-audit.xml 配置文件中的参数 xasecure.audit.is.enabled 的对应值修改为 true。

Spark IPv6 运行 Python 报错

若报错信息为:RuntimeError: could not open socket: ["tried to connect to ('127.0.0.1', 44373), but an error occurred: [Errno 111] Connection refused"],可尝试在运行 Python 任务前,导入环境变量:export SPARK_PREFER_IPV6=true

Spark Client 模式提交报错,Cluster 模式正常

若报错信息为:java.net.NoRouteToHostException: No Route to Host(没有到主机的路由),可尝试检查集群外客户端所在节点的防火墙,iptables 防火墙使用命令 service iptables status;firewall 防火墙使用 systemctl status firewalld。由于 Client 模式 Driver 位于客户端所在节点,而 Driver 需要与集群内的 Executor 进行网络通信,因此客户端所在节点需要向集群内所有节点关闭防火墙。

Spark Drop Hive 表不删除底层 HDFS 数据

若要删除底层 HDFS 数据,SQL 需要加上 PURGE 关键字,例如 drop table test_1 purge。或者提交 Spark 任务时指定参数 --conf spark.sql.iceberg.drop-table-rollback=true。否则只会删除元数据,而不会删除底层 HDFS 数据,这是由于将 Iceberg 作为默认 Catalog 导致的。

Spark 如何对接 RSS

若所有 Spark 任务均使用 RSS,则在 TM 页面修改【经典集群】Spark 组件 spark-defaults.conf 配置文件,新增如下参数,无需重启服务。
若只是单个 Spark 任务使用 RSS,可在提交 Spark 任务时,直接通过 --conf 指定如下参数(coordinatorIp 替换为具体 IP),例如:/usr/local/service/spark/bin/spark-sql --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.manager=org.apache.spark.shuffle.RssShuffleManager --conf spark.rss.coordinator.quorum=:19999,:19999 --conf spark.shuffle.sort.io.plugin.class=org.apache.spark.shuffle.RssShuffleDataIo 。
更多详情参考:【运维管理】-> 【运维管理指南】->【Uniffle运维

# Uniffle 通过网络传输序列化的洗牌数据,因此应使用支持序列化对象重定位的序列化器(spark-defaults.conf默认已设置,因此可不设置)
spark.serializer org.apache.spark.serializer.KryoSerializer

# 开启 RssShuffleManager
spark.shuffle.manager org.apache.spark.shuffle.RssShuffleManager

# 指定 Coordinator 地址(coordinatorIp替换为具体IP)
spark.rss.coordinator.quorum <coordinatorIp1>:19999,<coordinatorIp2>:19999

# 开启 RSS 时,使用 Spark 动态分配(默认开启),仅 Spark 3.5(Spark 3.4 及以下,需要更新补丁,暂不支持)
spark.shuffle.sort.io.plugin.class org.apache.spark.shuffle.RssShuffleDataIo