Загрузка данных с помощью таблиц потоковой передачи в Databricks SQL

Внимание

Эта функция предоставляется в режиме общедоступной предварительной версии. Чтобы зарегистрироваться для доступа, заполните эту форму.

Databricks рекомендует использовать потоковые таблицы для приема данных с помощью Databricks SQL. Потоковая таблица — это управляемая таблица каталога Unity с дополнительной поддержкой потоковой или добавочной обработки данных. Конвейер DLT автоматически создается для каждой потоковой таблицы. Таблицы потоковой передачи можно использовать для добавочной загрузки данных из Kafka и облачного хранилища объектов.

В этой статье показано использование потоковых таблиц для загрузки данных из облачного хранилища объектов, настроенного в качестве тома каталога Unity (рекомендуется) или внешнего расположения.

Примечание.

Чтобы узнать, как использовать таблицы Delta Lake в качестве источников потоковой передачи и приемников, см. раздел потоковой передачи и записи в разностной таблице.

Перед началом работы

Прежде чем начать, убедитесь, что у вас есть следующее:

  • Учетная запись Azure Databricks с включенным бессерверным доступом. Дополнительные сведения см. в разделе "Включение бессерверных хранилищ SQL".

  • Рабочая область с включенным каталогом Unity. Дополнительные сведения см. в разделе "Настройка каталога Unity" и управление ими.

  • Хранилище SQL, использующее Current канал.

  • Чтобы запросить потоковые таблицы, созданные конвейером Delta Live Tables, необходимо использовать общие вычислительные ресурсы с помощью Databricks Runtime 13.3 LTS и выше или хранилища SQL. Потоковые таблицы, созданные в конвейере с включенным каталогом Unity, не могут запрашиваться из назначенных или без кластеров изоляции.

  • Привилегия READ FILES во внешнем расположении каталога Unity. Дополнительные сведения см. в статье "Создание внешнего расположения для подключения облачного хранилища к Azure Databricks".

  • Привилегия USE CATALOG каталога, в котором создается потоковая таблица.

  • Привилегия USE SCHEMA схемы, в которой создается потоковая таблица.

  • Привилегия CREATE TABLE схемы, в которой создается потоковая таблица.

  • Путь к исходным данным.

    Пример пути тома: /Volumes/<catalog>/<schema>/<volume>/<path>/<file-name>

    Пример пути к внешнему расположению: abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis

    Примечание.

    В этой статье предполагается, что данные, к которым требуется загрузить, хранятся в облачном хранилище, соответствующем тому каталога Unity или внешнему расположению, к которому у вас есть доступ.

Обнаружение и предварительный просмотр исходных данных

  1. На боковой панели рабочей области щелкните "Запросы" и нажмите кнопку "Создать запрос".

  2. В редакторе запросов выберите хранилище SQL, использующее Current канал из раскрывающегося списка.

  3. Вставьте приведенные ниже значения в редактор, заменив значения в угловых скобках (<>) для сведений, определяющих исходные данные, и нажмите кнопку "Выполнить".

    Примечание.

    При запуске read_files табличной функции могут возникнуть ошибки вывода схемы, если значения по умолчанию для функции не удается проанализировать данные. Например, может потребоваться настроить многострочный режим для файлов CSV или JSON. Список параметров синтаксического анализа см. в разделе read_files табличное значение функции.

    /* Discover your data in a volume */
    LIST "/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>"
    
    /* Preview your data in a volume */
    SELECT * FROM read_files("/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>") LIMIT 10
    
    /* Discover your data in an external location */
    LIST "abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>"
    
    /* Preview your data */
    SELECT * FROM read_files("abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>") LIMIT 10
    

Загрузка данных в потоковую таблицу

Чтобы создать потоковую таблицу из данных в облачном хранилище объектов, вставьте следующую команду в редактор запросов и нажмите кнопку "Выполнить".

/* Load data from a volume */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('/Volumes/<catalog>/<schema>/<volume>/<path>/<folder>')

/* Load data from an external location */
CREATE OR REFRESH STREAMING TABLE <table-name> AS
SELECT * FROM STREAM read_files('abfss://<container>@<storage-account>.dfs.core.windows.net/<path>/<folder>')

Обновление потоковой таблицы с помощью конвейера DLT

В этом разделе описываются шаблоны обновления таблицы потоковой передачи с последними данными, доступными из источников, определенных в запросе.

CREATE операции потоковой передачи таблиц используют хранилище SQL Databricks для первоначального создания и загрузки данных в таблицу потоковой передачи. REFRESH операции потоковой передачи используют разностные динамические таблицы (DLT). Конвейер DLT автоматически создается для каждой потоковой таблицы. При обновлении таблицы потоковой передачи инициируется обновление конвейера DLT для обработки обновления.

После выполнения REFRESH команды возвращается ссылка конвейера DLT. Чтобы проверка состояние обновления, можно использовать ссылку на конвейер DLT.

Примечание.

Только владелец таблицы может обновить потоковую таблицу, чтобы получить последние данные. Пользователь, создающий таблицу, является владельцем, и владелец не может быть изменен.

См. раздел "Что такое разностные динамические таблицы?".

Прием новых данных только

По умолчанию read_files функция считывает все существующие данные в исходном каталоге во время создания таблицы, а затем обрабатывает вновь поступающие записи с каждым обновлением.

Чтобы избежать приема данных, которые уже существуют в исходном каталоге во время создания таблицы, задайте includeExistingFiles для параметра значение false. Это означает, что только данные, поступающие в каталог после создания таблицы, обрабатываются. Например:

CREATE OR REFRESH STREAMING TABLE my_bronze_table
AS SELECT *
FROM STREAM read_files(
  'abfss://myContainer@myStorageAccount.dfs.core.windows.net/analysis/*/*/*.json',
  includeExistingFiles => false)

Полное обновление таблицы потоковой передачи

Полные обновления повторно обрабатывают все данные, доступные в источнике с помощью последнего определения. Не рекомендуется вызывать полные обновления в источниках, которые не хранят всю историю данных или имеют короткие периоды хранения, например Kafka, так как полное обновление усечено существующих данных. Возможно, вы не сможете восстановить старые данные, если данные больше не доступны в источнике.

Например:

REFRESH STREAMING TABLE my_bronze_table FULL

Планирование потоковой таблицы для автоматического обновления

Чтобы настроить потоковую таблицу для автоматического обновления на основе определенного расписания, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

ALTER STREAMING TABLE
[[<catalog>.]<database>.]<name>
ADD [SCHEDULE [REFRESH]
        CRON '<cron-string>'
                [ AT TIME ZONE '<timezone-id>' ]];

Например, запросы расписания обновления см. в разделе ALTER STREAMING TABLE.

Отслеживание состояния обновления

Вы можете просмотреть состояние обновления потоковой таблицы, просмотрев конвейер, который управляет потоковой таблицей в пользовательском интерфейсе разностных динамических таблиц или просмотром сведений об обновлении, возвращаемых DESCRIBE EXTENDED командой для потоковой таблицы.

DESCRIBE EXTENDED <table-name>

Потоковая прием из Kafka

Пример приема потоковой передачи из Kafka см. в read_kafka.

Предоставление пользователям доступа к таблице потоковой передачи

Чтобы предоставить пользователям привилегии SELECT в таблице потоковой передачи, чтобы они могли запросить ее, вставьте следующее в редактор запросов и нажмите кнопку "Выполнить".

GRANT SELECT ON TABLE <catalog>.<schema>.<table> TO <user-or-group>

Дополнительные сведения о предоставлении привилегий для защищаемых объектов каталога Unity см. в разделе "Права каталога Unity" и защищаемые объекты.

Ограничения

  • Таблицы потоковой передачи Databricks SQL не поддерживаются в регионах южной части США и западной части США 2.

Дополнительные ресурсы