gapply を使用して R コードを並列化する方法How To parallelize R code with gapply

R コードの並列化は、ドライバーと R データに対して実行されるため、困難です。フレームは配布されません。Parallelization of R code is difficult, because R code runs on the driver and R data.frames are not distributed. 多くの場合、ローカルで実行され、Apache Spark で実行するように変換される既存の R コードがあります。Often, there is existing R code that is run locally and that is converted to run on Apache Spark. それ以外の場合、高度な統計分析や機械学習手法で使用される一部の SparkR 関数は、分散コンピューティングをサポートしていない可能性があります。In other cases, some SparkR functions used for advanced statistical analysis and machine learning techniques may not support distributed computing. このような場合は、SparkR UDF API を使用して、クラスター全体に必要なワークロードを分散することができます。In such cases, the SparkR UDF API can be used to distribute the desired workload across a cluster.

ユースケースの例: キーでグループ化されたデータセットのサブセットに対して機械学習モデルをトレーニングします。Example use case: You want to train a machine learning model on subsets of a data set, grouped by a key. データのサブセットがワーカーに適合する場合は、SparkR UDF API を使用して、複数のモデルを一度にトレーニングする方が効率的な場合があります。If the subsets of the data fit on the workers, it may be more efficient to use the SparkR UDF API to train multiple models at once.

関数 gapplygapplyCollect 関数は、Spark データフレームの各グループに関数を適用します。The gapply and gapplyCollect functions apply a function to each group in a Spark DataFrame. Spark データフレームの各グループについて、次のようにします。For each group in a Spark DataFrame:

  1. 各グループを R データフレームとして収集します。Collect each group as an R data.frame.
  2. 関数をワーカーに送信し、を実行します。Send the function to the worker and execute.
  3. スキーマで指定されているように、結果をドライバーに返します。Return the result to the driver as specified by the schema.


を呼び出す場合は gapply 、出力スキーマを指定する必要があります。When you call gapply, you must specify the output schema. では gapplyCollect 、出力の R データフレームを使用して、結果がドライバーに収集されます。With gapplyCollect, the result is collected to the driver using an R data.frame for the output.

次の例では、個別のサポートベクターマシンモデルが各月のデータに適合してい airquality ます。In the following example, a separate support vector machine model is fit on the airquality data for each month. 出力は、各月に対して結果として生成される MSE を持つデータフレームです。スキーマを指定しない場合でも、スキーマを指定しない場合もあります。The output is a data.frame with the resulting MSE for each month, shown both with and without specifying the schema.

df <- createDataFrame(na.omit(airquality))

schema <- structType(
  structField("Month", "MSE"),
  structField("integer", "Number"))

result <- gapply(df, c("Month"), function(key, x) {
  data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
}, schema)
df <- createDataFrame(na.omit(airquality))

gapplyCollect(df, c("Month"), function(key, x) {
  y <- data.frame(month = key, mse = svm(Ozone ~ ., x, cross = 3)$tot.MSE)
 names(y) <- c("Month", "MSE")


Spark データフレームから開始し、すべてのワーカーにパッケージをインストールします。Start with a Spark DataFrame and install packages on all workers.