如何使用 gapply 平行處理 R 程式碼How To parallelize R code with gapply

R 程式碼的平行處理很棘手,因為 R 程式碼是在驅動程式和 R 資料上執行。畫面不會散佈。Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. 通常會有在本機執行並轉換成在 Apache Spark 上執行的現有 R 程式碼。Often, there is existing R code that is run locally and that is converted to run on Apache Spark. 在其他情況下,用於高階統計分析和機器學習技術的部分 SparkR 函數可能不支援分散式運算。In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. 在這種情況下,您可以使用 SparkR UDF API,將所需的工作負載分散到整個叢集。In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

範例使用案例:您想要在資料集的子集上定型機器學習模型,並依索引鍵分組。Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. 如果資料的子集符合背景工作角色,使用 SparkR UDF API 一次訓練多個模型可能會更有效率。If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

和函式會將函式套用 gapply gapplyCollect 至 Spark 資料框架中的每個群組。The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. 針對 Spark 資料框架中的每個群組:For each group in a Spark DataFrame:

  1. 將每個群組收集為 R 資料框架。Collect each group as an R data.frame.
  2. 將函式傳送至背景工作並執行。Send the function to the worker and execute.
  3. 將結果傳回到架構所指定的驅動程式。Return the result to the driver as specified by the schema.

注意

當您呼叫時 gapply ,您必須指定輸出架構。When you call gapply, you must specify the output schema. gapplyCollect若為,則會使用 R 資料的輸出將結果收集到驅動程式。With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

在下列範例中,個別的支援向量機器模型適用于 airquality 每個月的資料。In the following example, a separate support vector machine model is fit on the airquality data for each month. 輸出是包含每個月產生之 MSE 的資料框架,其中顯示了和不指定架構。The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  library(e1071)
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  library(e1071)
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")
  y
})

注意

從 Spark 資料框架開始,並在所有背景工作上安裝套件。Start with a Spark DataFrame and install packages on all workers.