Een machine learning-pijplijn in Apache Spark maken

De schaalbare machine learning-bibliotheek (MLlib) van Apache Spark biedt modelleringsmogelijkheden in een gedistribueerde omgeving. Het Spark-pakket spark.ml is een set API's op hoog niveau die zijn gebouwd op DataFrames. Met deze API's kunt u praktische machine learning-pijplijnen maken en afstemmen. Spark-machine learning verwijst naar deze op MLlib DataFrame gebaseerde API, niet naar de oudere RDD-pijplijn-API.

Een machine learning-pijplijn (ML) is een volledige werkstroom waarin meerdere machine learning-algoritmen worden gecombineerd. Er kunnen veel stappen nodig zijn om gegevens te verwerken en ervan te leren. Hiervoor is een reeks algoritmen vereist. Pijplijnen definiëren de fasen en volgorde van een machine learning-proces. In MLlib worden fasen van een pijplijn vertegenwoordigd door een specifieke reeks PipelineStages, waarbij een transformer en een estimator elk taken uitvoeren.

Een transformer is een algoritme waarmee het ene DataFrame naar het andere wordt getransformeerd met behulp van de transform() -methode. Een functietransformator kan bijvoorbeeld één kolom van een DataFrame lezen, toewijzen aan een andere kolom en een nieuw DataFrame uitvoeren waaraan de toegewezen kolom is toegevoegd.

Een estimator is een abstractie van leeralgoritmen en is verantwoordelijk voor het aanpassen of trainen van een gegevensset om een transformer te produceren. Een estimator implementeert een methode met de naam fit(), die een DataFrame accepteert en een DataFrame produceert, wat een transformer is.

Elk staatloze exemplaar van een transformer of estimator heeft een eigen unieke id, die wordt gebruikt bij het opgeven van parameters. Beide gebruiken een uniforme API voor het opgeven van deze parameters.

Voorbeeld van pijplijn

Ter illustratie van een praktisch gebruik van een ML-pijplijn wordt in dit voorbeeld het voorbeeldgegevensbestand HVAC.csv gebruikt dat vooraf is geladen in de standaardopslag voor uw HDInsight-cluster, azure storage of Data Lake Storage. Als u de inhoud van het bestand wilt weergeven, gaat u naar de /HdiSamples/HdiSamples/SensorSampleData/hvac map. HVAC.csv bevat een reeks tijden met zowel de beoogde als de werkelijke temperaturen voor HVAC-systemen (verwarming, ventilatie en airconditioning) in verschillende gebouwen. Het doel is om het model te trainen op basis van de gegevens en een prognosetemperatuur voor een bepaald gebouw te produceren.

De volgende code:

  1. Definieert een LabeledDocument, waarin de BuildingIDopgeslagen , SystemInfo (de id en leeftijd van een systeem) en een label (1,0 als het gebouw te heet is, 0,0 anders).
  2. Hiermee maakt u een aangepaste parserfunctie parseDocument die een regel (rij) met gegevens gebruikt en bepaalt of het gebouw 'heet' is door de doeltemperatuur te vergelijken met de werkelijke temperatuur.
  3. Past de parser toe bij het extraheren van de brongegevens.
  4. Hiermee maakt u trainingsgegevens.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

Deze voorbeeldpijplijn heeft drie fasen: Tokenizer en HashingTF (beide transformatoren) en Logistic Regression (een estimator). De geëxtraheerde en geparseerde gegevens in het training DataFrame stromen door de pijplijn wanneer pipeline.fit(training) wordt aangeroepen.

  1. De eerste fase, Tokenizer, splitst de SystemInfo invoerkolom (bestaande uit de systeem-id en leeftijdswaarden) in een words uitvoerkolom. Deze nieuwe words kolom wordt toegevoegd aan het DataFrame.
  2. De tweede fase, HashingTF, converteert de nieuwe words kolom naar functievectoren. Deze nieuwe features kolom wordt toegevoegd aan het DataFrame. Deze eerste twee fasen zijn Transformers.
  3. De derde fase, LogisticRegression, is een estimator en daarom roept de pijplijn de LogisticRegression.fit() methode aan om een LogisticRegressionModelte produceren.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Voer een PipelineModel.transform() methode uit op het oorspronkelijke DataFrame om de nieuwe words kolommen en features te zien die zijn toegevoegd door de Tokenizer transformers en HashingTF een voorbeeld van de LogisticRegression estimator. In de productiecode is de volgende stap het doorgeven van een Test DataFrame om de training te valideren.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Het model object kan nu worden gebruikt om voorspellingen te doen. Zie Apache Spark machine learning-toepassingen bouwen in Azure HDInsight voor het volledige voorbeeld van deze machine learning-toepassing en stapsgewijze instructies voor het uitvoeren ervan.

Zie ook