Отправлять события или получения событий из концентраторов событий с помощью GoSend events to or receive events from Event Hubs using Go

Центры событий Azure — это платформа потоковой передачи больших данных и служба приема событий, принимающая и обрабатывающая миллионы событий в секунду.Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. Центры событий могут обрабатывать и сохранять события, данные и телеметрию, созданные распределенным программным обеспечением и устройствами.Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. Данные, отправляемые в концентратор событий, можно преобразовывать и сохранять с помощью любого поставщика аналитики в реальном времени, а также с помощью адаптеров пакетной обработки или хранения.Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. Подробный обзор Центров событий см. в статьях Что такое Центры событий Azure? и Обзор функций Центров событий.For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

Это руководство содержит инструкции для написания приложений Go для отправки событий или получать события из концентратора событий.This tutorial describes how to write Go applications to send events to or receive events from an event hub.

Примечание

Вы можете скачать это краткое руководство в качестве примера с сайта GitHub, заменить строки EventHubConnectionString и EventHubName значениями для своего концентратора событий и выполнить этот пример.You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. Или следуйте инструкциям из этого руководства, чтобы создать собственное решение.Alternatively, you can follow the steps in this tutorial to create your own.

Технические условияPrerequisites

Для работы с данным руководством вам потребуется:To complete this tutorial, you need the following prerequisites:

  • Локально установленный Go.Go installed locally. При необходимости выполните следующие инструкции.Follow these instructions if necessary.
  • Активная учетная запись Azure.An active Azure account. Если у вас еще нет подписки Azure, создайте бесплатную учетную запись Azure, прежде чем начинать работу.If you don't have an Azure subscription, create a free account before you begin.
  • Создать пространство имен концентраторов событий и концентратора событий.Create an Event Hubs namespace and an event hub. Используйте портала Azure создать пространство имен типа концентраторов событий и получение учетных данных управления, необходимых приложению для взаимодействия с концентратором событий.Use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. Чтобы создать пространство имен и концентратор событий, выполните инструкции из этой статьи.To create a namespace and an event hub, follow the procedure in this article.

Отправка событийSend events

В этом разделе показано, как создать приложение Go для отправки событий в концентратор событий.This section shows you how to create a Go application to send events to an event hub.

Установка пакета GoInstall Go package

Получить пакет Go для Центров событий с помощью go get или dep.Get the Go package for Event Hubs with go get or dep. Пример:For example:

go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go

Импорт пакетов в файл кодаImport packages in your code file

Для импорта пакетов Go используйте следующий пример кода:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
)

Создание субъекта-службыCreate service principal

Создайте субъект-службу, следуя инструкциям в статье Создание субъекта-службы Azure с помощью Azure CLI 2.0.Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. Сохраните предоставленные учетные данные в своей среде со следующими именами.Save the provided credentials in your environment with the following names. Пакеты Azure SDK для Go и Центров событий предварительно настроенные для поиска этих имен переменных:Both the Azure SDK for Go and the Event Hubs packages are preconfigured to look for these variable names:

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Теперь создайте поставщик авторизации для клиента Центров событий, использующего эти учетные данные:Now, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Создание клиента Центров событийCreate Event Hubs client

В следующем коде создается клиент Центров событий:The following code creates an Event Hubs client:

hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
    log.Fatalf("failed to get hub %s\n", err)
}

Написание кода для отправки сообщенийWrite code to send messages

В следующем фрагменте кода используйте (1) для отправки сообщений из терминала в интерактивном режиме или (2) для отправки сообщений в рамках программы:In the following snippet, use (1) to send messages interactively from a terminal, or (2) to send messages within your program:

// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
    fmt.Printf("Input a message to send: ")
    text, _ := reader.ReadString('\n')
    hub.Send(ctx, eventhubs.NewEventFromString(text))
}

// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))

ДополнительноExtras

Получите идентификаторы каждой секции в концентраторе событий:Get the IDs of the partitions in your event hub:

info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
    log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)

Запустите приложение и отправьте события в концентратор событий.Run the application to send events to the event hub.

Поздравляем!Congratulations! Теперь вы можете отправлять сообщения в концентратор событий.You have now sent messages to an event hub.

Получение событийReceive events

Создание учетной записи хранения и контейнераCreate a Storage account and container

Такие состояния, как аренда разделов и контрольные точки в потоке событий, совместно используются получателями через контейнер службы хранилища Azure.State such as leases on partitions and checkpoints in the event stream are shared between receivers using an Azure Storage container. Вы можете создать учетную запись хранения и контейнер с помощью пакета SDK для Go, но их также можно создать, следуя инструкциям в статье Об учетных записях хранения Azure.You can create a storage account and container with the Go SDK, but you can also create one by following the instructions in About Azure storage accounts.

Примеры для создания артефактов хранилища с помощью пакета SDK для Go доступны в репозитории примеров Go и в примере, прилагаемом к этому руководству.Samples for creating Storage artifacts with the Go SDK are available in the Go samples repo and in the sample corresponding to this tutorial.

Выберите пакетыGo packages

Чтобы получать сообщения, получите пакеты Go для концентраторов событий с помощью go get или dep:To receive the messages, get the Go packages for Event Hubs with go get or dep:

go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest

Импорт пакетов в файл кодаImport packages in your code file

Для импорта пакетов Go используйте следующий пример кода:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
    eph "github.com/Azure/azure-event-hubs-go/eph"
    storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
    azure "github.com/Azure/go-autorest/autorest/azure"
)

Создание субъекта-службыCreate service principal

Создайте субъект-службу, следуя инструкциям в статье Создание субъекта-службы Azure с помощью Azure CLI 2.0.Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. Сохраните предоставленные учетные данные в своей среде со следующими именами. Пакет Azure SDK для Go и пакет для службы "Центры событий" предварительно настроены для поиска этих имен переменных.Save the provided credentials in your environment with the following names: Both Azure SDK for Go and Event Hubs package are preconfigured to look for these variable names.

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Затем создайте поставщика авторизации для вашего клиента концентраторов событий, который использует эти учетные данные:Next, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Получение структуры метаданныхGet metadata struct

Получите структуру с метаданными о вашей среде Azure, используя пакет Azure SDK для Go.Get a struct with metadata about your Azure environment using the Azure Go SDK. Последующие операции используют эту структуру для поиска нужных конечных точек.Later operations use this struct to find correct endpoints.

azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
    log.Fatalf("could not get azure.Environment struct: %s\n", err)
}

Создание вспомогательного приложения для учетных данныхCreate credential helper

Создайте вспомогательное приложение, которое использует предыдущие учетные данные Azure Active Directory (AAD) для создания подписанного URL-адреса (SAS) для службы хранилища.Create a credential helper that uses the previous Azure Active Directory (AAD) credentials to create a Shared Access Signature (SAS) credential for Storage. Последний параметр предписывает конструктору использовать такие же переменные среды, как и ранее:The last parameter tells this constructor to use the same environment variables as used previously:

cred, err := storageLeaser.NewAADSASCredential(
    subscriptionID,
    resourceGroupName,
    storageAccountName,
    storageContainerName,
    storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
    log.Fatalf("could not prepare a storage credential: %s\n", err)
}

Создайте указатель проверку и leaserCreate a check pointer and a leaser

Создание leaser, ответственные за предоставление секции для определенного получателя и проверьте указатель, отвечает за запись контрольных точек для потока сообщений, так что можно начать другим получателям чтение из правильного смещения.Create a leaser, responsible for leasing a partition to a particular receiver, and a check pointer, responsible for writing checkpoints for the message stream so that other receivers can begin reading from the correct offset.

В настоящее время доступно одно StorageLeaserCheckpointer. Оно использует один контейнер хранилища для управления арендой и контрольными точками.Currently, a single StorageLeaserCheckpointer is available that uses the same Storage container to manage both leases and checkpoints. Помимо имен учетной записи и контейнера, средству StorageLeaserCheckpointer требуются учетные данные, созданные на предыдущем шаге, и структура среды Azure для правильного доступа к контейнеру.In addition to the storage account and container names, the StorageLeaserCheckpointer needs the credential created in the previous step and the Azure environment struct to correctly access the container.

leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
    cred,
    storageAccountName,
    storageContainerName,
    azureEnv)
if err != nil {
    log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}

Создание узла обработчика событийConstruct Event Processor Host

Теперь у вас есть компоненты для создания узла обработчика событий следующим образом.You now have the pieces needed to construct an EventProcessorHost, as follows. Же StorageLeaserCheckpointer используется в качестве leaser и указатель на флажок, как описано ранее:The same StorageLeaserCheckpointer is used as both a leaser and check pointer, as described previously:

ctx := context.Background()
p, err := eph.New(
    ctx,
    nsName,
    hubName,
    tokenProvider,
    leaserCheckpointer,
    leaserCheckpointer)
if err != nil {
    log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())

Создание обработчикаCreate handler

Теперь создайте обработчик и зарегистрируйте его в узле обработчика событий.Now create a handler and register it with the Event Processor Host. Когда узел запущен, он применяет его и любые другие указанные обработчики ко входящим сообщениям:When the host is started, it applies this and any other specified handlers to incoming messages:

handler := func(ctx context.Context, event *eventhubs.Event) error {
    fmt.Printf("received: %s\n", string(event.Data))
    return nil
}

// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
    log.Fatalf("failed to register handler: %s\n", err)
}

Написание кода для получения сообщенийWrite code to receive messages

Когда все будет настроено, можно запустить узел обработчика событий с Start(context), чтобы он постоянно работал, или с StartNonBlocking(context) для работы только до тех пор, пока доступны сообщения.With everything set up, you can start the Event Processor Host with Start(context) to keep it permanently running, or with StartNonBlocking(context) to run only as long as messages are available.

В этом руководстве запуск и выполнение происходит следующим образом. (Ознакомьтесь с примером использования StartNonBlocking на GitHub).This tutorial starts and runs as follows; see the GitHub sample for an example using StartNonBlocking:

ctx := context.Background()
err = p.Start()
if err != nil {
    log.Fatalf("failed to start EPH: %s\n", err)
}

Дальнейшие действияNext steps

Ознакомьтесь со следующими статьями:Read the following articles: