Datagranskning och modellering med Spark

Lär dig hur du använder HDInsight Spark för att träna maskininlärningsmodeller för förutsägelser om taxipriser med Spark MLlib.

Det här exemplet visar de olika stegen i Team Data Science Process. En delmängd av datamängden new york taxi trip och fare 2013 används för att läsa in, utforska och förbereda data. Sedan tränas användning av Spark MLlib, binär klassificering och regressionsmodeller för att förutsäga om ett tips kommer att betalas för resan och uppskatta tipsbeloppet.

Förutsättningar

Du behöver ett Azure-konto och ett Spark 1.6(eller Spark 2.0) HDInsight-kluster för att slutföra den här genomgången. Se Översikt över datavetenskap med Spark på Azure HDInsight anvisningar om hur du uppfyller dessa krav. Avsnittet innehåller också en beskrivning av de taxidata från NEWC 2013 som används här och instruktioner om hur du kör kod från en Jupyter Notebook i Spark-klustret.

Spark-kluster och notebook-datorer

Installationssteg och kod finns i den här genomgången för att använda HDInsight Spark 1.6. Men Jupyter Notebooks tillhandahålls för både HDInsight Spark 1.6- och Spark 2.0-kluster. En beskrivning av notebook-filer och länkar till dem finns i Readme.md för GitHub-lagringsplatsen som innehåller dem. Dessutom är koden här och i de länkade notebook-anteckningsböckerna generisk och bör fungera i alla Spark-kluster. Om du inte använder HDInsight Spark kan stegen för klusterkonfiguration och -hantering vara något annorlunda än vad som visas här. För enkelhetens skull finns här länkarna till Jupyter Notebooks för Spark 1.6 (som ska köras i pySpark-kerneln på Jupyter Notebook-servern) och Spark 2.0 (som ska köras i pySpark3-kerneln på Jupyter Notebook-servern):

  • Notebook-datorer för Spark 1.6:Innehåller information om hur du utför datagranskning, modellering och bedömning med flera olika algoritmer.
  • Notebook-datorer för Spark 2.0:Innehåller information om hur du utför regressions- och klassificeringsuppgifter. Datauppsättningar kan variera, men stegen och begreppen gäller för olika datamängder.

Varning

Faktureringen för HDInsight-kluster är prorated per minut, oavsett om du använder dem eller inte. Se till att ta bort klustret när du har använt det. Se hur du tar bort ett HDInsight-kluster.

Anteckning

Beskrivningarna nedan är relaterade till användningen av Spark 1.6. För Spark 2.0-versioner använder du notebook-filarna som beskrivs och länkas ovan.

Installation

Spark kan läsa och skriva till Azure Storage Blob (även kallat WASB). Så alla befintliga data som lagras där kan bearbetas med Spark och resultaten lagras igen i WASB.

Om du vill spara modeller eller filer i WASB måste sökvägen anges korrekt. Standardcontainern som är kopplad till Spark-klustret kan refereras med hjälp av en sökväg som börjar med: "wasb:///". Andra platser refereras till av "wasb://".

Ange katalogsökvägar för lagringsplatser i WASB

Följande kodexempel anger platsen för de data som ska läsas och sökvägen till modellens lagringskatalog som modellutdata sparas till:

# SET PATHS TO FILE LOCATIONS: DATA AND MODEL STORAGE

# LOCATION OF TRAINING DATA
taxi_train_file_loc = "wasb://mllibwalkthroughs@cdspsparksamples.blob.core.windows.net/Data/NYCTaxi/JoinedTaxiTripFare.Point1Pct.Train.tsv";

# SET THE MODEL STORAGE DIRECTORY PATH 
# NOTE THAT THE FINAL BACKSLASH IN THE PATH IS NEEDED.
modelDir = "wasb:///user/remoteuser/NYCTaxi/Models/" 

Importera bibliotek

Du måste också importera nödvändiga bibliotek för att konfigurera. Ange Spark-kontext och importera nödvändiga bibliotek med följande kod:

# IMPORT LIBRARIES
import pyspark
from pyspark import SparkConf
from pyspark import SparkContext
from pyspark.sql import SQLContext
import matplotlib
import matplotlib.pyplot as plt
from pyspark.sql import Row
from pyspark.sql.functions import UserDefinedFunction
from pyspark.sql.types import *
import atexit
from numpy import array
import numpy as np
import datetime

Förinställd Spark-kontext och PySpark-magi

PySpark-kernels som medföljer Jupyter Notebooks har en förinställd kontext. Därför behöver du inte ange Spark- eller Hive-kontexterna explicit innan du börjar arbeta med det program som du utvecklar. De här kontexterna är tillgängliga som standard. Dessa kontexter är:

  • sc – för Spark
  • sqlContext – för Hive

PySpark-kerneln innehåller vissa fördefinierade "magics", som är särskilda kommandon som du kan anropa med %%. Det finns två sådana kommandon som används i dessa kodexempel.

  • %%local Anger att koden på efterföljande rader ska köras lokalt. Koden måste vara giltig Python-kod.
  • %%sql -o <variable name> Kör en Hive-fråga mot sqlContext. Om parametern -o skickas bevaras resultatet av frågan i den %%lokala Python-kontexten som en Pandas DataFrame.

Mer information om Jupyter Notebook-kärnor och fördefinierade "magics" finns i Kernels available for Jupyter notebooks with HDInsight Spark Linux clusters on HDInsight(Kernlar som är tillgängliga för Jupyter Notebooks med HDInsight Spark Linux-kluster i HDInsight).

Läsa in data

Det första steget i data science-processen är att mata in data som ska analyseras från källor där finns i din datautforsknings- och modelleringsmiljö. Miljön är Spark i den här genomgången. Det här avsnittet innehåller koden för att slutföra en serie uppgifter:

  • mata in det dataexempel som ska modelleras
  • läsa i indatauppsättningen (lagras som en .tsv-fil)
  • formatera och rensa data
  • skapa och cachelagra objekt (RDD:er eller dataramar) i minnet
  • registrera den som en temp-table i SQL-kontext.

Här är koden för datainmatning.

# INGEST DATA

# RECORD START TIME
timestart = datetime.datetime.now()

# IMPORT FILE FROM PUBLIC BLOB
taxi_train_file = sc.textFile(taxi_train_file_loc)

# GET SCHEMA OF THE FILE FROM HEADER
schema_string = taxi_train_file.first()
fields = [StructField(field_name, StringType(), True) for field_name in schema_string.split('\t')]
fields[7].dataType = IntegerType() #Pickup hour
fields[8].dataType = IntegerType() # Pickup week
fields[9].dataType = IntegerType() # Weekday
fields[10].dataType = IntegerType() # Passenger count
fields[11].dataType = FloatType() # Trip time in secs
fields[12].dataType = FloatType() # Trip distance
fields[19].dataType = FloatType() # Fare amount
fields[20].dataType = FloatType() # Surcharge
fields[21].dataType = FloatType() # Mta_tax
fields[22].dataType = FloatType() # Tip amount
fields[23].dataType = FloatType() # Tolls amount
fields[24].dataType = FloatType() # Total amount
fields[25].dataType = IntegerType() # Tipped or not
fields[26].dataType = IntegerType() # Tip class
taxi_schema = StructType(fields)

# PARSE FIELDS AND CONVERT DATA TYPE FOR SOME FIELDS
taxi_header = taxi_train_file.filter(lambda l: "medallion" in l)
taxi_temp = taxi_train_file.subtract(taxi_header).map(lambda k: k.split("\t"))\
        .map(lambda p: (p[0],p[1],p[2],p[3],p[4],p[5],p[6],int(p[7]),int(p[8]),int(p[9]),int(p[10]),
                        float(p[11]),float(p[12]),p[13],p[14],p[15],p[16],p[17],p[18],float(p[19]),
                        float(p[20]),float(p[21]),float(p[22]),float(p[23]),float(p[24]),int(p[25]),int(p[26])))


# CREATE DATA FRAME
taxi_train_df = sqlContext.createDataFrame(taxi_temp, taxi_schema)

# CREATE A CLEANED DATA-FRAME BY DROPPING SOME UN-NECESSARY COLUMNS & FILTERING FOR UNDESIRED VALUES OR OUTLIERS
taxi_df_train_cleaned = taxi_train_df.drop('medallion').drop('hack_license').drop('store_and_fwd_flag').drop('pickup_datetime')\
    .drop('dropoff_datetime').drop('pickup_longitude').drop('pickup_latitude').drop('dropoff_latitude')\
    .drop('dropoff_longitude').drop('tip_class').drop('total_amount').drop('tolls_amount').drop('mta_tax')\
    .drop('direct_distance').drop('surcharge')\
    .filter("passenger_count > 0 and passenger_count < 8 AND payment_type in ('CSH', 'CRD') AND tip_amount >= 0 AND tip_amount < 30 AND fare_amount >= 1 AND fare_amount < 150 AND trip_distance > 0 AND trip_distance < 100 AND trip_time_in_secs > 30 AND trip_time_in_secs < 7200" )


# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
taxi_df_train_cleaned.cache()
taxi_df_train_cleaned.count()

# REGISTER DATA-FRAME AS A TEMP-TABLE IN SQL-CONTEXT
taxi_df_train_cleaned.registerTempTable("taxi_train")

# PRINT HOW MUCH TIME IT TOOK TO RUN THE CELL
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

Produktionen:

Tid det tar att köra ovanstående cell: 51,72 sekunder

Utforska data

När data har förts in i Spark är nästa steg i data science-processen att få en djupare förståelse för data genom utforskning och visualisering. I det här avsnittet undersöker vi taxidata med hjälp av SQL-frågor och ritar ut målvariablerna och potentiella funktioner för visuell kontroll. Mer specifikt visar vi frekvensen för antalet passagerare i taxiresor, frekvensen för tipsbelopp och hur tipsen varierar beroende på betalningsbelopp och typ.

Rita ett histogram med frekvenser för antal passagerare i exemplet på taxiresor

Den här koden och efterföljande kodfragment använder SQL-magic för att fråga exemplet och lokal magic för att rita ut data.

  • SQL Magic ( %%sql ) HDInsight PySpark-kerneln stöder enkla infogade HiveQL-frågor mot sqlContext. Argumentet (-o VARIABLE_NAME) bevarar utdata från SQL-frågan som en Pandas DataFrame på Jupyter-servern. Den här inställningen gör utdata tillgängliga i det lokala läget.
  • Magic %%local används för att köra kod lokalt på Jupyter-servern, vilket är huvudnoden för HDInsight-klustret. Normalt använder du %%local magic tillsammans med magic med %%sql parametern -o. Parametern -o beständiga utdata från SQL-frågan lokalt och sedan utlöser %%local magic nästa uppsättning kodfragment som ska köras lokalt mot utdata från SQL-frågor som finns kvar lokalt

Utdata visualiseras automatiskt när du har kört koden.

Den här frågan hämtar resorna efter antalet passagerare.

# PLOT FREQUENCY OF PASSENGER COUNTS IN TAXI TRIPS

# HIVEQL QUERY AGAINST THE sqlContext
%%sql -q -o sqlResults
SELECT passenger_count, COUNT(*) as trip_counts 
FROM taxi_train 
WHERE passenger_count > 0 and passenger_count < 7 
GROUP BY passenger_count 

Den här koden skapar en lokal dataram från frågeutdata och ritar ut data. Magic %%local skapar en lokal dataram, , som kan användas för att rita med sqlResults matplotlib.

Anteckning

PySpark-magin används flera gånger i den här genomgången. Om mängden data är stor bör du prova att skapa en dataram som får plats i det lokala minnet.

#CREATE LOCAL DATA-FRAME AND USE FOR MATPLOTLIB PLOTTING

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# USE THE JUPYTER AUTO-PLOTTING FEATURE TO CREATE INTERACTIVE FIGURES. 
# CLICK ON THE TYPE OF PLOT TO BE GENERATED (E.G. LINE, AREA, BAR ETC.)
sqlResults

Här är koden för att rita resorna efter antal passagerare

# PLOT PASSENGER NUMBER VS. TRIP COUNTS
%%local
import matplotlib.pyplot as plt
%matplotlib inline

x_labels = sqlResults['passenger_count'].values
fig = sqlResults[['trip_counts']].plot(kind='bar', facecolor='lightblue')
fig.set_xticklabels(x_labels)
fig.set_title('Counts of trips by passenger count')
fig.set_xlabel('Passenger count in trips')
fig.set_ylabel('Trip counts')
plt.show()

Produktionen:

Resefrekvens efter antal passagerare

Du kan välja mellan flera olika typer av visualiseringar (Tabell, Cirkel, Linje, Område eller Stapel) med hjälp av menyknapparna Typ i anteckningsboken. Stapeldiagram visas här.

Rita ett histogram med tipsbelopp och hur tipsmängden varierar beroende på antal passagerare och biljettbelopp.

Använd en SQL-fråga för att exempeldata.

# PLOT HISTOGRAM OF TIP AMOUNTS AND VARIATION BY PASSENGER COUNT AND PAYMENT TYPE

# HIVEQL QUERY AGAINST THE sqlContext
%%sql -q -o sqlResults
SELECT fare_amount, passenger_count, tip_amount, tipped 
FROM taxi_train 
WHERE passenger_count > 0 
AND passenger_count < 7 
AND fare_amount > 0 
AND fare_amount < 200 
AND payment_type in ('CSH', 'CRD') 
AND tip_amount > 0 
AND tip_amount < 25

Den här kodcellen använder SQL-frågan för att skapa tre diagram över data.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER
%%local

# HISTOGRAM OF TIP AMOUNTS AND PASSENGER COUNT
ax1 = sqlResults[['tip_amount']].plot(kind='hist', bins=25, facecolor='lightblue')
ax1.set_title('Tip amount distribution')
ax1.set_xlabel('Tip Amount ($)')
ax1.set_ylabel('Counts')
plt.suptitle('')
plt.show()

# TIP BY PASSENGER COUNT
ax2 = sqlResults.boxplot(column=['tip_amount'], by=['passenger_count'])
ax2.set_title('Tip amount by Passenger count')
ax2.set_xlabel('Passenger count')
ax2.set_ylabel('Tip Amount ($)')
plt.suptitle('')
plt.show()

# TIP AMOUNT BY FARE AMOUNT, POINTS ARE SCALED BY PASSENGER COUNT
ax = sqlResults.plot(kind='scatter', x= 'fare_amount', y = 'tip_amount', c='blue', alpha = 0.10, s=5*(sqlResults.passenger_count))
ax.set_title('Tip amount by Fare amount')
ax.set_xlabel('Fare Amount ($)')
ax.set_ylabel('Tip Amount ($)')
plt.axis([-2, 100, -2, 20])
plt.show()

Produktionen:

Tipsbeloppsfördelning

Tipsbelopp efter antal passagerare

Tipsbelopp efter prisbelopp

Förbereda data

Det här avsnittet beskriver och tillhandahåller koden för procedurer som används för att förbereda data för användning i ML-modellering. Den visar hur du utför följande uppgifter:

  • Skapa en ny funktion genom att använda flera timmar i trafiktids bucketar
  • Indexera och koda kategoriska funktioner
  • Skapa märkta punktobjekt för indata i ML-funktioner
  • Skapa en slumpmässig delexempel av data och dela upp dem i tränings- och testuppsättningar
  • Funktionsskalning
  • Cacheobjekt i minnet

Skapa en ny funktion genom att använda flera timmar i trafiktids bucketar

Den här koden visar hur du skapar en ny funktion genom att lagra timmar i trafiktids bucketar och sedan hur du cachelagrar den resulterande dataramen i minnet. Om elastiska distribuerade datauppsättningar (RDD:er) och dataramar används upprepade gånger leder cachelagring till förbättrade körningstider. Därför cachelagrar vi RDD:er och dataramar i flera steg i genomgången.

# CREATE FOUR BUCKETS FOR TRAFFIC TIMES
sqlStatement = """
    SELECT *,
    CASE
     WHEN (pickup_hour <= 6 OR pickup_hour >= 20) THEN "Night" 
     WHEN (pickup_hour >= 7 AND pickup_hour <= 10) THEN "AMRush" 
     WHEN (pickup_hour >= 11 AND pickup_hour <= 15) THEN "Afternoon"
     WHEN (pickup_hour >= 16 AND pickup_hour <= 19) THEN "PMRush"
    END as TrafficTimeBins
    FROM taxi_train 
"""
taxi_df_train_with_newFeatures = sqlContext.sql(sqlStatement)

# CACHE DATA-FRAME IN MEMORY & MATERIALIZE DF IN MEMORY
# THE .COUNT() GOES THROUGH THE ENTIRE DATA-FRAME,
# MATERIALIZES IT IN MEMORY, AND GIVES THE COUNT OF ROWS.
taxi_df_train_with_newFeatures.cache()
taxi_df_train_with_newFeatures.count()

Produktionen:

126050

Indexera och koda kategoriska funktioner för indata till modelleringsfunktioner

Det här avsnittet visar hur du indexerar eller kodar kategoriska funktioner för indata till modelleringsfunktionerna. Modellerings- och förutsägelsefunktionerna i MLlib kräver att funktioner med kategoriska indata indexeras eller kodas innan de används. Beroende på modell måste du indexera eller koda dem på olika sätt:

  • Trädbaserad modellering kräver att kategorier kodas som numeriska värden (till exempel kan en funktion med tre kategorier kodas med 0, 1, 2). Den här algoritmen tillhandahålls av Funktionen StringIndexer i MLlib. Den här funktionen kodar en strängkolumn med etiketter till en kolumn med etikettindex som sorteras efter etikettfrekvenser. Även om de indexeras med numeriska värden för indata- och datahantering kan de trädbaserade algoritmerna anges för att behandla dem korrekt som kategorier.
  • Modeller för logistisk och linjär regression kräver one-hot-kodning, där till exempel en funktion med tre kategorier kan utökas till tre egenskapskolumner, där var och en innehåller 0 eller 1 beroende på observationens kategori. MLlib tillhandahåller en OneHotEncoder-funktion för one-hot-kodning. Den här kodaren mappar en kolumn med etikettindex till en kolumn med binära vektorer, med högst ett enda värde. Den här kodningen gör att algoritmer som förväntar sig numeriska värdefunktioner, till exempel logistisk regression, kan tillämpas på kategoriska funktioner.

Här är koden för att indexera och koda kategoriska funktioner:

# INDEX AND ENCODE CATEGORICAL FEATURES

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES    
from pyspark.ml.feature import OneHotEncoder, StringIndexer, VectorAssembler, VectorIndexer

# INDEX AND ENCODE VENDOR_ID
stringIndexer = StringIndexer(inputCol="vendor_id", outputCol="vendorIndex")
model = stringIndexer.fit(taxi_df_train_with_newFeatures) # Input data-frame is the cleaned one from above
indexed = model.transform(taxi_df_train_with_newFeatures)
encoder = OneHotEncoder(dropLast=False, inputCol="vendorIndex", outputCol="vendorVec")
encoded1 = encoder.transform(indexed)

# INDEX AND ENCODE RATE_CODE
stringIndexer = StringIndexer(inputCol="rate_code", outputCol="rateIndex")
model = stringIndexer.fit(encoded1)
indexed = model.transform(encoded1)
encoder = OneHotEncoder(dropLast=False, inputCol="rateIndex", outputCol="rateVec")
encoded2 = encoder.transform(indexed)

# INDEX AND ENCODE PAYMENT_TYPE
stringIndexer = StringIndexer(inputCol="payment_type", outputCol="paymentIndex")
model = stringIndexer.fit(encoded2)
indexed = model.transform(encoded2)
encoder = OneHotEncoder(dropLast=False, inputCol="paymentIndex", outputCol="paymentVec")
encoded3 = encoder.transform(indexed)

# INDEX AND TRAFFIC TIME BINS
stringIndexer = StringIndexer(inputCol="TrafficTimeBins", outputCol="TrafficTimeBinsIndex")
model = stringIndexer.fit(encoded3)
indexed = model.transform(encoded3)
encoder = OneHotEncoder(dropLast=False, inputCol="TrafficTimeBinsIndex", outputCol="TrafficTimeBinsVec")
encodedFinal = encoder.transform(indexed)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Tid det tar att köra ovanstående cell: 1,28 sekunder

Skapa märkta punktobjekt för indata i ML-funktioner

Det här avsnittet innehåller kod som visar hur du indexerar kategoriska textdata som en etiketterad punktdatatyp och kodar dem så att de kan användas för att träna och testa MLlib Logistic Regression och andra klassificeringsmodeller. Märkta punktobjekt är RESILIENT Distributed Datasets (RDD) formaterade på ett sätt som behövs som indata av de flesta ML-algoritmer i MLlib. En märkt punkt är en lokal vektor, antingen kompakt eller gles, associerad med en etikett/ett svar.

Det här avsnittet innehåller kod som visar hur du indexerar kategoriska textdata som en märkt punktdatatyp och kodar dem så att de kan användas för att träna och testa MLlib Logistic Regression och andra klassificeringsmodeller. Märkta punktobjekt är RESILIENT Distributed Datasets (RDD) som består av en etikett (mål-/svarsvariabel) och funktionsvektor. Det här formatet krävs som indata av många ML-algoritmer i MLlib.

Här är koden för att indexera och koda textfunktioner för binär klassificering.

# FUNCTIONS FOR BINARY CLASSIFICATION

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from numpy import array

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingBinary(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex,
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO LOGISTIC REGRESSION MODELS
def parseRowOneHotBinary(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tipped, features)
    return  labPt

Här är koden för att koda och indexera kategoriska textfunktioner för linjär regressionsanalys.

# FUNCTIONS FOR REGRESSION WITH TIP AMOUNT AS TARGET VARIABLE

# ONE-HOT ENCODING OF CATEGORICAL TEXT FEATURES FOR INPUT INTO TREE-BASED MODELS
def parseRowIndexingRegression(line):
    features = np.array([line.paymentIndex, line.vendorIndex, line.rateIndex, line.TrafficTimeBinsIndex, 
                         line.pickup_hour, line.weekday, line.passenger_count, line.trip_time_in_secs, 
                         line.trip_distance, line.fare_amount])

    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

# INDEXING CATEGORICAL TEXT FEATURES FOR INPUT INTO LINEAR REGRESSION MODELS
def parseRowOneHotRegression(line):
    features = np.concatenate((np.array([line.pickup_hour, line.weekday, line.passenger_count,
                                        line.trip_time_in_secs, line.trip_distance, line.fare_amount]), 
                                        line.vendorVec.toArray(), line.rateVec.toArray(), 
                                        line.paymentVec.toArray(), line.TrafficTimeBinsVec.toArray()), axis=0)
    labPt = LabeledPoint(line.tip_amount, features)
    return  labPt

Skapa en slumpmässig delexempel av data och dela upp dem i tränings- och testuppsättningar

Den här koden skapar en slumpmässig sampling av data (25 % används här). Även om det inte krävs för det här exemplet på grund av datauppsättningens storlek, visar vi hur du kan ta ett exempel här så att du vet hur du använder den för ditt eget problem när det behövs. När exemplen är stora kan sampling spara betydande tid under träningen av modeller. Därefter delade vi upp exemplet i en träningsdel (75 % här) och en testdel (25 % här) som ska användas i klassificerings- och regressionsmodellering.

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.sql.functions import rand

# SPECIFY SAMPLING AND SPLITTING FRACTIONS
samplingFraction = 0.25;
trainingFraction = 0.75; testingFraction = (1-trainingFraction);
seed = 1234;
encodedFinalSampled = encodedFinal.sample(False, samplingFraction, seed=seed)

# SPLIT SAMPLED DATA-FRAME INTO TRAIN/TEST
# INCLUDE RAND COLUMN FOR CREATING CROSS-VALIDATION FOLDS (FOR USE LATER IN AN ADVANCED TOPIC)
dfTmpRand = encodedFinalSampled.select("*", rand(0).alias("rand"));
trainData, testData = dfTmpRand.randomSplit([trainingFraction, testingFraction], seed=seed);

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary = trainData.map(parseRowIndexingBinary)
indexedTESTbinary = testData.map(parseRowIndexingBinary)
oneHotTRAINbinary = trainData.map(parseRowOneHotBinary)
oneHotTESTbinary = testData.map(parseRowOneHotBinary)

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg = trainData.map(parseRowIndexingRegression)
indexedTESTreg = testData.map(parseRowIndexingRegression)
oneHotTRAINreg = trainData.map(parseRowOneHotRegression)
oneHotTESTreg = testData.map(parseRowOneHotRegression)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Tid det tar att köra ovanstående cell: 0,24 sekunder

Funktionsskalning

Funktionsskalning, som även kallas för data normalisering, garanterar att funktioner med ofta använda värden inte får en överdriven viktning i målfunktionen. Koden för funktionsskalning använder StandardScaler för att skala funktionerna till enhetsavvikelse. Den tillhandahålls av MLlib för användning i linjär regression med Stochastic Gradient Descent (SGD), en populär algoritm för träning av en mängd andra maskininlärningsmodeller, till exempel regulariserade regressioner eller stödvektormaskiner (SVM).

Anteckning

Vi har upptäckt att LinearRegressionWithSGD-algoritmen är känslig för funktionsskalning.

Här är koden för att skala variabler för användning med den regulariserade linjära SGD-algoritmen.

# FEATURE SCALING

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.regression import LabeledPoint
from pyspark.mllib.linalg import Vectors
from pyspark.mllib.feature import StandardScaler, StandardScalerModel
from pyspark.mllib.util import MLUtils

# SCALE VARIABLES FOR REGULARIZED LINEAR SGD ALGORITHM
label = oneHotTRAINreg.map(lambda x: x.label)
features = oneHotTRAINreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTRAINregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

label = oneHotTESTreg.map(lambda x: x.label)
features = oneHotTESTreg.map(lambda x: x.features)
scaler = StandardScaler(withMean=False, withStd=True).fit(features)
dataTMP = label.zip(scaler.transform(features.map(lambda x: Vectors.dense(x.toArray()))))
oneHotTESTregScaled = dataTMP.map(lambda x: LabeledPoint(x[0], x[1]))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Tid det tar att köra ovanstående cell: 13,17 sekunder

Cacheobjekt i minnet

Den tid det tar att träna och testa ML-algoritmer kan minskas genom att cachelagra de indataramobjekt som används för klassificering, regression och skalade funktioner.

# RECORD START TIME
timestart = datetime.datetime.now()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.cache()
indexedTESTbinary.cache()
oneHotTRAINbinary.cache()
oneHotTESTbinary.cache()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.cache()
indexedTESTreg.cache()
oneHotTRAINreg.cache()
oneHotTESTreg.cache()

# SCALED FEATURES
oneHotTRAINregScaled.cache()
oneHotTESTregScaled.cache()

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Tid det tar att köra ovanstående cell: 0,15 sekunder

Träna en binär klassificeringsmodell

Det här avsnittet visar hur du använder tre modeller för den binära klassificeringsuppgiften för att förutsäga om ett tips betalas för en taxiresa. De modeller som visas är:

  • Regulariserad logistisk regression
  • Slumpmässig skogsmodell
  • Gradient Boost Trees

Varje kodavsnitt för modellskapande delas upp i steg:

  1. Modellträningsdata med en parameteruppsättning
  2. Modellutvärdering på en testdatauppsättning med mått
  3. Spara modellen i blob för framtida användning

Klassificering med logistisk regression

Koden i det här avsnittet visar hur du tränar, utvärderar och sparar en logistisk regressionsmodell med LBFGS som förutsäger om ett tips betalas för en resa i new york-taxiresan och prisdatamängden.

Träna logistisk regressionsmodell med CV och genomsökning av hyperparametrar

# LOGISTIC REGRESSION CLASSIFICATION WITH CV AND HYPERPARAMETER SWEEPING

# GET ACCURACY FOR HYPERPARAMETERS BASED ON CROSS-VALIDATION IN TRAINING DATA-SET

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.classification import LogisticRegressionWithLBFGS 
from sklearn.metrics import roc_curve,auc
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics


# CREATE MODEL WITH ONE SET OF PARAMETERS
logitModel = LogisticRegressionWithLBFGS.train(oneHotTRAINbinary, iterations=20, initialWeights=None, 
                                               regParam=0.01, regType='l2', intercept=True, corrections=10, 
                                               tolerance=0.0001, validateData=True, numClasses=2)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(logitModel.weights))
print("Intercept: " + str(logitModel.intercept))

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Koefficienter: [0,0082065285375, -0,0223675576104, -0.0183812028036, -3.48124578069e-05, -0.00247646947233, -0.00165897881503, 0.0675394837328, -0.111823113101, -0.324609912762, -0.204549780032, -1.36499216354, 0.591088507921, -0.664263411392, -1.00439726852, 3.46567827545, -3.. 51025855172, -0.0471341112232, -0.04352183294, 0.000243375810385, 0.054518719222]

Skärningspunkt: -0.0111216486893

Tid det tar att köra ovanstående cell: 14,43 sekunder

Utvärdera den binära klassificeringsmodellen med standardmått

#EVALUATE LOGISTIC REGRESSION MODEL WITH LBFGS

# RECORD START TIME
timestart = datetime.datetime.now()

# PREDICT ON TEST DATA WITH MODEL
predictionAndLabels = oneHotTESTbinary.map(lambda lp: (float(logitModel.predict(lp.features)), lp.label))

# INSTANTIATE METRICS OBJECT
metrics = BinaryClassificationMetrics(predictionAndLabels)

# AREA UNDER PRECISION-RECALL CURVE
print("Area under PR = %s" % metrics.areaUnderPR)

# AREA UNDER ROC CURVE
print("Area under ROC = %s" % metrics.areaUnderROC)
metrics = MulticlassMetrics(predictionAndLabels)

# OVERALL STATISTICS
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)


## SAVE MODEL WITH DATE-STAMP
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
logisticregressionfilename = "LogisticRegressionWithLBFGS_" + datestamp;
dirfilename = modelDir + logisticregressionfilename;
logitModel.save(sc, dirfilename);

# OUTPUT PROBABILITIES AND REGISTER TEMP TABLE
logitModel.clearThreshold(); # This clears threshold for classification (0.5) and outputs probabilities
predictionAndLabelsDF = predictionAndLabels.toDF()
predictionAndLabelsDF.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds";

Produktionen:

Område under PR = 0,985297691373

Område under ROC = 0,983714670256

Sammanfattningsstatistik

Precision = 0,984304060189

Recall = 0.984304060189

F1-poäng = 0,984304060189

Tid det tar att köra ovanstående cell: 57,61 sekunder

Rita ROC-kurvan.

PredictionAndLabelsDF registreras som en tabell, tmp_results, i föregående cell. tmp_results kan användas för att köra frågor och mata ut resultat till sqlResults-dataramen för plottning. Här är koden.

# QUERY RESULTS                              
%%sql -q -o sqlResults
SELECT * from tmp_results

Här är koden för att göra förutsägelser och rita ROC-kurvan.

# MAKE PREDICTIONS AND PLOT ROC-CURVE

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
from sklearn.metrics import roc_curve,auc

# MAKE PREDICTIONS
predictions_pddf = test_predictions.rename(columns={'_1': 'probability', '_2': 'label'})
prob = predictions_pddf["probability"] 
fpr, tpr, thresholds = roc_curve(predictions_pddf['label'], prob, pos_label=1);
roc_auc = auc(fpr, tpr)

# PLOT ROC CURVE
plt.figure(figsize=(5,5))
plt.plot(fpr, tpr, label='ROC curve (area = %0.2f)' % roc_auc)
plt.plot([0, 1], [0, 1], 'k--')
plt.xlim([0.0, 1.0])
plt.ylim([0.0, 1.05])
plt.xlabel('False Positive Rate')
plt.ylabel('True Positive Rate')
plt.title('ROC Curve')
plt.legend(loc="lower right")
plt.show()

Produktionen:

ROC-curve.png

Klassificering av slumpmässig skog

Koden i det här avsnittet visar hur du tränar, utvärderar och sparar en slumpmässig skogsmodell som förutsäger om ett tips betalas för en resa i datamängden taxi och fare i New York.

#PREDICT WHETHER A TIP IS PAID OR NOT USING RANDOM FOREST

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import BinaryClassificationMetrics
from pyspark.mllib.evaluation import MulticlassMetrics

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

# TRAIN RANDOMFOREST MODEL
rfModel = RandomForest.trainClassifier(indexedTRAINbinary, numClasses=2, 
                                       categoricalFeaturesInfo=categoricalFeaturesInfo,
                                       numTrees=25, featureSubsetStrategy="auto",
                                       impurity='gini', maxDepth=5, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRINT TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = rfModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfclassificationfilename = "RandomForestClassification_" + datestamp;
dirfilename = modelDir + rfclassificationfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Område under ROC = 0,985297691373

Tid det tar att köra ovanstående cell: 31,09 sekunder

Klassificering av gradienter som ökar träd

Koden i det här avsnittet visar hur du tränar, utvärderar och sparar en modell för gradientförstärkarträd som förutsäger om ett tips betalas för en resa i datamängden taxi och fare i NEWC.

#PREDICT WHETHER A TIP IS PAID OR NOT USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel

# SPECIFY NUMBER OF CATEGORIES FOR CATEGORICAL FEATURES. FEATURE #0 HAS 2 CATEGORIES, FEATURE #2 HAS 2 CATEGORIES, AND SO ON
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}

gbtModel = GradientBoostedTrees.trainClassifier(indexedTRAINbinary, categoricalFeaturesInfo=categoricalFeaturesInfo, numIterations=5)
## UNCOMMENT IF YOU WANT TO PRINT TREE DETAILS
#print('Learned classification GBT model:')
#print(bgtModel.toDebugString())

# PREDICT ON TEST DATA AND EVALUATE
predictions = gbtModel.predict(indexedTESTbinary.map(lambda x: x.features))
predictionAndLabels = indexedTESTbinary.map(lambda lp: lp.label).zip(predictions)

# AREA UNDER ROC CURVE
metrics = BinaryClassificationMetrics(predictionAndLabels)
print("Area under ROC = %s" % metrics.areaUnderROC)

# PERSIST MODEL IN A BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btclassificationfilename = "GradientBoostingTreeClassification_" + datestamp;
dirfilename = modelDir + btclassificationfilename;

gbtModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Område under ROC = 0,985297691373

Tid det tar att köra ovanstående cell: 19,76 sekunder

Träna en regressionsmodell

Det här avsnittet visar hur du använder tre modeller för regressionsuppgiften att förutsäga mängden tips som betalas för en taxiresa baserat på andra tipsfunktioner. De modeller som visas är:

  • Regulariserad linjär regression
  • Slumpmässig skog
  • Gradient Boost Trees

Dessa modeller beskrevs i introduktionen. Varje kodavsnitt för modellskapande delas upp i steg:

  1. Modellträningsdata med en parameteruppsättning
  2. Modellutvärdering på en testdatauppsättning med mått
  3. Spara modellen i blob för framtida användning

Linjär regression med SGD

Koden i det här avsnittet visar hur du använder skalade funktioner för att träna en linjär regression som använder SGD (Stochastic Gradient Descent) för optimering och hur du poängar, utvärderar och sparar modellen i Azure Blob Storage (WASB).

Tips

I vår erfarenhet kan det finnas problem med konvergensen i LinearRegressionWithSGD-modeller, och parametrar måste ändras/optimeras noggrant för att få en giltig modell. Skalning av variabler bidrar avsevärt till konvergens.

#PREDICT TIP AMOUNTS USING LINEAR REGRESSION WITH SGD

# RECORD START TIME
timestart = datetime.datetime.now()

# LOAD LIBRARIES
from pyspark.mllib.regression import LabeledPoint, LinearRegressionWithSGD, LinearRegressionModel
from pyspark.mllib.evaluation import RegressionMetrics
from scipy import stats

# USE SCALED FEATURES TO TRAIN MODEL
linearModel = LinearRegressionWithSGD.train(oneHotTRAINregScaled, iterations=100, step = 0.1, regType='l2', regParam=0.1, intercept = True)

# PRINT COEFFICIENTS AND INTERCEPT OF THE MODEL
# NOTE: There are 20 coefficient terms for the 10 features, 
#       and the different categories for features: vendorVec (2), rateVec, paymentVec (6), TrafficTimeBinsVec (4)
print("Coefficients: " + str(linearModel.weights))
print("Intercept: " + str(linearModel.intercept))

# SCORE ON SCALED TEST DATA-SET & EVALUATE
predictionAndLabels = oneHotTESTregScaled.map(lambda lp: (float(linearModel.predict(lp.features)), lp.label))
testMetrics = RegressionMetrics(predictionAndLabels)

# PRINT TEST METRICS
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL WITH DATE-STAMP IN THE DEFAULT BLOB FOR THE CLUSTER
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
linearregressionfilename = "LinearRegressionWithSGD_" + datestamp;
dirfilename = modelDir + linearregressionfilename;

linearModel.save(sc, dirfilename)

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

Koefficienter: [0,00457675809917, -0,0226314167349, -0,0191910355236, 0,246793409578, 0,312047890459, 0.359634405999, 0.00928692253981, -0.000987181489428, -0.088306617845, 0.0569376211553, 0.115519551711, 0.149250164995, -0.00990211159703, -0.00637410344522, 0.545083566179, -00.536756072402, 0.0105762393099, -0.0130117577055, 0.012930473772, -0.00171065945959]

Skärningspunkt: 0,853872718283

RMSE = 1.24190115863

R-sqr = 0,608017146081

Tid det tar att köra ovanstående cell: 58,42 sekunder

Regression av slumpmässig skog

Koden i det här avsnittet visar hur du tränar, utvärderar och sparar en slumpmässig skogs regression som förutsäger tipsmängden för taxiresa i New York.

#PREDICT TIP AMOUNTS USING RANDOM FOREST

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import RandomForest, RandomForestModel
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import RegressionMetrics


## TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
rfModel = RandomForest.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo,
                                    numTrees=25, featureSubsetStrategy="auto",
                                    impurity='variance', maxDepth=10, maxBins=32)
## UN-COMMENT IF YOU WANT TO PRING TREES
#print('Learned classification forest model:')
#print(rfModel.toDebugString())

## PREDICT AND EVALUATE ON TEST DATA-SET
predictions = rfModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = oneHotTESTreg.map(lambda lp: lp.label).zip(predictions)

# TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
rfregressionfilename = "RandomForestRegression_" + datestamp;
dirfilename = modelDir + rfregressionfilename;

rfModel.save(sc, dirfilename);

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

RMSE = 0,891209218139

R-sqr = 0,759661334921

Tid det tar att köra ovanstående cell: 49,21 sekunder

Regression av gradienter som ökar träd

Koden i det här avsnittet visar hur du tränar, utvärderar och sparar en modell för gradientförstärkarträd som förutsäger tipsmängden för taxiresa i New York.

Träna och utvärdera

#PREDICT TIP AMOUNTS USING GRADIENT BOOSTING TREES

# RECORD START TIME
timestart= datetime.datetime.now()

# LOAD PYSPARK LIBRARIES
from pyspark.mllib.tree import GradientBoostedTrees, GradientBoostedTreesModel
from pyspark.mllib.util import MLUtils

## TRAIN MODEL
categoricalFeaturesInfo={0:2, 1:2, 2:6, 3:4}
gbtModel = GradientBoostedTrees.trainRegressor(indexedTRAINreg, categoricalFeaturesInfo=categoricalFeaturesInfo, 
                                                numIterations=10, maxBins=32, maxDepth = 4, learningRate=0.1)

## EVALUATE A TEST DATA-SET
predictions = gbtModel.predict(indexedTESTreg.map(lambda x: x.features))
predictionAndLabels = indexedTESTreg.map(lambda lp: lp.label).zip(predictions)

# TEST METRICS
testMetrics = RegressionMetrics(predictionAndLabels)
print("RMSE = %s" % testMetrics.rootMeanSquaredError)
print("R-sqr = %s" % testMetrics.r2)

# SAVE MODEL IN BLOB
datestamp = unicode(datetime.datetime.now()).replace(' ','').replace(':','_');
btregressionfilename = "GradientBoostingTreeRegression_" + datestamp;
dirfilename = modelDir + btregressionfilename;
gbtModel.save(sc, dirfilename)

# CONVERT RESULTS TO DF AND REGISTER TEMP TABLE
test_predictions = sqlContext.createDataFrame(predictionAndLabels)
test_predictions.registerTempTable("tmp_results");

# PRINT ELAPSED TIME
timeend = datetime.datetime.now()
timedelta = round((timeend-timestart).total_seconds(), 2) 
print "Time taken to execute above cell: " + str(timedelta) + " seconds"; 

Produktionen:

RMSE = 0,908473148639

R-sqr = 0,753835096681

Tid det tar att köra ovanstående cell: 34,52 sekunder

Tomt

tmp_results registreras som en Hive-tabell i föregående cell. Resultatet från tabellen matas ut till sqlResults-dataramen för ritytan. Här är koden

# PLOT SCATTER-PLOT BETWEEN ACTUAL AND PREDICTED TIP VALUES

# SELECT RESULTS
%%sql -q -o sqlResults
SELECT * from tmp_results

Här är koden för att rita data med Jupyter-servern.

# RUN THE CODE LOCALLY ON THE JUPYTER SERVER AND IMPORT LIBRARIES
%%local
%matplotlib inline
import numpy as np

# PLOT 
ax = test_predictions_pddf.plot(kind='scatter', figsize = (6,6), x='_1', y='_2', color='blue', alpha = 0.25, label='Actual vs. predicted');
fit = np.polyfit(test_predictions_pddf['_1'], test_predictions_pddf['_2'], deg=1)
ax.set_title('Actual vs. Predicted Tip Amounts ($)')
ax.set_xlabel("Actual")
ax.set_ylabel("Predicted")
ax.plot(test_predictions_pddf['_1'], fit[0] * test_predictions_pddf['_1'] + fit[1], color='magenta')
plt.axis([-1, 20, -1, 20])
plt.show(ax)

Produktionen:

Actual-vs-predicted-tip-amounts

Rensa objekt från minnet

Använd unpersist() för att ta bort objekt som cachelagrats i minnet.

# REMOVE ORIGINAL DFs
taxi_df_train_cleaned.unpersist()
taxi_df_train_with_newFeatures.unpersist()

# FOR BINARY CLASSIFICATION TRAINING AND TESTING
indexedTRAINbinary.unpersist()
indexedTESTbinary.unpersist()
oneHotTRAINbinary.unpersist()
oneHotTESTbinary.unpersist()

# FOR REGRESSION TRAINING AND TESTING
indexedTRAINreg.unpersist()
indexedTESTreg.unpersist()
oneHotTRAINreg.unpersist()
oneHotTESTreg.unpersist()

# SCALED FEATURES
oneHotTRAINregScaled.unpersist()
oneHotTESTregScaled.unpersist()

Spara modellerna

Om du vill använda och bedöma en oberoende datauppsättning som beskrivs i avsnittet Poäng och utvärdera Spark-skapade maskininlärningsmodeller måste du kopiera och klistra in dessa filnamn som innehåller de sparade modeller som skapats här i Consumption Jupyter-anteckningsboken. Här är koden för att skriva ut sökvägarna till de modellfiler som du behöver där.

# MODEL FILE LOCATIONS FOR CONSUMPTION
print "logisticRegFileLoc = modelDir + \"" + logisticregressionfilename + "\"";
print "linearRegFileLoc = modelDir + \"" + linearregressionfilename + "\"";
print "randomForestClassificationFileLoc = modelDir + \"" + rfclassificationfilename + "\"";
print "randomForestRegFileLoc = modelDir + \"" + rfregressionfilename + "\"";
print "BoostedTreeClassificationFileLoc = modelDir + \"" + btclassificationfilename + "\"";
print "BoostedTreeRegressionFileLoc = modelDir + \"" + btregressionfilename + "\"";

Produktionen

logisticRegFileLoc = modelDir + "LogisticRegressionWithLBFGS_2016-05-0317_03_23.516568"

linearRegFileLoc = modelDir + "LinearRegressionWithSGD_2016-05-0317_05_21.577773"

randomForestClassificationFileLoc = modelDir + "RandomForestClassification_2016-05-0317_04_11.950206"

randomForestRegFileLoc = modelDir + "RandomForestRegression_2016-05-0317_06_08.723736"

BoostedTreeClassificationFileLoc = modelDir + "GradientBoostingTreeClassification_2016-05-0317_04_36.346583"

BoostedTreeRegressionFileLoc = modelDir + "GradientBoostingTreeRegression_2016-05-0317_06_51.737282"

Nästa steg

Nu när du har skapat regressions- och klassificeringsmodeller med Spark MlLib är du redo att lära dig att bedöma och utvärdera dessa modeller. Notebook-programmet för avancerad datagranskning och modellering fördjupar sig i bland annat korsvalidering, genomsökning av hyperparametrar och modellutvärdering.

Modellförbrukning: Information om hur du poängdömer och utvärderar klassificerings- och regressionsmodellerna som skapats i det här avsnittet finns i Poängklassificera och utvärdera Spark-skapade maskininlärningsmodeller.

Avsökning av korsvalidering och hyperparametrar: Se Avancerad datagranskning och modellering med Spark om hur modeller kan tränas med korsvalidering och genomsökning av hyperparametrar