Utiliser Spark pour travailler avec des fichiers de données

Effectué

L’un des avantages de l’utilisation de Spark est que vous pouvez écrire et exécuter du code dans différents langages de programmation, ce qui vous permet d’utiliser les compétences de programmation que vous avez déjà et d’utiliser le langage le plus approprié pour une tâche donnée. Le langage par défaut dans un nouveau notebook Azure Databricks Spark est PySpark, version optimisée Spark de Python couramment utilisée par les scientifiques et analystes Données en raison de sa forte prise en charge de la manipulation et de la visualisation des données. De plus, vous pouvez utiliser des langages tels que Scala (langage dérivé de Java qui peut être utilisé de manière interactive) et SQL (variante du langage couramment utilisé SQL inclus dans la bibliothèque Spark SQL pour travailler avec des structures de données relationnelles). Les ingénieurs logiciels peuvent également créer des solutions compilées qui s’exécutent sur Spark en utilisant des frameworks tels que Java.

Exploration de données avec des dataframes

En mode natif, Spark utilise une structure de données appelée jeu de données distribué résilient (RDD, resilient distributed dataset). Toutefois, même si vous pouvez écrire du code qui fonctionne directement avec des jeux RDD, la structure de données la plus couramment utilisée pour utiliser des données structurées dans Spark est le dataframe, qui est fourni dans le cadre de la bibliothèque Spark SQL. Les dataframes dans Spark sont similaires à ceux de la bibliothèque Pandas Python omniprésente, mais sont optimisés pour fonctionner dans l’environnement de traitement distribué de Spark.

Notes

En plus de l’API Dataframe, Spark SQL fournit une API Dataset fortement typée qui est prise en charge dans Java et Scala. Dans ce module, nous allons nous concentrer sur l’API Dataframe.

Chargement des données dans un dataframe

Explorons un exemple hypothétique afin de voir comment utiliser un dataframe pour travailler avec des données. Supposons que vous disposiez des données suivantes dans un fichier texte délimité par des virgules appelé products.csv dans le dossier data de votre stockage Databricks File System (DBFS) :

ProductID,ProductName,Category,ListPrice
771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

Dans un notebook Spark, vous pouvez utiliser le code PySpark suivant pour charger les données dans un dataframe et afficher les 10 premières lignes :

%pyspark
df = spark.read.load('/data/products.csv',
    format='csv',
    header=True
)
display(df.limit(10))

La ligne %pyspark au début est appelée magic et indique à Spark que le langage utilisé dans cette cellule est PySpark. Voici le code Scala équivalent pour l’exemple de données des produits :

%spark
val df = spark.read.format("csv").option("header", "true").load("/data/products.csv")
display(df.limit(10))

La commande magic %spark est utilisée pour spécifier Scala.

Conseil

Vous pouvez également sélectionner le langage que vous souhaitez utiliser pour chaque cellule dans l’interface Notebook.

Les deux exemples présentés précédemment produisent une sortie comme suit :

ProductID ProductName Catégorie ListPrice
771 Mountain-100 Silver, 38 VTT 3399.9900
772 Mountain-100 Silver, 42 VTT 3399.9900
773 Mountain-100 Silver, 44 VTT 3399.9900
... ... ... ...

Spécification d’un schéma de dataframe

Dans l’exemple précédent, la première ligne du fichier CSV contenait les noms de colonne, et Spark pouvait déduire le type de données de chaque colonne en se basant sur les données qu’elle contenait. Vous pouvez également spécifier un schéma explicite pour les données, ce qui est utile lorsque les noms de colonne ne sont pas inclus dans le fichier de données, comme cet exemple CSV :

771,"Mountain-100 Silver, 38",Mountain Bikes,3399.9900
772,"Mountain-100 Silver, 42",Mountain Bikes,3399.9900
773,"Mountain-100 Silver, 44",Mountain Bikes,3399.9900
...

L’exemple PySpark suivant montre comment spécifier un schéma pour que le dataframe soit chargé à partir d’un fichier appelé product-data.csv dans ce format :

from pyspark.sql.types import *
from pyspark.sql.functions import *

productSchema = StructType([
    StructField("ProductID", IntegerType()),
    StructField("ProductName", StringType()),
    StructField("Category", StringType()),
    StructField("ListPrice", FloatType())
    ])

df = spark.read.load('/data/product-data.csv',
    format='csv',
    schema=productSchema,
    header=False)
display(df.limit(10))

Les résultats seraient une fois de plus similaires à :

ProductID ProductName Catégorie ListPrice
771 Mountain-100 Silver, 38 VTT 3399.9900
772 Mountain-100 Silver, 42 VTT 3399.9900
773 Mountain-100 Silver, 44 VTT 3399.9900
... ... ... ...

Filtrage et regroupement des dataframes

Vous pouvez utiliser les méthodes de la classe Dataframe pour filtrer, trier, regrouper et manipuler les données qu’elle contient. Par exemple, l’exemple de code suivant utilise la méthode select pour récupérer les colonnes ProductName et ListPrice à partir du dataframe df contenant les données de produit de l’exemple précédent :

pricelist_df = df.select("ProductID", "ListPrice")

Les résultats de cet exemple de code devraient ressembler à ceci :

ProductID ListPrice
771 3399.9900
772 3399.9900
773 3399.9900
... ...

Comme la plupart des méthodes de manipulation de données, select retourne un nouvel objet de dataframe.

Conseil

La sélection d’une partie des colonnes d’un dataframe est une opération courante, qui peut également être réalisée à l’aide de la syntaxe plus courte suivante :

pricelist_df = df["ProductID", "ListPrice"]

Vous pouvez « chaîner » les méthodes ensemble pour effectuer une série de manipulations qui entraînent un dataframe transformé. Par exemple, cet exemple de code chaîne les méthodes select et where pour créer un dataframe contenant les colonnes ProductName et ListPrice des produits avec la catégorie Vélos VTT ou Vélos de route :

bikes_df = df.select("ProductName", "ListPrice").where((df["Category"]=="Mountain Bikes") | (df["Category"]=="Road Bikes"))
display(bikes_df)

Les résultats de cet exemple de code devraient ressembler à ceci :

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Noir, 52 539.9900
... ...

Pour regrouper et agréger des données, vous pouvez utiliser la méthode groupBy et les fonctions d’agrégation. Par exemple, le code PySpark suivant compte le nombre de produits de chaque catégorie :

counts_df = df.select("ProductID", "Category").groupBy("Category").count()
display(counts_df)

Les résultats de cet exemple de code devraient ressembler à ceci :

Catégorie count
Oreillettes 3
Roues 14
VTT 32
... ...

Utilisation d’expressions SQL dans Spark

L’API Dataframe fait partie d’une bibliothèque Spark appelée Spark SQL, qui permet aux analystes Données d’utiliser des expressions SQL pour interroger et manipuler des données.

Création d’objets de base de données dans le catalogue Spark

Le catalogue Spark est un metastore pour les objets de données relationnelles tels que les vues et les tables. Le runtime Spark peut utiliser le catalogue pour intégrer de façon fluide le code écrit dans n’importe quel langage pris en charge par Spark avec des expressions SQL qui peuvent être plus naturelles pour certains analystes Données ou développeurs.

L’une des méthodes les plus simples pour rendre les données d’un dataframe disponibles pour pouvoir les interroger dans le catalogue Spark consiste à créer une vue temporaire, comme illustré dans l’exemple de code suivant :

df.createOrReplaceTempView("products")

Une vue est temporaire, ce qui signifie qu’elle est automatiquement supprimée à la fin de la session active. Vous pouvez également créer des tables persistantes dans le catalogue pour définir une base de données pouvant être interrogée à l’aide de Spark SQL.

Notes

Nous n’allons pas explorer les tables de catalogue Spark en profondeur dans ce module, mais cela vaut la peine de prendre le temps de mettre en évidence quelques points clés :

  • Vous pouvez créer une table vide à l’aide de la méthode spark.catalog.createTable. Les tables sont des structures de métadonnées qui stockent leurs données sous-jacentes dans l’emplacement de stockage associé au catalogue. La suppression d’une table supprime également ses données sous-jacentes.
  • Vous pouvez enregistrer un dataframe en tant que table en utilisant sa méthode saveAsTable.
  • Vous pouvez créer une table externe en utilisant la méthode spark.catalog.createExternalTable. Les tables externes définissent des métadonnées dans le catalogue, mais obtiennent leurs données sous-jacentes d’un emplacement de stockage externe, généralement un dossier dans un lac de données. La suppression d’une table externe ne supprime pas les données sous-jacentes.

Utilisation de l’API Spark SQL pour interroger des données

Vous pouvez utiliser l’API Spark SQL dans le code écrit dans n’importe quel langage pour interroger les données du catalogue. Par exemple, le code PySpark suivant utilise une requête SQL pour retourner les données de la vue produits en tant que dataframe.

bikes_df = spark.sql("SELECT ProductID, ProductName, ListPrice \
                      FROM products \
                      WHERE Category IN ('Mountain Bikes', 'Road Bikes')")
display(bikes_df)

Les résultats de l’exemple de code ressembleraient au tableau suivant :

ProductName ListPrice
Mountain-100 Silver, 38 3399.9900
Road-750 Noir, 52 539.9900
... ...

Utilisation du code SQL

L’exemple précédent a montré comment utiliser l’API Spark SQL pour incorporer des expressions SQL dans le code Spark. Dans un notebook, vous pouvez également utiliser la commande magic %sql pour exécuter le code SQL qui interroge les objets du catalogue, comme suit :

%sql

SELECT Category, COUNT(ProductID) AS ProductCount
FROM products
GROUP BY Category
ORDER BY Category

L’exemple de code SQL retourne un jeu de résultats qui s’affiche automatiquement dans le notebook sous forme de tableau, comme celui ci-dessous :

Catégorie ProductCount
Cuissards 3
Porte-vélos 1
Supports à vélos 1
... ...