您现在访问的是微软AZURE全球版技术文档网站,若需要访问由世纪互联运营的MICROSOFT AZURE中国区技术文档网站,请访问 https://docs.azure.cn.

优化 HDInsight 中的 Apache Spark 作业Optimize Apache Spark jobs in HDInsight

了解如何为特定工作负荷优化 Apache Spark 群集配置。Learn how to optimize Apache Spark cluster configuration for your particular workload. 最常见的难题是内存压力,因为配置不正确(特别是错误的执行器)、长时间运行的操作以及导致笛卡尔操作的任务。The most common challenge is memory pressure, because of improper configurations (particularly wrong-sized executors), long-running operations, and tasks that result in Cartesian operations. 可通过以下方式为作业提速:使用适当的缓存,并允许数据倾斜You can speed up jobs with appropriate caching, and by allowing for data skew. 若要实现最佳性能,应监视和查看长时间运行并耗用资源的 Spark 作业执行。For the best performance, monitor and review long-running and resource-consuming Spark job executions. 有关 HDInsight 上 Apache Spark 入门的信息,请参阅使用 Azure 门户创建 Apache Spark 群集For information on getting started with Apache Spark on HDInsight, see Create Apache Spark cluster using Azure portal.

以下部分介绍常用的 Spark 作业优化方法和建议。The following sections describe common Spark job optimizations and recommendations.

选择数据抽象Choose the data abstraction

早期的 Spark 版本使用 Rdd 抽象数据、Spark 1.3 和1.6 分别引入 DataFrames 和数据集。Earlier Spark versions use RDDs to abstract data, Spark 1.3, and 1.6 introduced DataFrames and DataSets, respectively. 请仔细衡量下列优缺点:Consider the following relative merits:

  • DataFrameDataFrames
    • 大多数情况下的最佳选择。Best choice in most situations.
    • 通过 Catalyst 提供查询优化。Provides query optimization through Catalyst.
    • 全阶段代码生成。Whole-stage code generation.
    • 直接内存访问。Direct memory access.
    • 垃圾回收 (GC) 开销低。Low garbage collection (GC) overhead.
    • 不像数据集那样易于开发者使用,因为没有编译时检查或域对象编程。Not as developer-friendly as DataSets, as there are no compile-time checks or domain object programming.
  • DataSetDataSets
    • 适合可容忍性能受影响的复杂 ETL 管道。Good in complex ETL pipelines where the performance impact is acceptable.
    • 不适合需要考虑性能受影响的聚合。Not good in aggregations where the performance impact can be considerable.
    • 通过 Catalyst 提供查询优化。Provides query optimization through Catalyst.
    • 提供域对象编程和编译时检查,适合开发。Developer-friendly by providing domain object programming and compile-time checks.
    • 增加序列化/反序列化开销。Adds serialization/deserialization overhead.
    • GC 开销高。High GC overhead.
    • 中断全阶段代码生成。Breaks whole-stage code generation.
  • RDDRDDs
    • 无需使用 Rdd,除非需要构建新的自定义 RDD。You don't need to use RDDs, unless you need to build a new custom RDD.
    • 不能通过 Catalyst 提供查询优化。No query optimization through Catalyst.
    • 不提供全阶段代码生成。No whole-stage code generation.
    • GC 开销高。High GC overhead.
    • 必须使用 Spark 1.x 旧版 API。Must use Spark 1.x legacy APIs.

使用最佳数据格式Use optimal data format

Spark 支持多种格式,比如 csv、json、xml、parquet、orc 和 avro。Spark supports many formats, such as csv, json, xml, parquet, orc, and avro. Spark 可以借助外部数据源进行扩展,以支持更多格式 — 有关详细信息,请参阅 Apache Spark 包Spark can be extended to support many more formats with external data sources - for more information, see Apache Spark packages.

最能提高性能的格式是采用 Snappy 压缩的 Parquet,这是 Spark 2.x 中的默认格式。The best format for performance is parquet with snappy compression, which is the default in Spark 2.x. Parquet 以分列格式存储数据,并在 Spark 中得到了高度优化。Parquet stores data in columnar format, and is highly optimized in Spark.

选择默认存储Select default storage

创建新的 Spark 群集时,可以选择 "Azure Blob 存储" 或 "Azure Data Lake Storage 作为群集的默认存储。When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. 这两个选项都为您提供了暂时群集的长期存储的优点,因此在删除群集时,您的数据不会自动删除。Both options give you the benefit of long-term storage for transient clusters, so your data doesn't get automatically deleted when you delete your cluster. 用户可以重新创建暂时性群集,并且依然能访问数据。You can recreate a transient cluster and still access your data.

存储类型Store Type 文件系统File System 速度Speed 暂时性Transient 用例Use Cases
Azure Blob 存储Azure Blob Storage wasb: //url/wasb://url/ StandardStandard Yes 暂时性群集Transient cluster
Azure Blob 存储(安全)Azure Blob Storage (secure) wasbs: //url/wasbs://url/ StandardStandard Yes 暂时性群集Transient cluster
Azure Data Lake Storage Gen 2Azure Data Lake Storage Gen 2 abfs: //url/abfs://url/ 较快Faster Yes 暂时性群集Transient cluster
Azure Data Lake Storage 第1代Azure Data Lake Storage Gen 1 adl: //url/adl://url/ 较快Faster Yes 暂时性群集Transient cluster
本地 HDFSLocal HDFS hdfs: //url/hdfs://url/ 最快Fastest No 全天候交互型群集Interactive 24/7 cluster

有关适用于 HDInsight 群集的存储选项的完整说明,请参阅比较用于 Azure hdinsight 群集的存储选项For a full description of the storage options available for HDInsight clusters, see Compare storage options for use with Azure HDInsight clusters.

使用缓存Use the cache

Spark 提供自己的本机缓存机制,可通过各种方法(比如 .persist().cache()CACHE TABLE)使用。Spark provides its own native caching mechanisms, which can be used through different methods such as .persist(), .cache(), and CACHE TABLE. 这种本机缓存适用于小型数据集以及需要缓存中间结果的 ETL 管道。This native caching is effective with small data sets as well as in ETL pipelines where you need to cache intermediate results. 不过,Spark 本机缓存目前不适用于分区,因为缓存表不会保留分区数据。However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. 存储层缓存是一种更通用且更可靠的缓存技术。A more generic and reliable caching technique is storage layer caching.

  • 本机 Spark 缓存(不推荐)Native Spark caching (not recommended)

    • 适用于小型数据集。Good for small datasets.
    • 不能用于分区,这可能会在将来的 Spark 版本中更改。Doesn't work with partitioning, which may change in future Spark releases.
  • 存储级缓存(推荐)Storage level caching (recommended)

    • 可以在 HDInsight 上使用IO 缓存功能实现。Can be implemented on HDInsight using the IO Cache feature.
    • 使用内存中和 SSD 缓存。Uses in-memory and SSD caching.
  • 本地 HDFS(推荐)Local HDFS (recommended)

    • hdfs://mycluster 路径。hdfs://mycluster path.
    • 使用 SSD 缓存。Uses SSD caching.
    • 删除群集时,缓存数据将丢失,需要重新生成缓存。Cached data will be lost when you delete the cluster, requiring a cache rebuild.

有效利用内存Use memory efficiently

Spark 在运行时会将数据放在内存中,因此,管理内存资源是优化 Spark 作业执行的一个重要方面。Spark operates by placing data in memory, so managing memory resources is a key aspect of optimizing the execution of Spark jobs. 可通过以下几种方法来有效地利用群集内存。There are several techniques you can apply to use your cluster's memory efficiently.

  • 为分区策略中的数据大小、类型和分布优先选择较小的数据分区和帐户。Prefer smaller data partitions and account for data size, types, and distribution in your partitioning strategy.
  • 考虑使用更新、更有效的 Kryo 数据序列化,而不是使用默认的 Java 序列化。Consider the newer, more efficient Kryo data serialization, rather than the default Java serialization.
  • 首选 YARN,因为它分批进行 spark-submitPrefer using YARN, as it separates spark-submit by batch.
  • 监视和优化 Spark 配置设置。Monitor and tune Spark configuration settings.

下图展示了 Spark 内存结构和一些键执行程序内存参数供用户参考。For your reference, the Spark memory structure and some key executor memory parameters are shown in the next image.

Spark 内存注意事项Spark memory considerations

如果使用APACHE HADOOP YARN,则 YARN 会控制每个 Spark 节点上所有容器使用的最大内存量。If you're using Apache Hadoop YARN, then YARN controls the maximum sum of memory used by all containers on each Spark node. 下图展示了一些键对象及其关系。The following diagram shows the key objects and their relationships.

YARN Spark 内存管理

若要解决显示“内存不足”消息的问题,请尝试:To address 'out of memory' messages, try:

  • 查看 DAG 管理数据重组。Review DAG Management Shuffles. 通过映射端化简减少内存使用,对源数据进行预分区(或 Bucket 存储化),最大化单个数据重组,以及减少发送的数据量。Reduce by map-side reducting, pre-partition (or bucketize) source data, maximize single shuffles, and reduce the amount of data sent.
  • 首选具有固定内存限制的 ReduceByKey,而不是 GroupByKey,后者提供聚合、窗口化和其他功能,但具有无限内存限制。Prefer ReduceByKey with its fixed memory limit to GroupByKey, which provides aggregations, windowing, and other functions but it has ann unbounded memory limit.
  • 首选在执行程序或分区上执行更多工作的 TreeReduce,而不是在驱动程序上执行所有工作的 ReducePrefer TreeReduce, which does more work on the executors or partitions, to Reduce, which does all work on the driver.
  • 使用 DataFrame,而不是级别较低的 RDD 对象。Leverage DataFrames rather than the lower-level RDD objects.
  • 创建用于封装操作(比如“Top N”、各种聚合或窗口化操作)的 ComplexType。Create ComplexTypes that encapsulate actions, such as "Top N", various aggregations, or windowing operations.

有关其他故障排除步骤,请参阅Azure HDInsight 中 Apache Spark 的 OutOfMemoryError 异常For additional troubleshooting steps, see OutOfMemoryError exceptions for Apache Spark in Azure HDInsight.

优化数据序列化Optimize data serialization

Spark 作业是分布式作业,因此,适当的数据序列化对实现最佳性能很重要。Spark jobs are distributed, so appropriate data serialization is important for the best performance. Spark 有两个序列化选项:There are two serialization options for Spark:

  • Java 序列化是默认选项。Java serialization is the default.
  • Kryo 序列化是一种较新的格式,可带来比 Java 更快、更紧凑的序列化。Kryo serialization is a newer format and can result in faster and more compact serialization than Java. Kryo 要求你在程序中注册类,但它尚不支持所有可序列化的类型。Kryo requires that you register the classes in your program, and it doesn't yet support all Serializable types.

使用 Bucket 存储Use bucketing

Bucket 存储类似于数据分区,但每个 Bucket 都可以保存一组列值,而不只是一个列值。Bucketing is similar to data partitioning, but each bucket can hold a set of column values rather than just one. Bucket 存储适合对大量(数以百万计或更多)值分区,比如产品标识符。Bucketing works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. 通过哈希行的 Bucket 键可以确定 Bucket。A bucket is determined by hashing the bucket key of the row. 由 Bucket 存储的表可提供独一无二的优化,因为它们存储了有关其 Bucket 存储方式和排序方式的元数据。Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.

下面是一些高级 Bucket 存储功能:Some advanced bucketing features are:

  • 基于 Bucket 存储元信息的查询优化。Query optimization based on bucketing meta-information.
  • 优化的聚合。Optimized aggregations.
  • 优化的联接。Optimized joins.

可以同时使用分区和 Bucket 存储。You can use partitioning and bucketing at the same time.

优化联接和数据重组Optimize joins and shuffles

如果某个联接和数据重组操作上有速度较慢的作业,可能是由数据倾斜引起的,即作业数据不对称。If you have slow jobs on a Join or Shuffle, the cause is probably data skew, which is asymmetry in your job data. 例如,运行映射作业可能需要 20 秒,但运行对数据进行联接或重组的作业则需数小时。For example, a map job may take 20 seconds, but running a job where the data is joined or shuffled takes hours. 若要解决数据倾斜问题,应对整个键进行加盐加密,或对仅仅一部分键使用独立的加密盐To fix data skew, you should salt the entire key, or use an isolated salt for only some subset of keys. 如果使用的是隔离的 salt,则应该进一步筛选以在映射联接中隔离加盐键的子集。If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. 另一种做法是引入 Bucket 列,先在 Bucket 中进行预聚合。Another option is to introduce a bucket column and pre-aggregate in buckets first.

导致联接变慢的另一个因素可能是联接类型。Another factor causing slow joins could be the join type. 默认情况下,Spark 使用 SortMerge 联接类型。By default, Spark uses the SortMerge join type. 这种联接最适合大型数据集,但另一方面又会占用大量计算资源,因为它必须先对数据的左右两侧进行排序,然后才进行合并。This type of join is best suited for large data sets, but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.

Broadcast 联接最适合小型数据集,或者联接的一侧比另一侧小得多的情况。A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. 这种联接会将一侧数据广播到所有执行程序,因此通常需要为广播提供更多内存。This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general.

可以通过设置 spark.sql.autoBroadcastJoinThreshold 来更改配置中的联接类型,也可以使用 DataFrame API (dataframe.join(broadcast(df2))) 来设置联接提示。You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold, or you can set a join hint using the DataFrame APIs (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

如果使用的是分段表,则将使用第三种联接类型,即 "Merge 联接"。If you're using bucketed tables, then you have a third join type, the Merge join. 已进行正确预分区和预排序的数据集将跳过 SortMerge 联接中成本高昂的排序阶段。A correctly pre-partitioned and pre-sorted dataset will skip the expensive sort phase from a SortMerge join.

联接的顺序至关重要,尤其是在较为复杂的查询中。The order of joins matters, particularly in more complex queries. 应先从最严格的联接开始。Start with the most selective joins. 此外,尽可能移动在聚合后增加行数的联接。Also, move joins that increase the number of rows after aggregations when possible.

若要管理笛卡尔联接的并行度,可以添加嵌套结构,并可能跳过 Spark 作业中的一个或多个步骤。To manage parallelism for Cartesian joins, you can add nested structures, windowing, and perhaps skip one or more steps in your Spark Job.

自定义群集配置Customize cluster configuration

根据 Spark 群集工作负荷,用户可能认为某个非默认 Spark 配置更能优化 Spark 作业执行。Depending on your Spark cluster workload, you may determine that a non-default Spark configuration would result in more optimized Spark job execution. 可使用示例工作负荷执行基准测试,来验证任何非默认群集配置。Perform benchmark testing with sample workloads to validate any non-default cluster configurations.

下面是一些可调整的常见参数:Here are some common parameters you can adjust:

  • --num-executors 设置适当的执行程序数量。--num-executors sets the appropriate number of executors.
  • --executor-cores 设置每个执行程序的内核数。--executor-cores sets the number of cores for each executor. 通常应使用中等大小的执行程序,因为其他进程会占用部分可用内存。Typically you should have middle-sized executors, as other processes consume some of the available memory.
  • --executor-memory 设置每个执行程序的内存大小,用于控制 YARN 上的堆大小。--executor-memory sets the memory size for each executor, which controls the heap size on YARN. 应当留一些内存用于执行开销。You should leave some memory for execution overhead.

选择正确的执行程序大小Select the correct executor size

在决定执行程序配置时,请考虑 Java 垃圾回收 (GC) 开销。When deciding your executor configuration, consider the Java garbage collection (GC) overhead.

  • 通过以下方式减小执行程序大小:Factors to reduce executor size:

    1. 将堆大小减至 32 GB 以下,使 GC 开销 < 10%。Reduce heap size below 32 GB to keep GC overhead < 10%.
    2. 减少内核数,使 GC 开销 < 10%。Reduce the number of cores to keep GC overhead < 10%.
  • 通过以下方式增加执行程序大小:Factors to increase executor size:

    1. 减少执行程序之间的通信开销。Reduce communication overhead between executors.
    2. 在较大的群集(超过 100 个执行程序)上减少执行程序 (N2) 之间已打开的连接数。Reduce the number of open connections between executors (N2) on larger clusters (>100 executors).
    3. 增加堆大小,以容纳占用大量内存的任务。Increase heap size to accommodate for memory-intensive tasks.
    4. 可选:减少每个执行程序的内存开销。Optional: Reduce per-executor memory overhead.
    5. 可选:通过超额订阅 CPU 来增加利用率和并发。Optional: Increase utilization and concurrency by oversubscribing CPU.

选择执行程序大小时,一般遵循以下做法:As a general rule of thumb when selecting the executor size:

  1. 最开始,每个执行程序 30 GB,并分发可用的计算机内核。Start with 30 GB per executor and distribute available machine cores.
  2. 对于较大的群集(超过 100 个执行程序),增加执行程序内核数。Increase the number of executor cores for larger clusters (> 100 executors).
  3. 基于试用版和前述因素(如 GC 开销)修改大小。Modify size based both on trial runs and on the preceding factors such as GC overhead.

运行并发查询时,考虑以下做法:When running concurrent queries, consider the following:

  1. 最开始,每个执行程序 30 GB,并分发所有计算机内核。Start with 30 GB per executor and all machine cores.
  2. 通过超额订阅 CPU,创建多个并行 Spark 应用程序(延迟缩短大约 30%)。Create multiple parallel Spark applications by oversubscribing CPU (around 30% latency improvement).
  3. 跨并行应用程序分布查询。Distribute queries across parallel applications.
  4. 基于试用版和前述因素(如 GC 开销)修改大小。Modify size based both on trial runs and on the preceding factors such as GC overhead.

有关使用 Ambari 配置执行器的详细信息,请参阅Apache Spark 设置-Spark执行器。For more information on using Ambari to configure executors, see Apache Spark settings - Spark executors.

通过查看时间线视图、SQL 图、作业统计信息等等,监视查询性能中的离群值或其他性能问题。Monitor your query performance for outliers or other performance issues, by looking at the timeline view, SQL graph, job statistics, and so forth. 有关使用 YARN 和 Spark 历史记录服务器调试 Spark 作业的信息,请参阅调试在 Azure HDInsight 上运行 Apache Spark 作业For information on debugging Spark jobs using YARN and the Spark History server, see Debug Apache Spark jobs running on Azure HDInsight. 有关使用 YARN 时间线服务器的提示,请参阅Access APACHE HADOOP YARN 应用程序日志For tips on using YARN Timeline Server, see Access Apache Hadoop YARN application logs.

有时,一个或几个执行程序的速度比其他执行程序要慢,执行任务时花费的时间也长得多。Sometimes one or a few of the executors are slower than the others, and tasks take much longer to execute. 这通常发生在较大的群集(超过 30 个节点)上。This frequently happens on larger clusters (> 30 nodes). 在这种情况下,应将工作划分成更多任务,以便计划程序可以补偿速度较慢的任务。In this case, divide the work into a larger number of tasks so the scheduler can compensate for slow tasks. 例如,任务数量应至少为应用程序中执行程序内核数的两倍。For example, have at least twice as many tasks as the number of executor cores in the application. 也可以使用 conf: spark.speculation = true 对任务启用推理执行。You can also enable speculative execution of tasks with conf: spark.speculation = true.

优化作业执行Optimize job execution

  • 根据需要进行缓存,例如,如果数据要使用两次,则缓存它。Cache as necessary, for example if you use the data twice, then cache it.
  • 将变量广播到所有执行程序。Broadcast variables to all executors. 只对变量执行一次序列化,以便加快查找速度。The variables are only serialized once, resulting in faster lookups.
  • 使用驱动程序上的线程池,这会加快许多任务的操作速度。Use the thread pool on the driver, which results in faster operation for many tasks.

定期监视正在运行的作业,看是否有性能问题。Monitor your running jobs regularly for performance issues. 如果需要更深入地了解某些问题,请考虑使用以下性能分析工具之一:If you need more insight into certain issues, consider one of the following performance profiling tools:

Spark 2.x 查询性能的关键在于 Tungsten 引擎,这取决于全程代码生成。Key to Spark 2.x query performance is the Tungsten engine, which depends on whole-stage code generation. 在某些情况下,可能会禁用全程代码生成。In some cases, whole-stage code generation may be disabled. 例如,如果在聚合表达式中使用非可变类型 (string),则会显示 SortAggregate,而不是 HashAggregateFor example, if you use a non-mutable type (string) in the aggregation expression, SortAggregate appears instead of HashAggregate. 例如,为了提高性能,可尝试运行以下命令,然后重新启用代码生成:For example, for better performance, try the following and then re-enable code generation:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

后续步骤Next steps