Обработка событий Центров событий в Apache Kafka с использованием Stream AnalyticsProcess Apache Kafka for Event Hubs events using Stream analytics

В этой статье показано, как выполняется потоковая передача данных в Центры событий с поддержкой Kafka и обработка с помощью Azure Stream Analytics.This article shows how to stream data into Kafka-enabled Event Hubs and process it with Azure Stream Analytics. Здесь подробно описаны следующие действия:It walks you through the following steps:

  1. создание пространства имен Центров событий с поддержкой Kafka;Create a Kafka enabled Event Hubs namespace.
  2. создание клиента Kafka, который отправляет сообщения в концентратор событий;Create a Kafka client that sends messages to the event hub.
  3. создание задания Stream Analytics, которое копирует данные из концентратора событий в хранилище BLOB-объектов Azure.Create a Stream Analytics job that copies data from the event hub into an Azure blob storage.

Вам не обязательно изменять протокол клиентов или запускать собственные кластеры при использовании конечной точки Kafka, предоставленной концентратором событий.You do not need to change your protocol clients or run your own clusters when you use the Kafka endpoint exposed by an event hub. Центры событий Azure поддерживают Apache Kafka 1.0.Azure Event Hubs supports Apache Kafka version 1.0. и более поздние версии.and above.

предварительные требованияPrerequisites

Ниже указаны требования для работы с этим кратким руководством.To complete this quickstart, make sure you have the following prerequisites:

Создание пространства имен Центров событий с поддержкой KafkaCreate a Kafka enabled Event Hubs namespace

  1. Войдите на портал Azure и щелкните Создать ресурс в левой верхней части экрана.Sign in to the Azure portal, and click Create a resource at the top left of the screen.

  2. Найдите Центры событий и выберите параметры, показанные ниже.Search for Event Hubs and select the options shown here:

    Поиск Центров событий на портале

  3. На странице Центры событий выберите Создать.On the Event Hubs page, select Create.

  4. На странице Создание пространства имен выполните следующие действия.On the Create Namespace page, do the following actions:

    1. Присвойте пространству имен уникальное имя.Provide a unique name for the namespace.

    2. Выберите ценовую категорию.Select a pricing tier.

    3. Выберите Включить Kafka.Select Enable Kafka. Это действие является важным шагом.This step is an important step.

    4. Выберите подписку, в которой необходимо создать пространство имен концентратора событий.Select your subscription in which you want the event hub namespace to be created.

    5. Создайте новую группу ресурсов или выберите имеющуюся.Create a new resource group or select an existing resource group.

    6. Выберите расположение.Select a location.

    7. Нажмите кнопку Создать.Click Create.

      Создание пространства имен

  5. В сообщении с уведомлением выберите имя группы ресурсов.In the notification message, select the resource group name.

    Создание пространства имен

  6. Выберите Пространство имен концентратора событий в группе ресурсов.Select the event hub namespace in the resource group.

  7. После создания пространства имен, выберите Политики общего доступа в разделе Параметры.Once the namespace is created, select Shared access policies under SETTINGS.

    Выбор политик общего доступа

  8. Вы можете выбрать значение по умолчанию RootManageSharedAccessKey или добавить новую политику.You can choose the default RootManageSharedAccessKey, or add a new policy. Щелкните имя политики и скопируйте строку подключения.Click the policy name and copy the connection string. Строка подключения используется для настройки клиента Kafka.You use the connection string to configure the Kafka client.

    Выбор политики

Теперь можно выполнять потоковую передачу событий из приложений, использующих протокол Kafka, в Центры событий.You can now stream events from your applications that use the Kafka protocol into Event Hubs.

Отправка сообщений с использованием Kafka в Центрах событийSend messages with Kafka in Event Hubs

  1. Клонируйте репозиторий Центров событий Azure для Kafka на компьютер.Clone the Azure Event Hubs for Kafka repository to your machine.

  2. Перейдите в папку azure-event-hubs-for-kafka/quickstart/java/producer.Navigate to the folder: azure-event-hubs-for-kafka/quickstart/java/producer.

  3. Обновите сведения о конфигурации для отправителя в файле по адресу src/main/resources/producer.config.Update the configuration details for the producer in src/main/resources/producer.config. Укажите имя и строку подключения для пространства имен концентратора событий.Specify the name and connection string for the event hub namespace.

    bootstrap.servers={EVENT HUB NAMESPACE}.servicebus.windows.net:9093
    security.protocol=SASL_SSL
    sasl.mechanism=PLAIN
    sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="{CONNECTION STRING for EVENT HUB NAMESPACE}";
    
  4. Перейдите в azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/com/example/app и откройте файл TestDataReporter.java в любом редакторе по своему усмотрению.Navigate to azure-event-hubs-for-kafka/quickstart/java/producer/src/main/java/com/example/app, and open TestDataReporter.java file in an editor of your choice.

  5. Закомментируйте следующую строку кода.Comment out the following line of code:

                //final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "Test Data " + i);
    
  6. Добавьте следующую строку кода вместо закомментированного кода.Add the following line of code in place of the commented code:

                final ProducerRecord<Long, String> record = new ProducerRecord<Long, String>(TOPIC, time, "{ \"eventData\": \"Test Data " + i + "\" }");            
    

    Этот код отправляет данные события в формате JSON.This code sends the event data in JSON format. При настройке входных данных для задания Stream Analytics необходимо указать JSON как формат входных данных.When you configure input for a Stream Analytics job, you specify JSON as the format for the input data.

  7. Запустите отправителя и потоковую передачу данных в Центры событий с поддержкой Kafka.Run the producer and stream into Kafka-enabled Event Hubs. При использовании командной строки Node.js перед выполнением этих команд на компьютере Windows переключитесь на папку azure-event-hubs-for-kafka/quickstart/java/producer.On a Windows machine, when using a Node.js command prompt, switch to the azure-event-hubs-for-kafka/quickstart/java/producer folder before running these commands.

    mvn clean package
    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    

Проверка получения данных концентратором событийVerify that event hub receives the data

  1. Выберите Центры событий в разделе Сущности.Select Event Hubs under ENTITIES. Убедитесь, что вы видите концентратор событий с именем test.Confirm that you see an event hub named test.

    Концентратор событий. Тестирование

  2. Убедитесь, что вы видите сообщения, входящие в концентратор событий.Confirm that you see messages coming in to the event hub.

    Концентратор событий. Сообщения

Обработка данных событий с помощью задания Stream AnalyticsProcess event data using a Stream Analytics job

В этом разделе будет создано задание Azure Stream Analytics.In this section, you create an Azure Stream Analytics job. Клиент Kafka отправит события в концентратор событий.The Kafka client sends events to the event hub. Вы создадите задание Stream Analytics, которое принимает данные о событиях в качестве входных данных и выводит их в хранилище BLOB-объектов Azure.You create a Stream Analytics job that takes event data as input and outputs it to an Azure blob storage. Если у вас нет учетной записи службы хранилища Azure, то потребуется ее создать.If you don't have an Azure Storage account, create one.

Запрос в задании Stream Analytics проходит через данные без выполнения какой-либо аналитики.The query in the Stream Analytics job passes through the data without performing any analytics. Можно создать запрос, преобразующий входные данные в выходные данные в другом формате или с полученными аналитическими сведениями.You can create a query that transforms the input data to produce output data in a different format or with gained insights.

Создание задания Stream AnalyticsCreate a Stream Analytics job

  1. Выберите + Создать ресурс на портале Azure.Select + Create a resource in the Azure portal.
  2. Выберите Analytics в меню Azure Marketplace, и затем выберите задание Stream Analytics.Select Analytics in the Azure Marketplace menu, and select Stream Analytics job.
  3. На странице Новое задание Stream Analytics выполните следующие действия.On the New Stream Analytics page, do the following actions:
    1. Введите имя для задания.Enter a name for the job.

    2. Выберите свою подписку.Select your subscription.

    3. Выберите Создать новую для группы ресурсов и введите имя.Select Create new for the resource group and enter the name. Вы также можете использовать существующую группу ресурсов.You can also use an existing resource group.

    4. Выберите расположение для задания.Select a location for the job.

    5. Щелкните Создать, чтобы создать задание.Select Create to create the job.

      Новое задание Stream Analytics

Настройка входных данных для заданияConfigure job input

  1. В сообщение уведомления, выберите перейти к ресурсу для см. в разделе задания Stream Analytics страницы.In the notification message, select Go to resource to see the Stream Analytics job page.

  2. Выберите Входные данные в разделе Топология задания в меню слева.Select Inputs in the JOB TOPOLOGY section on the left menu.

  3. Выберите Добавить потоковый вход, а затем выберите Концентратор событий.Select Add stream input, and then select Event Hub.

    Добавление концентратора событий в качестве входных данных

  4. На странице конфигурации входных данных концентратора событий выполните следующие действия.On the Event Hub input configuration page, do the following actions:

    1. Укажите псевдоним для входных данных.Specify an alias for the input.

    2. Выберите свою подписку Azure.Select your Azure subscription.

    3. Выберите пространство имен концентратора событий созданное ранее.Select the event hub namespace your created earlier.

    4. Выберите тестирование для концентратора событий.Select test for the event hub.

    5. Щелкните Сохранить.Select Save.

      Конфигурация входных данных концентратора событий

Настройка выходных данных для заданияConfigure job output

  1. Выберите Выходные данные в меню раздела Топология задания.Select Outputs in the JOB TOPOLOGY section on the menu.
  2. Выберите + Добавить на панели инструментов и выберите Хранилище BLOB-объектовSelect + Add on the toolbar, and select Blob storage
  3. На странице параметров выходных данных хранилища BLOB-объектов выполните следующие действия.On the Blob storage output settings page, do the following actions:
    1. Укажите псевдоним для выходных данных.Specify an alias for the output.

    2. Выберите свою подписку Azure.Select your Azure subscription.

    3. Выберите свою учетную запись службы хранилища Azure.Select your Azure Storage account.

    4. Введите имя для контейнера, в котором хранятся выходные данные из запроса Stream Analytics.Enter a name for the container that stores the output data from the Stream Analytics query.

    5. Щелкните Сохранить.Select Save.

      Конфигурация выходных данных хранилища BLOB-объектов

Определение запросаDefine a query

Настроив задание Stream Analytics для считывания входящего потока данных, необходимо создать преобразование, которое анализирует данные в реальном времени.After you have a Stream Analytics job setup to read an incoming data stream, the next step is to create a transformation that analyzes data in real time. Определите запрос преобразования с помощью языка запросов Stream Analytics.You define the transformation query by using Stream Analytics Query Language. В этом пошаговом руководстве необходимо определить запрос, который проходит через данные без выполнения какого-либо преобразования.In this walkthrough, you define a query that passes through the data without performing any transformation.

  1. Выберите Запрос.Select Query.

  2. В окне запроса замените [YourOutputAlias] псевдонимом выходных данных, созданным ранее.In the query window, replace [YourOutputAlias] with the output alias you created earlier.

  3. Замените [YourInputAlias] псевдонимом входных данных, созданным ранее.Replace [YourInputAlias] with the input alias you created earlier.

  4. На панели инструментов щелкните Сохранить.Select Save on the toolbar.

    Запрос

Выполнение задания Stream AnalyticsRun the Stream Analytics job

  1. В меню слева выберите Обзор.Select Overview on the left menu.

  2. Щелкните Запуск.Select Start.

    Меню "Пуск"

  3. На странице Запуск задания выберите Запуск.On the Start job page, select Start.

    Страница запуска задания

  4. Подождите, пока состояние задания не изменится с запуска на выполняется.Wait until the status of the job changes from Starting to running.

    Состояние задания. Выполняется

Тестирование сценарияTest the scenario

  1. Запустите еще раз отправитель Kafka, чтобы отправить события в концентратор событий.Run the Kafka producer again to send events to the event hub.

    mvn exec:java -Dexec.mainClass="TestProducer"                                    
    
  2. Убедитесь, что вы видите выходные данные сгенерированные в хранилище BLOB-объектов Azure.Confirm that you see output data is generated in the Azure blob storage. Вы увидите JSON-файл в контейнере с сотней строк, которые выглядят следующим образом.You see a JSON file in the container with 100 rows that look like the following sample rows:

    {"eventData":"Test Data 0","EventProcessedUtcTime":"2018-08-30T03:27:23.1592910Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 1","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    {"eventData":"Test Data 2","EventProcessedUtcTime":"2018-08-30T03:27:23.3936511Z","PartitionId":0,"EventEnqueuedUtcTime":"2018-08-30T03:27:22.9220000Z"}
    

    Задание Azure Stream Analytics получило входные данные из концентратора событий и в этом сценарии сохранило их в хранилище BLOB-объектов Azure.The Azure Stream Analytics job received input data from the event hub and stored it in the Azure blob storage in this scenario.

Следующие шагиNext steps

В этой статье вы узнаете, как выполнять потоковую передачу данных в Центры событий с поддержкой Kafka без необходимости менять клиенты протоколов или запускать собственные кластеры.In this article, you learned how to stream into Kafka-enabled Event Hubs without changing your protocol clients or running your own clusters. Дополнительные сведения о Центрах событий и Центрах событий для Kafka см. в статьях:To learn more about Event Hubs and Event Hubs for Kafka, see the following topic: