Introducción a SparkR 1.6

Nota

Para más información sobre la biblioteca más reciente de SparkR, consulte el artículo acerca de Azure Databricks para desarrolladores de R.

Sparkr es un paquete de R que proporciona un front-end ligero para usar Apache Spark de R. a partir de Spark 1.5.1, Sparkr proporciona una implementación de trama de datos distribuida que admite operaciones como selección, filtrado y agregación (similar a tramas de datos de R y dplyr), pero en grandes conjuntos de datos. Sparkr también admite el aprendizaje automático distribuido mediante MLlib.

Creación de tramas de marcos de Sparkr

Las aplicaciones pueden crear tramas de datos a partir de una trama de datos local de R, de orígenes de datos o mediante consultas SQL de Spark.

La manera más sencilla de crear una trama de datos es convertir una trama de datos de R local en una trama de datos de Sparkr. En concreto, se puede usar crear una trama de datos y pasar la trama de datos local de R para crear una trama de datos de Sparkr. Por ejemplo, la celda siguiente crea una trama de imágenes con el conjunto de elementos fiel desde R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

De orígenes de datos con Spark SQL

El método general para crear tramas de datos a partir de orígenes de datos es Read. DF. Este método toma SQLContext, la ruta de acceso del archivo que se va a cargar y el tipo de origen de datos. Sparkr admite la lectura de archivos JSON y parquet de forma nativa y a través de paquetes de Spark. puede encontrar conectores de origen de datos para formatos de archivo populares como CSV y Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

Sparkr infiere automáticamente el esquema a partir del archivo JSON.

printSchema(people)
display(people)

Uso de conectores de origen de datos con paquetes Spark

Como ejemplo, usaremos el paquete CSV de Spark para cargar un archivo CSV. Aquí puede encontrar una lista de paquetes de Spark por los bricks delos mismos.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

La API de orígenes de datos también se puede usar para guardar tramas de datos en varios formatos de archivo. Por ejemplo, podemos guardar la trama de archivos del ejemplo anterior en un archivo parquet mediante Write. DF

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

Desde consultas de Spark SQL

También puede crear tramas de marcos de Sparkr mediante consultas de Spark SQL.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

Operaciones de trama de fotogramas

Las tramas de datos de sparkr admiten una serie de funciones para realizar el procesamiento de datos estructurados. Aquí incluimos algunos ejemplos básicos y se puede encontrar una lista completa en los documentosde la API.

Seleccionar filas y columnas

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Agrupación y agregación

Las tramas de datos de sparkr admiten varias funciones de uso frecuente para agregar datos después de la agrupación. Por ejemplo, podemos contar el número de veces que cada tiempo de espera aparece en el conjunto de

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Operaciones de columna

Sparkr proporciona varias funciones que se pueden aplicar directamente a las columnas para el procesamiento y la agregación de datos. En el ejemplo siguiente se muestra el uso de las funciones aritméticas básicas.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Machine Learning

A partir de Spark 1,5, Sparkr permite la instalación de modelos lineales generalizados en tramas de interconexión de Sparkr mediante la función GLM (). En el capó, Sparkr usa MLlib para entrenar un modelo de la familia especificada. Se admite un subconjunto de los operadores de fórmulas de R disponibles para la instalación de modelos, incluidos ' ~ ', '. ', ' + ' y '-'.

En el capó, Sparkr realiza automáticamente una codificación única de las características de categorías para que no sea necesario hacerlo manualmente. Además de las características de tipos de cadena y tipo Double, también es posible ajustarse a las características del vector MLlib, por motivos de compatibilidad con otros componentes de MLlib.

En el ejemplo siguiente se muestra el uso de la creación de un modelo de GLM Gaussiano con Sparkr. Para ejecutar la regresión lineal, establezca Family en "gaussiano". Para ejecutar la regresión logística, establezca Family en "binomial".

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Conversión de tramas de datos de R locales en tramas de datos de Sparkr

Puede usar createDataFrame para convertir tramas de datos locales de R en tramas de datos de sparkr.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)