Zelfstudie: Een machine learning-app bouwen met Apache Spark MLlib en Azure Synapse Analytics

In dit artikel leert u hoe u Apache Spark MLlib kunt gebruiken om een machine learning-toepassing te maken die eenvoudige voorspellende analyse uitvoert voor Azure Open Datasets. Spark biedt ingebouwde machine learning-bibliotheken. In dit voorbeeld wordt gebruikgemaakt van classificatie via logistieke regressie.

SparkML en MLlib zijn kernbibliotheken van Spark die veel hulpprogramma's bieden die nuttig zijn voor machine learning-taken, waaronder hulpprogramma's die geschikt zijn voor:

  • Classificatie
  • Regressie
  • Clustering
  • Modellering van onderwerpen
  • SVD (Singular Value Decomposition) en PCA (Principal Component Snalysis)
  • Hypothesen voor het testen en berekenen van voorbeeldstatistieken

Classificatie en logistieke regressie begrijpen

Classificatie, een populaire machine learning-taak, is het proces waarbij invoergegevens worden gesorteerd in categorieën. Het is de taak van een classificatie-algoritme om erachter te komen hoe labels kunnen worden toegewezen aan invoergegevens die u opgeeft. U kunt bijvoorbeeld een machine learning-algoritme bedenken dat aandeleninformatie accepteert als invoer en het aandeel opsplitst in twee categorieën: aandelen die u moet verkopen en aandelen die u moet behouden.

Logistieke regressie is een algoritme dat u kunt gebruiken voor classificatie. De logistieke regressie-API van Spark is handig voor binaire classificatie, of voor het classificeren van invoergegevens in één van twee groepen. Zie Wikipedia voor meer informatie over logistieke regressie.

Samengevat produceert het proces van logistieke regressie een logistieke functie die u kunt gebruiken om de waarschijnlijkheid te voorspellen dat een invoervector in de ene groep of de andere hoort.

Voorbeeld van voorspellende analyse over taxigegevens in NYC

In dit voorbeeld gebruikt u Spark om een voorspellende analyse uit te voeren op gegevens van taxirittips uit New York. De gegevens zijn beschikbaar via Azure Open Datasets. Deze subset van de gegevensset bevat informatie over taxiritten, waaronder informatie over elke rit, de begin- en eindtijd en locaties, de kosten, en andere interessante kenmerken.

Belangrijk

Er kunnen extra kosten gelden voor het ophalen van deze gegevens uit de opslaglocatie.

In de volgende stappen ontwikkelt u een model om te voorspellen of voor een bepaalde trip een fooi is betaald of niet.

Een Apache Spark-machine learning-model maken

  1. Maak een notebook met behulp van de PySpark-kernel. Zie Een notebook maken voor instructies.

  2. Importeer de typen die zijn vereist voor deze toepassing. Kopieer en plak de volgende code in een lege cel en druk op Shift+Enter. Of voer de cel uit met behulp van het blauwe afspeelpictogram links van de code.

    import matplotlib.pyplot as plt
    from datetime import datetime
    from dateutil import parser
    from pyspark.sql.functions import unix_timestamp, date_format, col, when
    from pyspark.ml import Pipeline
    from pyspark.ml import PipelineModel
    from pyspark.ml.feature import RFormula
    from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorIndexer
    from pyspark.ml.classification import LogisticRegression
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml.evaluation import BinaryClassificationEvaluator
    

    Vanwege de PySpark-kernel hoeft u niet expliciet contexten te maken. De Spark-context wordt automatisch voor u gemaakt wanneer u de eerste codecel uitvoert.

De invoer-DataFrame maken

Omdat de onbewerkte gegevens een Parquet-indeling hebben, kunt u de Spark-context gebruiken om het bestand rechtstreeks als een DataFrame in het geheugen op te halen. Hoewel de code in de volgende stappen gebruikmaakt van de standaardopties, is het mogelijk om toewijzing van gegevenstypen en andere schemakenmerken af te dwingen, indien nodig.

  1. Voer de volgende regels uit om een Spark DataFrame te maken door de code in een nieuwe cel te plakken. Met deze stap worden de gegevens opgehaald via de Open Datasets-API. Het ophalen van deze gegevens genereert bijna 1,5 miljard rijen.

    Afhankelijk van de grootte van uw serverloze Apache Spark-pool zijn de onbewerkte gegevens mogelijk te groot of duurt het te veel tijd om te werken. U kunt deze gegevens filteren naar een kleinere hoeveelheid. In het volgende codevoorbeeld wordt en end_date gebruikt start_date om een filter toe te passen dat één maand aan gegevens retourneert.

    from azureml.opendatasets import NycTlcYellow
    
    from datetime import datetime
    from dateutil import parser
    
    end_date = parser.parse('2018-05-08 00:00:00')
    start_date = parser.parse('2018-05-01 00:00:00')
    
    nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
    filtered_df = spark.createDataFrame(nyc_tlc.to_pandas_dataframe())
    
    
  2. Het nadeel van eenvoudig filteren is dat het, vanuit een statistisch perspectief, vooroordelen in de gegevens kan veroorzaken. Een andere aanpak is het gebruik van de steekproeven die zijn ingebouwd in Spark.

    Met de volgende code wordt de gegevensset teruggebracht tot ongeveer 2000 rijen, als deze wordt toegepast na de voorgaande code. U kunt deze steekproefstap gebruiken in plaats van het eenvoudige filter of in combinatie met het eenvoudige filter.

    # To make development easier, faster, and less expensive, downsample for now
    sampled_taxi_df = filtered_df.sample(True, 0.001, seed=1234)
    
  3. Het is nu mogelijk om de gegevens te bekijken om te zien wat er is gelezen. Het is normaal gesproken beter om gegevens te controleren met een subset in plaats van de volledige set, afhankelijk van de grootte van de gegevensset.

    De volgende code biedt twee manieren om de gegevens weer te geven. De eerste manier is eenvoudig. De tweede manier biedt een veel rijkere rasterervaring, samen met de mogelijkheid om de gegevens grafisch te visualiseren.

    #sampled_taxi_df.show(5)
    display(sampled_taxi_df)
    
  4. Afhankelijk van de grootte van de gegenereerde gegevensset en uw behoefte om te experimenteren of het notebook meerdere keren uit te voeren, wilt u de gegevensset mogelijk lokaal in de cache opslaan in de werkruimte. Er zijn drie manieren om expliciete caching uit te voeren:

    • Sla het DataFrame lokaal op als een bestand.
    • Sla het DataFrame op als een tijdelijke tabel of weergave.
    • Sla het DataFrame op als een permanente tabel.

De eerste twee van deze benaderingen zijn opgenomen in de volgende codevoorbeelden.

Het maken van een tijdelijke tabel of weergave biedt verschillende toegangspaden tot de gegevens, maar deze duurt alleen voor de duur van de sessie van het Spark-exemplaar.

sampled_taxi_df.createOrReplaceTempView("nytaxi")

De gegevens voorbereiden

De gegevens in de onbewerkte vorm zijn vaak niet geschikt om rechtstreeks aan een model door te geven. U moet een reeks acties uitvoeren op de gegevens om deze in een toestand te krijgen waarin het model deze kan gebruiken.

In de volgende code voert u vier klassen bewerkingen uit:

  • Het verwijderen van uitbijters of onjuiste waarden door filteren.
  • Onnodige kolommen verwijderen.
  • Het maken van nieuwe kolommen die zijn afgeleid van de onbewerkte gegevens om het model effectiever te laten werken. Deze bewerking wordt ook wel featurization genoemd.
  • Labeling. Omdat u binaire classificatie uitvoert (is er een fooi of niet op een bepaalde reis), moet u het fooibedrag converteren naar een waarde van 0 of 1.
taxi_df = sampled_taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'rateCodeId', 'passengerCount'\
                                , 'tripDistance', 'tpepPickupDateTime', 'tpepDropoffDateTime'\
                                , date_format('tpepPickupDateTime', 'hh').alias('pickupHour')\
                                , date_format('tpepPickupDateTime', 'EEEE').alias('weekdayString')\
                                , (unix_timestamp(col('tpepDropoffDateTime')) - unix_timestamp(col('tpepPickupDateTime'))).alias('tripTimeSecs')\
                                , (when(col('tipAmount') > 0, 1).otherwise(0)).alias('tipped')
                                )\
                        .filter((sampled_taxi_df.passengerCount > 0) & (sampled_taxi_df.passengerCount < 8)\
                                & (sampled_taxi_df.tipAmount >= 0) & (sampled_taxi_df.tipAmount <= 25)\
                                & (sampled_taxi_df.fareAmount >= 1) & (sampled_taxi_df.fareAmount <= 250)\
                                & (sampled_taxi_df.tipAmount < sampled_taxi_df.fareAmount)\
                                & (sampled_taxi_df.tripDistance > 0) & (sampled_taxi_df.tripDistance <= 100)\
                                & (sampled_taxi_df.rateCodeId <= 5)
                                & (sampled_taxi_df.paymentType.isin({"1", "2"}))
                                )

Vervolgens voert u een tweede doorgang over de gegevens om de uiteindelijke functies toe te voegen.

taxi_featurised_df = taxi_df.select('totalAmount', 'fareAmount', 'tipAmount', 'paymentType', 'passengerCount'\
                                                , 'tripDistance', 'weekdayString', 'pickupHour','tripTimeSecs','tipped'\
                                                , when((taxi_df.pickupHour <= 6) | (taxi_df.pickupHour >= 20),"Night")\
                                                .when((taxi_df.pickupHour >= 7) & (taxi_df.pickupHour <= 10), "AMRush")\
                                                .when((taxi_df.pickupHour >= 11) & (taxi_df.pickupHour <= 15), "Afternoon")\
                                                .when((taxi_df.pickupHour >= 16) & (taxi_df.pickupHour <= 19), "PMRush")\
                                                .otherwise(0).alias('trafficTimeBins')
                                              )\
                                       .filter((taxi_df.tripTimeSecs >= 30) & (taxi_df.tripTimeSecs <= 7200))

Een logistiek regressiemodel maken

De laatste taak bestaat uit het converteren van de gelabelde gegevens naar een indeling die kan worden geanalyseerd via logistieke regressie. De invoer voor een logistiek regressie-algoritme moet een set label-/functievectorparen zijn, waarbij de functievector een vector is van getallen die het invoerpunt vertegenwoordigen.

U moet de categorische kolommen dus converteren naar getallen. U moet met name de trafficTimeBins kolommen en weekdayString converteren naar weergaven met gehele getallen. Er zijn meerdere benaderingen voor het uitvoeren van de conversie. In het volgende voorbeeld wordt de OneHotEncoder methode gebruikt, die gebruikelijk is.

# Because the sample uses an algorithm that works only with numeric features, convert them so they can be consumed
sI1 = StringIndexer(inputCol="trafficTimeBins", outputCol="trafficTimeBinsIndex")
en1 = OneHotEncoder(dropLast=False, inputCol="trafficTimeBinsIndex", outputCol="trafficTimeBinsVec")
sI2 = StringIndexer(inputCol="weekdayString", outputCol="weekdayIndex")
en2 = OneHotEncoder(dropLast=False, inputCol="weekdayIndex", outputCol="weekdayVec")

# Create a new DataFrame that has had the encodings applied
encoded_final_df = Pipeline(stages=[sI1, en1, sI2, en2]).fit(taxi_featurised_df).transform(taxi_featurised_df)

Deze actie resulteert in een nieuw DataFrame met alle kolommen in de juiste indeling om een model te trainen.

Een logistiek regressiemodel trainen

De eerste taak is het splitsen van de gegevensset in een trainingsset en een test- of validatieset. De splitsing hier is willekeurig. Experimenteer met verschillende gesplitste instellingen om te zien of deze van invloed zijn op het model.

# Decide on the split between training and testing data from the DataFrame
trainingFraction = 0.7
testingFraction = (1-trainingFraction)
seed = 1234

# Split the DataFrame into test and training DataFrames
train_data_df, test_data_df = encoded_final_df.randomSplit([trainingFraction, testingFraction], seed=seed)

Nu er twee DataFrames zijn, is de volgende taak het maken van de modelformule en het uitvoeren ervan op basis van het DataFrame voor training. Vervolgens kunt u valideren op basis van het dataframe dat wordt getest. Experimenteer met verschillende versies van de modelformule om de impact van verschillende combinaties te zien.

Notitie

Als u het model wilt opslaan, wijst u de rol Inzender voor opslagblobgegevens toe aan het resourcebereik van de Azure SQL Database-server. Raadpleeg Azure-rollen toewijzen met de Azure Portal voor meer details. Alleen leden met eigenaarsbevoegdheden kunnen deze stap uitvoeren.

## Create a new logistic regression object for the model
logReg = LogisticRegression(maxIter=10, regParam=0.3, labelCol = 'tipped')

## The formula for the model
classFormula = RFormula(formula="tipped ~ pickupHour + weekdayVec + passengerCount + tripTimeSecs + tripDistance + fareAmount + paymentType+ trafficTimeBinsVec")

## Undertake training and create a logistic regression model
lrModel = Pipeline(stages=[classFormula, logReg]).fit(train_data_df)

## Saving the model is optional, but it's another form of inter-session cache
datestamp = datetime.now().strftime('%m-%d-%Y-%s')
fileName = "lrModel_" + datestamp
logRegDirfilename = fileName
lrModel.save(logRegDirfilename)

## Predict tip 1/0 (yes/no) on the test dataset; evaluation using area under ROC
predictions = lrModel.transform(test_data_df)
predictionAndLabels = predictions.select("label","prediction").rdd
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

De uitvoer van deze cel is:

Area under ROC = 0.9779470729751403

Een visuele weergave van de voorspelling maken

U kunt nu een definitieve visualisatie maken om u te helpen de testresultaten te beoordelen. Een ROC-curve is een manier om het resultaat te controleren.

## Plot the ROC curve; no need for pandas, because this uses the modelSummary object
modelSummary = lrModel.stages[-1].summary

plt.plot([0, 1], [0, 1], 'r--')
plt.plot(modelSummary.roc.select('FPR').collect(),
         modelSummary.roc.select('TPR').collect())
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.show()

Grafiek met de ROC-curve voor logistieke regressie in het tipmodel.

Het Spark-exemplaar afsluiten

Nadat u klaar bent met het uitvoeren van de toepassing, sluit u het notebook af om de resources vrij te geven door het tabblad te sluiten. Of selecteer Sessie beëindigen in het statusvenster onderaan het notitieblok.

Zie ook

Volgende stappen

Notitie

Sommige officiële Apache Spark-documentatie is afhankelijk van het gebruik van de Spark-console, die niet beschikbaar is in Apache Spark in Azure Synapse Analytics. Gebruik in plaats daarvan de ervaring met notebooks of IntelliJ.