2016 年 4 月

第 31 卷,第 4 期

本文章是由機器翻譯。

巨量資料 - Spark 上的資料處理與機器學習服務

Eugene Chuvyrov

以下是您一個問題 ︰ 架構,借用經常從 Microsoft 樹木專案中,名稱是什麼成為最受歡迎的開放原始碼專案的 2015年,而且也設定為資料處理的記錄,只要 23 分鐘排序 100 TB 的資料? 我的回答是: Apache Spark。

在本文中我將討論的速度和 Spark,以及為什麼不清除目前的優勝者巨量資料處理和分析空間中的熱門程度。使用 Microsoft Azure 訂閱,我將提供範例解決與 Spark 機器學習 (ML) 問題的逐步從軟體工程納入資料科學世界。但我深入探討資料分析和 ML 之前,務必說有關 Spark framework 的各種元件以及 Azure Spark 的關聯性的幾個字。

Spark 元件

Spark 架構的值是讓處理巨量資料的商品機器叢集上的工作負載。Spark Core 是引擎,可讓處理可能、 封裝資料的查詢,並順暢地將它們分散到叢集。除了 Spark Core 有數個其他元件 Spark 架構,而且每個這些元件適用於特定問題領域。很可能您就永遠不需要使用任何這些元件,如果您想要只在管理和報告對大型資料工作負載。不過,在本文中,我將使用 Spark MLLib 擴建相當精確地將讓您將 ML 模型 」 猜測 」 已撰寫以手動方式 (更多稍後) 的數字。Spark 架構的其他元件,讓串流資料處理 (Spark 串流),圖形的操作和著名的 PageRank 演算法 (GraphX) 運算,並執行 SQL 查詢中的分散式資料 (Spark SQL) 之上。

在 Azure 上執行 Spark

有不少選項中使用從受管理的服務在試驗 Spark, databricks.com (此公司建立並持續提升 Spark),佈建 Docker 容器和抓取預先安裝的 Spark 的映像從 Docker 中樞,從 GitHub 取得完整的來源的程式碼儲存機制 (github.com/apache/spark) 和建置您自己的產品。但是,因為這篇文章是關於 Azure 中,我想要告訴您如何在 Azure 上建立 Spark 叢集。這個選項是非常有趣的原因是因為 Azure 提供了企業等級的保證,對於部署到 Azure 的 Spark 計算叢集。Azure 提供 99.9 %microsoft 備份 SLA 的所有 Spark 叢集,而且提供 24x7 企業支援和監視叢集。這些保證,2016年期間結合的叢集部署以及一大堆宣告 Spark 和 Azure 建置會議讓 Microsoft 定域機組極佳的環境,為您巨量資料的工作。

Pixie 灰塵

立即讓 Spark 資料科學家之間這麼受歡迎的密碼是一體兩面 ︰ 很快速而且好置於該架構的程式。首先,讓我們看看有何 Spark 速度比其前方的架構。

Spark 的前身 Hadoop MapReduce 是巨量資料分析空間的主力自從 2005 Doug 剪下和 Mike Cafarella co-founded Apache Hadoop 專案。MapReduce 工具之前可以只在 Google 資料中心內,未完全關閉來源。Hadoop 特別適用於執行批次分析處理在叢集上,但它仍有極大的嚴格度。對應和縮減作業移在一起。您第一次完成對應的工作,然後在您完成減少工作。複雜的工作必須結合多個對應,並減少步驟。此外,每項工作必須能分解成減少作業的對應。長的時間,來執行這些循序作業但程式非常繁瑣。換句話說,這不是即時分析。

相反地,Spark 架構套用於智慧資料分析工作。它會建構導向非循環圖 (DAG) 的排程工作,非常類似於 SQL Server 如何建構查詢執行計畫,然後再執行資料擷取或操作作業之前執行。Dag 提供轉換將資料上執行的相關資訊,Spark 也能夠以智慧方式結合許多這些轉換成單一階段,並再一次執行轉換,了解最初是由 Microsoft Research 專案樹木中。

此外,Spark 也能夠聰明地保留透過稱為 「 彈性分散式資料集 (RDDs) 的建構記憶體中的資料,我會在稍後說明 — 並 Dag 之間共用程式碼。共用 Dag 可讓工作之間的資料比他們的最佳化不會有更快完成。[圖 1 顯示"hello world"的資料科學空間的 DAG ︰ 指定的文字檔案中的字數計數。請注意數個作業,也就讀取文字檔案、 flatMap 和對應,會結合成單一步驟中,允許更快地執行。下列程式碼會顯示實際的 Scala 程式碼 (因為 Spark 撰寫 Scala) 執行字數統計 (即使您從未看到一行 Scala 程式碼之前,我敢打賭,您會立即了解如何實作在 Spark 中的字數) ︰

val wordCounts = textFile.flatMap(line => line.split(" ")).map(word => (word,1))  
  .reduceByKey((a, b) => a + b)

導向非循環圖 (DAG) 的字數統計
[圖 1 導向非循環圖 (DAG) 的字數統計

Spark 是這麼受歡迎的第二個原因是因為其程式設計模型。實作 Spark (Scala 程式碼) 中的字數統計是實作 Hadoop MapReduce 字數統計比簡單多了。除了 Scala,您可以建立 Spark 應用程式中 Java 和 Python,也就是我在本文中使用的語言。之前 Spark 與 Hadoop MapReduce 資料科學家程式設計人員必須使用非自然結合的典範細分成對應的一組複雜的工作並減少作業。Spark,任何使用 LINQ 和 lambda 函式的.NET 開發人員熟悉功能性程式設計方法用來轉換和分析資料。

稍後您會看到如何輕鬆,但強大、 程式設計模型是 Spark。撰寫照樣的優異功能程式碼之前,但在大型與小型資料集,您需要建立分散式的機器會具有所有必要的元件的安裝並準備好接受您送出至它的程式設計工作的 Spark 叢集。建立 Spark 叢集是絕對令人怯步還用來建立及設定叢集自行;所幸,Microsoft 雲端可讓您完成佈建只需幾個點擊動作。下一節,我將示範如何只執行該動作。

部署 Spark 叢集

現在,讓我們在 Azure 上建立 HDInsight 叢集。"HDInsight"視為概括性術語,包括 Hadoop 與 Spark 技術。Hadoop HDInsight 和 Spark HDInsight 的兩個範例說明在 Azure 上受管理的巨量資料服務。

若要佈建 Spark 叢集,請登入 Azure 入口網站 (portal.azure.com),並點選 [新增] |資料 + 分析 |HDInsight |建立。填寫 [HDInsight 叢集的屬性,指定名稱、 叢集類型 = Spark 叢集作業系統設定為 Linux,(因為 Spark 開發在 Linux 上),並保留不變的 [版本] 欄位中所示 [圖 2。完成必要的資訊,包括指定的認證來登入叢集和儲存體帳戶/容器名稱的其餘部分。然後按 [建立] 按鈕。建立叢集的程序需要 15 到 30 分鐘的時間。

在 Azure 中建立 Spark 叢集
[圖 2 在 Azure 中建立 Spark 叢集

建立程序完成之後,您必須並排顯示代表新建立的 HDInsight 叢集在 Azure 入口網站。最後,您可以深入程式碼 ! 程式碼之前,不過,我們要檢閱的程式設計環境和使用 Spark 提供給您的語言。

有數種方式可以在 Spark 環境中的程式。首先,您可以存取透過 Spark 殼層,直覺的方式足夠 spark 殼層命令,說明在 bit.ly/1ON5Vy4, 、 where、 建立 Spark 叢集的前端節點的 SSH 工作階段之後, 您可以撰寫 Scala 程式 REPL 類似的方式和提交程式設計建構一次 (不擔心這一句聽範例以外國語言撰寫,就直接跳到選項 3)。第二,您可以在 Spark 執行完成調整應用程式 (透過提交 spark-submit 命令處予以說明 bit.ly/1fqgZHY)。最後,另外還有一個選項來使用 Jupyter notebook (jupyter.org) Spark 之上。如果您不熟悉 Jupyter 專案,Jupyter notebook 提供視覺化、 以 Web 為基礎的互動式環境中用來執行資料分析指令碼。這些 notebook 我的資料分析的慣用的方法,而且我相信,一旦您請試試看,它們將會有太程式設計上 Spark,慣用的方法。Azure HDInsight 安裝在叢集上的 Jupyter notebook 環境,讓您輕鬆地開始使用它。

若要存取 Jupyter notebook,按一下 [叢集儀表板] 磚中所示 [圖 3, ,然後按一下投影片外視窗上的 Jupyter notebook 磚。使用您在叢集建立期間指定的認證登入,您應該會看到 Jupyter 環境準備好接受新的或編輯舊的筆記。現在按一下右上角的 [新增] 按鈕,然後選取 [Python 2。為什麼 Python 2 嗎? 因為雖然 Spark 本身以 Scala 撰寫和許多程式設計 Spark 係以 Scala,另外還有 Python 橋接器可透過 Pyspark。順便一提,還有狂暴爭論是否以 Scala 或 Python 撰寫程式碼。每個語言與 Scala 正在可能更快,而 Python 可能是多個有明顯的優勢,表達,最常用於資料科學的語言 (請參閱 bit.ly/1WTSemP)。這可讓您使用表達能力,但是簡明的 Python 程式設計 Spark 叢集上方時。Python 也是我慣用的語言 (連同 R) 的資料分析和我可以使用的所有強大 Python 程式庫我習慣。

存取透過叢集儀表板的 Azure HDInsight 中的 Jupyter Notebook
[圖 3 存取透過叢集儀表板的 Azure HDInsight 中的 Jupyter Notebook

您最後準備將深入探索並執行 ML 和資料分析工作內 Jupyter notebook。

機器學習使用 Spark

為了說明 ML Spark 中的,我將使用的典型問題表單中的"微幅 」 的資料範例 ML 中 — 辨識手寫的數字,例如會出現在信封上郵遞區號的項目。雖然此資料集無法透過任何方式大,此解決方案的優點是時間的,資料應該會增加一個千位摺疊,您可以在叢集中增加更多電腦,仍然在合理內完成的資料分析。如下所示的程式碼的任何變更將會需要 — Spark 架構會負責散發叢集中的個別電腦的工作負載。您將使用的資料檔案也是一份古典 — 它經常被稱為 MNIST 資料集 —,其中包含 50000 手寫的數字,準備好要分析。雖然有許多地方線上取得 MNIST 資料集,但 Kaggle 網站可讓您方便存取的資料 (請參閱 bit.ly/1QJN20c)。

附帶一提,如果您不熟悉 kaggle.com, ,它能主控 ML 競賽,其中幾乎 500000 資料科學家從世界各地爭奪貨幣獎品或其中一個最上層的 ML 公司面試機會。五個 Kaggle 競賽中,我已完成,而且如果您是具競爭力的人,它是極 addictive 的經驗。和 Kaggle 站台本身在 Azure 上執行 !

讓我們花一點時間瞭解 train.csv 的內容。該檔案的每一行代表一個包含手寫的數字,例如中所示的 28 x 28 映像的像素 x 像素表示 [圖 4 (此圖顯示放大的表示法)。第一個資料行包含數字真的是什麼。其餘資料行包含像素的強度,從 0 到 255,所有的 784 像素 (28 x 28)。

放大的數字"7"表示 MNIST 資料集範例
[圖 4 放大"7"MNIST 資料集以表示數字的範例

開啟新的 Jupyter notebook 時,將下列程式碼貼到第一個資料格 ︰

from pyspark import SparkContext
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.tree import RandomForest
import time
sc = SparkContext(appName="MNISTDigitsDT")
#TODO: provide your own path to the train.csv in the line(s) below, 
# you can use Azure Storage 
#Explorer to upload files into the cloud and to read their full path
fileNameTrain = 'wasb://datasets@chuvyrov.blob.core.windows.net/trainingsample.csv'
fileNameTest = 'wasb://datasets@chuvyrov.blob.core.windows.net/validationsample.csv'
mnist_train = sc.textFile(fileNameTrain)
mnist_test = sc.textFile(fileNameTest)

此程式碼會執行在 Spark ML 必要的程式庫匯入,然後指定將用於定型和測試模型的資料檔案的位置 (請注意,這些檔案應該位於您的儲存體帳戶,透過從 Microsoft 定域機組中的 Spark wasb: / / 參考)。最後,最後兩行是 RDD 建立文字檔案的位置。RDDs 是 Spark 難 — 它們分散式程式設計人員/使用者通常隱藏的資料結構,但其實作的複雜性。此外,這些 RDDs 延遲評估,而且會保存,因此您需要使用該 RDD 一次,並立即毋需重新 computation/擷取。當您操作 RDDs 時,它會觸發 Dag 產生工作和執行階段式 Spark 叢集,如我稍早觸及時。

按 Shift + Enter 來執行您所貼上程式碼在 Jupyter 資料格中。沒消息應該是好消息 (如果您沒有收到錯誤訊息,您已經準備就緒),而且您現在應該已經 RDDs 提供用來查詢和操作。這些 RDDs 目前,包含以逗號分隔的文字行,因為 MNIST 資料是透過的方式。

接下來您要為幫助您將這幾行的文字轉換成自訂的 LabeledPoint 物件的簡單函式定義。需要的 ML 演算法,可訓練及進行預測,您將使用此物件。簡單的說,這個物件包含一組 「 功能 」 (有時候很方便的功能視為資料庫資料表中的資料行) 或單一資料點,以及為其 「 label 」 或值的相關特性嘗試學習來預測。這聽起來有點不清楚現在,或許查看 MNIST train.csv 檔案可能有助於。您會發現,train.csv 中的每一行會有其他所有資料行的第一個資料行和一組數字,從 0 到 255,數字。第一個資料行稱為 「 label 」,因為我們想要了解如何預測該數字。所有其他資料行都是 「 功能 」,是所有的功能稱為 「 特性向量 」。 這些功能是數字的圖片中的每個數位化像素的濃度,0 表示黑色和 255 是白色與許多值之間。圖片是所有 28 個像素高和 28 個像素寬,784 包含像素強度 train.csv 檔案 (28 x 28 = 784) 中的資料行所組成。

複製並貼入新的儲存格的 Jupyter notebook 中的下列函數 ︰

def parsePoint(line):
  #Parse a line of text into an MLlib LabeledPoint object
  values = line.split(',')
  values = [0 if e == '' else int(e) for e in values]
  return LabeledPoint(int(values[0]), values[1:])

按 Shift + Enter 來執行程式碼。現在您已經定義經 Spark parsePoint 函式,供您使用與您在中讀取的資料集。此函式會接受一行以逗號分隔的文字、 將它分割為個別的值並將這些值轉換成 LabeledPoint 物件。

接下來,您可以執行一些基本的資料清理準備學習演算法。不幸的是,學習演算法尚未聰明地了解資料的哪個部分有預測值。因此,在略過使用取自駭客 train.csv 檔標頭 stackoverflow.com; 然後,您所要列印結果 RDD 以確定它是在您預期它處於狀態的第一行 ︰

#skip header
header = mnist_train.first() #extract header
mnist_train = mnist_train.filter(lambda x:x !=header) 
#filter out header using a lambda
print mnist_train.first()

現在,您已準備好與下一節.map(parsePoint) 運算子,RDD 轉換格式可供在 Spark ML 演算法套用功能性程式設計方法。此轉換基本上會剖析 mnist_train RDD 內的每一行,而且該 RDD 轉換成一組 LabeledPoint 物件。

RDDs 和互動功能 ︰ 四大要件的 Spark 的能力

有幾個重要的問題。首先,您正在使用分散到叢集的機器 (RDD) 的資料結構,但分散式運算環境的複雜度幾乎完全看不見您。功能性轉換套用至 RDD 與 Spark 最佳化所有處理和重責大任,在叢集中可用的電腦在幕後為您 ︰

labeledPoints = mnist_train.map(parsePoint)
#Split the data into training and test sets (30% held out for testing)
(trainingData, testData) = labeledPoints.randomSplit([0.7, 0.3])
print mnist_train.first()

雖然最後一行 (print 陳述式) 可能是似乎微不足道,能夠以互動方式查詢大型資料集的功能極為強大,而且幾乎不存在之前 Spark 的大型資料集的世界。在資料科學和大型資料管理專案中,它將是非常有用的技巧,以確認套用的轉換您的想法確實套用。這個功能強大的互動式處理還透過其他巨量資料處理架構 Spark 的另一個優點。

也請注意,資料分割成定型和測試資料集使用 randomSplit 函式。沒有建立 trainingData RDD 中使用這些資料將 ML 模型和測試使用 testData 中的資料模型的概念 RDD,您會看到程式碼中短時間內。

您現在準備好將 ML 演算法套用至分散式的資料集,您只要建立 (mnist_train)。快速檢閱,記住 ML 問題幾乎在所有情況下有兩個不同的集合,會發生的步驟 ︰ 首先,您使用定型模型已知的資料集與已知的結論。第二,您讓您建立,或在第一個步驟中所學的模型進行預測。下列程式碼中,您使用 Spark 機器學習架構 (Spark MLLib) 中可用的 RandomForest 演算法來定型模型。RandomForest 屬於 Spark MLLib 內提供數個分散式演算法,它是其中最強大的其中一個。將下列內容貼到新的儲存格 ︰

depthLevel = 4
treeLevel = 3
#start timer
start_time = time.time()
#this is building a model using the Random Forest algorithm from Spark MLLib
model = RandomForest.trainClassifier(trainingData, numClasses=10, 
  categoricalFeaturesInfo={},
  numTrees=treeLevel, featureSubsetStrategy="auto",
  impurity='gini', maxDepth=depthLevel, maxBins=32) 
print("Training time --- %s seconds ---" % (time.time() - start_time))

請注意,此程式碼如何開始測量執行時間的演算法,然後設定部分所需的參數 RandomForest 演算法,也就是 maxDepth 和 numTrees 的初始值。按 Shift + Enter,以執行該程式碼。您可能會想知道 RandomForest 什麼,以及如何運作? RandomForest 是,在極高的層級,其運作方式隨機選取的變數上分割決策樹來建構資料的許多決策樹的 ML 演算法 (也就是一個樹狀結構可能一樣簡單,「 如果右下角的像素是白色的就可能 No.2 」),然後後輪詢建構的樹狀結構最後的決定。幸運的是,沒有已演算法的分散式的版本可供您使用 spark。不過,不會阻止您撰寫您自己的演算法,如果您想這麼做;分散式的 k 最近鄰近項目 (kNN) 演算法仍然不存在於 Spark 架構。

現在,回到 MNIST 數字辨識工作。如果您有類似探勘的環境,您應該取得定型大約 21 秒演算法的執行時間。這表示,在 21 秒,您已了解 — 使用 RandomForest 演算法,可用來預測您看見提供的功能已分析的數字的模型。您現在可以準備 ML 工作的最重要的部分 — 進行預測以模型為基礎所建立。此外,就也可以評估這些預測的精確度所示 [圖 5

[圖 5 評估您預測的精確度

# Evaluate model on test instances and compute test error
1 #start timer
2 start_time = time.time()
3 #make predictions using the Machine Learning created prior
4 predictions = model.predict(testData.map(lambda x: x.features))
5 #validate predictions using the training set
6 labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
7 testErr = labelsAndPredictions.filter(lambda (v, p): v != p).count() /
8   float(testData.count())
9 print('Test Error = ' + str(testErr))
10 print("Prediction time --- %s seconds ---" % (time.time() - start_time))
11 #print('Learned classification tree model:')
12 #print(model.toDebugString())

請注意 model.predict 上建構的第 4 行 [圖 5。這是讓實際的預測您稍早建立的模型為基礎的那一行。程式碼行上進行預測 (行 5-7) 之後, 您使用暫存的策略與相關的一些基本的資料操作 — 透過 zip 函式 — 您預測的值,可供您下載的一部分的實際值。然後,您只需計算此資料的正確預測的百分比,並列印執行時間。

過高的錯誤與此初始分類的結果是稍微令人不安 (也就是您模型運作? 根本錯誤率接近 43%)。您可以改善使用稱為 「 方格 hyperparameter 搜尋 」 建立的模型,它就能立即,最後會不斷收斂 hyperparameter 值,讓您獲得最佳的整體效能測試時,會嘗試一系列的值的概念模型。換句話說,試著一堆系統化的實驗,來判斷哪些模型參數有最佳的預測值。

您將會套用超方格搜尋會 numTrees 和 maxDepth;貼上程式碼所示 [圖 6 到 notebook 中新的儲存格。

[圖 6 反覆最佳參數 RandomForest 演算法在 Spark 中的 「 方格搜尋 」

1 bestModel = None
2 bestTestErr = 100
3 #Define a range of hyperparameters to try
4 maxDepths = range(4,10)
5 maxTrees = range(3,10)
6
7 #Loop over parameters for depth and tree level(s)
8 for depthLevel in maxDepths:
9 for treeLevel in maxTrees:
10       
11   #start timer
12   start_time = time.time()
13   #Train RandomForest machine learning classifier
14   model = RandomForest.trainClassifier(trainingData,
15     numClasses=10, categoricalFeaturesInfo={},
16     numTrees=treeLevel, featureSubsetStrategy="auto",
17     impurity='gini', maxDepth=depthLevel, maxBins=32)       
18              
19   #Make predictions using the model created above
20   predictions = model.predict(testData.map(lambda x: x.features))
21   #Join predictions with actual values from the data and determine the error rate
22   labelsAndPredictions = testData.map(lambda lp: lp.label).zip(predictions)
23   testErr = labelsAndPredictions.filter(lambda (v, p): v != p)
24     .count() / float(testData.count())
25       
26   #Print information about the model as we proceed with each iteration of the loop
27   print ('\maxDepth = {0:.1f}, trees = {1:.1f}: trainErr = {2:.5f}'
28          .format(depthLevel, treeLevel, testErr))
29   print("Prediction time --- %s seconds ---" % (time.time() - start_time))
30   if (testErr < bestTestErr):
31       bestModel = model
32       bestTestErr = testErr
33           
34 print ('Best Test Error: = {0:.3f}\n'.format(bestTestErr))

請注意如何行 8 到 14 掃描透過一組 numTrees 參數,10,3 隨機樹系演算法建立模型,並評估其效能。接下來,在 30-32 您擷取模型,如果它能提供更好的結果,其中任何一個先前您嘗試過的模型比或否則關閉模型。提供此迴圈一些時間來執行。在執行結束時,您應該會看到不超過 10%的預測錯誤值。

總結

當我著手寫這篇文章時,我的主要目標是透過範例顯示使用 Spark,尤其是程式設計是多麼如果你的功能性程式設計與 Azure。我第二個目標是要示範如何執行 ML 工作上的資料集大型和小型 Spark MLLib 程式庫的協助。過程中,我想說明為什麼 Spark 執行速度比分散式資料共用的瑣事如何抵達,我們是現今在分散式的資料分析空間的位元。

Microsoft 大量投資未來的巨量資料,ML、 分析和明確地說,Spark。這是正確的時間,若要了解這些技術,以充分發揮超大規模一計算和資料分析 Microsoft 雲端所提供的機會。Azure 會取得要延展至大型資料集,所有備份服務等級的保證,您可以預期只能從最佳的企業雲端提供者所使用 Spark 快速、 輕鬆且準備好。

既然您已建立 ML 模型,並在已知的資料進行預測,您也可以進行預測的資料不包含其真正的標籤。也就是在 test.csv 檔案從 Kaggle.com。您接著可以提交給 Kaggle.com 做為該平台上的數字辨識器競爭的一部分。所有的本文中的程式碼,以及撰寫提交檔案的程式碼將會位於 GitHub.com/echuvyrov/SparkOnAzure。我很希望能了解您所取得的分數。電子郵件給我的問題、 意見、 建議和 ML 成就,在 eugene.chuvyrov@microsoft.com


Eugene Chuvyrov是定域機組解決方案架構設計人員在 Microsoft 技術推廣和開發小組他可協助公司 San Francisco 灣區中的充分利用超小數位數,並提供 Microsoft 定域機組。雖然他目前著重於高階資料合作夥伴,他還擔任軟體工程師忘記他的根項目以外,而 C#、 JavaScript 和 Python 中撰寫可供定域機組的程式碼。在 Twitter 上追蹤他 ︰ @EugeneChuvyrov

感謝以下的微軟技術專家對本文的審閱: Bruno Terkaly
Bruno Terkaly 是 Microsoft 的主要軟體工程師其目的是為了啟用跨裝置開發領先業界的應用程式和服務。他負責推動前定域機組和行動裝置的機會在美國及技術啟用觀點的進階功能。他負責協助市場他們的應用程式將藉由提供架構指導方針和 ISV 的評估、 開發和部署期間的深入技術參與的夥伴。Terkaly 也與密切的雲端和行動的工程群組,提供意見反應,並且影響藍圖。