Центры событий Azure

Центры событий Azure — это служба обработки данных телеметрии с высокой степенью масштабируемости. Она собирает, преобразовывает и хранит миллионы событий. Эта распределенная платформа для потоковой передачи данных с низкой задержкой и настраиваемым временем хранения позволяет принимать большие объемы данных телеметрии в облаке, а также считывать данные из нескольких приложений на основе семантики публикации и подписки.

В этой статье описано, как использовать структурированную потоковую передачу с использованием Центров событий Azure и кластеров Azure Databricks.

Примечание.

Центры событий Azure предоставляет конечную точку, совместимую с Apache Kafka, которую можно использовать с помощью Структурированный соединитель Stream Kafka, доступный в Databricks Runtime, для обработки сообщений из Центры событий Azure. Databricks рекомендует использовать соединитель Структурированной потоковой передачи Kafka для обработки сообщений из Центры событий Azure.

Requirements

Информацию о поддержке текущих выпусков см. в разделе с описанием последних выпусков в файле README проекта Spark Connector Центров событий Azure.

  1. Создайте библиотеку в рабочей области Azure Databricks, используя координату com.microsoft.azure:azure-eventhubs-spark_2.11:2.3.17 Maven.

    Примечание.

    Этот соединитель регулярно обновляется, и может быть доступна более новая версия. Мы рекомендуем вам получить последнюю версию соединителя из репозитория Maven.

  2. Установите созданную библиотеку в кластере.

Схема

Схема записей:

Column Тип
body binary
partition строка
offset строка
sequenceNumber длинный
enqueuedTime TIMESTAMP
publisher строка
partitionKey строка
properties map[string,json]

body всегда предоставляется в виде массива байтов. Используйте cast("string") для явной десериализации столбца body.

Краткая инструкция

Начнем с краткого примера: WordCount. Следующая записная книжка — это все, что нужно для запуска WordCount с использованием структурированной потоковой передачи с помощью Центров событий Azure.

Выполнение WordCount с использованием Центров событий Azure с записной книжкой структурированной потоковой передачи

Получить записную книжку

Настройка

В этом разделе обсуждаются параметры конфигурации, необходимые для работы с Центрами событий.

Подробные инструкции по настройке структурированной потоковой передачи с использованием Центров событий Azure см. в статье Руководство по интеграции структурированной потоковой передачи и Центров событий Azure от корпорации Майкрософт.

Подробные инструкции по использованию структурированной потоковой передачи см. в статье "Потоковая передача" в Azure Databricks.

Connection string

Строка подключения к Центрам событий требуется для подключения к службе Центров событий. Строку подключения для экземпляра Центров событий можно получить на портале Azure или с помощью ConnectionStringBuilder в библиотеке.

Портал Azure

Когда вы получаете строку подключения с портала Azure, в ней может быть или не быть ключ EntityPath. Необходимо учесть следующие моменты.

  // Without an entity path
val without = "Endpoint=<endpoint>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>"

// With an entity path
val with = "Endpoint=sb://<sample>;SharedAccessKeyName=<key-name>;SharedAccessKey=<key>;EntityPath=<eventhub-name>"

Чтобы подключиться к Центрам событий, требуется присутствие EntityPath. Если в строке подключения этого нет, не беспокойтесь. Этот вопрос будет решен за вас:

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder(without)   // defined in the previous code block
  .setEventHubName("<eventhub-name>")
  .build

ConnectionStringBuilder

Кроме того, можно использовать ConnectionStringBuilder для создания строки подключения.

import org.apache.spark.eventhubs.ConnectionStringBuilder

val connectionString = ConnectionStringBuilder()
  .setNamespaceName("<namespace-name>")
  .setEventHubName("<eventhub-name>")
  .setSasKeyName("<key-name>")
  .setSasKey("<key>")
  .build

EventHubsConf

Вся конфигурация, связанная с Центрами событий, определяется в EventHubsConf. Чтобы создать EventHubsConf, необходимо передать строку подключения.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)

Дополнительные сведения о получении допустимой строки подключения см. в разделе Строка подключения.

Полный список конфигураций см. в разделе EventHubsConf. Вот набор конфигураций, которые помогут вам приступить к работе:

Вариант Значение По умолчанию Тип запроса Description
consumerGroup Строка $Default Потоковая передача и пакетная обработка Группа потребителей — это представление всего концентратора событий. Группы потребителей обеспечивают каждому из нескольких потребляющих приложений отдельное представление потока событий, а также возможность считывания потока независимо друг от друга в своем темпе и с собственными смещениями. Дополнительные сведения см. в документации Майкрософт.
startingPosition EventPosition Начало потоковой передачи Потоковая передача и пакетная обработка Начальная позиция для задания структурированной потоковой передачи. Дополнительные сведения о порядке, в котором считываются параметры, см. в разделе startingPositions.
maxEventsPerTrigger длинный partitionCount

* 1000
Запрос потоковой передачи Ограничение скорости для максимального количества событий, обрабатываемых за интервал триггера. Указанное общее количество событий будет пропорционально распределено между секциями разного объема.

Для каждого варианта существует соответствующий параметр в EventHubsConf. Например:

import org.apache.spark.eventhubs.

val cs = "<your-connection-string>"
val eventHubsConf = EventHubsConf(cs)
  .setConsumerGroup("sample-cg")
  .setMaxEventsPerTrigger(10000)

EventPosition

EventHubsConf позволяет пользователям указывать начальную (и конечную) позиции в классеEventPosition. EventPosition определяет позицию события в секции концентратора событий. Позицией может быть время постановки в очередь, смещение, порядковый номер, а также начало или конец потока.

import org.apache.spark.eventhubs._

EventPosition.fromOffset("246812")          // Specifies offset 246812
EventPosition.fromSequenceNumber(100L)      // Specifies sequence number 100
EventPosition.fromEnqueuedTime(Instant.now) // Any event after the current time
EventPosition.fromStartOfStream             // Specifies from start of stream
EventPosition.fromEndOfStream               // Specifies from end of stream

Если вы хотите выполнить запуск (или остановку) в определенной позиции, просто создайте правильный класс EventPosition и настройте его в EventHubsConf.

val connectionString = "<event-hub-connection-string>"
val eventHubsConf = EventHubsConf(connectionString)
  .setStartingPosition(EventPosition.fromEndOfStream)

Структурированная потоковая передача с помощью Центров событий Azure в рабочей среде

Когда вы запускаете потоковые запросы в рабочей среде, вам, вероятно, нужны более надежные гарантии надежности и времени безотказной работы, чем если бы вы просто подключали записную книжку к кластеру и выполняли свои потоковые запросы в интерактивном режиме. Импортируйте и запустите следующую записную книжку, чтобы узнать, как настроить и запустить структурированную потоковую передачу в рабочей среде с помощью Центров событий Azure и Azure Databricks.

Дополнительные сведения см. в рекомендациях по рабочей среде для структурированной потоковой передачи.

Структурированная потоковая передача с помощью записной книжки Центров событий Azure в рабочей среде

Получить записную книжку