CREATE STREAMING TABLE
S’applique à : Databricks SQL Databricks Runtime 13.3 LTS et versions ultérieures
Important
Cette fonctionnalité est disponible en préversion publique.
Crée une table de streaming, une table Delta avec une prise en charge supplémentaire pour le streaming ou le traitement incrémentiel des données.
Les tables de diffusion en continu sont uniquement prises en charge dans Delta Live Tables et sur Databricks SQL avec Unity Catalog. L’exécution de cette commande sur le calcul Databricks Runtime pris en charge analyse uniquement la syntaxe. Consultez Implémenter un pipeline Delta Live Tables avec SQL.
Syntaxe
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Paramètres
REFRESH
S’il est spécifié, actualise la table avec les dernières données disponibles à partir des sources définies dans la requête. Seules les nouvelles données qui arrivent avant le début de la requête sont traitées. Les nouvelles données ajoutées aux sources pendant l’exécution de la commande sont ignorées jusqu’à l’actualisation suivante.
IF NOT EXISTS
S’il est spécifié et qu’une table portant le même nom existe déjà, l’instruction est ignorée.
IF NOT EXISTS
ne peut pas être utilisé avecREFRESH
, ce qui signifie queCREATE OR REFRESH TABLE IF NOT EXISTS
n’est pas autorisé.-
Le nom de la table à créer. Le nom ne doit pas inclure une spécification temporelle. Si le nom n’est pas qualifié, la table est créée dans le schéma actuel.
spécification_table
Cette clause facultative définit la liste des colonnes, leurs types, leurs propriétés, leurs descriptions et leurs contraintes de colonne.
Si vous ne définissez pas de colonnes dans le schéma de la table, vous devez spécifier
AS query
.-
Nom unique de la colonne.
-
Spécifie le type de données de la colonne.
NOT NULL
Si elle est spécifiée, la colonne n’accepte
NULL
pas de valeurs.COMMENTAIRE Column_comment
Littéral de chaîne pour décrire la colonne.
-
Important
Cette fonctionnalité est disponible en préversion publique.
Ajoute une clé primaire ou une contrainte de clé étrangère à la colonne d’une table de diffusion en continu. Les contraintes ne sont pas prises en charge pour les tables du catalogue
hive_metastore
. CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Ajoute des attentes de qualité des données à la table. Ces attentes de qualité des données peuvent être suivies au fil du temps et accessibles via le journal des événements de la table de streaming. Une attente
FAIL UPDATE
entraîne l’échec du traitement lors de la création de la table et de l’actualisation de la table. Une attenteDROP ROW
entraîne la suppression de la ligne entière si l’attente n’est pas remplie.expectation_expr
peut être composé de littéraux, d’identificateurs de colonnes dans la table, et de fonctions ou d’opérateurs SQL déterministes intégrés, à l’exception de :- Fonctions d’agrégation
- Fonctions de fenêtre analytique
- Les fonctions de classement de fenêtre
- Fonctions du générateur de valeur de table
expr
Ne doit pas non plus contenir de sous-requête.- Fonctions d’agrégation
-
Important
Cette fonctionnalité est disponible en préversion publique.
Ajoute une clé primaire informative ou des contraintes de clés étrangères informatives à une table de diffusion en continu. Les contraintes de clé ne sont pas prises en charge pour les tables du catalogue
hive_metastore
.
-
-
table_clauses
Vous pouvez également spécifier le partitionnement, les commentaires, les propriétés définies par l'utilisateur et un calendrier d'actualisation pour la nouvelle table. Chaque sous-clause ne peut être spécifiée qu’une seule fois.
-
Liste facultative des colonnes de la table sur laquelle partitionner la table.
Table_comment de COMMENTAIRES
Littéral
STRING
pour décrire la colonne.-
(Facultatif) Définit une ou plusieurs propriétés définies par l’utilisateur.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Le cas échéant, planifie la table de streaming ou la vue matérialisée pour actualiser ses données avec la planification cron quartz donnée. Seules les time_zone_values sont acceptées. La fonction
AT TIME ZONE LOCAL
n'est pas prise en charge. SiAT TIME ZONE
est absent, le fuseau horaire de session est utilisé. SiAT TIME ZONE
est absent et que le fuseau horaire de session n’est pas défini, une erreur est générée.SCHEDULE
est équivalent sémantiquement àSCHEDULE REFRESH
.Vous ne pouvez pas utiliser la syntaxe
SCHEDULE
dans une définition de pipeline Delta Live Tables.La clause
SCHEDULE
n’est pas autorisée dans une commandeCREATE OR REFRESH
. La planification peut être fournie dans le cadre de la commandeCREATE
. Utilisez ALTER STREAMING TABLE pour modifier la planification d’une table de streaming après sa création.
-
COMMERequête
Cette clause remplit la table à l’aide des données de
query
. Cette requête doit être une requête de streaming . Pour ce faire, ajoutez leSTREAM
mot clé à n’importe quelle relation que vous souhaitez traiter de manière incrémentielle. Lorsque vous spécifiez unquery
et untable_specification
ensemble, le schéma de table spécifié danstable_specification
doit contenir toutes les colonnes retournées par lequery
, sinon vous obtenez une erreur. Toutes les colonnes spécifiées danstable_specification
mais non retournées parquery
des valeurs de retournull
lorsqu’elles sont interrogées.Cette clause est requise pour les tables de streaming créées dans Databricks SQL, mais pas obligatoire dans Delta Live Tables. Si cette clause n’est pas fournie dans Delta Live Tables, vous devez référencer cette table dans une commande
APPLY CHANGES
de votre pipeline DLT. Consultez capture de données modifiées avec SQL dans Delta Live Tables.
Différences entre les tables de diffusion en continu et les autres tables
Les tables de diffusion en continu sont des tables avec état, conçues pour gérer chaque ligne une seule fois lorsque vous traitez un jeu de données croissant. Étant donné que la plupart des jeux de données croissent continuellement au fil du temps, les tables de diffusion en continu conviennent à la plupart des charges de travail d’ingestion. Les tables de diffusion en continu sont optimales pour les pipelines qui nécessitent une actualisation des données et une faible latence. Les tables de diffusion en continu peuvent également être utiles pour les transformations massives à l’échelle, car les résultats peuvent être calculés de manière incrémentielle à mesure que de nouvelles données arrivent, ce qui permet de maintenir les résultats à jour sans avoir à recalculer entièrement toutes les données sources à chaque mise à jour. Les tables de diffusion en continu sont conçues pour les sources de données en ajout uniquement.
Les tables de streaming acceptent des commandes supplémentaires telles que REFRESH
, qui traite les dernières données disponibles dans les sources fournies dans la requête. Les modifications apportées à la requête fournie sont uniquement reflétées sur les nouvelles données en appelant des REFRESH
données non traitées précédemment. Pour appliquer également les modifications sur les données existantes, vous devez exécuter REFRESH TABLE <table_name> FULL
pour effectuer un FULL REFRESH
. Les actualisations complètes re-traitent toutes les données disponibles dans la source avec la dernière définition. Il n’est pas recommandé d’appeler des actualisations complètes sur des sources qui ne conservent pas l’historique complet des données ou qui ont de courtes périodes de rétention, telles que Kafka, car l’actualisation complète tronque les données existantes. Vous ne pourrez peut-être pas récupérer d’anciennes données si les données ne sont plus disponibles dans la source.
Limites
Seuls les propriétaires de tables peuvent actualiser les tables de streaming pour obtenir les données les plus récentes.
ALTER TABLE
les commandes ne sont pas autorisées sur les tables de streaming. La définition et les propriétés de la table doivent être modifiées par le biais de l’instructionALTER STREAMING TABLE
.Les requêtes de voyage dans le temps ne sont pas prises en charge.
L’évolution du schéma de table via des commandes DML telles que
INSERT INTO
etMERGE
n’est pas prise en charge.Les commandes suivantes ne sont pas prises en charge sur les tables de streaming :
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
Le partage Delta n’est pas pris en charge.
Le changement de nom de la table ou la modification du propriétaire n’est pas pris en charge.
Les contraintes de table telles que
PRIMARY KEY
etFOREIGN KEY
ne sont pas prises en charge.Les colonnes générées, les colonnes d’identité et les colonnes par défaut ne sont pas prises en charge.
Exemples
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
Articles connexes
Commentaires
https://aka.ms/ContentUserFeedback.
Bientôt disponible : Tout au long de l’année 2024, nous abandonnerons progressivement le mécanisme de retour d’information GitHub Issues pour le remplacer par un nouveau système de commentaires. Pour plus d’informations, consultez :Soumettre et afficher des commentaires pour