Tutoriel : charger et transformer des données en utilisant des DataFrames Apache Spark
Ce tutoriel vous montre comment charger et transformer des données en utilisant l’API Apache Spark Python (PySpark) DataFrame, l’API Apache Spark Scala DataFrame et l’API SparkR SparkDataFrame dans Azure Databricks.
À la fin de ce tutoriel, vous saurez ce qu’est un DataFrame et serez en mesure d’effectuer les tâches suivantes :
Python
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un DataFrame avec Python
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans PySpark
Consultez également la référence de l’API PySpark Apache Spark.
Scala
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un DataFrame avec Scala
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans Apache Spark
Consultez également Référence de l’API Scala Apache Spark.
R
- Définir des variables et copier des données publiques dans un volume Unity Catalog
- Créer un SparkDataFrame SparkR
- Charger des données dans un DataFrame à partir d’un fichier CSV
- Afficher un DataFrame et interagir avec celui-ci
- Enregistrer le DataFrame
- Exécuter des requêtes SQL dans SparkR
Consultez également les informations de référence sur l’API Apache SparkR.
Qu’est-ce qu’un DataFrame ?
Un DataFrame est une structure de données étiquetée à deux dimensions avec des colonnes de types potentiellement différents. Vous pouvez considérer un DataFrame comme une feuille de calcul, une table SQL ou un dictionnaire d’objets de série. Les DataFrames Apache Spark offre un ensemble complet de fonctions (select columns, filter, join, aggregate, etc.) qui vous permettent de résoudre efficacement les problèmes courants d’analyse des données.
Les DataFrames Apache Spark sont une abstraction basée sur des jeux de données distribués résilients (RDD). Les DataFrames Spark et Spark SQL utilisent un moteur de planification et d’optimisation unifié, ce qui vous permet d’obtenir des performances presque identiques dans tous les langages pris en charge sur Azure Databricks (Python, SQL, Scala et R).
Spécifications
Pour suivre le tutoriel ci-après, les conditions suivantes doivent être remplies :
Pour utiliser les exemples de ce tutoriel, Unity Catalog doit être activé dans votre espace de travail.
Les exemples de ce tutoriel utilisent un volume Unity Catalog pour stocker des exemples de données. Pour utiliser ces exemples, créez un volume et utilisez les noms de catalogue, de schéma et de volume associés à ce volume afin de définir le chemin d’accès au volume utilisé par les exemples.
Vous devez disposer des autorisations suivantes dans Unity Catalog :
READ VOLUME
etWRITE VOLUME
, ouALL PRIVILEGES
pour le volume utilisé dans ce tutoriel.USE SCHEMA
ouALL PRIVILEGES
pour le schéma utilisé dans ce tutoriel.USE CATALOG
ouALL PRIVILEGES
pour le catalogue utilisé dans ce tutoriel.
Pour définir ces autorisations, contactez votre administrateur Databricks ou consultez Privilèges et objets sécurisables dans Unity Catalog.
Conseil
Pour découvrir un notebook terminé pour cet article, voir Notebook du tutoriel sur les DataFrames.
Étape 1 : définir des variables et charger un fichier CSV
Cette étape définit les variables à utiliser dans ce tutoriel, puis charge un fichier CSV de noms de bébé provenant de health.data.ny.gov dans votre volume Unity Catalog.
Ouvrez un nouveau notebook en cliquant sur l’icône . Pour savoir comment naviguer dans les notebooks Azure Databricks, consultez Interface et contrôles du notebook Databricks.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Remplacez
<catalog-name>
,<schema-name>
et<volume-name>
par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog. Remplacez<table_name>
par le nom de table de votre choix. Plus loin dans ce tutoriel, vous chargerez des noms de bébé dans cette table.Appuyez sur
Shift+Enter
pour exécuter la cellule et créer une cellule vide.Python
catalog = "<catalog_name>" schema = "<schema_name>" volume = "<volume_name>" download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name = "rows.csv" table_name = "<table_name>" path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume path_tables = catalog + "." + schema print(path_tables) # Show the complete path print(path_volume) # Show the complete path
Scala
val catalog = "<catalog_name>" val schema = "<schema_name>" val volume = "<volume_name>" val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" val file_name = "rows.csv" val table_name = "<table_name>" val path_volume = s"/Volumes/$catalog/$schema/$volume" val path_tables = s"$catalog.$schema.$table_name" print(path_volume) // Show the complete path print(path_tables) // Show the complete path
R
catalog <- "<catalog_name>" schema <- "<schema_name>" volume <- "<volume_name>" download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv" file_name <- "rows.csv" table_name <- "<table_name>" path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "") path_tables <- paste(catalog, ".", schema, sep = "") print(path_volume) # Show the complete path print(path_tables) # Show the complete path
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code copie le fichier
rows.csv
de health.data.ny.gov dans votre volume Unity Catalog à l’aide de la commande Databricks dbutuils.Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.Python
dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
Scala
dbutils.fs.cp(download_url, s"$path_volume/$file_name")
R
dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
Étape 2 : créer un DataFrame
Cette étape crée un DataFrame nommé df1
avec des données de test, puis affiche son contenu.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code crée le Dataframe avec des données de test, puis affiche le contenu et le schéma du DataFrame.
Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.Python
data = [[2021, "test", "Albany", "M", 42]] columns = ["Year", "First_Name", "County", "Sex", "Count"] df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int") display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
Scala
val data = Seq((2021, "test", "Albany", "M", 42)) val columns = Seq("Year", "First_Name", "County", "Sex", "Count") val df1 = data.toDF(columns: _*) display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization. // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
R
# Load the SparkR package that is already preinstalled on the cluster. library(SparkR) data <- data.frame( Year = c(2021), First_Name = c("test"), County = c("Albany"), Sex = c("M"), Count = c(42) ) df1 <- createDataFrame(data) display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization. # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
Étape 3 : charger des données dans un DataFrame à partir d’un fichier CSV
Cette étape crée un DataFrame nommé df_csv
à partir du fichier CSV précédemment chargé dans votre volume Unity Catalog. Consultez spark.read.csv.
Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code charge les noms de bébé dans le DataFrame
df_csv
à partir du fichier CSV, puis affiche le contenu du DataFrame.Appuyez sur
Shift+Enter
pour exécuter la cellule et passer à la cellule suivante.Python
df_csv = spark.read.csv(f"{path_volume}/{file_name}", header=True, inferSchema=True, sep=",") display(df_csv)
Scala
val df_csv = spark.read .option("header", "true") .option("inferSchema", "true") .option("delimiter", ",") .csv(s"$path_volume/$file_name") display(df_csv)
R
df_csv <- read.df(paste(path_volume, "/", file_name, sep=""), source="csv", header = TRUE, inferSchema = TRUE, delimiter = ",") display(df_csv)
Vous pouvez charger des données à partir de nombreux formats de fichiers pris en charge.
Étape 4 : afficher votre DataFrame et interagir avec celui-ci
Utilisez les méthodes suivantes pour afficher vos DataFrames de noms de bébé et interagir avec eux.
Imprimer le schéma DataFrame
Découvrez comment afficher le schéma d’un DataFrame Apache Spark. Apache Spark utilise le terme schéma pour désigner les noms et les types de données des colonnes du DataFrame.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code montre le schéma de vos DataFrames avec la méthode .printSchema()
pour afficher les schémas des deux DataFrames et préparer l’union des deux DataFrames.
Python
df_csv.printSchema()
df1.printSchema()
Scala
df_csv.printSchema()
df1.printSchema()
R
printSchema(df_csv)
printSchema(df1)
Remarque
Azure Databricks utilise également le terme schéma pour décrire une collection de tables inscrites dans un catalogue.
Renommer une colonne dans le DataFrame
Découvrez comment renommer une colonne dans un DataFrame.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code renomme une colonne dans le DataFrame df1_csv
pour qu’elle reflète la colonne correspondante du DataFrame df1
. Ce code utilise la méthode Apache Spark withColumnRenamed()
.
Python
df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema
Scala
val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()
R
df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)
Combiner des DataFrames
Découvrez comment créer un DataFrame qui ajoute les lignes d’un DataFrame à un autre.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark union()
pour combiner le contenu de votre premier DataFrame df
avec le DataFrame df_csv
contenant les noms de bébé chargés à partir du fichier CSV.
Python
df = df1.union(df_csv)
display(df)
Scala
val df = df1.union(df_csv_renamed)
display(df)
R
display(df <- union(df1, df_csv))
Filtrer des lignes dans un DataFrame
Découvrez les noms de bébé les plus populaires dans votre jeu de données en filtrant les lignes à l’aide des méthodes Apache Spark .filter()
ou .where()
. Utilisez le filtrage pour sélectionner un sous-ensemble de lignes à retourner ou modifier dans un DataFrame. Il n’existe aucune différence de performances ou de syntaxe, comme indiqué dans les exemples suivants.
Utilisation de la méthode .filter()
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .filter()
pour afficher les lignes du DataFrame avec un nombre supérieur à 50.
Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))
Utilisation de la méthode .where()
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .where()
pour afficher les lignes du DataFrame avec un nombre supérieur à 50.
Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))
Sélectionner les colonnes d’un DataFrame et les classer par fréquence
Découvrez la fréquence des noms de bébé avec la méthode select()
pour spécifier les colonnes du DataFrame à retourner. Utilisez les fonctions Apache Spark orderby
et desc
pour trier les résultats.
Le module pyspark.sql pour Apache Spark prend en charge les fonctions SQL. Parmi les fonctions que nous utilisons dans ce tutoriel, citons les fonctions Apache Spark orderBy()
, desc()
et expr()
. Vous activez l’utilisation de ces fonctions en les important dans votre session si nécessaire.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction desc()
, puis utilise la méthode Apache Spark select()
et les fonctions Apache Spark orderBy()
et desc()
pour afficher les noms les plus courants et leur nombre par ordre décroissant.
Python
from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
Scala
import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))
R
display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))
Créer un sous-ensemble DataFrame
Découvrez comment créer un DataFrame de sous-ensemble à partir d’un DataFrame existant.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark filter
pour créer un DataFrame limitant les données par année, nombre et sexe. Il utilise la méthode Apache Spark select()
pour limiter les colonnes. Il utilise également les fonctions Apache Spark orderBy()
et desc()
pour trier le nouveau DataFrame par nombre.
Python
subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
Scala
val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)
R
subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)
Étape 5 : enregistrer le DataFrame
Découvrez comment enregistrer un DataFrame. Vous pouvez enregistrer votre DataFrame dans une table ou écrire le DataFrame dans un ou plusieurs fichiers.
Enregistrer le DataFrame dans une table
Azure Databricks utilise par défaut le format Delta Lake pour toutes les tables. Pour enregistrer votre DataFrame, vous devez avoir des privilèges de table CREATE
sur le catalogue et le schéma.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le contenu du DataFrame dans une table à l’aide de la variable définie au début de ce tutoriel.
Python
df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")
# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")
Scala
df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")
// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")
R
saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")
La plupart des applications Apache Spark fonctionnent sur des jeux de données volumineux de manière distribuée. Apache Spark écrit un répertoire de fichiers plutôt qu’un fichier unique. Delta Lake fractionne les dossiers et fichiers Parquet. De nombreux systèmes de données peuvent lire ces répertoires de fichiers. Azure Databricks recommande l’utilisation de tables plutôt que des chemins d’accès aux fichiers pour la plupart des applications.
Enregistrer le DataFrame dans des fichiers JSON
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le DataFrame dans un répertoire de fichiers JSON.
Python
df.write.format("json").save("/tmp/json_data")
# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")
Scala
df.write.format("json").save("/tmp/json_data")
// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")
R
write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")
Lire le DataFrame à partir d’un fichier JSON
Découvrez comment utiliser la méthode Apache Spark spark.read.format()
pour lire les données JSON à partir d’un répertoire dans un DataFrame.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code affiche les fichiers JSON enregistrés dans l’exemple précédent.
Python
display(spark.read.format("json").json("/tmp/json_data"))
Scala
display(spark.read.format("json").json("/tmp/json_data"))
R
display(read.json("/tmp/json_data"))
Tâches supplémentaires : exécuter des requêtes SQL dans PySpark, Scala et R
Les DataFrames Apache Spark fournissent les options suivantes pour combiner SQL avec PySpark, Scala et R. Vous pouvez exécuter le code suivant dans le même notebook que celui que vous avez créé pour ce tutoriel.
Spécifier une colonne comme requête SQL
Découvrez comment utiliser la méthode Apache Spark selectExpr()
. Il s’agit d’une variante de la méthode select()
qui accepte les expressions SQL et retourne un DataFrame mis à jour. Cette méthode vous permet d’utiliser une expression SQL, telle que upper
.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark selectExpr()
et l’expression SQL upper
pour convertir une colonne de chaîne en majuscules (et renommer la colonne).
Python
display(df.selectExpr("Count", "upper(County) as big_name"))
Scala
display(df.selectExpr("Count", "upper(County) as big_name"))
R
display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))
Utiliser expr()
pour utiliser la syntaxe SQL d’une colonne
Découvrez comment importer et utiliser la fonction Apache Spark expr()
pour utiliser la syntaxe SQL partout où une colonne peut être spécifiée.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction expr()
, puis utilise la fonction Apache Spark expr()
et l’expression SQL lower
pour convertir une colonne de chaîne en minuscules (et renommer la colonne).
Python
from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))
Scala
import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function
display(df.select(col("Count"), expr("lower(County) as little_name")))
R
display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality
Exécuter une requête SQL arbitraire à l’aide de la fonction spark.sql()
Découvrez comment utiliser la fonction Apache Spark spark.sql()
pour exécuter des requêtes SQL arbitraires.
Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la fonction Apache Spark spark.sql()
pour interroger une table SQL en utilisant la syntaxe SQL.
Python
display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))
Scala
display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))
R
display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))
Notebook du tutoriel sur les DataFrames
Le notebook suivant inclut les exemples de requêtes de ce tutoriel.
Python
Tutoriel DataFrames en utilisant un notebook Python
Scala
Tutoriel DataFrames en utilisant un notebook Scala
R
Tutoriel DataFrames en utilisant un notebook R
Ressources supplémentaires
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de 2024, nous allons supprimer progressivement GitHub Issues comme mécanisme de commentaires pour le contenu et le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultezEnvoyer et afficher des commentaires pour