СОЗДАНИЕ ТАБЛИЦЫ ПОТОКОВОЙ ПЕРЕДАЧИ

Область применения:проверка помечены да Databricks SQL проверка помечены да Databricks Runtime 13.3 LTS и выше

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии.

Создает таблицу потоковой передачи, таблицу Delta с дополнительной поддержкой потоковой или добавочной обработки данных.

Потоковая передача таблиц поддерживается только в разностных динамических таблицах и в Databricks SQL с каталогом Unity. При выполнении этой команды в поддерживаемой среде выполнения Databricks вычисляется только синтаксический анализ. См. статью "Реализация конвейера разностных динамических таблиц" с помощью SQL.

Синтаксис

{ CREATE OR REFRESH STREAMING TABLE | CREATE STREAMING TABLE [ IF NOT EXISTS ] }
  table_name
  [ table_specification ]
  [ table_clauses ]
  [ AS query ]

table_specification
  ( [ column_identifier column_type [ NOT NULL ]
      [ COMMENT column_comment ] [ column_constraint ]
    ] [, ...]
    [ CONSTRAINT expectation_name EXPECT (expectation_expr)
      [ ON VIOLATION { FAIL UPDATE | DROP ROW } ] ] [, ...]
    [ , table_constraint ] [...] )

table_clauses
  { PARTITIONED BY (col [, ...]) |
    COMMENT table_comment |
    TBLPROPERTIES clause |
    SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ] } [...]

Параметры

  • REFRESH

    При указании обновляет таблицу с последними данными, доступными из источников, определенных в запросе. Обрабатываются только новые данные, поступающие до запуска запроса. Новые данные, добавляемые в источники во время выполнения команды, игнорируются до следующего обновления.

  • IF NOT EXISTS

    Если этот параметр указан и таблица с таким именем уже существует, инструкция игнорируется.

    IF NOT EXISTS нельзя использовать вместе с REFRESHтем, что означает CREATE OR REFRESH TABLE IF NOT EXISTS , что не допускается.

  • table_name

    Имя создаваемой таблицы. Имя не должно содержать временную спецификацию. Если имя не указано полностью, таблица создается в текущей схеме.

  • table_specification

    Это необязательное предложение определяет список столбцов, их типы, свойства, описания и ограничения.

    Если столбцы в схеме таблицы не определены, необходимо указать AS query.

    • column_identifier

      Уникальное имя столбца.

      • column_type

        Указывает тип данных столбца.

      • NOT NULL

        Если указанный столбец не принимает NULL значения.

      • COMMENT column_comment

        Строковый литерал для описания столбца.

      • column_constraint

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет ограничение первичного ключа или внешнего ключа в столбец в таблице потоковой передачи. Ограничения не поддерживаются для таблиц в каталоге hive_metastore.

      • ОГРАНИЧЕНИЕ EXPECTATION_NAME ОЖИДАНИЕ (EXPECTATION_EXPR) [ ON VIOLATION { FAIL UPDATE | DROP ROW } ]

        Добавляет ожидания качества данных в таблицу. Эти ожидания качества данных можно отслеживать с течением времени и получать доступ через журнал событий потоковой таблицы. Ожидание FAIL UPDATE приводит к сбою обработки при создании таблицы, а также обновлении таблицы. Ожидание DROP ROW приводит к тому, что вся строка будет удалена, если ожидание не выполнено.

        expectation_expr может состоять из литералов, идентификаторов столбцов в таблице и детерминированных встроенных функций или операторов SQL, кроме:

        Кроме того, expr не должен содержать какой-либо вложенный запрос.

      • table_constraint

        Внимание

        Эта функция предоставляется в режиме общедоступной предварительной версии.

        Добавляет в таблицу потоковой передачи ограничения информационных первичных ключей или информационных внешних ключей. Ограничения ключей не поддерживаются для таблиц в каталоге hive_metastore .

  • table_clauses

    При необходимости укажите секционирование, комментарии, пользовательские свойства и расписание обновления для новой таблицы. Каждое вложенное предложение может быть указано только один раз.

    • PARTITIONED BY

      Необязательный список столбцов таблицы для секционирования таблицы по.

    • COMMENT table_comment

      Литерал STRING для описания таблицы.

    • TBLPROPERTIES

      При необходимости задает одно или несколько свойств, определяемых пользователем.

    • SCHEDULE [ REFRESH ] CRON cron_string [ AT TIME ZONE timezone_id ]

      Если это указано, планирует потоковую таблицу или материализованное представление, чтобы обновить свои данные с заданным расписанием крона . Принимаются только time_zone_values . Функция AT TIME ZONE LOCAL не поддерживается. Если AT TIME ZONE нет, используется часовой пояс сеанса. Если AT TIME ZONE отсутствует и часовой пояс сеанса не задан, возникает ошибка. SCHEDULE семантически эквивалентен SCHEDULE REFRESH.

      Синтаксис нельзя использовать в определении SCHEDULE конвейера Delta Live Table.

      Предложение SCHEDULE не допускается в команде CREATE OR REFRESH . Расписание можно указать как часть CREATE команды. Используйте ALTER STREAMING TABLE , чтобы изменить расписание потоковой таблицы после создания.

  • AS query

    Это предложение заполняет таблицу с помощью данных из query. Этот запрос должен быть потоковым запросом. Это можно достичь, добавив STREAM ключевое слово в любое отношение, которое требуется обработать постепенно. При указании query и вместе схема таблицы, указанная table_specification в table_specification ней, должна содержать все столбцы, возвращаемые параметром query, в противном случае возникает ошибка. Все столбцы, указанныеtable_specification, но не возвращаются возвращаемыми null значениями query при запросе.

    Это предложение требуется для потоковых таблиц, созданных в Databricks SQL, но не требуется в разностных динамических таблицах. Если это предложение не указано в Delta Live Table, необходимо ссылаться на эту таблицу в команде в конвейере APPLY CHANGES DLT. См. сведения об отслеживании измененных данных с помощью SQL в разностных динамических таблицах.

Различия между таблицами потоковой передачи и другими таблицами

Потоковая передача таблиц — это таблицы с отслеживанием состояния, предназначенные для обработки каждой строки только один раз при обработке растущего набора данных. Так как большинство наборов данных постоянно растут с течением времени, потоковые таблицы хорошо подходит для большинства рабочих нагрузок приема. Таблицы потоковой передачи оптимально подходят для конвейеров, требующих свежести данных и низкой задержки. Потоковые таблицы также могут быть полезны для крупномасштабных преобразований, так как результаты могут быть добавочно вычисляются по мере поступления новых данных, сохраняя результаты до актуальности без необходимости полностью перекомпьютировать все исходные данные с каждым обновлением. Потоковые таблицы предназначены для источников данных, доступных только для добавления.

Потоковые таблицы принимают дополнительные команды, такие как REFRESH, которые обрабатывают последние данные, доступные в источниках, предоставленных в запросе. Изменения предоставленного запроса отражаются только на новых данных путем вызова REFRESHне обработанных ранее данных. Чтобы применить изменения к существующим данным, необходимо выполнить REFRESH TABLE <table_name> FULL его FULL REFRESH. Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полное обновление усечено существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Ограничения

  • Только владельцы таблиц могут обновлять потоковые таблицы, чтобы получить последние данные.

  • ALTER TABLE команды запрещены в таблицах потоковой передачи. Определение и свойства таблицы должны быть изменены с помощью инструкции ALTER STREAMING TABLE .

  • Запросы на поездки по времени не поддерживаются.

  • Эволюционирование схемы таблицы с помощью таких команд DML, как INSERT INTOи MERGE не поддерживается.

  • Следующие команды не поддерживаются в таблицах потоковой передачи:

    • CREATE TABLE ... CLONE <streaming_table>
    • COPY INTO
    • ANALYZE TABLE
    • RESTORE
    • TRUNCATE
    • GENERATE MANIFEST
    • [CREATE OR] REPLACE TABLE
  • Разностный общий доступ не поддерживается.

  • Переименование таблицы или изменение владельца не поддерживается.

  • Ограничения таблиц, такие как PRIMARY KEY и FOREIGN KEY не поддерживаются.

  • Созданные столбцы, столбцы удостоверений и столбцы по умолчанию не поддерживаются.

Примеры

-- Creates a streaming table that processes files stored in the given external location with
-- schema inference and evolution.
> CREATE OR REFRESH STREAMING TABLE raw_data
  AS SELECT * FROM STREAM read_files('abfss://container@storageAccount.dfs.core.windows.net/base/path');

-- Creates a streaming table that processes files with a known schema.
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with schema evolution and data quality expectations.
-- The table creation or refresh fails if the data doesn't satisfy the expectation.
> CREATE OR REFRESH STREAMING TABLE avro_data (
    CONSTRAINT date_parsing (to_date(dt) >= '2000-01-01') ON VIOLATION FAIL UPDATE
  )
  AS SELECT *
  FROM STREAM read_files('gs://my-bucket/avroData');

-- Stores the data from Kafka in an append-only streaming table.
> CREATE OR REFRESH STREAMING TABLE firehose_raw
  COMMENT 'Stores the raw data from Kafka'
  TBLPROPERTIES ('delta.appendOnly' = 'true')
  AS SELECT
    value raw_data,
    offset,
    timestamp,
    timestampType
  FROM STREAM read_kafka(bootstrapServers => 'ips', subscribe => 'topic_name');

-- Read data from another streaming table scheduled to run every hour.
> CREATE STREAMING TABLE firehose_bronze
  SCHEDULE CRON '0 0 * * * ? *'
  AS SELECT
    from_json(raw_data, 'schema_string') data,
    * EXCEPT (raw_data)
  FROM STREAM firehose_raw;

-- Creates a streaming table with a column constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int PRIMARY KEY,
    ts timestamp,
    event string
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');

-- Creates a streaming table with a table constraint
> CREATE OR REFRESH STREAMING TABLE csv_data (
    id int,
    ts timestamp,
    event string,
    CONSTRAINT pk_id PRIMARY KEY (id)
  )
  AS SELECT *
  FROM STREAM read_files(
      's3://bucket/path',
      format => 'csv',
      schema => 'id int, ts timestamp, event string');