Κοινή χρήση μέσω


Δημιουργία μοντέλου εκμάθησης μηχανής με το Apache Spark MLlib

Σε αυτό το άρθρο, θα μάθετε πώς μπορείτε να χρησιμοποιήσετε το Apache Spark MLlib για να δημιουργήσετε μια εφαρμογή εκμάθησης μηχανής που κάνει απλή προγνωστική ανάλυση σε ένα ανοικτό σύνολο δεδομένων Azure. Το Spark παρέχει ενσωματωμένες βιβλιοθήκες εκμάθησης μηχανής. Αυτό το παράδειγμα χρησιμοποιεί ταξινόμηση μέσω λογιστικής παλινδρόμησης.

Τα SparkML και MLlib είναι βασικές βιβλιοθήκες Spark που παρέχουν πολλά βοηθητικά προγράμματα που είναι χρήσιμα για εργασίες εκμάθησης μηχανής, συμπεριλαμβανομένων βοηθητικών προγραμμάτων που είναι κατάλληλα για:

  • Ταξινόμηση
  • Παλινδρόμησης
  • Συμπλέγματος
  • Μοντελοποίηση θέματος
  • Αποδόμηση μοναδικής τιμής (SVD) και ανάλυση κύριων στοιχείων (PCA)
  • Δοκιμή υποθέσεων και υπολογισμός δειγμάτων στατιστικών στοιχείων

Κατανόηση της ταξινόμησης και της λογιστικής παλινδρόμησης

Η ταξινόμηση, μια δημοφιλής εργασία εκμάθησης μηχανής, είναι η διαδικασία ταξινόμησης δεδομένων εισόδου σε κατηγορίες. Είναι δουλειά ενός αλγόριθμου ταξινόμησης να καταλάβουμε πώς μπορείτε να αντιστοιχίσετε ετικέτες για την εισαγωγή δεδομένων που παρέχετε. Για παράδειγμα, μπορείτε να θεωρήσετε έναν αλγόριθμο εκμάθησης μηχανής που αποδέχεται πληροφορίες μετοχών ως είσοδο και διαιρεί το απόθεμα σε δύο κατηγορίες: μετοχές που θα πρέπει να πουλήσετε και μετοχές που θα πρέπει να διατηρήσετε.

Η λογιστική παλινδρόμηση είναι ένας αλγόριθμος που μπορείτε να χρησιμοποιήσετε για ταξινόμηση. Το API λογιστικής παλινδρόμησης του Spark είναι χρήσιμο για τη δυαδική ταξινόμηση ή την ταξινόμηση δεδομένων εισόδου σε μία από τις δύο ομάδες. Για περισσότερες πληροφορίες σχετικά με την λογιστική παλινδρόμηση, ανατρέξτε στην Wikipedia.

Συνοπτικά, η διαδικασία λογιστικής παλινδρόμησης παράγει μια λογιστική συνάρτηση που μπορείτε να χρησιμοποιήσετε για να προβλέψετε την πιθανότητα ότι ένα διάνυσμα εισόδου ανήκει σε μία ομάδα ή στην άλλη.

Παράδειγμα προγνωστικής ανάλυσης για δεδομένα ταξί NYC

Για να ξεκινήσετε, εγκαταστήστε azureml-opendatasetsτο . Τα δεδομένα είναι διαθέσιμα μέσω του Azure Open Datasets. Αυτό το υποσύνολο του συνόλου δεδομένων περιέχει πληροφορίες σχετικά με διαδρομές με κίτρινα ταξί, όπως την ώρα έναρξης και λήξης και τοποθεσίες, το κόστος και άλλα χαρακτηριστικά.

%pip install azureml-opendatasets

Στο υπόλοιπο άρθρο, θα χρησιμοποιήσουμε το Apache Spark για να εκτελέσουμε κάποια ανάλυση στα δεδομένα συμβουλής διαδρομής ταξί της Νέας Υόρκης και, στη συνέχεια, θα αναπτύξουμε ένα μοντέλο για να προβλέψουμε εάν μια συγκεκριμένη διαδρομή περιλαμβάνει μια συμβουλή ή όχι.

Δημιουργία μοντέλου εκμάθησης μηχανής Apache Spark

  1. Δημιουργήστε ένα σημειωματάριο PySpark. Για οδηγίες, ανατρέξτε στο θέμα Δημιουργία σημειωματάριου.

  2. Εισαγάγετε τους τύπους που απαιτούνται για αυτό το σημειωματάριο.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    
  3. Θα χρησιμοποιήσουμε τη MLflow για να παρακολουθούμε τα πειράματα εκμάθησης μηχανής και τις αντίστοιχες εκτελέσεις. Εάν είναι ενεργοποιημένη η αυτόματη καταγραφή microsoft Fabric, τα αντίστοιχα μετρικά και παράμετροι καταγράφονται αυτόματα.

    import mlflow
    

Κατασκευή του dataFrame εισόδου

Σε αυτό το παράδειγμα, θα φορτώσουμε τα δεδομένα σε ένα πλαίσιο δεδομένων Pandas και, στη συνέχεια, θα τα μετατρέψουμε σε ένα dataframe Apache Spark. Χρησιμοποιώντας αυτήν τη μορφή, μπορούμε να εφαρμόσουμε άλλες λειτουργίες Apache Spark για εκκαθάριση και φιλτράρισμα του συνόλου δεδομένων.

  1. Εκτελέστε τις ακόλουθες γραμμές για να δημιουργήσετε ένα Spark DataFrame επικολλώντας τον κώδικα σε ένα νέο κελί. Αυτό το βήμα ανακτά τα δεδομένα μέσω του API Ανοικτών συνόλων δεδομένων. Μπορούμε να φιλτράρουμε αυτά τα δεδομένα για να εξετάσουμε ένα συγκεκριμένο παράθυρο δεδομένων. Το παρακάτω παράδειγμα start_date κώδικα χρησιμοποιεί το και end_date για να εφαρμόσει ένα φίλτρο που επιστρέφει έναν μήνα δεδομένων.

    from azureml.opendatasets import NycTlcYellow
    
    end_date = parser.parse('2018-06-06')
    start_date = parser.parse('2018-05-01')
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    nyc_tlc_pd = nyc_tlc.to_pandas_dataframe()
    
    nyc_tlc_df = spark.createDataFrame(nyc_tlc_pd).repartition(20)
    
    
  2. Ο ακόλουθος κώδικας μειώνει το σύνολο δεδομένων σε περίπου 10.000 γραμμές. Για να επιταχύνουμε την ανάπτυξη και την εκπαίδευση, θα κάνουμε δειγματοληψία προς το παρόν του συνόλου δεδομένων μας.

    # To make development easier, faster, and less expensive, sample down for now
    sampled_taxi_df = nyc_tlc_df.sample(True, 0.001, seed=1234)
    
  3. Στη συνέχεια, θέλουμε να ρίξουμε μια ματιά στα δεδομένα μας χρησιμοποιώντας την ενσωματωμένη display() εντολή. Αυτό μας επιτρέπει να προβάλετε εύκολα ένα δείγμα των δεδομένων ή να εξερευνήσουμε τις τάσεις στα δεδομένα με γραφική απεικόνιση.

    #sampled_taxi_df.show(10)
    display(sampled_taxi_df.limit(10))    
    

Προετοιμασία των δεδομένων

Η προετοιμασία δεδομένων είναι ένα κρίσιμο βήμα στη διαδικασία εκμάθησης μηχανής. Περιλαμβάνει την εκκαθάριση, τον μετασχηματισμό και την οργάνωση ανεπεξέργαστων δεδομένων για να τα κάνετε κατάλληλα για ανάλυση και μοντελοποίηση. Στον παρακάτω κώδικα, εκτελείτε διάφορα βήματα προετοιμασίας δεδομένων:

  • Κατάργηση έκτοπων και λανθασμένων τιμών με φιλτράρισμα του συνόλου δεδομένων
  • Κατάργηση στηλών που δεν είναι απαραίτητες για την εκπαίδευση μοντέλου
  • Δημιουργία νέων στηλών από τα ανεπεξέργαστα δεδομένα
  • Δημιουργήστε μια ετικέτα για να προσδιορίσετε αν θα υπάρχει συμβουλή ή όχι για τη συγκεκριμένη διαδρομή με ταξί
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                        , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                        , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                        , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                        , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                        , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                        )\
                .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                        & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                        & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                        & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                        & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                        & (sampled_taxi_df.rateCodeId <= 5)
                        & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                        )

Στη συνέχεια, θα κάνουμε μια δεύτερη μεταβίβαση των δεδομένων για να προσθέσουμε τις τελικές δυνατότητες.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Δημιουργία μοντέλου λογιστικής παλινδρόμησης

Η τελική εργασία είναι να μετατρέψετε τα δεδομένα με ετικέτα σε μια μορφή που μπορεί να αναλυθεί μέσω λογιστικής παλινδρόμησης. Η είσοδος σε έναν αλγόριθμο λογιστικής παλινδρόμησης πρέπει να είναι ένα σύνολο ζευγών διανύσματος ετικέτας/δυνατότητας, όπου το διάνυσμα δυνατότητας είναι ένα διάνυσμα αριθμών που αντιπροσωπεύουν το σημείο εισόδου.

Επομένως, πρέπει να μετατρέψετε τις κατηγορικές στήλες σε αριθμούς. Συγκεκριμένα, πρέπει να μετατρέψετε τις trafficTimeBins στήλες και weekdayString σε ακέραιες αναπαραστάσεις. Υπάρχουν πολλές προσεγγίσεις για την εκτέλεση της μετατροπής. Το παρακάτω παράδειγμα χρησιμοποιεί την OneHotEncoder προσέγγιση.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Αυτή η ενέργεια έχει ως αποτέλεσμα ένα νέο DataFrame με όλες τις στήλες στη σωστή μορφή για την εκπαίδευση ενός μοντέλου.

Εκπαίδευση μοντέλου λογιστικής παλινδρόμησης

Η πρώτη εργασία είναι να διαιρέσετε το σύνολο δεδομένων σε ένα σύνολο εκπαίδευσης και ένα σύνολο δοκιμών ή επικύρωσης.

# Decide on the split between training and test data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Τώρα που υπάρχουν δύο DataFrame, η επόμενη εργασία είναι να δημιουργήσετε τον τύπο μοντέλου και να τον εκτελέσετε στο εκπαιδευτικό DataFrame. Στη συνέχεια, μπορείτε να κάνετε επικύρωση σε σχέση με το test dataFrame. Πειραματιστείτε με διαφορετικές εκδόσεις του τύπου μοντέλου για να δείτε την επίδραση διαφορετικών συνδυασμών.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

Η έξοδος από αυτό το κελί είναι:

Area under ROC = 0.9749430523917996

Δημιουργία οπτικής αναπαράστασης της πρόβλεψης

Τώρα, μπορείτε να δημιουργήσετε μια τελική απεικόνιση για να ερμηνεύσετε τα αποτελέσματα του μοντέλου. Μια καμπύλη ROC είναι ένας τρόπος για να εξετάσετε το αποτέλεσμα.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Graph that shows the ROC curve for logistic regression in the tip model.