Overzicht van SparkR 1.6

Notitie

Zie de Azure Databricks voor R-ontwikkelaarsvoor meer informatie over de meest recente SparkR-bibliotheek.

SparkR is een R-pakket dat een lichtgewicht front-end biedt voor het gebruik van Apache Spark van R. Vanaf Spark 1.5.1 biedt SparkR een gedistribueerde DataFrame-implementatie die ondersteuning biedt voor bewerkingen zoals selectie, filtering en aggregatie (vergelijkbaar met R-gegevensframes en dplyr), maar op grote gegevenssets. SparkR ondersteunt ook gedistribueerde machine learning MLlib.

SparkR DataFrames maken

Toepassingen kunnen DataFrames maken op basis van een lokaal R-gegevensframe, vanuit gegevensbronnen of met behulp van Spark SQL query's.

De eenvoudigste manier om een DataFrame te maken, is door een lokaal R-gegevensframe te converteren naar een SparkR DataFrame. We kunnen specifiek een DataFrame maken en het lokale R-gegevensframe doorgeven om een SparkR DataFrame te maken. De volgende cel maakt bijvoorbeeld een DataFrame met behulp van de gegevensset van R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

Uit gegevensbronnen die Spark SQL

De algemene methode voor het maken van DataFrames op basis van gegevensbronnen is read.df. Bij deze methode worden de SQLContext, het pad voor het bestand dat moet worden geladen en het type gegevensbron gebruikt. SparkR ondersteunt het native lezen van JSON- en Parquet-bestanden en via Spark-pakketten kunt u gegevensbronconnectoren vinden voor populaire bestandsindelingen zoals CSV en Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR afleiden het schema automatisch uit het JSON-bestand.

printSchema(people)
display(people)

Gegevensbronconnectoren gebruiken met Spark-pakketten

Als voorbeeld gebruiken we het Spark CSV-pakket om een CSV-bestand te laden. U vindt hier een lijst met Spark-pakketten van Databricks.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

De GEGEVENSBRONNEN-API kan ook worden gebruikt om DataFrames op te slaan in meerdere bestandsindelingen. We kunnen bijvoorbeeld het DataFrame uit het vorige voorbeeld opslaan in een Parquet-bestand met behulp van write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Vanuit Spark SQL query's

U kunt ook SparkR DataFrames maken met behulp van Spark SQL query's.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

DataFrame-bewerkingen

SparkR DataFrames ondersteunen een aantal functies voor gestructureerde gegevensverwerking. Hier vindt u enkele basisvoorbeelden en een volledige lijst in de API-documenten.

Rijen en kolommen selecteren

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Groeperen en aggregeren

SparkR DataFrames ondersteunen een aantal veelgebruikte functies voor het samenvoegen van gegevens na het groeperen. We kunnen bijvoorbeeld het aantal keren tellen dat elke wachttijd wordt weergegeven in de gegevensset.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Kolombewerkingen

SparkR biedt een aantal functies die rechtstreeks kunnen worden toegepast op kolommen voor gegevensverwerking en aggregatie. In het volgende voorbeeld ziet u het gebruik van eenvoudige rekenkundige functies.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Machine learning

Vanaf Spark 1.5 kan SparkR ge generaliseerde lineaire modellen aanpassen via SparkR DataFrames met behulp van de functie glm(). Achter de komma gebruikt SparkR MLlib om een model van de opgegeven familie te trainen. We ondersteunen een subset van de beschikbare R-formuleoperators voor modelfitting, waaronder ~, ., +en -.

Achter de hand voert SparkR automatisch one-hot codering van categorische functies uit, zodat dit niet handmatig hoeft te worden gedaan. Naast de functies Tekenreeks en Dubbel type is het ook mogelijk om over MLlib Vector-functies te passen voor compatibiliteit met andere MLlib-onderdelen.

In het volgende voorbeeld ziet u het gebruik van het bouwen van een gaussisch GLM-model met behulp van SparkR. Als u lineaire regressie wilt uitvoeren, stelt u family in op 'gaussian'. Als u logistieke regressie wilt uitvoeren, stelt u family in op 'binomial'.

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Lokale R-gegevensframes converteren naar SparkR DataFrames

U kunt gebruiken om createDataFrame lokale R-gegevensframes te converteren naar SparkR DataFrames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)