Consultez Charger des données à l’aide de tables de streaming dans Databricks SQL.

Important

Cette fonctionnalité est disponible en préversion publique. Pour vous inscrire et y accéder, remplissez ce formulaire.

Databricks recommande d’utiliser des tables de streaming pour ingérer des données à l’aide de Databricks SQL. Une table de diffusion en continu est une table gérée par Unity Catalog avec une prise en charge supplémentaire pour la diffusion en continu ou le traitement incrémentiel des données. Un pipeline DLT est créé automatiquement pour chaque table de streaming. Vous pouvez utiliser des tables de streaming pour le chargement incrémentiel des données à partir de Kafka et du stockage d’objets cloud.

Cet article montre comment utiliser des tables de streaming pour charger des données à partir d’un stockage d’objets cloud configuré en tant que volume Unity Catalog (recommandé) ou emplacement externe.

Remarque

Pour en savoir plus sur comment utiliser des tables Delta Lake comme sources et récepteurs de diffusion en continu, consultez Lectures et écritures en continu de table Delta.

Avant de commencer

Avant de commencer, assurez-vous de disposer des éléments suivants :

  • Un compte Azure Databricks avec serverless activé. Pour plus d’informations, consultez Activer des entrepôts SQL serverless.

  • Un espace de travail avec Unity Catalog activé. Pour plus d’informations, consultez Configurer et gérer Unity Catalog.

  • Un entrepôt SQL qui utilise le canal Current.

  • Pour interroger les tables de diffusion en continu créées par un pipeline Delta Live Tables, vous devez utiliser un calcul partagé à l’aide de Databricks Runtime 13.3 LTS et versions ultérieures ou d’un entrepôt SQL. La diffusion en continu des tables créées dans un pipeline avec Unity Catalog ne peut pas être interrogée à partir de clusters d’isolement affectés ou non.

  • Privilège READ FILES sur un emplacement externe Unity Catalog. Pour plus d’informations, consultez Créer un emplacement externe pour connecter le stockage cloud à Azure Databricks.

  • Privilège USE CATALOG sur le catalogue dans lequel vous créez la table de streaming.

  • Privilège USE SCHEMA sur le schéma dans lequel vous créez la table de streaming.

  • Privilège CREATE TABLE sur le schéma dans lequel vous créez la table de streaming.

  • Chemin d’accès à vos données sources.

    Exemple de chemin d’accès au volume : /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Exemple de chemin d’accès à l’emplacement externe : abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Remarque

    Cet article suppose que les données que vous souhaitez charger se trouvent dans un emplacement de stockage cloud qui correspond à un volume Unity Catalog ou à un emplacement externe auquel vous avez accès.

Découvrir et afficher un aperçu des données sources

  1. Dans la barre latérale de votre espace de travail, cliquez sur Requêtes, puis sur Créer une requête.

  2. Dans l’éditeur de requête, sélectionnez un entrepôt SQL qui utilise le canal Current dans la liste déroulante.

  3. Collez ce qui suit dans l’éditeur, en remplaçant les valeurs entre crochets (<>) pour les informations identifiant vos données sources, puis cliquez sur Exécuter.

    Remarque

    Vous pouvez rencontrer des erreurs d’inférence de schéma lors de l’exécution de la fonction table read_files si les valeurs par défaut de la fonction ne peuvent pas analyser vos données. Par exemple, vous devrez peut-être configurer le mode multiligne pour les fichiers CSV ou JSON multilignes. Pour obtenir la liste des options de l’analyseur, consultez read_files fonction table.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Charger des données dans une table de streaming

Pour créer une table de streaming à partir de données dans le stockage d’objets cloud, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Actualiser une table de streaming à l’aide d’un pipeline DLT

Cette section décrit les modèles d’actualisation d’une table de streaming avec les dernières données disponibles à partir des sources définies dans la requête.

Les opérations CREATE pour les tables de streaming utilisent un entrepôt Databricks SQL pour la création initiale et le chargement de données dans la table de streaming. Les opérations REFRESH pour les tables de streaming utilisent Delta Live Tables (DLT). Un pipeline DLT est créé automatiquement pour chaque table de streaming. Lorsqu’une table de streaming est actualisée, une mise à jour du pipeline DLT est lancée pour traiter l’actualisation.

Après avoir exécuté la commande REFRESH, le lien de pipeline DLT est retourné. Vous pouvez utiliser le lien de pipeline DLT pour vérifier l’état d’actualisation.

Remarque

Seul le propriétaire de la table peut actualiser une table de diffusion en continu pour obtenir les données les plus récentes. L’utilisateur qui crée la table est le propriétaire et le propriétaire ne peut pas être modifié.

Consultez l’article Qu’est-ce que Delta Live Tables ?.

Ingérer de nouvelles données uniquement

Par défaut, la fonction read_files lit toutes les données existantes dans le répertoire source lors de la création de la table, puis traite les enregistrements nouvellement arrivés à chaque actualisation.

Pour éviter d’ingérer des données qui existent déjà dans le répertoire source au moment de la création de la table, utilisez définissez l’option includeExistingFiles sur false. Cela signifie que seules les données qui arrivent dans le répertoire après la création de la table sont traitées. Par exemple :

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Actualiser entièrement une table de diffusion en continu

Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique complet des données ou qui ont de courtes périodes de rétention, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.

Par exemple :

REFRESH STREAMING TABLE my_bronze_table FULL

Planifier une table de streaming pour l’actualisation automatique

Pour configurer une table de streaming pour l’actualiser automatiquement en fonction d’une planification définie, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Pour obtenir des exemples de requêtes de planification d’actualisation, consultez ALTER STREAMING TABLE.

Suivre l’état de l’opération d’actualisation

Vous pouvez afficher l’état d’une actualisation de table de diffusion en continu en affichant le pipeline qui gère la table de diffusion en continu dans l’interface utilisateur Delta Live Tables ou en affichant les informations d’actualisation retournées par la commande DESCRIBE EXTENDED pour la table de diffusion en continu.

DESCRIBE EXTENDED <table-name>

Ingestion en streaming à partir de Kafka

Pour obtenir un exemple d’ingestion en streaming à partir de Kafka, consultez read_kafka.

Accorder aux utilisateurs l’accès à une table de streaming

Pour accorder aux utilisateurs le privilège SELECT sur la table de diffusion en continu afin qu’ils puissent l’interroger, collez ce qui suit dans l’éditeur de requête, puis cliquez sur Exécuter :

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Pour plus d’informations sur les privilèges Unity Catalog, consultez les privilèges et objets sécurisables Unity Catalog.

Limitations

  • Les tables de diffusion en continu Databricks SQL ne sont pas prises en charge dans les régions USA Centre Sud et USA Ouest 2.

Ressources supplémentaires