CREATE STREAMING TABLE
Se aplica a: Databricks SQL Databricks Runtime 13.3 LTS y versiones posteriores
Importante
Esta característica está en versión preliminar pública. Para registrarse para obtener acceso, rellene este formulario.
Crea una tabla de flujo de datoses una tabla delta con compatibilidad adicional para el procesamiento de datos incremental o de transmisión.
Las tablas de streaming solo se admiten en Delta Live Tables y en Databricks SQL con el catálogo de Unity. Al ejecutar este comando en el proceso de Databricks Runtime compatible, solo se analiza la sintaxis. Consulte Implementación de una canalización de Delta Live Tables con SQL.
Sintaxis
{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
table_name
[ table_specification ]
[ table_clauses ]
[ AS query ]
table_specification
( [ column_identifier column_type [ NOT NULL ]
[ COMMENT column_comment ] [ column_constraint ]
] [, ...]
[ CONSTRAINT expectation_name EXPECT (expectation_expr)
[ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
[ , table_constraint ] [...] )
table_clauses
{ PARTITIONED BY (col [, ...]) |
COMMENT table_comment |
TBLPROPERTIES clause |
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]
Parámetros
REFRESH
Si se especifica, actualiza la tabla con los datos más recientes disponibles de los orígenes definidos en la consulta. Solo se procesan los nuevos datos que llegan antes de que se inicie la consulta. Los nuevos datos que se agregan a los orígenes durante la ejecución del comando se omiten hasta la siguiente actualización.
IF NOT EXISTS
Si se especifica y ya existe una tabla con el mismo nombre, se omite la instrucción.
IF NOT EXISTS
no se puede usar junto conREFRESH
, lo que significaCREATE OR REFRESH TABLE IF NOT EXISTS
que no se permite.-
Nombre de la tabla que se va a crear. El nombre no debe incluir una especificación temporal. Si el nombre no está completo, la tabla se crea en el esquema actual.
table_specification
Esta cláusula opcional define la lista de columnas y sus tipos, propiedades, descripciones y restricciones de columnas.
Si no define columnas en el esquema de la tabla, debe especificar
AS query
.-
Nombre único para la columna.
-
Especifica el tipo de datos de la columna.
NOT NULL
Si se especifica, la columna no acepta valores
NULL
.COMMENT column_comment
Literal de cadena para describir la columna.
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal o una restricción de clave externa a la columna de una tabla de transmisión. No se admiten restricciones para las tablas del catálogo
hive_metastore
. CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]
Agrega expectativas de calidad de datos a la tabla. Estas expectativas de calidad de los datos se pueden realizar con el tiempo y acceder a ellas a través del registro de eventos de la tabla de streaming. Una
FAIL UPDATE
expectativa hace que se produzca un error en el procesamiento al crear la tabla, así como al actualizar la tabla. UnaDROP ROW
expectativa hace que se quite toda la fila si no se cumple la expectativa.expectation_expr
puede estar compuesto por literales, identificadores de columna dentro de la tabla y funciones u operadores SQL integrados y deterministas, a excepción de lo siguiente:- Funciones de agregado
- Funciones de ventana analítica
- Funciones de ventana de categoría
- Funciones de generador con valores de tabla
expr
tampoco debe contener ninguna subconsulta.- Funciones de agregado
-
Importante
Esta característica está en versión preliminar pública.
Agrega una clave principal informativa o restricciones de clave externa informativas a una tabla de transmisión. No se admiten restricciones de clave para las tablas del catálogo
hive_metastore
.
-
-
table_clauses
Opcionalmente, especifique la creación de particiones, los comentarios, las propiedades definidas por el usuario y una programación de actualización para la nueva tabla. Cada subcláusula solo se puede especificar una vez.
-
Lista opcional de columnas de la tabla por la que se va a particionar la tabla.
COMMENT table_comment
Una
STRING
literal para describir la tabla.-
Este parámetro opcional le permite establecer una o más propiedades que defina el usuario.
SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]
Si se proporciona, programa la tabla de streaming para actualizar sus datos con la programación quartz cron especificada. Solo se aceptan valores time_zone_values. No se admite
AT TIME ZONE LOCAL
. SiAT TIME ZONE
no está presente, se usa la zona horaria de la sesión. SiAT TIME ZONE
no está presente y no se establece la zona horaria de la sesión, se produce un error.SCHEDULE
es equivalente semánticamente aSCHEDULE REFRESH
.No se puede usar la sintaxis
SCHEDULE
en una definición de canalización de Delta Live Tables.No se permite la
SCHEDULE
cláusula en unCREATE OR REFRESH
comando. La programación se puede proporcionar como parte delCREATE
comando. Use ALTER STREAMING TABLE para modificar la programación de una tabla de streaming después de la creación.
-
AS query
Esta cláusula rellena la tabla con los datos de
query
. Esta consulta debe ser una consulta de streaming. Esto se puede lograr agregando laSTREAM
palabra clave a cualquier relación que desee procesar de forma incremental. Al especificar unquery
elemento y un elementotable_specification
juntos, el esquema de tabla especificado entable_specification
debe contener todas las columnas devueltas porquery
, de lo contrario, obtendrá un error. Todas las columnas especificadas entable_specification
pero no devueltas porquery
valores devueltosnull
cuando se consultan.Esta cláusula es necesaria para las tablas de streaming creadas en Databricks SQL, pero no necesarias en Delta Live Tables. Si esta cláusula no se proporciona en Delta Live Tables, debe hacer referencia a esta tabla en un
APPLY CHANGES
comando de la canalización DLT. Vea Captura de datos modificados con SQL en Delta Live Tables.
Diferencias entre tablas de streaming y otras tablas
Las tablas de streaming son tablas con estado, diseñadas para controlar cada fila solo una vez a medida que se procesa un conjunto de datos creciente. Dado que la mayoría de los conjuntos de datos crecen continuamente con el tiempo, las tablas de streaming son adecuadas para la mayoría de las cargas de trabajo de ingesta. Las tablas de streaming son óptimas para las canalizaciones que requieren actualización de datos y baja latencia. Las tablas de streaming también pueden ser útiles para las transformaciones de escala masiva, ya que los resultados se pueden calcular incrementalmente a medida que llegan nuevos datos, manteniendo los resultados actualizados sin necesidad de volver a calcular completamente todos los datos de origen con cada actualización. Las tablas de streaming están diseñadas para orígenes de datos que son de solo anexión.
Las tablas de streaming aceptan comandos adicionales, como REFRESH
, que procesa los datos más recientes disponibles en los orígenes proporcionados en la consulta. Los cambios en la consulta proporcionada solo se reflejan en los nuevos datos mediante una llamada a, REFRESH
no los datos procesados previamente. Para aplicar también los cambios en los datos existentes, debe ejecutar REFRESH TABLE <table_name> FULL
para realizar una FULL REFRESH
. Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no mantengan todo el historial de los datos o tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.
Limitaciones
Solo los propietarios de tablas pueden actualizar las tablas de streaming para obtener los datos más recientes.
ALTER TABLE
Los comandos no se permiten en las tablas de streaming. La definición y las propiedades de la tabla se deben modificar mediante laALTER STREAMING TABLE
instrucción.No se admiten las consultas de viaje de tiempo.
No se admite la evolución del esquema de tabla a través de comandos DML como
INSERT INTO
, yMERGE
.Los siguientes comandos no se admiten en tablas de streaming:
CREATE TABLE ... CLONE <streaming_table>
COPY INTO
ANALYZE TABLE
RESTORE
TRUNCATE
GENERATE MANIFEST
[CREATE OR] REPLACE TABLE
No se admite el uso compartido delta.
No se admite cambiar el nombre de la tabla ni cambiar el propietario.
No se admiten restricciones de tabla como
PRIMARY KEY
yFOREIGN KEY
.No se admiten columnas generadas, columnas de identidad y columnas predeterminadas.
Ejemplos
-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');
-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
)
AS SELECT *
FROM STREAM read_files('gs://my-bucket/avroData');
-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
COMMENT 'Stores the raw data from Kafka'
TBLPROPERTIES ('delta.appendOnly' = 'true')
AS SELECT
value raw_data,
offset,
timestamp,
timestampType
FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');
-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
SCHEDULE CRON '0 0 * * * ? *'
AS SELECT
from_json(raw_data, 'schema_string') data,
* EXCEPT (raw_data)
FROM STREAM firehose_raw;
-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int PRIMARY KEY,
ts timestamp,
event string
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');
-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
id int,
ts timestamp,
event string,
CONSTRAINT pk_id PRIMARY KEY (id)
)
AS SELECT *
FROM STREAM read_files(
's3://bucket/path',
format => 'csv',
schema => 'id int, ts timestamp, event string');