Parallelisieren von R-Code mit gapplyHow To parallelize R code with gapply

Die Parallelisierung von r-Code ist schwierig, da r-Code auf dem Treiber und r-Daten ausgeführt wird. Frames werden nicht verteilt.Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. Häufig gibt es vorhandenen R-Code, der lokal ausgeführt wird und in Apache Spark ausgeführt wird.Often, there is existing R code that is run locally and that is converted to run on Apache Spark. In anderen Fällen unterstützen einige sparkr-Funktionen, die für erweiterte statistische Analysen und maschinelle Lernverfahren verwendet werden, das verteilte Computing möglicherweise nicht.In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. In solchen Fällen kann die UDF-API sparkr verwendet werden, um die gewünschte Arbeitsauslastung über einen Cluster zu verteilen.In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

Beispiel für einen Anwendungsfall: Sie möchten ein Machine Learning-Modell mit Teilmengen eines Datasets, gruppiert nach einem Schlüssel, trainieren.Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. Wenn die Teilmengen der Daten in die Worker passen, ist es möglicherweise effizienter, die sparkr-UDF-API zum gleichzeitigen trainieren mehrerer Modelle zu verwenden.If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

Die gapply gapplyCollect Funktionen und wenden eine Funktion auf jede Gruppe in einem Spark-dataframe an.The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. Für jede Gruppe in einem Spark-dataframe:For each group in a Spark DataFrame:

  1. Erfassen Sie jede Gruppe als R Data. Frame.Collect each group as an R data.frame.
  2. Senden Sie die Funktion an den Worker, und führen Sie aus.Send the function to the worker and execute.
  3. Gibt das Ergebnis gemäß dem Schema an den Treiber zurück.Return the result to the driver as specified by the schema.

Hinweis

Wenn Sie aufzurufen gapply , müssen Sie das Ausgabe Schema angeben.When you call gapply, you must specify the output schema. Mit gapplyCollect wird das Ergebnis für den Treiber mithilfe eines R Data. Frames für die Ausgabe gesammelt.With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

Im folgenden Beispiel ist ein separates Support Vector Machine-Modell airquality für die Daten für jeden Monat geeignet.In the following example, a separate support vector machine model is fit on the airquality data for each month. Bei der Ausgabe handelt es sich um einen Data. Frame mit der resultierenden MSE für jeden Monat, der sowohl mit als auch ohne Angabe des Schemas angezeigt wird.The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

Hinweis

Beginnen Sie mit einem Spark-dataframe, und installieren Sie Pakete für alle Worker.Start with a Spark DataFrame and install packages on all workers.