Tutoriel : charger et transformer des données en utilisant des DataFrames Apache Spark

Ce tutoriel vous montre comment charger et transformer des données en utilisant l’API Apache Spark Python (PySpark) DataFrame, l’API Apache Spark Scala DataFrame et l’API SparkR SparkDataFrame dans Azure Databricks.

À la fin de ce tutoriel, vous saurez ce qu’est un DataFrame et serez en mesure d’effectuer les tâches suivantes :

Python

Consultez également la référence de l’API PySpark Apache Spark.

Scala

Consultez également Référence de l’API Scala Apache Spark.

R

Consultez également les informations de référence sur l’API Apache SparkR.

Qu’est-ce qu’un DataFrame ?

Un DataFrame est une structure de données étiquetée à deux dimensions avec des colonnes de types potentiellement différents. Vous pouvez considérer un DataFrame comme une feuille de calcul, une table SQL ou un dictionnaire d’objets de série. Les DataFrames Apache Spark offre un ensemble complet de fonctions (select columns, filter, join, aggregate, etc.) qui vous permettent de résoudre efficacement les problèmes courants d’analyse des données.

Les DataFrames Apache Spark sont une abstraction basée sur des jeux de données distribués résilients (RDD). Les DataFrames Spark et Spark SQL utilisent un moteur de planification et d’optimisation unifié, ce qui vous permet d’obtenir des performances presque identiques dans tous les langages pris en charge sur Azure Databricks (Python, SQL, Scala et R).

Spécifications

Pour suivre le tutoriel ci-après, les conditions suivantes doivent être remplies :

  • Pour utiliser les exemples de ce tutoriel, Unity Catalog doit être activé dans votre espace de travail.

  • Les exemples de ce tutoriel utilisent un volume Unity Catalog pour stocker des exemples de données. Pour utiliser ces exemples, créez un volume et utilisez les noms de catalogue, de schéma et de volume associés à ce volume afin de définir le chemin d’accès au volume utilisé par les exemples.

  • Vous devez disposer des autorisations suivantes dans Unity Catalog :

    • READ VOLUME et WRITE VOLUME, ou ALL PRIVILEGES pour le volume utilisé dans ce tutoriel.
    • USE SCHEMA ou ALL PRIVILEGES pour le schéma utilisé dans ce tutoriel.
    • USE CATALOG ou ALL PRIVILEGES pour le catalogue utilisé dans ce tutoriel.

    Pour définir ces autorisations, contactez votre administrateur Databricks ou consultez Privilèges et objets sécurisables dans Unity Catalog.

Conseil

Pour découvrir un notebook terminé pour cet article, voir Notebook du tutoriel sur les DataFrames.

Étape 1 : définir des variables et charger un fichier CSV

Cette étape définit les variables à utiliser dans ce tutoriel, puis charge un fichier CSV de noms de bébé provenant de health.data.ny.gov dans votre volume Unity Catalog.

  1. Ouvrez un nouveau notebook en cliquant sur l’icône Nouvelle icône. Pour savoir comment naviguer dans les notebooks Azure Databricks, consultez Interface et contrôles du notebook Databricks.

  2. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Remplacez <catalog-name>, <schema-name> et <volume-name> par les noms de catalogue, de schéma et de volume d’un volume Unity Catalog. Remplacez <table_name> par le nom de table de votre choix. Plus loin dans ce tutoriel, vous chargerez des noms de bébé dans cette table.

  3. Appuyez sur Shift+Enter pour exécuter la cellule et créer une cellule vide.

    Python

    catalog = "<catalog_name>"
    schema = "<schema_name>"
    volume = "<volume_name>"
    download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name = "rows.csv"
    table_name = "<table_name>"
    path_volume = "/Volumes/" + catalog + "/" + schema + "/" + volume
    path_tables = catalog + "." + schema
    print(path_tables) # Show the complete path
    print(path_volume) # Show the complete path
    

    Scala

    val catalog = "<catalog_name>"
    val schema = "<schema_name>"
    val volume = "<volume_name>"
    val download_url = "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    val file_name = "rows.csv"
    val table_name = "<table_name>"
    val path_volume = s"/Volumes/$catalog/$schema/$volume"
    val path_tables = s"$catalog.$schema.$table_name"
    print(path_volume) // Show the complete path
    print(path_tables) // Show the complete path
    

    R

    catalog <- "<catalog_name>"
    schema <- "<schema_name>"
    volume <- "<volume_name>"
    download_url <- "https://health.data.ny.gov/api/views/jxy9-yhdk/rows.csv"
    file_name <- "rows.csv"
    table_name <- "<table_name>"
    path_volume <- paste("/Volumes/", catalog, "/", schema, "/", volume, sep = "")
    path_tables <- paste(catalog, ".", schema, sep = "")
    print(path_volume) # Show the complete path
    print(path_tables) # Show the complete path
    
  4. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code copie le fichier rows.csv de health.data.ny.gov dans votre volume Unity Catalog à l’aide de la commande Databricks dbutuils.

  5. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

    Python

    dbutils.fs.cp(f"{download_url}", f"{path_volume}" + "/" + f"{file_name}")
    

    Scala

    dbutils.fs.cp(download_url, s"$path_volume/$file_name")
    

    R

    dbutils.fs.cp(download_url, paste(path_volume, "/", file_name, sep = ""))
    

Étape 2 : créer un DataFrame

Cette étape crée un DataFrame nommé df1 avec des données de test, puis affiche son contenu.

  1. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code crée le Dataframe avec des données de test, puis affiche le contenu et le schéma du DataFrame.

  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

    Python

    data = [[2021, "test", "Albany", "M", 42]]
    columns = ["Year", "First_Name", "County", "Sex", "Count"]
    
    df1 = spark.createDataFrame(data, schema="Year int, First_Name STRING, County STRING, Sex STRING, Count int")
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    Scala

    val data = Seq((2021, "test", "Albany", "M", 42))
    val columns = Seq("Year", "First_Name", "County", "Sex", "Count")
    
    val df1 = data.toDF(columns: _*)
    display(df1) // The display() method is specific to Databricks notebooks and provides a richer visualization.
    // df1.show() The show() method is a part of the Apache Spark DataFrame API and provides basic visualization.
    

    R

    # Load the SparkR package that is already preinstalled on the cluster.
    library(SparkR)
    
    data <- data.frame(
      Year = c(2021),
      First_Name = c("test"),
      County = c("Albany"),
      Sex = c("M"),
      Count = c(42)
    )
    
    df1 <- createDataFrame(data)
    display(df1) # The display() method is specific to Databricks notebooks and provides a richer visualization.
    # head(df1) The head() method is a part of the Apache SparkR DataFrame API and provides basic visualization.
    

Étape 3 : charger des données dans un DataFrame à partir d’un fichier CSV

Cette étape crée un DataFrame nommé df_csv à partir du fichier CSV précédemment chargé dans votre volume Unity Catalog. Consultez spark.read.csv.

  1. Copiez et collez le code suivant dans la nouvelle cellule de notebook vide. Ce code charge les noms de bébé dans le DataFrame df_csv à partir du fichier CSV, puis affiche le contenu du DataFrame.

  2. Appuyez sur Shift+Enter pour exécuter la cellule et passer à la cellule suivante.

    Python

    df_csv = spark.read.csv(f"{path_volume}/{file_name}",
      header=True,
      inferSchema=True,
      sep=",")
    display(df_csv)
    

    Scala

    val df_csv = spark.read
      .option("header", "true")
      .option("inferSchema", "true")
      .option("delimiter", ",")
      .csv(s"$path_volume/$file_name")
    
    display(df_csv)
    

    R

    df_csv <- read.df(paste(path_volume, "/", file_name, sep=""),
      source="csv",
      header = TRUE,
      inferSchema = TRUE,
      delimiter = ",")
    
    display(df_csv)
    

Vous pouvez charger des données à partir de nombreux formats de fichiers pris en charge.

Étape 4 : afficher votre DataFrame et interagir avec celui-ci

Utilisez les méthodes suivantes pour afficher vos DataFrames de noms de bébé et interagir avec eux.

Découvrez comment afficher le schéma d’un DataFrame Apache Spark. Apache Spark utilise le terme schéma pour désigner les noms et les types de données des colonnes du DataFrame.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code montre le schéma de vos DataFrames avec la méthode .printSchema() pour afficher les schémas des deux DataFrames et préparer l’union des deux DataFrames.

Python

df_csv.printSchema()
df1.printSchema()

Scala

df_csv.printSchema()
df1.printSchema()

R

printSchema(df_csv)
printSchema(df1)

Remarque

Azure Databricks utilise également le terme schéma pour décrire une collection de tables inscrites dans un catalogue.

Renommer une colonne dans le DataFrame

Découvrez comment renommer une colonne dans un DataFrame.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code renomme une colonne dans le DataFrame df1_csv pour qu’elle reflète la colonne correspondante du DataFrame df1. Ce code utilise la méthode Apache Spark withColumnRenamed().

Python

df_csv = df_csv.withColumnRenamed("First Name", "First_Name")
df_csv.printSchema

Scala

val df_csvRenamed = df_csv.withColumnRenamed("First Name", "First_Name")
// when modifying a DataFrame in Scala, you must assign it to a new variable
df_csv_renamed.printSchema()

R

df_csv <- withColumnRenamed(df_csv, "First Name", "First_Name")
printSchema(df_csv)

Combiner des DataFrames

Découvrez comment créer un DataFrame qui ajoute les lignes d’un DataFrame à un autre.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark union() pour combiner le contenu de votre premier DataFrame df avec le DataFrame df_csv contenant les noms de bébé chargés à partir du fichier CSV.

Python

df = df1.union(df_csv)
display(df)

Scala

val df = df1.union(df_csv_renamed)
display(df)

R

display(df <- union(df1, df_csv))

Filtrer des lignes dans un DataFrame

Découvrez les noms de bébé les plus populaires dans votre jeu de données en filtrant les lignes à l’aide des méthodes Apache Spark .filter() ou .where(). Utilisez le filtrage pour sélectionner un sous-ensemble de lignes à retourner ou modifier dans un DataFrame. Il n’existe aucune différence de performances ou de syntaxe, comme indiqué dans les exemples suivants.

Utilisation de la méthode .filter()

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .filter() pour afficher les lignes du DataFrame avec un nombre supérieur à 50.

Python
display(df.filter(df["Count"] > 50))
Scala
display(df.filter(df("Count") > 50))
R
display(filteredDF <- filter(df, df$Count > 50))

Utilisation de la méthode .where()

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark .where() pour afficher les lignes du DataFrame avec un nombre supérieur à 50.

Python
display(df.where(df["Count"] > 50))
Scala
display(df.where(df("Count") > 50))
R
display(filtered_df <- where(df, df$Count > 50))

Sélectionner les colonnes d’un DataFrame et les classer par fréquence

Découvrez la fréquence des noms de bébé avec la méthode select() pour spécifier les colonnes du DataFrame à retourner. Utilisez les fonctions Apache Spark orderby et desc pour trier les résultats.

Le module pyspark.sql pour Apache Spark prend en charge les fonctions SQL. Parmi les fonctions que nous utilisons dans ce tutoriel, citons les fonctions Apache Spark orderBy(), desc() et expr(). Vous activez l’utilisation de ces fonctions en les important dans votre session si nécessaire.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction desc(), puis utilise la méthode Apache Spark select() et les fonctions Apache Spark orderBy() et desc() pour afficher les noms les plus courants et leur nombre par ordre décroissant.

Python

from pyspark.sql.functions import desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

Scala

import org.apache.spark.sql.functions.desc
display(df.select("First_Name", "Count").orderBy(desc("Count")))

R

display(arrange(select(df, df$First_Name, df$Count), desc(df$Count)))

Créer un sous-ensemble DataFrame

Découvrez comment créer un DataFrame de sous-ensemble à partir d’un DataFrame existant.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark filter pour créer un DataFrame limitant les données par année, nombre et sexe. Il utilise la méthode Apache Spark select() pour limiter les colonnes. Il utilise également les fonctions Apache Spark orderBy() et desc() pour trier le nouveau DataFrame par nombre.

Python

subsetDF = df.filter((df["Year"] == 2009) & (df["Count"] > 100) & (df["Sex"] == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))
display(subsetDF)

Scala

val subsetDF = df.filter((df("Year") == 2009) && (df("Count") > 100) && (df("Sex") == "F")).select("First_Name", "County", "Count").orderBy(desc("Count"))

display(subsetDF)

R

subsetDF <- select(filter(df, (df$Count > 100) & (df$year == 2009) & df["Sex"] == "F")), "First_Name", "County", "Count")
display(subsetDF)

Étape 5 : enregistrer le DataFrame

Découvrez comment enregistrer un DataFrame. Vous pouvez enregistrer votre DataFrame dans une table ou écrire le DataFrame dans un ou plusieurs fichiers.

Enregistrer le DataFrame dans une table

Azure Databricks utilise par défaut le format Delta Lake pour toutes les tables. Pour enregistrer votre DataFrame, vous devez avoir des privilèges de table CREATE sur le catalogue et le schéma.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le contenu du DataFrame dans une table à l’aide de la variable définie au début de ce tutoriel.

Python

df.write.saveAsTable(f"{path_tables}" + "." + f"{table_name}")

# To overwrite an existing table, use the following code:
# df.write.mode("overwrite").saveAsTable(f"{path_tables}" + "." + f"{table_name}")

Scala

df.write.saveAsTable(s"$path_tables" + "." + s"$table_name")

// To overwrite an existing table, use the following code:
// df.write.mode("overwrite").saveAsTable(s"$tables" + "." + s"$table_name")

R

saveAsTable(df, paste(path_tables, ".", table_name))
# To overwrite an existing table, use the following code:
# saveAsTable(df, paste(path_tables, ".", table_name), mode = "overwrite")

La plupart des applications Apache Spark fonctionnent sur des jeux de données volumineux de manière distribuée. Apache Spark écrit un répertoire de fichiers plutôt qu’un fichier unique. Delta Lake fractionne les dossiers et fichiers Parquet. De nombreux systèmes de données peuvent lire ces répertoires de fichiers. Azure Databricks recommande l’utilisation de tables plutôt que des chemins d’accès aux fichiers pour la plupart des applications.

Enregistrer le DataFrame dans des fichiers JSON

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code enregistre le DataFrame dans un répertoire de fichiers JSON.

Python

df.write.format("json").save("/tmp/json_data")

# To overwrite an existing file, use the following code:
# df.write.format("json").mode("overwrite").save("/tmp/json_data")

Scala

df.write.format("json").save("/tmp/json_data")

// To overwrite an existing file, use the following code:
// df.write.format("json").mode("overwrite").save("/tmp/json_data")

R

write.df(df, path = "/tmp/json_data", source = "json")
# To overwrite an existing file, use the following code:
# write.df(df, path = "/tmp/json_data", source = "json", mode = "overwrite")

Lire le DataFrame à partir d’un fichier JSON

Découvrez comment utiliser la méthode Apache Spark spark.read.format() pour lire les données JSON à partir d’un répertoire dans un DataFrame.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code affiche les fichiers JSON enregistrés dans l’exemple précédent.

Python

display(spark.read.format("json").json("/tmp/json_data"))

Scala

display(spark.read.format("json").json("/tmp/json_data"))

R

display(read.json("/tmp/json_data"))

Tâches supplémentaires : exécuter des requêtes SQL dans PySpark, Scala et R

Les DataFrames Apache Spark fournissent les options suivantes pour combiner SQL avec PySpark, Scala et R. Vous pouvez exécuter le code suivant dans le même notebook que celui que vous avez créé pour ce tutoriel.

Spécifier une colonne comme requête SQL

Découvrez comment utiliser la méthode Apache Spark selectExpr(). Il s’agit d’une variante de la méthode select() qui accepte les expressions SQL et retourne un DataFrame mis à jour. Cette méthode vous permet d’utiliser une expression SQL, telle que upper.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la méthode Apache Spark selectExpr() et l’expression SQL upper pour convertir une colonne de chaîne en majuscules (et renommer la colonne).

Python

display(df.selectExpr("Count", "upper(County) as big_name"))

Scala

display(df.selectExpr("Count", "upper(County) as big_name"))

R

display(df_selected <- selectExpr(df, "Count", "upper(County) as big_name"))

Utiliser expr() pour utiliser la syntaxe SQL d’une colonne

Découvrez comment importer et utiliser la fonction Apache Spark expr() pour utiliser la syntaxe SQL partout où une colonne peut être spécifiée.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code importe la fonction expr(), puis utilise la fonction Apache Spark expr() et l’expression SQL lower pour convertir une colonne de chaîne en minuscules (et renommer la colonne).

Python

from pyspark.sql.functions import expr
display(df.select("Count", expr("lower(County) as little_name")))

Scala

import org.apache.spark.sql.functions.{col, expr} // Scala requires us to import the col() function as well as the expr() function

display(df.select(col("Count"), expr("lower(County) as little_name")))

R

display(df_selected <- selectExpr(df, "Count", "lower(County) as little_name"))
# expr() function is not supported in R, selectExpr in SparkR replicates this functionality

Exécuter une requête SQL arbitraire à l’aide de la fonction spark.sql()

Découvrez comment utiliser la fonction Apache Spark spark.sql() pour exécuter des requêtes SQL arbitraires.

Copiez et collez le code suivant dans une cellule de notebook vide. Ce code utilise la fonction Apache Spark spark.sql() pour interroger une table SQL en utilisant la syntaxe SQL.

Python

display(spark.sql(f"SELECT * FROM {path_tables}" + "." + f"{table_name}"))

Scala

display(spark.sql(s"SELECT * FROM $path_tables.$table_name"))

R

display(sql(paste("SELECT * FROM", path_tables, ".", table_name)))

Notebook du tutoriel sur les DataFrames

Le notebook suivant inclut les exemples de requêtes de ce tutoriel.

Python

Tutoriel DataFrames en utilisant un notebook Python

Obtenir le notebook

Scala

Tutoriel DataFrames en utilisant un notebook Scala

Obtenir le notebook

R

Tutoriel DataFrames en utilisant un notebook R

Obtenir le notebook

Ressources supplémentaires