Guide pratique pour paralléliser du code R avec gapplyHow To parallelize R code with gapply

La parallélisation du code R est difficile, car le code R s’exécute sur le pilote et les données R. les frames ne sont pas distribués.Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. Souvent, il existe un code R existant qui est exécuté localement et qui est converti pour s’exécuter sur Apache Spark.Often, there is existing R code that is run locally and that is converted to run on Apache Spark. Dans d’autres cas, certaines fonctions Sparkr utilisées pour l’analyse statistique avancée et les techniques de Machine Learning peuvent ne pas prendre en charge l’informatique distribuée.In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. Dans ce cas, l’API UDF Spark peut être utilisée pour distribuer la charge de travail souhaitée sur un cluster.In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

Exemple de cas d’usage : vous souhaitez effectuer l’apprentissage d’un modèle de Machine Learning sur des sous-ensembles d’un jeu de données, regroupés par clé.Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. Si les sous-ensembles de données tiennent sur les threads de travail, il peut être plus efficace d’utiliser l’API UDF Spark pour former plusieurs modèles à la fois.If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

Les gapply gapplyCollect fonctions et appliquent une fonction à chaque groupe dans un tableau Spark.The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. Pour chaque groupe dans un tableau Spark :For each group in a Spark DataFrame:

  1. Collectez chaque groupe en tant que données R. Frame.Collect each group as an R data.frame.
  2. Envoyez la fonction au Worker et exécutez.Send the function to the worker and execute.
  3. Retourne le résultat au pilote comme spécifié par le schéma.Return the result to the driver as specified by the schema.

Notes

Lorsque vous appelez gapply , vous devez spécifier le schéma de sortie.When you call gapply, you must specify the output schema. Avec gapplyCollect , le résultat est collecté au pilote à l’aide d’un frame R Data. Frame pour la sortie.With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

Dans l’exemple suivant, un modèle de machine à vecteurs de support distinct est adapté aux airquality données pour chaque mois.In the following example, a separate support vector machine model is fit on the airquality data for each month. La sortie est un Data. Frame avec la MSE résultante pour chaque mois, indiquée avec et sans spécifier le schéma.The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

Notes

Commencez avec un tableau Spark et installez des packages sur tous les Workers.Start with a Spark DataFrame and install packages on all workers.