April 2016

Volume 31 Number 4

[Big Data]

Data Processing and Machine Learning on Spark

By Eugene Chuvyrov

Here’s a question for you: What’s the name of the framework that borrowed heavily from the Microsoft Dryad project, became the most popular open source project of 2015 and also set a data processing record, sorting 100TB of data in just 23 minutes? The answer: Apache Spark.

In this article, I’ll talk about the speed and popularity of Spark and why it’s the clear current winner in the Big Data processing and analytics space. Using your Microsoft Azure subscription, I’ll present examples of solving machine learning (ML) problems with Spark, taking a small step from software engineering into the data science world. But before I dive into data analysis and ML, it’s important to say a few words about various components of the Spark framework and about Spark’s relationship with Azure.

Spark Components

The value of the Spark framework is that it allows for processing of Big Data workloads on the clusters of commodity machines. Spark Core is the engine that makes that processing possible, packaging data queries and seamlessly distributing them across the cluster. Besides Spark Core, there are several additional components to the Spark framework, and each of those components is applicable to a specific problem domain. It’s possible that you would never need to work with any of those components if you’re interested only in manipulating and reporting on large data workloads. However, in this article, I’ll use Spark MLLib to build out an ML model that’ll let you fairly accurately “guess” the digits that’ve been written by hand (a lot more on this later). Other components of the Spark framework allow for processing of streaming data (Spark Streaming), manipulation of graphs and the computation of the famous Page­Rank algorithm (GraphX), and running SQL queries on top of distributed data (Spark SQL).

Running Spark on Azure

There are quite a few options in experimenting with Spark, from using managed services from databricks.com (this company created and continues to enhance Spark), to provisioning a Docker container and grabbing pre-installed Spark images from Docker Hub, to getting the entire source code repo from GitHub (github.com/apache/spark) and building the product yourself. But because this article is about Azure, I’d like to show you how to create Spark clusters on Azure. The reason this option is extremely interesting is because Azure provides enterprise-level guarantees for Spark deployed onto Azure compute clusters. Azure gives a 99.9 percent Microsoft-­backed SLA for all Spark clusters and also offers 24x7 enterprise support and cluster monitoring. These guarantees, coupled with the ease of cluster deployment and a slew of announcements around Spark and Azure during the 2016 Build conference make Microsoft Cloud an excellent environment for your Big Data jobs.

The Pixie Dust

The secret that makes Spark so popular among data scientists today is two-fold: It’s fast and it’s a joy to program on that framework. First, let’s look at what makes Spark much faster than the frameworks that preceded it.

Spark’s predecessor, Hadoop MapReduce, was the workhorse of Big Data analytics space ever since Doug Cutting and Mike Cafarella co-founded the Apache Hadoop project in 2005. MapReduce tools were previously available only inside the Google datacenters and were completely closed sourced. Hadoop worked well for running batch analytics processing on the cluster, but it suffered from extreme rigidity. Map and Reduce operations go together; first you complete the Map task, then you complete the Reduce task. Complex tasks had to combine multiple map and reduce steps. Also, every task had to be decomposed into a map to reduce operations. That took a long time to run these sequential operations and was tedious to program. In other words, this wasn’t real-time analytics.

In contrast, the Spark framework applies intelligence to data analytics tasks at hand. It constructs a Directed Acyclic Graph (DAG) of execution before scheduling tasks, very similar to how SQL Server constructs a query execution plan before executing a data retrieval or manipulation operation. DAGs provide information about transformations that’ll be performed on data and Spark is able to intelligently combine many of these transformations into a single stage and then execute transformations all at once—an idea initially pioneered by Microsoft Research in Project Dryad.

Additionally, Spark is able to intelligently persist data in memory via the constructs called Resilient Distributed Datasets (RDDs)—which I’ll explain later—and share it among DAGs. This sharing of data among DAGs lets jobs complete faster than they would have without that optimization. Figure 1 shows a DAG for the “hello world” of the data science space­—the count of words in a given text file. Notice how several operations, namely reading text file, flatMap and map, are combined into a single stage, allowing for faster execution. The following code shows the actual Scala code (because Spark is written in Scala) performing the word count (even if you’ve never seen a line of Scala code before, I’m willing to bet that you’ll instantly understand how to implement the word count in Spark):

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1))  
  .reduceByKey((a, b) => a + b)

Directed Acyclic Graph (DAG) for Word Count
Figure 1 Directed Acyclic Graph (DAG) for Word Count

The second reason Spark is so popular is because of its programming model. Implementing the word count in Spark (Scala code) is much simpler than implementing word count in Hadoop Map­Reduce. In addition to Scala, you can create Spark applications in Java and Python, which is the language I’m using in this article. Before Spark, with Hadoop MapReduce, data scientists/programmers had to use an unnatural paradigm of breaking down a complex task into a set of maps and reduce operations. With Spark, a functional programming approach familiar to any .NET developer working with LINQ and lambda functions is used to transform and analyze data.

Shortly, you’ll see just how easy, yet powerful, the Spark programming model is. But before you get to write awesome functional code that’ll work equally well on datasets large and small, you need to create a distributed cluster of machines that’ll have all the necessary components of Spark installed and ready to accept programming tasks you submit to it. The creation of a Spark cluster would be absolutely daunting if you had to create and configure the cluster yourself; fortunately, Microsoft Cloud lets you accomplish provisioning in just a few clicks. In the next section, I’ll show you just how to do that.

Deploying a Spark Cluster

Now, let’s create an HDInsight cluster on Azure. Think of “HDInsight” as an umbrella term that includes both Hadoop and Spark technologies; Hadoop HDInsight and Spark HDInsight are two examples of managed Big Data services on Azure.

To provision a Spark cluster, log on to the Azure Portal (portal.azure.com) and click through New | Data + Analytics | HDInsight | Create. Fill out HDInsight Cluster properties, specifying Name, Cluster Type = Spark, set Cluster Operating System as Linux (because Spark is being developed on Linux) and leave the version field unchanged, as shown in Figure 2. Complete the rest of the required information, including specifying the credentials to log onto the cluster and storage account/container name. Then press the Create button. The process of creating a cluster takes 15 to 30 minutes.

Creating a Spark Cluster in Azure
Figure 2 Creating a Spark Cluster in Azure

After the creation process completes, you’ll have a tile in the Azure portal representing the newly created HDInsight cluster. Finally, you get to dive deep into code! Before getting into code, however, let’s review the programming environment and languages available to you with Spark.

There are several ways to program in the Spark environment. First, you can access Spark shell via, intuitively enough, the spark-shell command, explained at bit.ly/1ON5Vy4, where, after establishing an SSH session to the Spark cluster head node, you can write Scala programs in a REPL-like manner and submit programming constructs one at a time (don’t worry if this sentence sounded like it was written in a foreign language, just proceed directly to Option 3). Second, you can run complete Scala applications on Spark (submitting them via the spark-submit command explained at bit.ly/1fqgZHY). Finally, there’s also an option to use Jupyter notebooks (jupyter.org) on top of Spark. If you aren’t familiar with the Jupyter project, Jupyter notebooks provide a visual, Web-based interactive environment in which to run data analytics scripts. These notebooks are my preferred method of data analysis and I’m convinced that, once you try them, they’ll become your preferred method of programming on Spark, too. Azure HDInsight installs the Jupyter notebook environment on top of the cluster for you, making it easy to start using it.

To access Jupyter notebooks, click on the Cluster Dashboards tile as illustrated in Figure 3, then click the Jupyter notebook tile on the slide-out window. Log in using the credentials you specified during cluster creation and you should see the Jupyter environment ready to accept new or edit old notebooks. Now click on the New button in the upper-right corner and select Python 2. Why Python 2? Because while Spark itself is written in Scala and a lot of Spark programming is done in Scala, there’s also a Python bridge available via Pyspark. By the way, there’s a raging debate whether you should code in Scala or Python. Each language has its clear benefits, with Scala being potentially faster, while Python is perhaps more expressive and the most commonly used language for data science (see bit.ly/1WTSemP). This lets you use an expressive, yet concise Python when programming on top of the Spark cluster. Python is also my preferred language for data analysis (along with R) and I can utilize all the powerful Python libraries that I’m used to.

Accessing Jupyter Notebooks in Azure HDInsight Via Cluster Dashboards
Figure 3 Accessing Jupyter Notebooks in Azure HDInsight Via Cluster Dashboards

You’re finally ready to dive deep and perform ML and data analytics tasks inside the Jupyter notebooks.

Machine Learning with Spark

To illustrate ML in Spark, I’ll use a “smallish” data example in the form of classical problems in ML—recognizing handwritten digits, such as the ones that appear in ZIP codes on envelopes. Although this dataset isn’t large by any means, the beauty of this solution is that, should the data increase one-thousand fold, you could add more machines to the cluster and still complete the data analysis in a reasonable amount of time. No changes to the code illustrated here will be necessary—the Spark framework will take care of distributing workloads to individual machines in the cluster. The data file that you’ll use is also a classical one—it’s frequently referred to as MNIST dataset—and it contains 50,000 handwritten digits, ready for you to analyze. Although there are many places online to get the MNIST dataset, the Kaggle Web site gives you convenient access to that data (see bit.ly/1QJN20c).

As a side note, if you’re not familiar with kaggle.com, it hosts ML competitions online, where almost 500,000 data scientists from around the world compete for monetary prizes or a chance to interview at one of the top ML companies. I’ve competed in five Kaggle competitions and, if you’re a competitive person, it’s an extremely addictive experience. And the Kaggle site itself is running on Azure!

Let’s take a moment to understand the contents of train.csv. Each line of that file represents a pixel-by-pixel representation of a 28x28 image containing a handwritten digit, such as the one shown in Figure 4 (the figure shows a zoomed-in representation). The first column contains what the digit really is; the rest of the columns contain pixel intensities, from 0 to 255, of all 784 pixels (28x28).

Zoomed in Sample of the Digit “7” Represented in the MNIST Dataset
Figure 4 Zoomed in Sample of the Digit “7” Represented in the MNIST Dataset

With the new Jupyter notebook open, paste the following code into the first cell:

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
import time
sc = SparkContext(appName="MNISTDigitsDT")
#TODO: provide your own path to the train.csv in the line(s) below, 
# you can use Azure Storage 
#Explorer to upload files into the cloud and to read their full path
fileNameTrain = 'wasb://datasets@chuvyrov.blob.core.windows.net/trainingsample.csv'
fileNameTest = 'wasb://datasets@chuvyrov.blob.core.windows.net/validationsample.csv'
mnist_train = sc.textFile(fileNameTrain)
mnist_test = sc.textFile(fileNameTest)

This code imports the necessary libraries for doing ML in Spark, then specifies the location of the data files that’ll be used for training and testing the model (note that these files should reside in your storage account, accessible from Spark in Microsoft Cloud via the wasb:// reference). Finally, the last two lines is where the RDD are created from the text files. RDDs are the magic behind Spark—they’re distributed data structures, but the complexity of their implementation is generally hidden from the programmer/user. Additionally, these RDDs are lazily evaluated and are persisted, so in case you needed to use that RDD again, it’s immediately available without re-computation/retrieval. When you manipulate RDDs, it triggers the generation of DAGs and the execution of staged tasks in the Spark cluster, as I touched upon earlier.

Press Shift+Enter inside the Jupyter cell to execute the code you pasted. No news should be good news (if you didn’t get an error message, you’re good), and you should now have RDDs available to you for querying and manipulation. These RDDs contain lines of comma-separated text at the moment, because that’s how your MNIST data came through.

The next thing you’re going to do is define a simple function that’ll help you convert these lines of text into a custom LabeledPoint object. This object is required for the ML algorithm that you’ll use to train and make predictions. In a nutshell, this object contains an array of “features” (sometimes, it’s convenient to think of features as columns in a database table) or characteristics about a single data point, as well as its “label,” or the value you’re trying to learn to predict. If this sounds a bit unclear right now, perhaps looking at the MNIST train.csv file might help. You’ll notice that every line in train.csv has a number in the first column and a set of numbers, from 0 to 255, in all other columns. The first column is called the “label,” because we’re trying to learn how to predict that number. All other columns are “features” and the features taken all together are referred to as a “feature vector.” These features are the intensity of each digitized pixel in the picture of the digit, 0 being black and 255 being white, with many values in between. The pictures are all 28 pixels high and 28 pixels wide, making up 784 columns containing pixel intensities in the train.csv file (28x28=784).

Copy and paste the following function into the new cell of your Jupyter notebook:

def parsePoint(line):
  #Parse a line of text into an MLlib LabeledPoint object
  values = line.split(',')
  values = [0 if e == '' else int(e) for e in values]
  return LabeledPoint(int(values[0]), values[1:])

Press Shift+Enter to execute the code. You have now defined the parsePoint function, which has been evaluated by Spark and it’s available for you to use with the dataset that you just read in. This function takes in a single line of comma-separated text, splits it into individual values and converts these values into the LabeledPoint object.

Next, you perform some basic data cleansing to get it ready for the learning algorithm; unfortunately, the learning algorithm isn’t yet smart enough to know what part of the data has predictive value. So, skip the header of the train.csv file using a hack borrowed from stackoverflow.com; then, you’ll print the first line of the resulting RDD to make sure it’s in the state you expect it to be in:

#skip header
header = mnist_train.first() #extract header
mnist_train = mnist_train.filter(lambda x:x !=header) 
#filter out header using a lambda
print mnist_train.first()

Now, you’re ready to apply a functional programming approach with the .map(parsePoint) operator in the next section to transform the RDD into the format ready for ML algorithms in Spark. This transformation will essentially parse every line inside the mnist_train RDD and convert that RDD to a set of LabeledPoint objects.

RDDs and Interactivity: Main Pillars of Spark’s Power

There are several important issues here. First, you’re working with a data structure distributed across the cluster of machines (the RDD), yet the complexity of distributed computing is almost completely hidden from you. You’re applying functional transforms to the RDD, and Spark optimizes all the processing and heavy lifting across the cluster of available machines behind the scenes for you:

labeledPoints = mnist_train.map(parsePoint)
#Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = labeledPoints.randomSplit([0.7, 0.3])
print mnist_train.first()

Although the last line (with the print statement) might seem trivial, the ability to interactively query large datasets is extremely powerful and was virtually absent from the world of large datasets before Spark. In your data science and large data manipulation projects, it’ll be a very useful technique to verify that the transformations you think are being applied are indeed being applied. This powerful interactive processing is yet another advantage of Spark over other Big Data processing frameworks.

Also notice the splitting of the data into the training and test dataset using the randomSplit function. The idea there is to create an ML model using the data in trainingData RDD and to test the model using the data in testData RDD, as you’ll see in the code shortly.

You’re now ready to apply an ML algorithm to the distributed dataset that you’ve just created (mnist_train). As a quick review, remember that in ML problems, almost in all cases there are two distinct sets of steps that occur: First, you train the model using the known dataset with known conclusions; second, you make predictions based on the model you created, or learned, in the first step. In the following code, you’re using a RandomForest algorithm available within the Spark Machine Learning framework (Spark MLLib) to train the model. RandomForest is one of several distributed algorithms available within Spark MLLib and it’s one of the most powerful. Paste the following contents into the new cell:

depthLevel = 4
treeLevel = 3
#start timer
start_time = time.time()
#this is building a model using the Random Forest algorithm from Spark MLLib
model = RandomForest.trainClassifier(trainingData, numClasses=10, 
  numTrees=treeLevel, featureSubsetStrategy="auto",
  impurity='gini', maxDepth=depthLevel, maxBins=32) 
print("Training time --- %s seconds ---" % (time.time() - start_time))

Note how this code starts to measure the execution time of the algorithms, then sets initial values for some of the parameters expected by the RandomForest algorithm, namely maxDepth and numTrees. Execute that code by pressing Shift+Enter. You might be wondering what this RandomForest thing is and how does it work? RandomForest is an ML algorithm that, at a very high level, works by constructing many decision trees on the data by randomly selecting a variable to split a decision tree on (that is, one tree could be as simple as, “If the pixel in the bottom-right corner is white, it’s probably No. 2”) and then making the final decision after polling all the trees constructed. Fortunately, there’s already a distributed version of the algorithm available to you on Spark. However, nothing stops you from writing your own algorithms should you decide to do it; distributed k-Nearest Neighbors (kNN) algorithm still isn’t present in the Spark framework.

Now, back to the MNIST digits recognition task. If you have an environment similar to mine, you should get execution time of training the algorithm of about 21 seconds. This means that in 21 seconds, you’ve learned—using the RandomForest algorithm—a model that you can use to predict the digits you’re seeing given the features you’ve analyzed. Now you’re ready for the most important part of the ML task—making predictions based on the model you’ve created. In addition, you’re also ready to evaluate the accuracy of these predictions, as shown in Figure 5.

Figure 5 Evaluating the Accuracy of Your Predictions

# Evaluate model on test instances and compute test error
1 #start timer
2 start_time = time.time()
3 #make predictions using the Machine Learning created prior
4 predictions = model.predict(testData.map(lambda x: x.features))
5 #validate predictions using the training set
6 labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
7 testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() /
8   float(testData.count())
9 print('Test Error = ' + str(testErr))
10 print("Prediction time --- %s seconds ---" % (time.time() - start_time))
11 #print('Learned classification tree model:')
12 #print(model.toDebugString())

Note the model.predict construct on line 4 of Figure 5. This is the line that makes the actual prediction based on the model that you’ve built earlier. On the lines after the prediction is made (lines 5-7), you’re using some basic data manipulation strategies to temporary relate—via the zip function—your predicted values to the real values available to you as part of the download. Then, you simply compute the percentage of correct predictions given this data and print the execution time.

The result of this initial classification with the error being so high is slightly disconcerting (that is, does your model work at all with error rates approaching 43 percent?). You can improve the model using the concept called “grid hyperparameter search” where you try a series of values when building out the model, test it right away and eventually converge on the hyperparameter values that give you the best performance overall. In other words, try a bunch of systematic experiments to determine what model parameters have the best predictive value.

The hyperparameters that you’ll apply grid search to will be numTrees and maxDepth; paste the code shown in Figure 6 into the new cell in the notebook.

Figure 6 Iterative “Grid Search” for Optimal Parameters in the RandomForest Algorithm in Spark

1 bestModel = None
2 bestTestErr = 100
3 #Define a range of hyperparameters to try
4 maxDepths = range(4,10)
5 maxTrees = range(3,10)
7 #Loop over parameters for depth and tree level(s)
8 for depthLevel in maxDepths:
9 for treeLevel in maxTrees:
11   #start timer
12   start_time = time.time()
13   #Train RandomForest machine learning classifier
14   model = RandomForest.trainClassifier(trainingData,
15     numClasses=10, categoricalFeaturesInfo={},
16     numTrees=treeLevel, featureSubsetStrategy="auto",
17     impurity='gini', maxDepth=depthLevel, maxBins=32)       
19   #Make predictions using the model created above
20   predictions = model.predict(testData.map(lambda x: x.features))
21   #Join predictions with actual values from the data and determine the error rate
22   labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
23   testErr = labelsAndPredictions.filter(lambda (v, p): v != p)
24     .count() / float(testData.count())
26   #Print information about the model as we proceed with each iteration of the loop
27   print ('\maxDepth = {0:.1f}, trees = {1:.1f}: trainErr = {2:.5f}'
28          .format(depthLevel, treeLevel, testErr))
29   print("Prediction time --- %s seconds ---" % (time.time() - start_time))
30   if (testErr < bestTestErr):
31       bestModel = model
32       bestTestErr = testErr
34 print ('Best Test Error: = {0:.3f}\n'.format(bestTestErr))

Notice how in lines 8-14 you scan through a set of numTrees parameters for the random forest algorithm from 3 to 10, creating models and evaluating their performance. Next, in lines 30-32, you capture the model if it gives you better results than any of the prior models that you’ve tried, or dismiss the model otherwise. Give this loop some time to run; at the end of the run, you should see prediction error values no greater than 10 percent.

Wrapping Up

When I set out to write this article, my main goal was to show through examples how easy it is to program with Spark, especially if you’re a fan of functional programming and Azure. My secondary goal was to demonstrate how you can perform ML tasks on datasets both large and small with the help of the Spark MLLib library. Along the way, I wanted to explain why Spark performs faster on distributed data than its predecessors and share bits of trivia of how we arrived at where we are today in the distributed data analytics space.

Microsoft is investing heavily in the future of Big Data, ML, analytics and, specifically, Spark. This is the right time to learn these technologies to truly take advantage of the opportunities for hyperscale compute and data analytics provided by Microsoft Cloud. Azure makes getting going with Spark fast, easy and ready to scale up to large datasets, all backed by service-level guarantees you can expect only from the best enterprise cloud providers.

Now that you’ve created an ML model and made predictions on the known data, you can also make predictions on the data that doesn’t include its true label; namely, on the test.csv file from Kaggle.com. You can then make a submission to Kaggle.com as part of the digit recognizer competition on that platform. All of the code for this article, as well as the code to write a submission file, is available at GitHub.com/echuvyrov/SparkOnAzure. I’d love to learn about the scores that you get. E-mail me with questions, comments, suggestions and ML achievements at eugene.chuvyrov@microsoft.com.

Eugene Chuvyrov is a cloud solutions architect at Microsoft in the Technical Evangelism and Development team where he helps companies around the San Francisco Bay area take full advantage of the hyper scale afforded by Microsoft Cloud. Although he currently focuses on high-scale data partners, he hasn’t forgotten his roots as a software engineer and enjoys writing cloud-ready code in C#, JavaScript and Python. Follow him on Twitter: @EugeneChuvyrov.

Thanks to the following Microsoft technical expert for reviewing this article: Bruno Terkaly
Bruno Terkaly is a principal software engineer at Microsoft with the objective of enabling development of industry-leading applications and services across devices. He’s responsible for driving the top cloud and mobile opportunities across the United States and beyond from a technology-enablement perspective. He helps partners bring their applications to market by providing architectural guidance and deep technical engagement during the ISV’s evaluation, development and deployment. Terkaly also works closely with the cloud and mobile engineering groups, providing feedback and influencing the roadmap.

Discuss this article in the MSDN Magazine forum