Översikt över SparkR 1.6

Anteckning

Information om det senaste SparkR-biblioteket finns i Azure Databricks för R-utvecklare.

SparkR är ett R-paket som tillhandahåller en lättvikts-frontend för användning av Apache Spark från R. Från och med Spark 1.5.1 tillhandahåller SparkR en distribuerad DataFrame-implementering som stöder åtgärder som urval, filtrering och aggregering (liknar R-dataramar och dplyr) men på stora datauppsättningar. SparkR stöder också distribuerad maskininlärning med hjälp av MLlib.

Skapa SparkR DataFrames

Program kan skapa DataFrames från en lokal R-dataram, från datakällor eller använda Spark SQL frågor.

Det enklaste sättet att skapa en DataFrame är att konvertera en lokal R-dataram till en SparkR DataFrame. Mer specifikt kan vi använda skapa en DataFrame och skicka in den lokala R-dataramen för att skapa en SparkR DataFrame. Till exempel skapar följande cell en DataFrame med hjälp av den grafiska datauppsättningen från R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

Från datakällor som använder Spark SQL

Den allmänna metoden för att skapa DataFrames från datakällor är read.df. Den här metoden använder SQLContext, sökvägen för filen som ska läsas in och typen av datakälla. SparkR stöder läsning av JSON- och Parquet-filer inbyggt och via Spark-paket kan du hitta anslutningsappar för datakällor för populära filformat som CSV och Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

SparkR härför automatiskt schemat från JSON-filen.

printSchema(people)
display(people)

Använda anslutningsappar för datakällor med Spark-paket

Vi använder till exempel Spark CSV-paketet för att läsa in en CSV-fil. Du hittar en lista över Spark-paket från Databricks här.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

API:et för datakällor kan också användas för att spara DataFrames i flera filformat. Vi kan till exempel spara DataFrame från föregående exempel till en Parquet-fil med write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Från Spark SQL frågor

Du kan också skapa SparkR DataFrames med Spark SQL frågor.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

DataFrame-åtgärder

SparkR DataFrames stöder ett antal funktioner för strukturerad databearbetning. Här inkluderar vi några grundläggande exempel och en fullständig lista finns i API-dokumentationen.

Välja rader och kolumner

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Gruppering och aggregering

SparkR DataFrames stöder ett antal vanliga funktioner för att aggregera data efter gruppering. Vi kan till exempel räkna antalet gånger som varje väntetid visas i den grafiska datamängden.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Kolumnåtgärder

SparkR innehåller ett antal funktioner som kan tillämpas direkt på kolumner för databearbetning och aggregering. I följande exempel visas användningen av grundläggande aritmetiska funktioner.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Maskininlärning

Från och med Spark 1.5 tillåter SparkR anpassning av generaliserade linjära modeller över SparkR DataFrames med hjälp av funktionen glm(). Under huven använder SparkR MLlib för att träna en modell av den angivna familjen. Vi stöder en delmängd av de tillgängliga R-formeloperatorer för modellkoppling, inklusive "~", ".", "+" och "-".

Under huven utför SparkR automatiskt one-hot-kodning av kategoriska funktioner så att det inte behöver göras manuellt. Utöver funktionerna för Sträng- och Double-typ är det också möjligt att passa över MLlib Vector-funktioner för kompatibilitet med andra MLlib-komponenter.

I följande exempel visas användningen av att skapa en gaussisk GLM-modell med SparkR. Om du vill köra linjär regression anger du familj till "gaussian". Om du vill köra Logistic Regression anger du familj till "binomial".

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Konvertera lokala R-dataramar till SparkR DataFrames

Du kan använda för createDataFrame att konvertera lokala R-dataramar till SparkR DataFrames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)