Создание конвейера машинного обучения Apache Spark

Масштабируемая библиотека машинного обучения (MLlib) Apache Spark предоставляет возможности моделирования для распределенной среды. Пакет Spark spark.ml — это набор API высокого уровня, созданных на основе кадров данных. Эти API позволяют создавать и настраивать практические конвейеры машинного обучения. Машинное обучение Spark использует этот API на основе кадров данных библиотеки машинного обучения, а не более старый API конвейера на основе RDD.

Конвейер машинного обучения — это полный рабочий процесс, объединяющий несколько алгоритмов машинного обучения. Обработка и анализ данных могут включать в себя множество обязательных шагов, требующих определенной последовательности алгоритмов. Конвейеры определяют этапы и порядок процесса машинного обучения. В библиотеке машинного обучения этапы конвейера представлены определенной последовательностью PipelineStages, в которой преобразователь и оценщик выполняют задачи.

Преобразователь — это алгоритм, который преобразует одну таблицу данных в другую с помощью метода transform(). Например, преобразователь признаков может считать один столбец таблицы данных, сопоставить его с другим столбцом и создать новую таблицу данных, добавив в нее сопоставленный столбец.

Оценщик — это абстракция алгоритмов обучения, отвечающая за подгонку или обучение по набору данных для создания преобразователя. Оценщик реализует метод с именем fit(), который принимает таблицу данных и создает новую таблицу данных, которая является преобразователем.

Каждый экземпляр преобразователя или оценщика без учета состояния имеет собственный уникальный идентификатор, который используется при указании параметров. Они оба используют универсальный API для указания этих параметров.

Пример конвейера

Чтобы продемонстрировать практическое применение конвейера машинного обучения, в этом примере используется образец файла данных HVAC.csv, который уже предварительно загружен в хранилище по умолчанию (службу хранилища Azure или Data Lake Storage) кластера HDInsight. Чтобы просмотреть содержимое файла, перейдите в каталог /HdiSamples/HdiSamples/SensorSampleData/hvac. HVAC.csv содержит набор значений времени с целевой и фактической температурами для систем HVAC (отопление, вентиляция и кондиционирование) в различных зданиях. Цель — обучить модель на данных и спрогнозировать температуру для данного здания.

В приведенном ниже коде

  1. Определяет LabeledDocument, в котором содержатся значения BuildingID, SystemInfo (идентификатор и время существования системы) и label (1.0 — если температура в здании слишком высокая; в противном случае — 0.0).
  2. Создает настраиваемую функцию синтаксического анализа parseDocument, которая принимает строку данных (запись) и определяет, высокая ли температура в здании, путем сравнения целевой и фактической температур.
  3. Применяет средство синтаксического анализа при извлечении источника данных.
  4. Создает данные для обучения.
from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import HashingTF, Tokenizer
from pyspark.sql import Row

# The data structure (column meanings) of the data array:
# 0 Date
# 1 Time
# 2 TargetTemp
# 3 ActualTemp
# 4 System
# 5 SystemAge
# 6 BuildingID

LabeledDocument = Row("BuildingID", "SystemInfo", "label")

# Define a function that parses the raw CSV file and returns an object of type LabeledDocument


def parseDocument(line):
    values = [str(x) for x in line.split(',')]
    if (values[3] > values[2]):
        hot = 1.0
    else:
        hot = 0.0

    textValue = str(values[4]) + " " + str(values[5])

    return LabeledDocument((values[6]), textValue, hot)


# Load the raw HVAC.csv file, parse it using the function
data = sc.textFile(
    "wasbs:///HdiSamples/HdiSamples/SensorSampleData/hvac/HVAC.csv")

documents = data.filter(lambda s: "Date" not in s).map(parseDocument)
training = documents.toDF()

В этом примере в конвейере имеется три этапа: Tokenizer, HashingTF (оба являются преобразователями) и Logistic Regression (оценщик). Извлеченные и проанализированные данные в таблице данных training проходят через конвейер при вызове pipeline.fit(training).

  1. Первый этап (Tokenizer) разбивает входной столбец SystemInfo (состоящий из значений идентификатора и времени существования системы) для получения выходного столбца words. Этот новый столбец words добавляется в таблицу данных.
  2. Второй этап (HashingTF) преобразует новый столбец words в векторы признаков. Этот новый столбец features добавляется в таблицу данных. Первые два этапа являются преобразователями.
  3. Третий этап (LogisticRegression) — оценщик. На этом этапе конвейер вызывает метод LogisticRegression.fit(), чтобы создать LogisticRegressionModel.
tokenizer = Tokenizer(inputCol="SystemInfo", outputCol="words")
hashingTF = HashingTF(inputCol=tokenizer.getOutputCol(), outputCol="features")
lr = LogisticRegression(maxIter=10, regParam=0.01)

# Build the pipeline with our tokenizer, hashingTF, and logistic regression stages
pipeline = Pipeline(stages=[tokenizer, hashingTF, lr])

model = pipeline.fit(training)

Чтобы увидеть новые столбцы words и features, добавленные преобразователями Tokenizer и HashingTF, а также пример оценщика LogisticRegression, выполните метод PipelineModel.transform() для исходной таблицы данных. Следующее действие в рабочем коде — передать тестовую таблицу данных для проверки эффективности обучения.

peek = model.transform(training)
peek.show()

# Outputs the following:
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|BuildingID|SystemInfo|label|   words|            features|       rawPrediction|         probability|prediction|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+
|         4|     13 20|  0.0|[13, 20]|(262144,[250802,2...|[0.11943986671420...|[0.52982451901740...|       0.0|
|        17|      3 20|  0.0| [3, 20]|(262144,[89074,25...|[0.17511205617446...|[0.54366648775222...|       0.0|
|        18|     17 20|  1.0|[17, 20]|(262144,[64358,25...|[0.14620993833623...|[0.53648750722548...|       0.0|
|        15|      2 23|  0.0| [2, 23]|(262144,[31351,21...|[-0.0361327091023...|[0.49096780538523...|       1.0|
|         3|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|         4|     13 28|  0.0|[13, 28]|(262144,[69821,25...|[0.14630166986618...|[0.53651031790592...|       0.0|
|         2|     12 24|  0.0|[12, 24]|(262144,[187043,2...|[-0.0509556393066...|[0.48726384581522...|       1.0|
|        16|     20 26|  1.0|[20, 26]|(262144,[128319,2...|[0.33829638728900...|[0.58377663577684...|       0.0|
|         9|      16 9|  1.0| [16, 9]|(262144,[153779,1...|[-0.0853679939336...|[0.47867095324139...|       1.0|
|        12|       6 5|  0.0|  [6, 5]|(262144,[18659,89...|[0.07513008136562...|[0.51877369045183...|       0.0|
|        15|     10 17|  1.0|[10, 17]|(262144,[64358,25...|[-0.0291988646553...|[0.49270080242078...|       1.0|
|         7|      2 11|  0.0| [2, 11]|(262144,[212053,2...|[0.03678030020834...|[0.50919403860812...|       0.0|
|        15|      14 2|  1.0| [14, 2]|(262144,[109681,2...|[0.06216423725633...|[0.51553605651806...|       0.0|
|         6|       3 2|  0.0|  [3, 2]|(262144,[89074,21...|[0.00565582077537...|[0.50141395142468...|       0.0|
|        20|     19 22|  0.0|[19, 22]|(262144,[139093,2...|[-0.0769288695989...|[0.48077726176073...|       1.0|
|         8|     19 11|  0.0|[19, 11]|(262144,[139093,2...|[0.04988910033929...|[0.51246968885151...|       0.0|
|         6|      15 7|  0.0| [15, 7]|(262144,[77099,20...|[0.14854929135994...|[0.53706918109610...|       0.0|
|        13|      12 5|  0.0| [12, 5]|(262144,[89689,25...|[-0.0519932532562...|[0.48700461408785...|       1.0|
|         4|      8 22|  0.0| [8, 22]|(262144,[98962,21...|[-0.0120753606650...|[0.49698119651572...|       1.0|
|         7|      17 5|  0.0| [17, 5]|(262144,[64358,89...|[-0.0721054054871...|[0.48198145477106...|       1.0|
+----------+----------+-----+--------+--------------------+--------------------+--------------------+----------+

only showing top 20 rows

Теперь объект model можно использовать для создания прогнозов. Полный пример этого приложения машинного обучения и пошаговые инструкции по его выполнению см. в статье Создание приложений машинного обучения Apache Spark в Azure HDInsight.

См. также раздел