Optimize Apache Spark jobs in Azure Synapse Analytics

Completed

Once you have checked the Monitor tab within the Azure Synapse Studio environment, and decide that you should improve the performance of an Apache Spark pool run, you have several areas to consider, including:

  • Choosing the data abstraction
  • Use the optimal data format
  • Use the cache option
  • Check the memory efficiency
  • Use Bucketing
  • Optimize Joins and Shuffles if appropriate
  • Optimize Job Execution

To optimize the Apache Spark Jobs in Azure Synapse Analytics, you need to consider the cluster configuration for the workload you're running on that cluster. You might run into challenges such as memory pressure (if not configured appropriately by choosing the wrong size of executors), long running operations, and tasks that might result in cartesian operations.

If you want to speed up the jobs, you'd have to configure the appropriate caching for that task, and check joins and shuffles in relation to data skew. Therefore, it is imperative that you monitor and review Apache Spark Job executions that are long running or resource consuming. Some recommendations for you to optimize the Apache Spark Job include:

Choosing the data abstraction

Some of the earlier Apache Spark versions use Resilient Distributed DataSets (RDDs) to abstract the data. Apache Spark 1.3 and 1.6 introduced the use of DataFrames and DataSets. The following relative merits might help you to optimize in relation to your data abstraction:

DataFrames

Using DataFrames would be a great place to start. DataFrames provide query optimization through Catalyst. It also includes a whole-stage code generation with direct memory access. When you want to have the best developer-friendly experience, it might be better to use DataSets, since there are no compile-time checks or domain object programming.

DataSets are good in complex ETL pipeline optimization where the performance effect is acceptable. Just be cautious when using DataSets in aggregations, since it might affect the performance. It will provide query optimization through Catalyst and is developer-friendly by providing object programming and compile-time checks.

RDDs

It is not necessary to use RDDs unless you want or need to build a new custom RDD. However, there is no query optimization through Catalyst and no whole-stage code generation and would still have a high garbage collection (GC) overhead. The only way to use RDDs is with Apache Spark 1.x legacy APIs.

Use the optimal data format

Apache Spark supports many data formats. The formats that you can use are csv, json, xml, parquet etc. It can also be extended by other formats with external data sources. A useful tip is to use Parquet with snappy compression (which also happen to be the default in Apache Spark 2.x.) as it stores data in a columnar format, is compressed and highly optimized in Apache Spark, and is splittable to decompress.

Use the cache option

When it comes to the caching, there is a native built-in Apache Spark caching mechanism. It can be used through different methods like: .persist(), .cache(), and CACHE TABLE. When using small datasets, it might be effective.

In ETL pipelines where caching of intermediate results is necessary it might come in handy too. Just keep in mind that when you need to do partitioning, the Apache Spark native caching mechanism might have some downsides because a cached table won't keep the partitioning data.

Check the memory efficiency

It is also imperative to understand how to use the memory efficiently. Apache Spark operates by placing data in memory. Therefore, managing memory resources is an aspect of optimizing Apache Spark jobs executions.

One way to manage memory resources might be to check smaller data partitions and check data size, types, and distributions when you formulate a partitioning strategy. Another way to optimize is to consider Kryo data serialization: Kryo data serialization, versus the default Java serialization. Always bear in mind to keep monitoring and tuning the Apache Spark configuration settings.

Use Bucketing

Bucketing is almost the same as data partitioning. The way it differs is that a bucket holds a set of column values instead of one. It might work well when you partition on large (millions or more) values like product identifiers. A bucket is determined by hashing the bucket key of a row. Bucketed tables are optimized because it is a metadata operation on how the data is bucketed and sorted.

Some advanced bucketing features are:

  • Query optimization based on bucketing meta-information.
  • Optimized aggregations.
  • Optimized joins.

However, bucketing doesn't exclude partitioning. They go hand in hand, and you can use partitioning and bucketing at the same time.

Optimize joins and shuffles

When you have a slower performance on join or shuffle jobs, it can be caused by data skew. Data skew is data that is stored asymmetrically on your system. An example might be that a job typically only takes 20 seconds, however running the same job where data is joined and shuffled can take hours.

To fix that data skew, you can salt the entire key, or use an isolated salt for only some subset of keys. Another option to investigate might be the introduction of a bucket column or pre-aggregated data in buckets.

However, there's more to causing slow performance just by joins alone, since it might be the join type that is causing the slow performance.

Apache Spark uses the SortMerge join type. This type of join is best suited for large data sets but is otherwise computationally expensive because it must first sort the left and right sides of data before merging them. Therefore, a Broadcast join might be better suited for smaller data sets, or where one side of the join is much smaller than the other side.

You can change the join type in your configuration by setting spark.sql.autoBroadcastJoinThreshold, or you can set a join hint using the DataFrame APIs (dataframe.join(broadcast(df2))) as shown in the following code.

// Option 1
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 1*1024*1024*1024)

// Option 2
val df1 = spark.table("FactTableA")
val df2 = spark.table("dimMP")
df1.join(broadcast(df2), Seq("PK")).
    createOrReplaceTempView("V_JOIN")

sql("SELECT col1, col2 FROM V_JOIN")

If you did decide to use bucketed tables, you will have a third join type, the Merge join. A correctly pre-partitioned and pre-sorted dataset will skip the expensive sort phase from a SortMerge join. Another thing to keep in mind is that the order of the different types of joins does matter, especially in complex queries. Therefore, it's advised to start with the most selective joins. In addition, try to move joins that increase the number of rows after aggregations when possible.

Optimize Job Execution

Looking at the sizing of executors to increase performance in your Apache Spark job, you could consider the Java garbage collection (GC) overhead and use the following factors to reduce executor size:

  • Reduce heap size below 32 GB to keep GC overhead < 10%.
  • Reduce the number of cores to keep GC overhead < 10%.

Or consider the following factors to increase executor size:

  • Reduce communication overhead between executors.
  • Reduce the number of open connections between executors (N2) on larger clusters (>100 executors).
  • Increase heap size to accommodate for memory-intensive tasks.
  • Reduce per-executor memory overhead.
  • Increase utilization and concurrency by oversubscribing CPU.

As a rule of thumb, when selecting the executor size:

  • Start with 30 GB per executor and distribute available machine cores.
  • Increase the number of executor cores for larger clusters (> 100 executors).
  • Modify size based both on trial runs and on the preceding factors such as GC overhead.

When running concurrent queries, consider as follows:

  • Start with 30 GB per executor and all machine cores.
  • Create multiple parallel Apache Spark applications by oversubscribing CPU (around 30% latency improvement).
  • Distribute queries across parallel applications.
  • Modify size based both on trial runs and on the preceding factors such as GC overhead.

As stated before, it's important to keep monitoring the performance, especially outliers, using the timeline view, SQL graph, job statistics, and so on. It might be a case where one of the executors is slower than the other, which most frequently happens on large clusters (30+ nodes). What you then might consider is to divide the work into more tasks so that the scheduler can compensate for the slower tasks.

If there is an optimization necessary in relation to the optimization of a job execution, make sure you keep in mind the caching (an example might be using the data twice, but cache it). If you broadcast variables on all the executors you set up, due to the variables only being serialized once, you'll have faster lookups.

In another case you might use the thread pool that runs on the driver, which could result in faster operations for many tasks.