如何使用 gapply 将 R 代码并行化How To parallelize R code with gapply

R 代码的并行化很难,因为 R 代码在驱动程序和 R 数据上运行。帧不会分布。Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. 通常,在本地运行并转换为在 Apache Spark 上运行的现有 R 代码。Often, there is existing R code that is run locally and that is converted to run on Apache Spark. 在其他情况下,一些用于高级统计分析和机器学习技术的 SparkR 函数可能不支持分布式计算。In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. 在这种情况下,可以使用 SparkR UDF API 在群集中分布所需的工作负荷。In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

示例用例:你想要针对数据集的子集定型机器学习模型,按键分组。Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. 如果数据的子集适合于辅助角色,则使用 SparkR UDF API 一次训练多个模型可能会更有效。If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

gapplygapplyCollect 函数将函数应用到 Spark 数据帧中的每个组。The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. 对于 Spark 数据帧中的每个组:For each group in a Spark DataFrame:

  1. 收集每个组作为 R 数据帧。Collect each group as an R data.frame.
  2. 将函数发送到辅助角色并执行。Send the function to the worker and execute.
  3. 返回由架构指定的驱动程序的结果。Return the result to the driver as specified by the schema.

备注

调用时 gapply ,必须指定输出架构。When you call gapply, you must specify the output schema. 对于 gapplyCollect ,将使用 R 数据将结果收集到驱动程序。输出的帧。With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

在下面的示例中,单独的支持向量机模型适用于 airquality 每个月的数据。In the following example, a separate support vector machine model is fit on the airquality data for each month. 输出是一个数据。包含每个月生成的 MSE 的数据帧,同时显示和,不指定架构。The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

备注

从 Spark 数据帧开始,并在所有辅助角色上安装包。Start with a Spark DataFrame and install packages on all workers.