SparkR 1.6: Übersicht

Hinweis

Informationen zur aktuellen SparkR-Bibliothek finden Sie unter Azure Databricks für R-Entwickler.

SparkR ist ein R-Paket, das ein leichtes Front-End für die Verwendung von Apache Spark von R bietet. Ab Spark 1.5.1 bietet SparkR eine verteilte DataFrame-Implementierung, die Vorgänge wie Auswahl, Filterung und Aggregation unterstützt (ähnlich wie R-Datenrahmen und dplyr), aber für große Datasets. SparkR unterstützt auch verteiltes maschinelles Lernen mithilfe von MLlib.

Erstellen von SparkR-DataFrames

Anwendungen können DataFrames aus einem lokalen R-Datenrahmen, aus Datenquellen oder mithilfe von Spark-SQL erstellen.

Die einfachste Möglichkeit zum Erstellen eines Datenrahmens besteht in der Konvertierung eines lokalen R-Datenrahmens in einen SparkR-Datenrahmen. Insbesondere können wir verwenden, um einen Datenrahmen zu erstellen und den lokalen R-Datenrahmen zu übergeben, um einen SparkR-Datenrahmen zu erstellen. Die folgende Zelle erstellt z. B. einen DataFrame unter Verwendung des entfernten Datasets aus R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

Aus Datenquellen, die Spark-SQL

Die allgemeine Methode zum Erstellen von DataFrames aus Datenquellen ist read.df. Diese Methode nimmt den SQLContext, den Pfad für die zu ladende Datei und den Typ der Datenquelle an. SparkR unterstützt das native Lesen von JSON- und Parquet-Dateien. Über Spark-Pakete finden Sie Datenquellenconnectors für gängige Dateiformate wie CSV und Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR abgeleitet das Schema automatisch aus der JSON-Datei.

printSchema(people)
display(people)

Verwenden von Datenquellenconnectors mit Spark-Paketen

Als Beispiel verwenden wir das Spark-CSV-Paket, um eine CSV-Datei zu laden. Eine Liste der Spark-Pakete von Databricks finden Sie hier.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

Die Datenquellen-API kann auch verwendet werden, um DataFrames in mehreren Dateiformaten zu speichern. Beispielsweise können wir den DataFrame aus dem vorherigen Beispiel mit write.df in einer Parquet-Datei speichern.

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Aus Spark SQL Abfragen

Sie können SparkR-DataFrames auch mithilfe von Spark-SQL erstellen.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

DataFrame-Vorgänge

SparkR-DataFrames unterstützen eine Reihe von Funktionen für die strukturierte Datenverarbeitung. Hier finden Sie einige grundlegende Beispiele, und eine vollständige Liste finden Sie in der API-Dokumentation.

Auswählen von Zeilen und Spalten

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Gruppierung und Aggregation

SparkR-DataFrames unterstützen eine Reihe häufig verwendeter Funktionen zum Aggregieren von Daten nach der Gruppierung. Beispielsweise können wir die Anzahl der Wartezeiten zählen, die im Dataset für die Zählung der Wartezeiten angezeigt werden.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Spaltenvorgänge (Column operations)

SparkR stellt eine Reihe von Funktionen zur Verfügung, die zur Datenverarbeitung und Aggregation direkt auf Spalten angewendet werden können. Das folgende Beispiel zeigt die Verwendung grundlegender arithmetischer Funktionen.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Machine Learning

Ab Spark 1.5 ermöglicht SparkR die Anpassung generalisierter linearer Modelle über SparkR-DataFrames mithilfe der glm()-Funktion. SparkR verwendet MLlib, um ein Modell der angegebenen Familie zu trainieren. Wir unterstützen eine Teilmenge der verfügbaren R-Formeloperatoren für die Modellanpassung, einschließlich "~", ".", "+" und "-".

Im Lauf der Zeit führt SparkR automatisch eine One-Hot-Codierung kategorischer Features durch, sodass dies nicht manuell erfolgen muss. Neben den Features vom Typ "String" und "Double" ist es auch möglich, MLlib Vector-Features zu überpassen, um die Kompatibilität mit anderen MLlib-Komponenten zu gewährleisten.

Das folgende Beispiel zeigt die Verwendung des Erstellens eines gaußschen GLM-Modells mit SparkR. Legen Sie zum Ausführen der linearen Regression family auf "gassian" fest. Legen Sie zum Ausführen der logistischen Regression family auf "binomial" fest.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Konvertieren lokaler R-Datenrahmen in SparkR-Datenrahmen

Sie können createDataFrame verwenden, um lokale R-Datenrahmen in SparkR-Datenrahmen zu konvertieren.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)