CREATE STREAMING TABLE

Se aplica a:casilla marcada como sí Databricks SQL casilla marcada como Sí Databricks Runtime 13.3 LTS y versiones posteriores

Importante

Esta característica está en versión preliminar pública. Para registrarse para obtener acceso, rellene este formulario.

Crea una tabla de flujo de datoses una tabla delta con compatibilidad adicional para el procesamiento de datos incremental o de transmisión.

Las tablas de streaming solo se admiten en Delta Live Tables y en Databricks SQL con el catálogo de Unity. Al ejecutar este comando en el proceso de Databricks Runtime compatible, solo se analiza la sintaxis. Consulte Implementación de una canalización de Delta Live Tables con SQL.

Sintaxis

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Parámetros

  • REFRESH

    Si se especifica, actualiza la tabla con los datos más recientes disponibles de los orígenes definidos en la consulta. Solo se procesan los nuevos datos que llegan antes de que se inicie la consulta. Los nuevos datos que se agregan a los orígenes durante la ejecución del comando se omiten hasta la siguiente actualización.

  • IF NOT EXISTS

    Si se especifica y ya existe una tabla con el mismo nombre, se omite la instrucción.

    IF NOT EXISTS no se puede usar junto con REFRESH, lo que significa CREATE OR REFRESH TABLE IF NOT EXISTS que no se permite.

  • table_name

    Nombre de la tabla que se va a crear. El nombre no debe incluir una especificación temporal. Si el nombre no está completo, la tabla se crea en el esquema actual.

  • table_specification

    Esta cláusula opcional define la lista de columnas y sus tipos, propiedades, descripciones y restricciones de columnas.

    Si no define columnas en el esquema de la tabla, debe especificar AS query.

    • column_identifier

      Nombre único para la columna.

      • column_type

        Especifica el tipo de datos de la columna.

      • NOT NULL

        Si se especifica, la columna no acepta valores NULL.

      • COMMENT column_comment

        Literal de cadena para describir la columna.

      • column_constraint

        Importante

        Esta característica está en versión preliminar pública.

        Agrega una clave principal o una restricción de clave externa a la columna de una tabla de transmisión. No se admiten restricciones para las tablas del catálogo hive_metastore.

      • CONSTRAINT expectation_name EXPECT (expectation_expr) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Agrega expectativas de calidad de datos a la tabla. Estas expectativas de calidad de los datos se pueden realizar con el tiempo y acceder a ellas a través del registro de eventos de la tabla de streaming. Una FAIL UPDATE expectativa hace que se produzca un error en el procesamiento al crear la tabla, así como al actualizar la tabla. Una DROP ROW expectativa hace que se quite toda la fila si no se cumple la expectativa.

        expectation_expr puede estar compuesto por literales, identificadores de columna dentro de la tabla y funciones u operadores SQL integrados y deterministas, a excepción de lo siguiente:

        expr tampoco debe contener ninguna subconsulta.

      • table_constraint

        Importante

        Esta característica está en versión preliminar pública.

        Agrega una clave principal informativa o restricciones de clave externa informativas a una tabla de transmisión. No se admiten restricciones de clave para las tablas del catálogo hive_metastore.

  • table_clauses

    Opcionalmente, especifique la creación de particiones, los comentarios, las propiedades definidas por el usuario y una programación de actualización para la nueva tabla. Cada subcláusula solo se puede especificar una vez.

    • PARTITIONED BY

      Lista opcional de columnas de la tabla por la que se va a particionar la tabla.

    • COMMENT table_comment

      Una STRING literal para describir la tabla.

    • TBLPROPERTIES

      Este parámetro opcional le permite establecer una o más propiedades que defina el usuario.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Si se proporciona, programa la tabla de streaming para actualizar sus datos con la programación quartz cron especificada. Solo se aceptan valores time_zone_values. No se admite AT TIME ZONE LOCAL. Si AT TIME ZONE no está presente, se usa la zona horaria de la sesión. Si AT TIME ZONE no está presente y no se establece la zona horaria de la sesión, se produce un error. SCHEDULE es equivalente semánticamente a SCHEDULE REFRESH.

      No se puede usar la sintaxis SCHEDULE en una definición de canalización de Delta Live Tables.

      No se permite la SCHEDULE cláusula en un CREATE OR REFRESH comando. La programación se puede proporcionar como parte del CREATE comando. Use ALTER STREAMING TABLE para modificar la programación de una tabla de streaming después de la creación.

  • AS query

    Esta cláusula rellena la tabla con los datos de query. Esta consulta debe ser una consulta de streaming. Esto se puede lograr agregando la STREAM palabra clave a cualquier relación que desee procesar de forma incremental. Al especificar un query elemento y un elemento table_specification juntos, el esquema de tabla especificado en table_specification debe contener todas las columnas devueltas por query, de lo contrario, obtendrá un error. Todas las columnas especificadas en table_specification pero no devueltas por query valores devueltos null cuando se consultan.

    Esta cláusula es necesaria para las tablas de streaming creadas en Databricks SQL, pero no necesarias en Delta Live Tables. Si esta cláusula no se proporciona en Delta Live Tables, debe hacer referencia a esta tabla en un APPLY CHANGES comando de la canalización DLT. Vea Captura de datos modificados con SQL en Delta Live Tables.

Diferencias entre tablas de streaming y otras tablas

Las tablas de streaming son tablas con estado, diseñadas para controlar cada fila solo una vez a medida que se procesa un conjunto de datos creciente. Dado que la mayoría de los conjuntos de datos crecen continuamente con el tiempo, las tablas de streaming son adecuadas para la mayoría de las cargas de trabajo de ingesta. Las tablas de streaming son óptimas para las canalizaciones que requieren actualización de datos y baja latencia. Las tablas de streaming también pueden ser útiles para las transformaciones de escala masiva, ya que los resultados se pueden calcular incrementalmente a medida que llegan nuevos datos, manteniendo los resultados actualizados sin necesidad de volver a calcular completamente todos los datos de origen con cada actualización. Las tablas de streaming están diseñadas para orígenes de datos que son de solo anexión.

Las tablas de streaming aceptan comandos adicionales, como REFRESH, que procesa los datos más recientes disponibles en los orígenes proporcionados en la consulta. Los cambios en la consulta proporcionada solo se reflejan en los nuevos datos mediante una llamada a, REFRESHno los datos procesados previamente. Para aplicar también los cambios en los datos existentes, debe ejecutar REFRESH TABLE <table_name> FULL para realizar una FULL REFRESH. Las actualizaciones completas vuelven a procesar todos los datos disponibles en el origen con la definición más reciente. No se recomienda llamar a actualizaciones completas en orígenes que no mantengan todo el historial de los datos o tengan períodos de retención cortos, como Kafka, ya que la actualización completa trunca los datos existentes. Es posible que no pueda recuperar datos antiguos si los datos ya no están disponibles en el origen.

Limitaciones

  • Solo los propietarios de tablas pueden actualizar las tablas de streaming para obtener los datos más recientes.

  • ALTER TABLE Los comandos no se permiten en las tablas de streaming. La definición y las propiedades de la tabla se deben modificar mediante la ALTER STREAMING TABLE instrucción.

  • No se admiten las consultas de viaje de tiempo.

  • No se admite la evolución del esquema de tabla a través de comandos DML como INSERT INTO, y MERGE .

  • Los siguientes comandos no se admiten en tablas de streaming:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • No se admite el uso compartido delta.

  • No se admite cambiar el nombre de la tabla ni cambiar el propietario.

  • No se admiten restricciones de tabla como PRIMARY KEY y FOREIGN KEY.

  • No se admiten columnas generadas, columnas de identidad y columnas predeterminadas.

Ejemplos

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');