SparkR の概要

SparkR は、R から Apache Spark を使用するための軽量フロントエンドを提供する R パッケージです。 SparkR では、MLlib を使用した分散型機械学習もサポートされています。

SparkR 関数のリファレンス

SparkR 関数の最新のリファレンスが spark.apache.org にあります。

SparkR パッケージをインポートした後、R ノートブックまたは RStudio で関数のヘルプを表示することもできます。

埋め込み R ドキュメント

ノートブックでの SparkR

  • Spark 2.0 以降では、すべての関数呼び出しに sqlContext オブジェクトを明示的に渡す必要はありません。
  • Spark 2.2 以降では、SparkR 関数は他の一般的なパッケージからの同様の名前の関数と競合しているため、ノートブックでは SparkR が既定でインポートされなくなりました。 SparkR を使用するには、ノートブックで library(SparkR) を呼び出します。 SparkR セッションは既に構成されており、すべての SparkR 関数は、既存のセッションを使用して接続されたクラスターとやり取りします。

spark-submit ジョブでの Spark

軽微なコード変更を行うことで、Azure Databricks で SparkR を使用するスクリプトを spark-submit ジョブとして実行できます。

SparkR DataFrames を作成する

DataFrame は、ローカル R data.frame から、データ ソースから、または Spark SQL クエリを使用して作成できます。

ローカル R data.frame から

DataFrame を作成する最も簡単な方法は、ローカル R data.frameSparkDataFrame に変換することです。 具体的には、createDataFrame を使用してローカル R data.frame を渡し、SparkDataFrame を作成することができます。 他のほとんどの SparkR 関数と同様、createDataFrame 構文は Spark 2.0 で変更されました。 この例は、次のコード スニペットで確認できます。 その他の例については、「createDataFrame」を参照してください。

library(SparkR)
df <- createDataFrame(faithful)

# Displays the content of the DataFrame to stdout
head(df)

データ ソース API の使用

データ ソースから DataFrame を作成するための一般的なメソッドは、read.df です。 このメソッドでは、読み込むファイルのパスとデータ ソースの種類を受け取ります。 SparkR では、CSV、JSON、テキスト、Parquet の各ファイルをネイティブに読み取ることができます。

library(SparkR)
diamondsDF <- read.df("/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv", source = "csv", header="true", inferSchema = "true")
head(diamondsDF)

SparkR では、CSV ファイルからスキーマが自動的に推測されます。

Spark パッケージを使用したデータ ソース コネクタの追加

Spark パッケージを使用して、Avro などの一般的なファイル形式のデータ ソース コネクタを見つけることができます。 例として、spark-avro パッケージを使用して Avro ファイルを読み込みます。 spark-avro パッケージが使用できるかどうかは、クラスターのバージョンによって異なります。 「Avro ファイル」を参照してください。

まず、既存の data.frame を取得し、Spark DataFrame に変換して、Avro ファイルとして保存します。

require(SparkR)
irisDF <- createDataFrame(iris)
write.df(irisDF, source = "com.databricks.spark.avro", path = "dbfs:/tmp/iris.avro", mode = "overwrite")

Avro ファイルが保存されたことを確認するには、次のようにします。

%fs ls /tmp/iris.avro

次に、spark-avro パッケージをもう一度使用して、データを読み取ります。

irisDF2 <- read.df(path = "/tmp/iris.avro", source = "com.databricks.spark.avro")
head(irisDF2)

データ ソース API を使用して、DataFrame を複数のファイル形式に保存することもできます。 たとえば、write.df を使用して、前の例の DataFrame を Parquet ファイルに保存できます。

write.df(irisDF2, path="dbfs:/tmp/iris.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/iris.parquet

Spark SQL クエリから

Spark SQL クエリを使用して SparkR DataFrame を作成することもできます。

# Register earlier df as temp view
createOrReplaceTempView(irisDF2, "irisTemp")
# Create a df consisting of only the 'species' column using a Spark SQL query
species <- sql("SELECT species FROM irisTemp")

species は SparkDataFrame です。

DataFrame 操作

Spark DataFrame では、構造化データ処理を行うための多くの関数がサポートされています。 ここでは基本的な例の一部を示します。 完全な一覧については API のドキュメントを参照してください。

行と列の選択

# Import SparkR package if this is a new notebook
require(SparkR)

# Create DataFrame
df <- createDataFrame(faithful)
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

グループ化と集計

SparkDataFrame では、グループ化後にデータを集計するために一般的に使用される関数が多数サポートされています。 たとえば、各待機時間が faithful データセットに表示される回数をカウントできます。

head(count(groupBy(df, df$waiting)))
# You can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

列の操作

SparkR には、データ処理と集計のために列に直接適用できる多数の関数が用意されています。 基本的な算術関数の使用例を次に示します。

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

機械学習

SparkR では、ほとんどの MLLib アルゴリズムが公開されています。 内部的には、SparkR ではモデルをトレーニングするために MLlib が使用されます。

次の例は、SparkR を使用してガウス GLM モデルを作成する方法を示しています。 線形回帰を実行するには、ファミリを "gaussian" に設定します。 ロジスティック回帰を実行するには、ファミリを "binomial" に設定します。 SparkML GLM を使用するとき、SparkR ではカテゴリの特徴のワンホット エンコードが自動的に実行されることで、手動で行う必要がなくなります。 他の MLlib コンポーネントとの互換性のために、String と Double 型の機能に加えて、MLlib Vector の特徴に合わせることもできます。

# Create the DataFrame
df <- createDataFrame(iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

チュートリアルについては、「チュートリアル: glm を使用してデータを分析する」を参照してください。

その他の例については、「R で DataFrame とテーブルを操作する」を参照してください。