Параллельное выполнение кода R с gapply

Параллелизации кода R сложно, так как код R выполняется на драйвере и в данных R. кадры не распределяются. Часто существует код R, который выполняется локально и преобразуется для выполнения на Apache Spark. В других случаях некоторые функции Spark, используемые для расширенного статистического анализа и приемов машинного обучения, могут не поддерживать распределенные вычисления. В таких случаях для распределения необходимой рабочей нагрузки в кластере можно использовать API UDF для Spark.

Пример варианта использования: необходимо обучить модель машинного обучения на поднаборах набора данных, сгруппированных по ключу. Если подмножества данных помещаются на рабочих процессах, для одновременного обучения нескольких моделей может быть эффективнее использовать API UDF для Spark.

gapplyФункции и gapplyCollect применяют функцию к каждой группе в кадре данных Spark. Для каждой группы в кадре данных Spark:

  1. Собирайте каждую группу как R Data. Frame.
  2. Отправьте функцию в рабочую роль и выполните.
  3. Верните результат в драйвер, как указано в схеме.

Примечание

При вызове gapply необходимо указать выходную схему. В gapplyCollect Результаты собираются в драйвер с помощью данных R. Frame для выходных данных.

В следующем примере отдельная модель аппаратного вектора поддержки подходит для airquality данных за каждый месяц. Выходные данные — это Data. Frame с результирующим редактором MSE для каждого месяца, показанный как с помощью, так и без указания схемы.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

Примечание

Начните с кадра данных Spark и установите пакеты на всех рабочих процессах.