June 2018

Volume 33 Number 6

[Azure Databricks]

Monitoring Databricks Jobs with Application Insights

By Joseph Fultz | June 2018

We work on a team that focuses on data and analytics for large companies that want to implement or migrate their solutions to the cloud. These efforts come with the obvious work of optimizing and reengineering applications to various degrees to ensure they take advantage of what the cloud offers. As great as those efforts can be for the application itself, there are additional challenges for organizations just starting their cloud journey, as they must also do all of the work that goes along with extending their operational capabilities to the cloud. And, as new technologies emerge and evolve, these must be folded into the existing operational infrastructure, as well. This is one of the challenges that exists with Spark- and Apache Hadoop-based solutions. Yes, Apache Ambari is there to provide a nice dashboard and has an API to expose metrics, but many organizations already have an investment in and a good understanding of other monitoring and dashboarding solutions, such as Azure Application Insights.

Imagine a WebJob that pulls messages from Azure Event Hubs, does some initial validation, and then drops them into Azure Storage, at which point that data is processed through several Spark jobs, as shown in Figure 1. The goal is to have a single runtime dashboard that can provide information that shows not only what’s happening, but also process- and business-specific information while it’s in flight. Additionally, it would be great to be able to track the flow of that information as a holistic process and see details on its constituent processes.

Figure 1 Single Solution, Separate Processes, Separate Steps

Sure, you can see default metrics for WebJobs in Application Insights and some information from the Spark jobs in Ambari—and roll them all up with Azure Log Analytics for post hoc insights. However, we don’t want to see two separate processes with four steps each. We want to see the process as a whole and we want runtime insights and runtime alerts.

In this article, we’ll walk through considerations and planning for bringing the full project together using Application Insights. Additionally, we’ll be using the Azure Databricks flavor of Spark as it has a nice set of features that help us more easily develop and operationalize our workflow.

Planning for Application Insights Telemetry

We won’t be covering core concepts, but for a primer on these concepts take a look through the online documentation at bit.ly/2FYOCyp. Also, Victor Mushkatin and Sergey Kanzhelev wrote a good article about optimizing telemetry data collection, “Optimize Telemetry with Application Insights” (msdn.com/magazine/mt808502). Here, we’ll focus on organizing our notebooks and jobs to facilitate proper tracking in the form of the operation, event and data we send from our Databricks jobs.

In Databricks, you can define a job as the execution of a notebook with certain parameters. Figure 2 illustrates a couple of basic approaches to organizing work in a Databricks Notebook.

Figure 2 Basic Organization Options for a Databricks Notebook Job

Figure 2 shows two simple possibilities in which one job is defined as a single notebook with a number of code blocks or functions that get called while the other job displays a control notebook that orches­trates the execution of child notebooks, either in sequence or in parallel. This is not, by any means, the only organization that can be used, but it’s enough to help illustrate how to think about correlation. How you go about organizing the notebooks and code is certainly a worthwhile topic and is highly variable depending on the size and nature of the job. For a little more depth on Databricks Notebook Workflow, take a look at the blog post, “Notebook Workflows: The Easiest Way to Implement Apache Spark Pipelines” (bit.ly/2HOqvTj).

Notice that notebook organization has been aligned with discrete operations that can be used to group event reporting in Application Insights. In Application Insights, correlation is accomplished via two properties: Operation Id and Parent Operation Id. As seen in Figure 2, we wish to capture all of the discrete events and metrics within a code block or separate notebook under the context of a single operation, which is done by using a distinct operation Id for each section. Additionally, we’d like to see those separate large operation blocks as part of a whole, which we can do by setting the context’s parent operation Id to the same value for all metrics reporting in each operation. The parent operation Id can also be passed in from an outside trigger for the job, which would then provide a mechanism to link all of the discrete operations from the previous process and the Azure Databricks job as part of a single gestalt operation identified by the parent operation Id in Application Insights.

We’ve depicted a couple scenarios here. The key point is that you should consider how you want to organize your operations, events and metrics as part of the overall job organization.

Adding Application Insights to the Environment

In order to get the environment ready, you need to install the Python Application Insights library on the cluster, grab some configuration settings and add a bit of helper code. You can find Application Insights on pypi (pypi.python.org/pypi/applicationinsights/0.11.5/). To add it to Databricks, simply choose a location in your workspace (we created one named Lib) and right-click and choose Create, then Library. Once there, you can enter the pypi application name and Databricks will download and install the package. The last thing you’ll have to decide is whether or not you want to attach the library to all clusters automatically.

In the attempt to reduce the amount of code to add to each notebook, we’ve added an include file that has a couple of helper functions:

def NewTelemetryClient (applicationId, operationId="",  
  tc = TelemetryClient(instrumentationKey)
  tc.context.application.id = applicationId
  tc.context.application.ver = '0.0.1'
  tc.context.device.id = 'Databricks notebook'
  tc.context.operation.id = operationId
  tc.context.operation.parentId = parentOperationId
  return tc

This code contains a factory function named NewTelemetry­Client to create a telemetry client object, set some of the properties and return the object to the caller. As you can see, it takes a parent operation Id and an operation Id. This initializes the object, but note that if you need to change the operation Id, you’ll have to do it in the job notebook directly. Also worth noting is that the TelemetryClient constructor takes an instrumentation key, which can be found in the properties blade of the Application Insights instance you wish to use. We’ve statically assigned a few values that are needed for the example, but the TelemetryClient context object has many child objects and properties that are available. If you needed to initialize other values, this would be the place to do it. Separating out the factory function keeps the clutter down and also eases implementation for the developer converting the notebook from a sandbox prototype kind of code to an enterprise job kind of implementation. 

With the library added to the cluster and the setup notebook defined, we simply need to add a line at the top of the job notebook to run the setup and then create a starter telemetry object. We’ll issue a %run command at the top of the notebook:

%run ./AppInsightsSetup

In the subsequent cell we’ll simply instantiate a new instance of the TelemetryClient object.

Figure 3 shows the code from the prediction example we created. There are several things to take note of here. First, we’re passing in a number of variables to the notebook that are sent as part of the job initialization, which is done via the dbutils.widgets object provided as part of the Databricks environment. Because we need a couple of IDs for the parent oper­ation and the discrete operation, we’ll go ahead and check those and, if they’re empty, create and assign new UUIDs. Assigning the arbitrary IDs in this case is mostly to make it easier to run interactively. However, other approaches could be taken, such as encapsulating the job notebook’s code into a series of functions and running tests by calling the parent function with a specific ID. Both work sufficiently well for our purposes here. The last thing we assign is an operation name, which eventually shows up in Application Insights as something you can use to view and group by, as seen in Figure 4.

Figure 3 Notebook Initialization Code

baseRatingsFile = dbutils.widgets.get("baseRatingsFile")
newRatingsFile = dbutils.widgets.get("newRatingsFile")
trainOperationId = dbutils.widgets.get("trainOperationId")
parentOperationId = dbutils.widgets.get("parentOperationId")
maxIterations =  int(dbutils.widgets.get("maxIterations"))
numFolds = int(dbutils.widgets.get("numFolds"))
numUserRecommendations = int(
predictionFilePath = dbutils.widgets.get("predictionFilePath")
if trainOperationId == "":
  trainOperationId = NewCorrelationId()
if parentOperationId == "":
  parentOperationId = NewCorrelationId()
#setup other needed variables
telemetryClient = NewTelemetryClient("PredictionExample",
  trainOperationId, parentOperationId)
telemetryClient.context.operation.name = "Train Model"

Looking at Figure 3, you can see that the operation name was assigned the value of Train Model. Figure 4 depicts it in a grid of data after it was chosen as the grouping mechanism for the data. As we run more jobs through and assign differing operation names, we’ll be able to see those show up in the view, as well. With those things in place, we’re in good shape to work on instrumenting our job code to capture events and metrics.

Operation Name in Application Insights

Figure 4 Operation Name in Application Insights

Instrumenting Databricks Job Code

Let's walk through an example that uses Application Insights to monitor a typical data-engineering job in Databricks. In this scenario, we’re using publicly available data from Fannie Mae (bit.ly/­2AhL5sS) and will take raw source data on single-family loan performance and prepare it for reporting and analytics. Several steps are required to properly prepare the data. With each step, we’ll capture information like record count and elapsed time and record these in Application Insights. Figure 5 illustrates the high-level steps in the job. We’ve settled on using the titles across the top of Figure 5 to identify our separate operations.

Figure 5 Data Engineering Job Flow

Additionally, we’ve established a set of measurements with similar names (for example, Write Duration, Read Duration, Record Count) that will be reported in differently named events. This will be important in the analytics as we look at specific metrics and then view them by operation or event. As shown in Figure 5, first we ingest multiple data files, then consolidate and transform them, and finally write to two target locations. The fully prepared data set is persisted to long-term Blob Storage and an aggregated subset is sent to our RDBMS, Azure SQL Database. Of course, within each high-level step there are several sub-steps. Specifically, we import four distinct files, merge them into a single Spark DataFrame, and write the raw, consolidated dataset to Blob Storage. The consolidated data is then read back out of Blob storage into a new DataFrame for cleansing and transformation. To complete the transformation, we subset the DataFrame (that is, narrow it to relevant columns only), rename the columns to meaningful names, and replace null values in the Servicer Name column. The final form of the data is persisted in the Parquet file format. The last step in this example persists the data to an Azure SQL Database.

For this Azure Databricks job example, we've taken the single notebook approach with the steps programmed in separate code cells. One parent operation Id is set for each run of the job. A (child) operation Id applies to each operation within the job, and we’ve defined Acquisition, Transformation and Persistence as these operations. We track the events occurring for each operation, recording timestamp, record count, duration and other parameters in Application Insights at job runtime.

As in the earlier predictions example, we add the Python package “applicationinsights” to the cluster, run the setup notebook, and instantiate a new instance of the TelemetryClient object. This time we'll name the instance DataEngineeringExample and then set the initial operation name to Acquisition, in order to prepare for our first series of steps to acquire source data:

telemetryClient = NewTelemetryClient(
  "DataEngineeringExample", operationId, parentOperationId)
telemetryClient.context.operation.name = "Acquisition"

Next, we capture the current time and track our first event in Application Insights, recording that the job has started:

import datetime
jobStartTime = datetime.datetime.now()
jobStartTimeStr = str(jobStartTime)
telemetryClient.track_event('Start Job', { 'Start Time': jobStartTimeStr,
  'perfDataFilePath':perfDataFilePath, 'perfDataFileNamePrefix' :  
  perfDataFileNamePrefix, 'consolidatedDataPath':consolidatedDataPath, 
  'transformedFilePath' : transformedFilePath, 'perfDBTableName': perfDBTableName})

This is the code to set the current timestamp as the start time for the job, and record it in our first Application Insights event. First, we import the Python library datetime for convenient date and time functions, and then set variable jobStartTime to the current timestamp. It’s worth noting that the signature for the track_event([eventName], [{props}], [{measurements}]) method takes parameters for the event name, dictionary of properties, and a dictionary of measurements. To that end, the timestamp variable needs to be JSON-serializable to include it in the properties of the telemetry event. So, we cast the jobStartTime object as a string and put the value in a new variable jobStartTimeStr. In the next step, we send our initial telemetry event with the track_event method, passing it our custom event name Start Time along with several parameters we selected to capture with this event. We've included parameters for various file paths and the database table name that are used in the job. For example, perfDataFilePath contains the location of the source data files, and perfDBTableName contains the target table name in the Azure SQL Database where we'll persist some of the data. This is helpful information in such cases where we see a 0 record connect or have an alert set; we can take a quick look at the telemetry of the related operation and quickly check the files and/or databases that are being accessed.

Now we can proceed through the command cells in the notebook, adding similar event-tracking code into each step, with a few changes relevant to the inner steps of the job. Because it's often helpful to use record counts throughout a data-engineering job to consider data volume when monitoring performance and resource utilization, we've added a record count measurement to each tracked event.

Figure 6 shows a few basic data transformations, followed by event-tracking for Application Insights. Inside the exception-­handling Try block, we perform three types of transformations at once on the perfTransformDF DataFrame. We subset the DataFrame, keeping only a select group of relevant columns and discarding the rest. We replace nulls in the Servicer Name column with “UNKNOWN.” And, because the original column names were meaningless (for example, “_C0,” “_C1”), we rename the relevant subset of columns to meaningful names like “loan_id” and “loan_age.”

Figure 6 Data Transformation Event-Tracking Code

if notebookError == "":
    perfTransformedDF = perfTransformedDF['_c0','_c1','_C2','_C3','_C4', \
                                          '_C5','_C6','_C7','_C8','_C9', \
                                          '_C10','_C11','_C12','_C13'] \
      .fillna({'_C2':'UNKNOWN'}) \
      .withColumnRenamed("_C0", "loan_id") \
      .withColumnRenamed("_C1", "period") \
      .withColumnRenamed("_C2", "servicer_name") \
      .withColumnRenamed("_C3", "new_int_rt") \
      .withColumnRenamed("_C4", "act_endg_upb") \
      .withColumnRenamed("_C5", "loan_age") \
      .withColumnRenamed("_C6", "mths_remng") \
      .withColumnRenamed("_C7", "aj_mths_remng") \
      .withColumnRenamed("_C8", "dt_matr") \
      .withColumnRenamed("_C9", "cd_msa") \
      .withColumnRenamed("_C10", "delq_sts") \
      .withColumnRenamed("_C11", "flag_mod") \
      .withColumnRenamed("_C12", "cd_zero_bal") \
      .withColumnRenamed("_C13", "dt_zero_bal")
    print("nulls replaced")
    end = datetime.datetime.now()
    rowCount = perfTransformedDF.count()
    duration = round((end - start).total_seconds(), 1)
    telemetryClient.track_event('Transformation Complete', {}, \
                                { 'Records Transformed': rowCount, \
                                 'Transformation Duration':duration })
  except Exception as e:
    notebookError = str(e)
    telemetryClient.track_exception(e,{"action":"column transform"},{})
  print("command skipped due to previous error")

Once the transformations are complete, we capture the current timestamp in variable “end” as the time this step completed; count the rows in the DataFrame; and calculate the step duration based on start and end times. We send that telemetry to Application Insights with the telemetryClient.track_event method using the event name “Transformation Complete,” and we include measure­ments for records transformed and transformation duration.

We add some very basic exception handling into our notebooks purely to illustrate tracking exceptions with Application Insights, as well. Note within the except block in Figure 6 that if we catch an exception, we’re going to call the track_exception method. We pass the exception as the first parameter and the subsequent parameters are the same types as in track_event, allowing you to record as much information around the event as possible. One important note to make here is that there’s currently no exception-handling semantics for inline sql. So, it might be best to skip magics like %sql for production jobs until support for exception handling is added.

The other steps in our data-engineering job, including operations for Acquisition and Persistence, follow the pattern seen in the Transformation code for sending telemetry events with custom measurements to Application Insights.

Configuring Analytics and Alerts

With the code in place to send the telemetry, we turn to configuring Application Insights to create live dashboards, look through event and correlated event details, and set up alerts to inform and possibly take action based on the event trigger.

Figure 7 depicts a few charts that we’ve configured via the Metrics Explorer blade and the Metrics (preview) blade in Application Insights and then pinned to our Azure Portal Dashboard.

Application Insights Charts on Azure Dashboard

Figure 7 Application Insights Charts on Azure Dashboard

Take note of the right two quartiles. The top right shows a grid of durations grouped by the operation name that we reported the telemetry under when we added the tracking calls. The bottom right shows a record count measurement grouped by the event name we used. Sure enough, “Persist to SQL DB” is much lower than the others, as this was the event that wrote only a small, filtered subset of our data to Azure SQL Database. Choosing your operation groupings, operation names, and event names is an important part of planning that pays off at this point as you get to visualize and report on the data in a way that makes sense for how you think about your operations.

The left two quartiles in Figure 7 show charts that were created with the Metrics (preview), which has a nice configuration UI, as well as some additional functionality for splitting the measurements based on another property. In the top left you can see the record count, but we’ve split it so that this is reported by Event Name, giving us a graph and data for the record count for different events. Here we’re comparing record counts taken when the source data was read to record counts taken later when consolidated data was loaded into a DataFrame. This is an important feature since Record Count might be a pretty common measurement across our parent operation, but we’d like to see it at each operation or event.

If you see something in one of the operational graphs that calls for some research, you can search through all the telemetry. Figure 8 depicts the results of a search and a graph showing an occurrence count over time in the left pane. In the right pane you can look at all of the information recorded in the event. Recall the track_event([name], [properties], [measurements]) signature. We’ve pulled up the detail of a Persist to SQL DB event in which you can see the custom properties at the top. In the middle, labeled Custom Data, is where you can find the custom measurements that were sent with the telemetry. At the bottom right are all of the Related Items where you can easily navigate to all of the events that belong to the operation or parent operation. In addition, there’s a line at the bottom to see all available telemetry at the time of the event. If you’ve standardized on Application Insights for your runtime monitoring, this is a great tool for understanding the overall system state and the operational context of an event. Having insight into what’s going on broadly might help explain when record counts are off or duration is askew.

Event Search and Details

Figure 8 Event Search and Details

The last thing we want to cover for Application Insights is the ability to set up an alert. In Figure 9 you can see part of the alert configuration. Like the other elements we looked at, the custom information we sent in the events shows up here for us to choose as criteria for alerting.

Figure 9 Setting Up an Alert on Metric

As you might expect, the alert can send an e-mail. However, it can also call a WebHook, which makes for a nice and easy way to take any other action you might desire. Azure Functions is a perfect fit for this setup and will allow you to create whatever custom action you like. More interestingly, Application Insights is directly integrated with Logic Apps. This enables native capability to integrate and orchestrate actions across a wide variety of integrations and Microsoft Azure. Thus, an Application Insights alert could notify people while starting to take compensating and/or corrective actions via Logic Apps orchestration, including actions and integrations with downstream and upstream systems.

In Closing

We want to make sure we highlight the key bits of information. Application Insights is not a Log Analytics solution. It integrates with Azure Log Analytics, which provides post hoc analysis and long-term log retention. Application Insights is for monitoring and analytics of your runtime operations, giving you information, insights and alerts about what’s happening now. Direct integration with other Azure services and broad availability of platform SDKs makes it a nice fit to help operationalize your Azure Databricks jobs. As a result, monitoring for those jobs isn’t done in a silo, but rather within the context of the full solution architecture.

Joseph Fultz is a cloud solution architect at Microsoft. He works with Microsoft customers developing architectures for solving business problems lev-eraging Microsoft Azure. Formerly, Fultz was responsible for the development and architecture of GM’s car-sharing program (mavendrive.com). Con-tact him on Twitter: @JosephRFultz or via e-mail at jofultz@microsoft.com.

Ryan Murphy is a solution architect living in Saint Louis, Mo. He’s been building and innovating with data for nearly 20 years, including extensive work in the gaming and agriculture industries. Currently, Murphy is helping some of the world’s largest organizations modernize their business with data solutions powered by Microsoft Azure Cloud. Follow him on Twitter: @murphrp.