Поделиться через


Запрос потоковых данных

С помощью Azure Databricks можно запрашивать источники данных потоковой передачи с помощью структурированной потоковой передачи. Azure Databricks обеспечивает обширную поддержку потоковых рабочих нагрузок в Python и Scala и поддерживает большинство функций структурированной потоковой передачи с помощью SQL.

В следующих примерах показано использование приемника памяти для ручной проверки потоковых данных во время интерактивной разработки в записных книжках. Из-за ограничений вывода строк в пользовательском интерфейсе записной книжки могут не наблюдаться все данные, считываемые потоковыми запросами. В рабочих нагрузках следует активировать только потоковые запросы, записывая их в целевую таблицу или внешнюю систему.

Примечание.

Поддержка SQL для интерактивных запросов на потоковую передачу данных ограничена записными книжками, работающими на всех вычислительных ресурсах. Вы также можете использовать SQL при объявлении потоковых таблиц в Databricks SQL или Delta Live Tables. См. статью "Загрузка данных с помощью потоковых таблиц в Databricks SQL " и "Что такое разностные динамические таблицы?".

Запрос данных из систем потоковой передачи

Azure Databricks предоставляет средства чтения потоковых данных для следующих систем потоковой передачи:

  • Kafka
  • Kinesis
  • PubSub
  • Пульсар

При инициализации запросов к этим системам необходимо указать сведения о конфигурации, которые зависят от настроенной среды и системы, из которой вы выбираете чтение. См. раздел "Настройка источников данных потоковой передачи".

Распространенные рабочие нагрузки, связанные с потоковой передачей, включают прием данных в lakehouse и потоковую обработку для приемника данных во внешних системах. Дополнительные сведения о рабочих нагрузках потоковой передачи см. в статье "Потоковая передача" в Azure Databricks.

В следующих примерах демонстрируется интерактивное потоковое чтение из Kafka:

Python

display(spark.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "<server:ip>")
  .option("subscribe", "<topic>")
  .option("startingOffsets", "latest")
  .load()
)

SQL

SELECT * FROM STREAM read_kafka(
  bootstrapServers => '<server:ip>',
  subscribe => '<topic>',
  startingOffsets => 'latest'
);

Запрос таблицы в виде потокового чтения

Azure Databricks создает все таблицы с помощью Delta Lake по умолчанию. При выполнении потокового запроса к таблице Delta запрос автоматически выбирает новые записи при фиксации версии таблицы. По умолчанию потоковые запросы ожидают, что исходные таблицы содержат только добавленные записи. Если необходимо работать с потоковыми данными, содержащими обновления и удаления, Databricks рекомендует использовать разностные динамические таблицы и APPLY CHANGES INTO. См . РАЗДЕЛ API APPLY CHANGES: Упрощение записи измененных данных в разностных динамических таблицах.

В следующих примерах показано выполнение интерактивной потоковой передачи из таблицы:

Python

display(spark.readStream.table("table_name"))

SQL

SELECT * FROM STREAM table_name

Запрос данных в облачном хранилище объектов с помощью автозагрузчика

Вы можете передавать данные из облачного хранилища объектов с помощью автозагрузчика, соединителя облачных данных Azure Databricks. Соединитель можно использовать с файлами, хранящимися в томах каталога Unity или других расположениях облачного хранилища объектов. Databricks рекомендует использовать тома для управления доступом к данным в облачном хранилище объектов. Ознакомьтесь с Подключение источниками данных.

Azure Databricks оптимизирует этот соединитель для приема потоковых данных в облачном хранилище объектов, которое хранится в популярных структурированных, полуструктурированных и неструктурированных форматах. Databricks рекомендует хранить прием данных в почти необработанном формате для максимальной пропускной способности и минимизации потенциальной потери данных из-за поврежденных записей или изменений схемы.

Дополнительные рекомендации по приему данных из облачного хранилища объектов см . в разделе "Прием данных" в databricks lakehouse.

В следующих примерах показано интерактивное потоковое чтение из каталога JSON-файлов в томе:

Python

display(spark.readStream.format("cloudFiles").option("cloudFiles.format", "json").load("/Volumes/catalog/schema/volumes/path/to/files"))

SQL

SELECT * FROM STREAM read_files('/Volumes/catalog/schema/volumes/path/to/files', format => 'json')