Apache Spark makine öğrenimi işlem hattı oluşturma

Apache Spark'ın ölçeklenebilir makine öğrenmesi kitaplığı (MLlib), dağıtılmış bir ortama modelleme özellikleri getirir. Spark paketi spark.ml , DataFrame'ler üzerinde oluşturulmuş üst düzey API'lerden oluşan bir kümedir. Bu API'ler pratik makine öğrenmesi işlem hatları oluşturmanıza ve ayarlamanıza yardımcı olur. Spark makine öğrenmesi , eski RDD tabanlı işlem hattı API'sini değil, bu MLlib DataFrame tabanlı API'yi ifade eder.

Makine öğrenmesi (ML) işlem hattı, birden çok makine öğrenmesi algoritmasını bir araya getiren eksiksiz bir iş akışıdır. Verilerin işlenmesi ve verilerden öğrenilmesi için gerekli olan ve bir dizi algoritma gerektiren birçok adım olabilir. İşlem hatları, makine öğrenmesi sürecinin aşamalarını ve sıralamasını tanımlar. MLlib'de bir işlem hattının aşamaları, bir Transformer ve Bir Tahmin Aracı'nın görevleri gerçekleştirdiği belirli bir PipelineStages dizisiyle temsil edilir.

Transformer, yöntemini kullanarak bir DataFrame'i başka bir DataFrame'e dönüştüren bir algoritmadır transform() . Örneğin, bir özellik transformatörü DataFrame'in bir sütununu okuyabilir, başka bir sütuna eşler ve eşlenmiş sütunun eklendiği yeni bir DataFrame çıkışı verebilir.

Tahmin Aracı, öğrenme algoritmalarının soyutlamasıdır ve Transformer üretmek için bir veri kümesine sığdırma veya eğitme sorumluluğundadır. Tahmin Aracı, bir DataFrame kabul eden ve Transformer olan bir DataFrame üreten adlı fit()bir yöntem uygular.

Transformer veya Estimator'ın durum bilgisi olmayan her örneğin, parametreleri belirtirken kullanılan kendi benzersiz tanımlayıcısı vardır. Her ikisi de bu parametreleri belirtmek için tekdüzen bir API kullanır.

İşlem hattı örneği

Bir ML işlem hattının pratik kullanımını göstermek için bu örnekte, HDInsight kümenizin varsayılan depolama alanına önceden yüklenmiş olarak gelen azure depolama veya Data Lake Storage örnek HVAC.csv veri dosyası kullanılmaktadır. Dosyanın içeriğini görüntülemek için dizinine /HdiSamples/HdiSamples/SensorSampleData/hvac gidin. HVAC.csv çeşitli binalarda HVAC (ısıtma, havalandırma ve klima) sistemleri için hem hedef hem de gerçek sıcaklıklarla bir dizi zaman içerir. Hedef, modeli veriler üzerinde eğitmek ve belirli bir bina için tahmin sıcaklığı üretmektir.

Aşağıdaki kod:

  1. , (sistemin tanımlayıcısı ve yaşı) depolayan BuildingIDSystemInfo bir LabeledDocumentlabel ve (bina çok sıcaksa 1,0, aksi takdirde 0,0) öğesini tanımlar.
  2. Bir veri satırı (satırı) alan ve hedef sıcaklığı gerçek sıcaklıkla karşılaştırarak binanın "sıcak" olup olmadığını belirleyen özel bir ayrıştırıcı işlevi parseDocument oluşturur.
  3. Kaynak verileri ayıklarken ayrıştırıcıyı uygular.
  4. Eğitim verileri oluşturur.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

Bu örnek işlem hattının üç aşaması vardır: Tokenizer ve HashingTF (her ikisi de Transformers) ve Logistic Regression (Tahmin Aracı). DataFrame'de training ayıklanan ve ayrıştırılan veriler çağrıldığında pipeline.fit(training) işlem hattı üzerinden akar.

  1. İlk aşama olan Tokenizer, giriş sütununu SystemInfo (sistem tanımlayıcısı ve yaş değerlerinden oluşan) bir words çıkış sütununa böler. Bu yeni words sütun DataFrame'e eklenir.
  2. İkinci aşama olan HashingTF, yeni words sütunu özellik vektörlerine dönüştürür. Bu yeni features sütun DataFrame'e eklenir. Bu ilk iki aşama Transformatörlerdir.
  3. Üçüncü aşama LogisticRegressionolan , bir Tahmin Aracı'dır ve işlem hattı bir oluşturmak LogisticRegressionModeliçin yöntemini çağırırLogisticRegression.fit().
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

ve transformatörleri tarafından eklenen yeni words ve featuresHashingTF sütunları ve tahmin aracının LogisticRegression bir örneğini görmek için özgün DataFrame'de bir PipelineModel.transform() yöntem çalıştırın.Tokenizer Üretim kodunda bir sonraki adım, eğitimi doğrulamak için bir test DataFrame geçirmek olacaktır.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Nesnesi model artık tahminlerde bulunmak için kullanılabilir. Bu makine öğrenmesi uygulamasının tam örneği ve çalıştırmaya yönelik adım adım yönergeler için bkz. Azure HDInsight'ta Apache Spark makine öğrenmesi uygulamaları derleme.

Ayrıca bkz.