使用 Apache Spark MLlib 建置機器學習應用程式及分析資料集

了解如何使用 Apache Spark MLlib 建立機器學習應用程式。 應用程式會對開啟的數據集執行預測性分析。 從 Spark 的內建機器學習程式庫,此範例會透過羅吉斯迴歸使用「分類」

MLlib 為核心 Spark 程式庫,其提供許多可用於機器學習工作的工具,例如:

  • 分類
  • 迴歸
  • 叢集
  • 建立模型
  • 奇異值分解 (SVD) 和主體元件分析 (PCA)
  • 假設測試和計算範例統計資料

了解分類和羅吉斯迴歸

分類是常見的機器學習工作,是指將輸入資料依類別排序的程序。 這是以分類演算法指出如何為您所提供的輸入資料指派「標籤」的作業。 例如,您可以將機器學習演算法視為接受股票資訊作為輸入。 然後將股票分為兩種類別:您應該賣出和您應該保留的股票。

羅吉斯迴歸是您用於分類的演算法。 Spark 的羅吉斯迴歸 API 可用於二元分類,或用來將輸入資料歸類到兩個群組之一。 如需羅吉斯迴歸的詳細資訊,請參閱 Wikipedia

總而言之,羅吉斯迴歸的流程會產生「羅吉斯函式」。 使用此函式來預測輸入向量屬於一個群組或另一個群組的可能性。

食品檢查數據的預測性分析範例

在此範例中,您會使用 Spark 來執行一些食物檢查資料 (Food_Inspections1.csv) 的預測分析。 透過芝加哥市資料入口網站取得的資料。 此資料集包含有關在芝加哥執行的食物機構檢查資訊。 資訊包括每一個機構、發現的違規事項 (若有) 和檢查結果。 與叢集相關聯的儲存體帳戶已有 CSV 資料檔,此叢集位於 /HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv

在下列步驟中,您會開發模型,以查看食品檢查需要什麼才能通過或失敗。

建立 Apache Spark MLlib 機器學習應用程式

  1. 使用 PySpark 核心建立 Jupyter Notebook。 如需指示,請參閱建立 Jupyter Notebook 檔案

  2. 匯入此應用程式所需的類型。 複製以下程式碼並貼入空白儲存格,然後按 Shift + Enter

    from pyspark.ml import Pipeline
    from pyspark.ml.classification import LogisticRegression
    from pyspark.ml.feature import HashingTF, Tokenizer
    from pyspark.sql import Row
    from pyspark.sql.functions import UserDefinedFunction
    from pyspark.sql.types import *
    

    由於使用 PySpark 核心,因此不需要明確建立任何內容。 當您執行第一個程式碼儲存格時,系統會自動為您建立 Spark 和 Hive 內容。

建構輸入資料框架

使用 Spark 內容,將原始 CSV 資料提取到記憶體中作為非結構化文字。 然後使用 Python 的 CSV 程式庫來剖析每一行資料。

  1. 請執行下列幾行,透過匯入和剖析輸入資料來建立具有恢復功能的分散式資料集 (RDD)。

    def csvParse(s):
        import csv
        from io import StringIO
        sio = StringIO(s)
        value = next(csv.reader(sio))
        sio.close()
        return value
    
    inspections = sc.textFile('/HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections1.csv')\
                    .map(csvParse)
    
  2. 執行下列程式碼,可從 RDD 擷取一個資料列,因此您可以查看資料結構描述:

    inspections.take(1)
    

    輸出如下:

    [['413707',
        'LUNA PARK INC',
        'LUNA PARK  DAY CARE',
        '2049789',
        "Children's Services Facility",
        'Risk 1 (High)',
        '3250 W FOSTER AVE ',
        'CHICAGO',
        'IL',
        '60625',
        '09/21/2010',
        'License-Task Force',
        'Fail',
        '24. DISH WASHING FACILITIES: PROPERLY DESIGNED, CONSTRUCTED, MAINTAINED, INSTALLED, LOCATED AND OPERATED - Comments: All dishwashing machines must be of a type that complies with all requirements of the plumbing section of the Municipal Code of Chicago and Rules and Regulation of the Board of Health. OBSEVERD THE 3 COMPARTMENT SINK BACKING UP INTO THE 1ST AND 2ND COMPARTMENT WITH CLEAR WATER AND SLOWLY DRAINING OUT. INST NEED HAVE IT REPAIR. CITATION ISSUED, SERIOUS VIOLATION 7-38-030 H000062369-10 COURT DATE 10-28-10 TIME 1 P.M. ROOM 107 400 W. SURPERIOR. | 36. LIGHTING: REQUIRED MINIMUM FOOT-CANDLES OF LIGHT PROVIDED, FIXTURES SHIELDED - Comments: Shielding to protect against broken glass falling into food shall be provided for all artificial lighting sources in preparation, service, and display facilities. LIGHT SHIELD ARE MISSING UNDER HOOD OF  COOKING EQUIPMENT AND NEED TO REPLACE LIGHT UNDER UNIT. 4 LIGHTS ARE OUT IN THE REAR CHILDREN AREA,IN THE KINDERGARDEN CLASS ROOM. 2 LIGHT ARE OUT EAST REAR, LIGHT FRONT WEST ROOM. NEED TO REPLACE ALL LIGHT THAT ARE NOT WORKING. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned. MISSING CEILING TILES WITH STAINS IN WEST,EAST, IN FRONT AREA WEST, AND BY THE 15MOS AREA. NEED TO BE REPLACED. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair. SPLASH GUARDED ARE NEEDED BY THE EXPOSED HAND SINK IN THE KITCHEN AREA | 34. FLOORS: CONSTRUCTED PER CODE, CLEANED, GOOD REPAIR, COVING INSTALLED, DUST-LESS CLEANING METHODS USED - Comments: The floors shall be constructed per code, be smooth and easily cleaned, and be kept clean and in good repair. INST NEED TO ELEVATE ALL FOOD ITEMS 6INCH OFF THE FLOOR 6 INCH AWAY FORM WALL.  ',
        '41.97583445690982',
        '-87.7107455232781',
        '(41.97583445690982, -87.7107455232781)']]
    

    輸出可讓您了解輸入檔案的結構描述。 這包含了每一個機構的名稱和機構類型。 以及地址、檢查的資料,還有位置和其他。

  3. 執行下列程式碼來建立資料框架 (df) 和暫存資料表 (CountResults),其中具有一些可用於預測分析的資料行。 您可以使用 sqlContext 對結構化資料執行轉換。

    schema = StructType([
    StructField("id", IntegerType(), False),
    StructField("name", StringType(), False),
    StructField("results", StringType(), False),
    StructField("violations", StringType(), True)])
    
    df = spark.createDataFrame(inspections.map(lambda l: (int(l[0]), l[1], l[12], l[13])) , schema)
    df.registerTempTable('CountResults')
    

    資料框架中的四個相關資料行為識別碼名稱結果違規事項

  4. 執行下列程式碼,可取得一小部分的資料範例:

    df.show(5)
    

    輸出如下:

    +------+--------------------+-------+--------------------+
    |    id|                name|results|          violations|
    +------+--------------------+-------+--------------------+
    |413707|       LUNA PARK INC|   Fail|24. DISH WASHING ...|
    |391234|       CAFE SELMARIE|   Fail|2. FACILITIES TO ...|
    |413751|          MANCHU WOK|   Pass|33. FOOD AND NON-...|
    |413708|BENCHMARK HOSPITA...|   Pass|                    |
    |413722|           JJ BURGER|   Pass|                    |
    +------+--------------------+-------+--------------------+
    

了解資料

我們要著手了解資料集的內容為何。

  1. 執行下列程式碼,可顯示結果 (results) 資料行中的相異值:

    df.select('results').distinct().show()
    

    輸出如下:

    +--------------------+
    |             results|
    +--------------------+
    |                Fail|
    |Business Not Located|
    |                Pass|
    |  Pass w/ Conditions|
    |     Out of Business|
    +--------------------+
    
  2. 執行下列程式碼,可將這些結果的分佈視覺化:

    %%sql -o countResultsdf
    SELECT COUNT(results) AS cnt, results FROM CountResults GROUP BY results
    

    %%sql magic 後面緊接著 -o countResultsdf 可確保查詢的輸出會保存在 Jupyter 伺服器的本機上 (通常是叢集的前端節點)。 輸出會使用指定的名稱 countResultsdf ,當做 Pandas資料框架保存。 如需 %%sql magic 及 PySpark 核心提供之其他 magic 的詳細資訊,請參閱 使用 Apache Spark HDInsight 叢集之 Jupyter Notebook 上可用的核心

    輸出如下:

    SQL 查詢輸出。

  3. 您也可以使用 Matplotlib (用於建構資料視覺效果的程式庫) 建立繪圖。 因為必須從保存在本機上的 countResultsdf 資料框架建立繪圖,所以程式碼片段的開頭必須為 %%local magic。 這個動作可確保程式碼是在 Jupyter 伺服器的本機上執行。

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = countResultsdf['results']
    sizes = countResultsdf['cnt']
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    若要預測食物檢查結果,您需要根據違規事項來開發模型。 由於羅吉斯迴歸是一種二元分類方法,因此將結果資料分成兩個類別,是很合理的:不合格通過

    • 通過

      • 通過
      • 有條件通過
    • 失敗

      • 失敗
    • 捨棄

      • 找不到該業者
      • 已結束營業

      其他結果的資料 (「找不到該業者」或「已結束營業」) 並不是很有用處,而且這些資料在結果中所佔的比例非常小。

  4. 執行下列程式碼,可將現有資料框架 (df) 轉換為新的資料框架,其中每項檢查都以一組「標籤-違規」來表示。 在此案例中,0.0 標籤代表失敗,1.0 標籤代表成功,-1.0 標籤代表此二者以外的某種結果。

    def labelForResults(s):
        if s == 'Fail':
            return 0.0
        elif s == 'Pass w/ Conditions' or s == 'Pass':
            return 1.0
        else:
            return -1.0
    label = UserDefinedFunction(labelForResults, DoubleType())
    labeledData = df.select(label(df.results).alias('label'), df.violations).where('label >= 0')
    
  5. 執行下列程式碼,可顯示一個加上標籤的資料列:

    labeledData.take(1)
    

    輸出如下:

    [Row(label=0.0, violations=u"41. PREMISES MAINTAINED FREE OF LITTER, UNNECESSARY ARTICLES, CLEANING  EQUIPMENT PROPERLY STORED - Comments: All parts of the food establishment and all parts of the property used in connection with the operation of the establishment shall be kept neat and clean and should not produce any offensive odors.  REMOVE MATTRESS FROM SMALL DUMPSTER. | 35. WALLS, CEILINGS, ATTACHED EQUIPMENT CONSTRUCTED PER CODE: GOOD REPAIR, SURFACES CLEAN AND DUST-LESS CLEANING METHODS - Comments: The walls and ceilings shall be in good repair and easily cleaned.  REPAIR MISALIGNED DOORS AND DOOR NEAR ELEVATOR.  DETAIL CLEAN BLACK MOLD LIKE SUBSTANCE FROM WALLS BY BOTH DISH MACHINES.  REPAIR OR REMOVE BASEBOARD UNDER DISH MACHINE (LEFT REAR KITCHEN). SEAL ALL GAPS.  REPLACE MILK CRATES USED IN WALK IN COOLERS AND STORAGE AREAS WITH PROPER SHELVING AT LEAST 6' OFF THE FLOOR.  | 38. VENTILATION: ROOMS AND EQUIPMENT VENTED AS REQUIRED: PLUMBING: INSTALLED AND MAINTAINED - Comments: The flow of air discharged from kitchen fans shall always be through a duct to a point above the roofline.  REPAIR BROKEN VENTILATION IN MEN'S AND WOMEN'S WASHROOMS NEXT TO DINING AREA. | 32. FOOD AND NON-FOOD CONTACT SURFACES PROPERLY DESIGNED, CONSTRUCTED AND MAINTAINED - Comments: All food and non-food contact equipment and utensils shall be smooth, easily cleanable, and durable, and shall be in good repair.  REPAIR DAMAGED PLUG ON LEFT SIDE OF 2 COMPARTMENT SINK.  REPAIR SELF CLOSER ON BOTTOM LEFT DOOR OF 4 DOOR PREP UNIT NEXT TO OFFICE.")]
    

從輸入資料框架建立羅吉斯迴歸模型

最後一項工作是轉換加上標籤的資料。 將數據轉換成羅吉斯回歸所分析的格式。 輸入至羅吉斯迴歸演算法需要一組「標籤-特徵向量組」。 其中「特徵向量」是代表輸入點的數字向量。 因此,您需要轉換半結構化且包含許多自然語言註解的「違規事項」資料行。 將資料行轉換成機器可輕易辨識的實數陣列。

處理自然語言的標準機器學習方法之一是為每個不同的單字指派索引。 然後將向量傳遞至機器學習演算法。 如此一來,每個索引的值都會包含文字字串中該字組的相對頻率。

MLlib 可提供簡單的方法來執行此作業。 首先,將每個違規情事字串語彙基元化,以取得字串中的個別字。 然後,使用 HashingTF 將每組語彙基元轉換為特性向量,接著可以將它傳遞給羅吉斯迴歸演算法以構建模型。 您可以使用管線依序執行上述所有步驟。

tokenizer = Tokenizer(inputCol="violations", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(labeledData)

使用另一個資料集來評估模型

您可以使用您稍早建立的模型來 預測 新檢查的結果。 此預測是根據觀察到的違規事項。 您已在資料集 Food_Inspections1.csv 上訓練此模型。 您可以使用第二個資料集 Food_Inspections2.csv,評估此模型對於新資料的強度。 第二個資料集 (Food_Inspections2.csv) 在與叢集相關聯的預設儲存體容器中。

  1. 執行下列程式碼,可建立新的資料框架 predictionsDf,其中包含模型所產生的預測。 該程式碼片段也會根據資料框架,建立暫存資料表 Predictions

    testData = sc.textFile('wasbs:///HdiSamples/HdiSamples/FoodInspectionData/Food_Inspections2.csv')\
                .map(csvParse) \
                .map(lambda l: (int(l[0]), l[1], l[12], l[13]))
    testDf = spark.createDataFrame(testData, schema).where("results = 'Fail' OR results = 'Pass' OR results = 'Pass w/ Conditions'")
    predictionsDf = model.transform(testDf)
    predictionsDf.registerTempTable('Predictions')
    predictionsDf.columns
    

    您應該會看到如下文字的輸出:

    ['id',
        'name',
        'results',
        'violations',
        'words',
        'features',
        'rawPrediction',
        'probability',
        'prediction']
    
  2. 請看其中一個預測。 執行此程式碼片段:

    predictionsDf.take(1)
    

    針對測試資料集中的第一個項目有一個預測。

  3. model.transform() 方法會將相同的轉換套用至具有相同結構描述的任何新資料,並做出關於如何分類資料的預測。 您可以做一些簡單的統計,了解一下預測的外觀:

    numSuccesses = predictionsDf.where("""(prediction = 0 AND results = 'Fail') OR
                                            (prediction = 1 AND (results = 'Pass' OR
                                                                results = 'Pass w/ Conditions'))""").count()
    numInspections = predictionsDf.count()
    
    print ("There were", numInspections, "inspections and there were", numSuccesses, "successful predictions")
    print ("This is a", str((float(numSuccesses) / float(numInspections)) * 100) + "%", "success rate")
    

    輸出看起來會如同下列文字:

    There were 9315 inspections and there were 8087 successful predictions
    This is a 86.8169618894% success rate
    

    將羅吉斯迴歸搭配 Spark 使用,能用英文提供您違規事項之間關係的描述模型。 以及所指定企業的食物檢查會通過或失敗。

建立預測的視覺表示法

您現在可以建構最終的視覺效果,以利研判此測試的結果。

  1. 您可以從擷取稍早建立的 Predictions 暫存資料表中不同的預測和結果開始。 下列查詢會將輸出分隔為 true_positivefalse_positivetrue_negativefalse_negative。 在下面的查詢中,可以使用 -q 關閉視覺效果,並 (使用 -o) 將輸出儲存為可和 %%local magic 搭配使用的資料框架。

    %%sql -q -o true_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND results = 'Fail'
    
    %%sql -q -o false_positive
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 0 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
    %%sql -q -o true_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND results = 'Fail'
    
    %%sql -q -o false_negative
    SELECT count(*) AS cnt FROM Predictions WHERE prediction = 1 AND (results = 'Pass' OR results = 'Pass w/ Conditions')
    
  2. 最後,使用 Matplotlib用下列程式碼片段產生繪圖。

    %%local
    %matplotlib inline
    import matplotlib.pyplot as plt
    
    labels = ['True positive', 'False positive', 'True negative', 'False negative']
    sizes = [true_positive['cnt'], false_positive['cnt'], false_negative['cnt'], true_negative['cnt']]
    colors = ['turquoise', 'seagreen', 'mediumslateblue', 'palegreen', 'coral']
    plt.pie(sizes, labels=labels, autopct='%1.1f%%', colors=colors)
    plt.axis('equal')
    

    您應該會看見下列輸出:

    Spark 機器學習應用程式輸出 - 餅圖失敗食品檢查的百分比。

    在此圖中,「肯定」結果是指未通過的食品檢查,否定結果則是指通過的檢查。

關閉 Notebook

執行應用程式之後,您應該關閉筆記本以釋放資源。 若要這麼做,請從 Notebook 的 [檔案] 功能表中,選取 [關閉並終止]。 此動作會關機並且關閉 Notebook。

下一步