HDInsight で Apache Spark ジョブを最適化するOptimize Apache Spark jobs in HDInsight

Apache Spark クラスター構成を特定のワークロード用に最適化する方法を説明します。Learn how to optimize Apache Spark cluster configuration for your particular workload. 最もよくある課題は、不適切な構成 (特に不適切なサイズの実行プログラム)、実行時間の長い操作、およびデカルト演算を生じるタスクが原因の、メモリ不足です。The most common challenge is memory pressure, because of improper configurations (particularly wrong-sized executors), long-running operations, and tasks that result in Cartesian operations. ジョブは、適切なキャッシュを使用し、データ スキューを可能にすることで、高速化することができます。You can speed up jobs with appropriate caching, and by allowing for data skew. 最適なパフォーマンスを得るためには、実行時間が長くリソースを多く消費する Spark ジョブの実行を監視し、確認します。For the best performance, monitor and review long-running and resource-consuming Spark job executions. HDInsight 上の Apache Spark の概要については、Azure portal を使用した Apache Spark クラスターの作成に関する記事を参照してください。For information on getting started with Apache Spark on HDInsight, see Create Apache Spark cluster using Azure portal.

次のセクションでは、一般的な Spark ジョブの最適化と推奨事項について説明します。The following sections describe common Spark job optimizations and recommendations.

データ抽象化の選択Choose the data abstraction

以前のバージョンの Spark では、データの抽象化に RDD が使用されていましたが、Spark 1.3 と 1.6 では、DataFrames と DataSets がそれぞれ導入されました。Earlier Spark versions use RDDs to abstract data, Spark 1.3, and 1.6 introduced DataFrames and DataSets, respectively. 次の相対的な利点を考慮してください。Consider the following relative merits:

  • DataFramesDataFrames
    • ほとんどの状況で最適な選択肢。Best choice in most situations.
    • Catalyst を介してクエリを最適化。Provides query optimization through Catalyst.
    • ステージ全体のコード生成。Whole-stage code generation.
    • ダイレクト メモリ アクセスDirect memory access.
    • ガベージ コレクション (GC) のオーバーヘッドが低い。Low garbage collection (GC) overhead.
    • コンパイル時のチェックやドメイン オブジェクトのプログラミングがないため、開発者にとっては DataSets ほど使いやすくない。Not as developer-friendly as DataSets, as there are no compile-time checks or domain object programming.
  • DataSetsDataSets
    • パフォーマンスへの影響が許容範囲内である複雑な ETL パイプラインに適している。Good in complex ETL pipelines where the performance impact is acceptable.
    • パフォーマンスへの影響が非常に大きい可能性のある集計には適さない。Not good in aggregations where the performance impact can be considerable.
    • Catalyst を介してクエリを最適化。Provides query optimization through Catalyst.
    • ドメイン オブジェクトのプログラミングとコンパイル時のチェックを提供することにより、開発者にっとって使いやすい。Developer-friendly by providing domain object programming and compile-time checks.
    • シリアル化/逆シリアル化のオーバーヘッドを追加。Adds serialization/deserialization overhead.
    • GC のオーバーヘッドが高い。High GC overhead.
    • ステージ全体のコード生成を中断する。Breaks whole-stage code generation.
  • RDDRDDs
    • 新しいカスタム RDD を構築する必要がある場合を除き、RDD を使用する必要がない。You don't need to use RDDs, unless you need to build a new custom RDD.
    • Catalyst を介したクエリ最適化がない。No query optimization through Catalyst.
    • ステージ全体のコード生成がない。No whole-stage code generation.
    • GC のオーバーヘッドが高い。High GC overhead.
    • Spark 1.x のレガシ API を使用する必要がある。Must use Spark 1.x legacy APIs.

最適なデータ形式の使用Use optimal data format

Spark では、csv、json、xml、parquet、orc、avro など、多くの形式がサポートされています。Spark supports many formats, such as csv, json, xml, parquet, orc, and avro. Spark は、外部データ ソースを使用して他の多くの形式をサポートするように拡張できます。詳細については、Apache Spark パッケージを参照してください。Spark can be extended to support many more formats with external data sources - for more information, see Apache Spark packages.

パフォーマンスのために最適な形式は Snappy で圧縮した Parquet であり、これが Spark 2.x の既定値です。The best format for performance is parquet with snappy compression, which is the default in Spark 2.x. Parquet はデータを列形式で格納し、Spark で高度に最適化されています。Parquet stores data in columnar format, and is highly optimized in Spark.

既定のストレージの選択Select default storage

新しい Spark クラスターを作成する際は、クラスターの既定のストレージとして Azure Blob Storage または Azure Data Lake Storage を選択することができます。When you create a new Spark cluster, you can select Azure Blob Storage or Azure Data Lake Storage as your cluster's default storage. どちらのオプションにも一時的なクラスター用の長期ストレージの利点があるため、クラスターを削除しても、データは自動的には削除されません。Both options give you the benefit of long-term storage for transient clusters, so your data doesn't get automatically deleted when you delete your cluster. 一時的なクラスターを再作成して、引き続きデータにアクセスできます。You can recreate a transient cluster and still access your data.

ストアの種類Store Type ファイル システムFile System 速度Speed 一時的Transient Use Cases
Azure Blob StorageAzure Blob Storage wasb: //url/wasb://url/ StandardStandard はいYes 一時的なクラスターTransient cluster
Azure Blob Storage (セキュア)Azure Blob Storage (secure) wasbs: //url/wasbs://url/ StandardStandard はいYes 一時的なクラスターTransient cluster
Azure Data Lake Storage Gen 2Azure Data Lake Storage Gen 2 abfs: //url/abfs://url/ より高速Faster はいYes 一時的なクラスターTransient cluster
Azure Data Lake Storage Gen 1Azure Data Lake Storage Gen 1 adl: //url/adl://url/ より高速Faster はいYes 一時的なクラスターTransient cluster
ローカルの HDFSLocal HDFS hdfs: //url/hdfs://url/ 最も高速Fastest いいえNo 24 時間 365 日の対話型クラスターInteractive 24/7 cluster

HDInsight クラスターに使用できるストレージ オプションの詳細については、「Azure HDInsight クラスターで使用するストレージ オプションを比較する」を参照してください。For a full description of the storage options available for HDInsight clusters, see Compare storage options for use with Azure HDInsight clusters.

キャッシュの使用Use the cache

Spark は、.persist().cache()CACHE TABLE などのさまざまな方法で使用できる、独自のネイティブ キャッシュ メカニズムを備えています。Spark provides its own native caching mechanisms, which can be used through different methods such as .persist(), .cache(), and CACHE TABLE. このネイティブ キャッシュは、小さいデータ セットと、中間結果をキャッシュする必要がある ETL パイプラインで有効です。This native caching is effective with small data sets as well as in ETL pipelines where you need to cache intermediate results. ただし、Spark ネイティブ キャッシュは、現在のところパーティション分割では適切に機能しません。キャッシュされたテーブルがパーティション分割のデータを保持しないためです。However, Spark native caching currently doesn't work well with partitioning, since a cached table doesn't keep the partitioning data. より汎用的で信頼性の高いキャッシュの手法は、ストレージ レイヤーのキャッシュです。A more generic and reliable caching technique is storage layer caching.

  • Spark のネイティブ キャッシュ (非推奨)Native Spark caching (not recommended)

    • 小さいデータセットに適している。Good for small datasets.
    • パーティション分割では機能しない (Spark の将来の リリースで変更の可能性あり)。Doesn't work with partitioning, which may change in future Spark releases.
  • ストレージ レベルのキャッシュ (推奨)Storage level caching (recommended)

    • IO キャッシュ機能を使用して HDInsight に実装できます。Can be implemented on HDInsight using the IO Cache feature.
    • メモリ内と SSD のキャッシュを使用。Uses in-memory and SSD caching.
  • ローカルの HDFS (推奨)Local HDFS (recommended)

    • hdfs://mycluster パス。hdfs://mycluster path.
    • SSD のキャッシュを使用。Uses SSD caching.
    • クラスターを削除するとキャッシュされたデータが失われるため、キャッシュの再構築が必要。Cached data will be lost when you delete the cluster, requiring a cache rebuild.

メモリの効率的な使用Use memory efficiently

Spark はデータをメモリ内に配置することで動作するため、メモリ リソースの管理は Spark ジョブの実行の最適化の重要な側面です。Spark operates by placing data in memory, so managing memory resources is a key aspect of optimizing the execution of Spark jobs. クラスターのメモリを効率的に使用するために適用できる手法がいくつかあります。There are several techniques you can apply to use your cluster's memory efficiently.

  • 小さいデータ パーティションを優先し、パーティション分割戦略でデータのサイズ、種類、および分散を要因とします。Prefer smaller data partitions and account for data size, types, and distribution in your partitioning strategy.
  • 既定の Java シリアル化ではなく、より新しくより効率的な Kryo データ シリアル化を検討します。Consider the newer, more efficient Kryo data serialization, rather than the default Java serialization.
  • YARN は spark-submit をバッチ単位で分割するため、YARN を優先的に使用します。Prefer using YARN, as it separates spark-submit by batch.
  • Spark 構成設定を監視し、チューニングします。Monitor and tune Spark configuration settings.

参考までに、Spark のメモリ構造と一部の重要な実行プログラムのメモリ パラメーターを、次のイメージで示します。For your reference, the Spark memory structure and some key executor memory parameters are shown in the next image.

Spark メモリに関する考慮事項Spark memory considerations

Apache Hadoop YARN を使用する場合は、YARN が Spark の各ノード上のすべてのコンテナーで使用される最大合計メモリを制御します。If you're using Apache Hadoop YARN, then YARN controls the maximum sum of memory used by all containers on each Spark node. 次の図は、重要なオブジェクトとそれらの関連性を示しています。The following diagram shows the key objects and their relationships.

YARN の Spark メモリの管理

''メモリ不足'' のメッセージに対処するには、次を試してください。To address 'out of memory' messages, try:

  • DAG 管理シャッフルを確認します。Review DAG Management Shuffles. マップ側の reduce 処理、ソース データの事前パーティション分割 (またはバケット化)、1 つのシャッフルの最大化、および送信されるデータ量の削減によって削減します。Reduce by map-side reducting, pre-partition (or bucketize) source data, maximize single shuffles, and reduce the amount of data sent.
  • 固定メモリが GroupByKey に制限された ReduceByKey を優先します。これは集計、ウィンドウ化、およびその他の機能を提供しますが、無制限のメモリ制限があります。Prefer ReduceByKey with its fixed memory limit to GroupByKey, which provides aggregations, windowing, and other functions but it has ann unbounded memory limit.
  • 実行プログラムやパーティションでより多くの操作を実行する TreeReduce を、すべての操作をドライバーで実行する Reduce より優先します。Prefer TreeReduce, which does more work on the executors or partitions, to Reduce, which does all work on the driver.
  • 下位レベルの RDD オブジェクトではなく、DataFrames を利用します。Leverage DataFrames rather than the lower-level RDD objects.
  • "上位 N"、各種の集計、ウィンドウ化操作などのアクションをカプセル化する、ComplexTypes を作成します。Create ComplexTypes that encapsulate actions, such as "Top N", various aggregations, or windowing operations.

その他のトラブルシューティングの手順については、「Azure HDInsight での Apache Spark の OutOfMemoryError 例外」を参照してください。For additional troubleshooting steps, see OutOfMemoryError exceptions for Apache Spark in Azure HDInsight.

データのシリアル化の最適化Optimize data serialization

Spark ジョブは分散されるため、パフォーマンスを最適にするためには適切なデータのシリアル化が重要です。Spark jobs are distributed, so appropriate data serialization is important for the best performance. Spark には 2 つのシリアル化のオプションがあります。There are two serialization options for Spark:

  • Java シリアル化は既定値です。Java serialization is the default.
  • Kryo シリアル化は新しい形式であり、シリアル化が Java よりも高速で、よりコンパクトになる可能性があります。Kryo serialization is a newer format and can result in faster and more compact serialization than Java. Kryo ではプログラムでクラスを登録する必要があり、まだシリアル化可能なすべての種類がサポートされているわけではありません。Kryo requires that you register the classes in your program, and it doesn't yet support all Serializable types.

バケットの使用Use bucketing

バケットはデータのパーティション分割に似ていますが、各バケットは、1 つだけではなく一連の列の値を保持できます。Bucketing is similar to data partitioning, but each bucket can hold a set of column values rather than just one. バケットは、製品識別子などの大量 (数百万以上) の値でパーティション分割する場合に適しています。Bucketing works well for partitioning on large (in the millions or more) numbers of values, such as product identifiers. バケットは、行のバケット キーをハッシュすることで決定されます。A bucket is determined by hashing the bucket key of the row. バケット化したテーブルは、バケット化と並べ替えの方法についてのメタデータを格納するため、固有の最適化を提供します。Bucketed tables offer unique optimizations because they store metadata about how they were bucketed and sorted.

いくつかの高度なバケット機能を、次に示します。Some advanced bucketing features are:

  • バケットのメタ情報に基づくクエリの最適化Query optimization based on bucketing meta-information.
  • 最適化された集計。Optimized aggregations.
  • 最適化された結合Optimized joins.

パーティション分割とバケットは同時に使用することができます。You can use partitioning and bucketing at the same time.

最適化された結合とシャッフルOptimize joins and shuffles

結合またはシャッフルで低速のジョブがある場合、その原因はデータ スキュー (ジョブ データの非対称) である可能性があります。If you have slow jobs on a Join or Shuffle, the cause is probably data skew, which is asymmetry in your job data. たとえば、マップ ジョブには 20 秒かかることがありますが、データが結合またはシャッフルされているジョブの実行には何時間もかかります。For example, a map job may take 20 seconds, but running a job where the data is joined or shuffled takes hours. データ スキューを修正するには、キー全体をソルティングするか、キーの一部のサブセットのみに分離したソルトを使用する必要があります。To fix data skew, you should salt the entire key, or use an isolated salt for only some subset of keys. 分離したソルトを使用する場合は、マップの結合でソルティングしたキーのサブセットを分離するため、さらにフィルター処理する必要があります。If you're using an isolated salt, you should further filter to isolate your subset of salted keys in map joins. もう 1 つのオプションは、バケット列を導入し、最初にバケットで事前に集計することです。Another option is to introduce a bucket column and pre-aggregate in buckets first.

結合が遅くなるもう 1 つの要因は、結合タイプである可能性があります。Another factor causing slow joins could be the join type. 既定では、Spark は SortMerge 結合タイプを使用します。By default, Spark uses the SortMerge join type. このタイプの結合は大きいデータ セットには最適ですが、それ以外の場合は、マージする前に最初にデータの左側と右側を並べ替える必要があるため、計算コストが高くなります。This type of join is best suited for large data sets, but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them.

Broadcast 結合は、小さいデータ セット、または結合の一方の側がもう一方よりはるかに小さい場合に最適です。A Broadcast join is best suited for smaller data sets, or where one side of the join is much smaller than the other side. このタイプの結合は、一方の側をすべての実行プログラムにブロードキャストするため、通常はブロードキャストに多くのメモリが必要です。This type of join broadcasts one side to all executors, and so requires more memory for broadcasts in general.

spark.sql.autoBroadcastJoinThreshold を設定することで構成内の結合タイプを変更することも、DataFrame API (dataframe.join(broadcast(df2))) を使用して結合のヒントを設定することもできます。You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold, or you can set a join hint using the DataFrame APIs (dataframe.join(broadcast(df2))).

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

バケット化したテーブルを使用する場合は、3 つ目の結合タイプ、Merge 結合があります。If you're using bucketed tables, then you have a third join type, the Merge join. 事前のパーティション分割と事前の並べ替えが適切に行われているデータセットは、SortMerge 結合からコストのかかる並べ替えフェーズを省略します。A correctly pre-partitioned and pre-sorted dataset will skip the expensive sort phase from a SortMerge join.

結合の順序は、より複雑なクエリでは特に重要です。The order of joins matters, particularly in more complex queries. 最も選択的な結合から開始してください。Start with the most selective joins. また、可能な場合は、集計後に行数を増やす結合を移動してください。Also, move joins that increase the number of rows after aggregations when possible.

デカルト結合の並列処理を管理するには、入れ子になった構造やウィンドウ化を追加することができ、場合によっては Spark ジョブの 1 つまたは複数の手順をスキップすることができます。To manage parallelism for Cartesian joins, you can add nested structures, windowing, and perhaps skip one or more steps in your Spark Job.

クラスター構成のカスタマイズCustomize cluster configuration

Spark クラスターのワークロードによっては、既定以外の Spark 構成のほうが結果として Spark ジョブの実行をより最適化することがわかる場合があります。Depending on your Spark cluster workload, you may determine that a non-default Spark configuration would result in more optimized Spark job execution. 既定以外のクラスター構成を検証するには、サンプルのワークロードを使用してベンチマーク テストを実行します。Perform benchmark testing with sample workloads to validate any non-default cluster configurations.

調整可能ないくつかの一般的なパラメーターを、次に示します。Here are some common parameters you can adjust:

  • --num-executors は、適切な数の実行プログラムを設定します。--num-executors sets the appropriate number of executors.
  • --executor-cores は、各実行プログラムのコア数を設定します。--executor-cores sets the number of cores for each executor. 他のプロセスが使用可能なメモリの一部を消費するため、通常は中規模な実行プログラムにしてください。Typically you should have middle-sized executors, as other processes consume some of the available memory.
  • --executor-memory は、YARN のヒープ サイズを制御する、各実行プログラムのメモリ サイズを設定します。--executor-memory sets the memory size for each executor, which controls the heap size on YARN. 実行のオーバーヘッド用のメモリを残しておいてください。You should leave some memory for execution overhead.

実行プログラムの適切なサイズの選択Select the correct executor size

実行プログラムの構成を決定する際は、Java ガベージ コレクション (GC) のオーバーヘッドを考慮してください。When deciding your executor configuration, consider the Java garbage collection (GC) overhead.

  • 実行プログラムのサイズを小さくする要因:Factors to reduce executor size:

    1. GC オーバーヘッドを 10% 未満に保つため、ヒープ サイズを 32 GB より小さくします。Reduce heap size below 32 GB to keep GC overhead < 10%.
    2. GC オーバーヘッドを 10% 未満に保つため、コア数を減らします。Reduce the number of cores to keep GC overhead < 10%.
  • 実行プログラムのサイズを大きくする要因:Factors to increase executor size:

    1. 実行プログラム間の通信オーバーヘッドを削減します。Reduce communication overhead between executors.
    2. 大きいクラスター (実行プログラムが 100 より多い) 上の実行プログラム (N2) の間で開いている接続の数を減らします。Reduce the number of open connections between executors (N2) on larger clusters (>100 executors).
    3. メモリを大量に使用するタスクに合わせて、ヒープ サイズを増やします。Increase heap size to accommodate for memory-intensive tasks.
    4. 省略可能:実行プログラムあたりのメモリ オーバーヘッドを減らします。Optional: Reduce per-executor memory overhead.
    5. 省略可能:CPU をオーバーサブスクライブすることで、使用率とコンカレンシーを増やします。Optional: Increase utilization and concurrency by oversubscribing CPU.

実行プログラムのサイズを選択する際の一般的な目安:As a general rule of thumb when selecting the executor size:

  1. 実行プログラムあたり 30 GB から始めて、使用可能なコンピューターのコアを分散します。Start with 30 GB per executor and distribute available machine cores.
  2. 大きいクラスター (実行プログラムが 100 より多い) の実行プログラムのコアの数を増やします。Increase the number of executor cores for larger clusters (> 100 executors).
  3. 試運転と、GC オーバーヘッドなどの上記の要因の両方に基づいて、サイズを変更します。Modify size based both on trial runs and on the preceding factors such as GC overhead.

同時クエリを実行する場合は、次を考慮してください。When running concurrent queries, consider the following:

  1. 実行プログラムあたり 30 GB と、すべてのコンピューター コアから始めます。Start with 30 GB per executor and all machine cores.
  2. CPU をオーバーサブスクライブすることで、複数の並列の Spark アプリケーションを作成します (約 30% の待機時間の改善)。Create multiple parallel Spark applications by oversubscribing CPU (around 30% latency improvement).
  3. クエリを並列アプリケーション全体に分散します。Distribute queries across parallel applications.
  4. 試運転と、GC オーバーヘッドなどの上記の要因の両方に基づいて、サイズを変更します。Modify size based both on trial runs and on the preceding factors such as GC overhead.

Ambari を使用してエグゼキュータを構成する方法の詳細については、Apache Spark の設定の Spark エグゼキュータに関する記事を参照してください。For more information on using Ambari to configure executors, see Apache Spark settings - Spark executors.

タイムライン ビュー、SQL グラフ、ジョブの統計情報などを調べることで、外れ値やその他のパフォーマンスの問題に対するクエリのパフォーマンスを監視します。Monitor your query performance for outliers or other performance issues, by looking at the timeline view, SQL graph, job statistics, and so forth. YARN と Spark History サーバーを使用した Spark ジョブのデバッグの詳細については、「Azure HDInsight で実行される Apache Spark ジョブのデバッグ」を参照してください。For information on debugging Spark jobs using YARN and the Spark History server, see Debug Apache Spark jobs running on Azure HDInsight. YARN Timeline Server を使う際のヒントについては、Apache Hadoop YARN アプリケーション ログへのアクセスに関する記事を参照してください。For tips on using YARN Timeline Server, see Access Apache Hadoop YARN application logs.

場合によっては、1 つまたは 2、3 の実行プログラムが他よりも遅くなり、タスクの実行にかなり長くかかることがあります。Sometimes one or a few of the executors are slower than the others, and tasks take much longer to execute. これは大きいクラスター (ノードが 30 より多い) で頻繁に発生します。This frequently happens on larger clusters (> 30 nodes). この場合は、スケジューラが低速のタスクを補正できるよう、より多くのタスクに操作を分割します。In this case, divide the work into a larger number of tasks so the scheduler can compensate for slow tasks. たとえば、タスクの数を、少なくとも 2 回、アプリケーション内の実行プログラムのコア数と同数にします。For example, have at least twice as many tasks as the number of executor cores in the application. また、conf: spark.speculation = true を使用してタスクの予測実行を有効にすることもできます。You can also enable speculative execution of tasks with conf: spark.speculation = true.

ジョブ実行の最適化Optimize job execution

  • たとえばデータを 2 回使用してからキャッシュする場合は、必要に応じてキャッシュします。Cache as necessary, for example if you use the data twice, then cache it.
  • 変数はすべての実行プログラムにブロードキャストします。Broadcast variables to all executors. 変数は 1 回シリアル化されるだけなので、結果として検索が高速になります。The variables are only serialized once, resulting in faster lookups.
  • ドライバーでスレッド プールを使用します。これにより、多数のタスクの操作が高速になります。Use the thread pool on the driver, which results in faster operation for many tasks.

パフォーマンスの問題の場合は、実行中のジョブを定期的に監視します。Monitor your running jobs regularly for performance issues. 特定の問題について詳細に知る必要がある場合は、次のパフォーマンスのプロファイル ツールのいずれかを検討してください。If you need more insight into certain issues, consider one of the following performance profiling tools:

Spark 2.x クエリのパフォーマンスに重要なものは、ステージ全体のコード生成に依存する Tungsten エンジンです。Key to Spark 2.x query performance is the Tungsten engine, which depends on whole-stage code generation. 場合によっては、ステージ全体のコード生成を利用できないことがあります。In some cases, whole-stage code generation may be disabled. たとえば、集計式で変更できない型 (string) を使用すると、HashAggregate ではなく SortAggregateが表示されます。For example, if you use a non-mutable type (string) in the aggregation expression, SortAggregate appears instead of HashAggregate. たとえば、パフォーマンスを向上させるために、次を実行してから、コード生成を再度有効にします。For example, for better performance, try the following and then re-enable code generation:

MAX(AMOUNT) -> MAX(cast(AMOUNT as DOUBLE))

次のステップNext steps