Omówienie usługi SparkR 1.6

Uwaga

Aby uzyskać informacje o najnowszej bibliotece platformy SparkR, zapoznaj się z tematem Azure Databricks dla deweloperów języka R.

SparkR to pakiet R, który zapewnia lekki frontony do korzystania z platformy Apache Spark z oprogramowania R. Począwszy od platformy Spark 1.5.1, platforma SparkR zapewnia rozproszoną implementację ramki danych, która obsługuje operacje takie jak wybór, filtrowanie i agregacja (podobnie jak w przypadku ramek danych i dplyr W R), ale w dużych zestawach danych. Platforma SparkR obsługuje również rozproszone uczenie maszynowe przy użyciu MLlib.

Tworzenie ramek danych platformy SparkR

Aplikacje mogą tworzyć ramki danych z lokalnej ramki danych R, ze źródeł danych lub przy użyciu zapytań SQL Spark.

Najprostszym sposobem utworzenia ramki danych jest przekonwertowanie lokalnej ramki danych R na ramkę danych platformy SparkR. W szczególności możemy użyć funkcji tworzenia ramki danych i przekazania lokalnej ramki danych R w celu utworzenia ramki danych platformy SparkR. Na przykład następująca komórka tworzy ramce danych przy użyciu zestawu danych z zestawu danych z R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

Ze źródeł danych korzystających z platformy Spark SQL

Ogólną metodą tworzenia ramek danych ze źródeł danych jest read.df. Ta metoda przyjmuje sqlcontext, ścieżkę do załadowania pliku i typ źródła danych. Platforma SparkR obsługuje odczytywanie plików JSON i Parquet natywnie, a za pomocą pakietów Spark można znaleźć łączniki źródła danych dla popularnych formatów plików, takich jak CSV i Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

Platforma SparkR automatycznie wywnioskuje schemat z pliku JSON.

printSchema(people)
display(people)

Używanie łączników źródła danych z pakietami Spark

Na przykład użyjemy pakietu CSV platformy Spark do załadowania pliku CSV. Listę pakietów Platformy Spark dla usług Databricksmożna znaleźć tutaj.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

Interfejs API źródeł danych może również służyć do zapisywania ramek danych w wielu formatach plików. Na przykład możemy zapisać ramce danych z poprzedniego przykładu do pliku Parquet przy użyciu pliku write.df

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Zapytania SQL Spark

Możesz również tworzyć ramki danych platformy SparkR przy użyciu zapytań SQL Spark.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

Operacje na ramce danych

Ramki danych platformy SparkR obsługują wiele funkcji do przetwarzania danych strukturalnych. W tym miejscu dołączymy kilka podstawowych przykładów, a pełną listę można znaleźć w dokumentów interfejsu API.

Wybieranie wierszy i kolumn

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Grupowanie i agregacja

Ramki danych platformy SparkR obsługują wiele często używanych funkcji do agregowania danych po grupowania. Na przykład możemy zliczyć, ile razy każdy czas oczekiwania pojawia się w zestawie danych z chłoniaka.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operacje na kolumnach

Platforma SparkR udostępnia wiele funkcji, które można bezpośrednio zastosować do kolumn w celu przetwarzania i agregacji danych. W poniższym przykładzie pokazano użycie podstawowych funkcji arytmetycznych.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Uczenie maszynowe

Od platformy Spark 1.5 platforma SparkR umożliwia dopasowywanie uogólnionych modeli liniowych do ramek danych platformy SparkR przy użyciu funkcji glm(). Pod maską platforma SparkR używa MLlib do trenowania modelu określonej rodziny. Obsługujemy podzbiór dostępnych operatorów formuł r do dopasowywania modelu, w tym "~", ".", "+" i "-".

Pod maską platforma SparkR automatycznie wykonuje kodowanie cech kategorii z gorącą, dzięki czemu nie trzeba tego robić ręcznie. Poza funkcjami typu String i Double można również dopasować funkcje MLlib Vector, aby zapewnić zgodność z innymi składnikami MLlib.

W poniższym przykładzie pokazano użycie tworzenia gaussowego modelu GLM przy użyciu platformy SparkR. Aby uruchomić regresję liniową, ustaw dla rodziny wartość "gaussian". Aby uruchomić regresję logistyczną, ustaw dla rodziny wartość "binomial".

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Konwertowanie lokalnych ramek danych r na ramki danych platformy SparkR

Za pomocą funkcji można konwertować lokalne ramki danych r na ramki danych platformy createDataFrame SparkR.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)