Vue d’ensemble de SparkR 1.6

Notes

Pour plus d’informations sur la dernière bibliothèque Sparkr, consultez Azure Databricks pour les développeurs R.

Sparkr est un package R qui fournit un frontend léger à utiliser Apache Spark à partir de R. à partir de Spark 1.5.1, Sparkr fournit une Implémentation de tableau qui prend en charge des opérations telles que la sélection filtrage et agrégation (similaire aux trames de données R et dplyr), mais sur Jeux de données volumineux. Sparkr prend également en charge les Machine Learning distribués à l’aide de MLlib.

Création de trames Sparkr

les Applications peuvent créer des trames à partir d’une trame de données R locale, à partir de sources de données ou à l’aide de requêtes Spark SQL.

La façon la plus simple de créer un tableau consiste à convertir une trame de données R locale en tableau Sparkr. Plus précisément, nous pouvons utiliser créer un tableau et transmettre la trame de données R locale pour créer un tableau Sparkr. Par exemple, la cellule suivante crée un tableau à l’aide du jeu de données fidèle de R.

df <- createDataFrame(sqlContext, faithful)

# Displays the content of the DataFrame to stdout
head(df)

À partir de sources de données à l’aide d’Spark SQL

La méthode générale pour créer des trames à partir de sources de données est Read. DF. Cette méthode prend le SQLContext, le chemin d’accès du fichier à charger et type de source de données. Sparkr prend en charge la lecture des fichiers JSON et parquet les connecteurs de source de données sont disponibles en mode natif et par le biais de packages Spark pour les formats de fichiers populaires tels que CSV et Avro.

%fs rm dbfs:/tmp/people.json
%fs put dbfs:/tmp/people.json
'{"age": 10, "name": "John"}
{"age": 20, "name": "Jane"}
{"age": 30, "name": "Andy"}'
people <- read.df(sqlContext, "dbfs:/tmp/people.json", source="json")

Sparkr déduit automatiquement le schéma à partir du fichier JSON.

printSchema(people)
display(people)

Utilisation de connecteurs de source de données avec des packages Spark

Par exemple, nous allons utiliser le package de volume partagé de cluster Spark pour charger un fichier CSV. Avoir peut trouver une liste de packages Spark par Databricks ici.

diamonds <- read.df(sqlContext, "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv",
                    source = "com.databricks.spark.csv", header="true", inferSchema = "true")
head(diamonds)

L’API de sources de données peut également être utilisée pour enregistrer des trames dans formats de fichiers multiples. Par exemple, nous pouvons enregistrer le tableau à partir du exemple précédent vers un fichier parquet à l’aide de Write. DF

%fs rm -r dbfs:/tmp/people.parquet
write.df(people, path="dbfs:/tmp/people.parquet", source="parquet", mode="overwrite")
%fs ls dbfs:/tmp/people.parquet

à partir de requêtes Spark SQL

vous pouvez également créer des trames sparkr à l’aide de requêtes spark SQL.

# Register earlier df as temp view
createOrReplaceTempView(people, "peopleTemp")
# Create a df consisting of only the 'age' column using a Spark SQL query
age <- sql(sqlContext, "SELECT age FROM peopleTemp")
head(age)
# Resulting df is a SparkR df
str(age)

Opérations tableau

Sparkr trames prend en charge un certain nombre de fonctions pour effectuer des données structurées traitée. Ici, nous incluons des exemples de base et une liste complète peut se trouve dans l' API documents.

Sélection de lignes et de colonnes

# Create DataFrame
df <- createDataFrame(sqlContext, faithful)
df
# Select only the "eruptions" column
head(select(df, df$eruptions))
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the DataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))

Regroupement et agrégation

Sparkr trames prend en charge un certain nombre de fonctions couramment utilisées pour agréger les données après le regroupement. Par exemple, nous pouvons compter le nombre de nombre de fois où chaque temps d’attente s’affiche dans le jeu de données fidèle.

head(count(groupBy(df, df$waiting)))
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- count(groupBy(df, df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))

Opérations sur les colonnes

Sparkr fournit un certain nombre de fonctions qui peuvent être appliquées directement à colonnes pour le traitement et l’agrégation des données. L’exemple suivant montre le utilisation de fonctions arithmétiques de base.

# Convert waiting time from hours to seconds.
# You can assign this to a new column in the same DataFrame
df$waiting_secs <- df$waiting * 60
head(df)

Apprentissage automatique

À partir de Spark 1,5, Sparkr permet d’ajuster les modèles linéaires généralisés sur Sparkr trames à l’aide de la fonction GLM (). En coulisses, Sparkr utilise MLlib pour effectuer l’apprentissage d’un modèle de la famille spécifiée. Nous prenons en charge un sous-ensemble des opérateurs de formule R disponibles pour le modèle d’ajustement, y compris « ~ », '. ', ' + 'et'-'.

En coulisses, Sparkr effectue automatiquement un encodage à chaud de fonctionnalités catégoriques pour qu’elles n’aient pas besoin d’être effectuées manuellement. Outre les fonctionnalités de chaîne et de type double, il est également possible de faire tenir sur Fonctionnalités vectorielles MLlib, pour la compatibilité avec d’autres composants MLlib.

L’exemple suivant illustre l’utilisation de la création d’un modèle GLM gaussien à l’aide de SparkR. Pour exécuter la régression linéaire, affectez à Family la valeur « gaussienne ». Pour exécuter Régression logistique, définissez la famille sur « binomiale ».

# Create the DataFrame
df <- createDataFrame(sqlContext, iris)

# Fit a linear model over the dataset.
model <- glm(Sepal_Length ~ Sepal_Width + Species, data = df, family = "gaussian")

# Model coefficients are returned in a similar format to R's native glm().
summary(model)

Conversion de trames de données R locales en Spark-trames

Vous pouvez utiliser createDataFrame pour convertir les trames de données R locales en sparkr trames.

# Create SparkR DataFrame using localDF
convertedSparkDF <- createDataFrame(sqlContext, localDF)
str(convertedSparkDF)
# Another example: Create SparkR DataFrame with a local R data frame
anotherSparkDF <- createDataFrame(sqlContext, data.frame(surname = c("Tukey", "Venables", "Tierney", "Ripley", "McNeil"),
                                                         nationality = c("US", "Australia", "US", "UK", "Australia"),
                                                         deceased = c("yes", rep("no", 4))))
count(anotherSparkDF)