分享方式:


使用 Apache Spark MLlib 建置機器學習模型

在本文中,您將瞭解如何使用 Apache Spark MLlib 建立機器學習應用程式,以在 Azure 開放數據集上進行簡單的預測性分析。 Spark 提供內建的機器學習連結庫。 此範例會透過羅吉斯回歸使用 分類

SparkML 和 MLlib 是核心 Spark 連結庫,可提供許多適用於機器學習工作的公用程式,包括適合下列專案的公用程式:

  • 分類
  • 迴歸
  • 叢集
  • 主題模型化
  • 單一值分解 (SVD) 和主體元件分析 (PCA)
  • 假設測試和計算樣本統計數據

了解分類和羅吉斯回歸

分類是熱門的機器學習工作,是將輸入數據排序為類別的程式。 分類演算法的工作是瞭解如何將標籤指派給您提供的輸入數據。 例如,您可以將接受股票資訊的機器學習演算法視為輸入,並將股票分成兩個類別:您應該出售的股票,以及您應該保留的股票。

羅吉斯回歸 是一種演算法,可用於分類。 Spark 的羅吉斯回歸 API 適用於 二元分類,或將輸入數據分類為兩個群組的其中一個。 如需羅吉斯回歸的詳細資訊,請參閱 維琪百科

總而言之,羅吉斯回歸的程式會產生 羅吉斯函 式,可用來預測輸入向量屬於某個群組或其他群組的機率。

NYC 計程車數據的預測性分析範例

若要開始使用,請安裝 azureml-opendatasets。 數據可透過 Azure 開放資料集取得。 此數據集子集包含黃色計程車車程的相關信息,包括開始和結束時間和位置、成本和其他屬性。

%pip install azureml-opendatasets

在本文中的其餘部分,我們將使用 Apache Spark 對 NYC 計程車車程小費數據執行一些分析,然後開發模型來預測特定車程是否包含小費。

建立 Apache Spark 機器學習模型

  1. 建立 PySpark 筆記本。 如需指示,請參閱 建立筆記本

  2. 匯入此筆記本所需的類型。

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. 我們將使用 MLflow 來追蹤機器學習實驗和對應的執行。 如果已啟用 Microsoft Fabric 自動記錄功能,則會自動擷取對應的計量和參數。

    import mlflow
    

建構輸入DataFrame

在此範例中,我們會將數據載入 Pandas 數據框架,然後將它轉換成 Apache Spark 數據框架。 使用此格式,我們可以套用其他 Apache Spark 作業來清除和篩選數據集。

  1. 執行下列幾行,將程式代碼貼入新單元格,以建立Spark DataFrame。 此步驟會透過開放式數據集 API 擷取數據。 我們可以篩選此數據,查看特定數據視窗。 下列程式代碼範例會使用 start_dateend_date 來套用傳回單一月份數據的篩選。

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. 下列程式代碼會將數據集縮減為大約 10,000 個數據列。 為了加快開發和定型,我們現在會取樣數據集。

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. 接下來,我們想要使用內 display() 建命令來查看數據。 這可讓我們輕鬆地檢視數據的範例,或以圖形方式探索數據的趨勢。

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

準備資料

數據準備是機器學習程式中的重要步驟。 它牽涉到清理、轉換及組織原始數據,使其適合分析和模型化。 在下列程式代碼中,您會執行數個數據準備步驟:

  • 藉由篩選數據集來移除極端值和不正確的值
  • 拿掉模型定型不需要的數據行
  • 從原始數據建立新的數據行
  • 產生標籤,以判斷指定的計程車車程是否有小費
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

然後,我們將進行第二次傳遞數據,以新增最終功能。

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

建立羅吉斯迴歸模型

最後一項工作是將標記的數據轉換成可透過羅吉斯回歸分析的格式。 羅吉斯回歸演算法的輸入必須是一組卷標/特徵向量,其中特徵向量是代表輸入點的數位向量

因此,您需要將類別數據行轉換成數位。 具體而言,您必須將 和 weekdayString 資料行轉換成trafficTimeBins整數表示法。 執行轉換的方法有很多種。 下列範例會 OneHotEncoder 採用方法。

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

此動作會產生具有正確格式之所有數據行的新 DataFrame,以定型模型。

定型羅吉斯回歸模型

第一項工作是將數據集分割成定型集和測試或驗證集。

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

現在有兩個 DataFrame,下一個工作是建立模型公式,並針對定型 DataFrame 執行它。 然後,您可以針對測試數據框架進行驗證。 試驗不同版本的模型公式,以查看不同組合的影響。

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

此儲存格輸出為:

Area under ROC = 0.9749430523917996

建立預測的視覺表示法

您現在可以建構最終視覺效果來解譯模型結果。 ROC 曲線是檢閱結果的一種方式。

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.