Inicio rápido: Envío o recepción de eventos en Event Hubs mediante GoQuickstart: Send events to or receive events from Event Hubs using Go

Azure Event Hubs es una plataforma de streaming de macrodatos y servicio de ingesta de eventos de gran escalabilidad capaz de recibir y procesar millones de eventos por segundo.Azure Event Hubs is a Big Data streaming platform and event ingestion service, capable of receiving and processing millions of events per second. Event Hubs puede procesar y almacenar eventos, datos o telemetría generados por dispositivos y software distribuido.Event Hubs can process and store events, data, or telemetry produced by distributed software and devices. Los datos enviados a un centro de eventos se pueden transformar y almacenar con cualquier proveedor de análisis en tiempo real o adaptadores de procesamiento por lotes y almacenamiento.Data sent to an event hub can be transformed and stored using any real-time analytics provider or batching/storage adapters. Para más información sobre Event Hubs, consulte Introducción a Event Hubs y Características de Event Hubs.For detailed overview of Event Hubs, see Event Hubs overview and Event Hubs features.

En este tutorial se describe cómo escribir aplicaciones de Go para enviar o recibir eventos en un centro de eventos.This tutorial describes how to write Go applications to send events to or receive events from an event hub.

Nota

Puede descargar esta guía de inicio rápido como un ejemplo desde GitHub, reemplazar las cadenas EventHubConnectionString y EventHubName por los valores del centro de eventos, y ejecutarlo.You can download this quickstart as a sample from the GitHub, replace EventHubConnectionString and EventHubName strings with your event hub values, and run it. También puede seguir los pasos de este tutorial para crear el suyo propio.Alternatively, you can follow the steps in this tutorial to create your own.

Requisitos previosPrerequisites

Para completar este tutorial, debe cumplir los siguientes requisitos previos:To complete this tutorial, you need the following prerequisites:

  • Go instalado de forma local.Go installed locally. Siga estas instrucciones si es necesario.Follow these instructions if necessary.
  • Una cuenta de Azure activa.An active Azure account. Si no tiene una suscripción a Azure, cree una cuenta gratuita antes de empezar.If you don't have an Azure subscription, create a free account before you begin.
  • Creación de un espacio de nombres de Event Hubs y un centro de eventos.Create an Event Hubs namespace and an event hub. Use Azure Portal para crear un espacio de nombres de tipo Event Hubs y obtener las credenciales de administración que la aplicación necesita para comunicarse con el centro de eventos.Use the Azure portal to create a namespace of type Event Hubs, and obtain the management credentials your application needs to communicate with the event hub. Para crear un espacio de nombres y un centro de eventos, siga el procedimiento que se indica en este artículo.To create a namespace and an event hub, follow the procedure in this article.

Envío de eventosSend events

En esta sección se muestra cómo crear una aplicación de Go para enviar eventos a un centro de eventos.This section shows you how to create a Go application to send events to an event hub.

Instalación del paquete GoInstall Go package

Consiga el paquete Go para Event Hubs con go get o dep.Get the Go package for Event Hubs with go get or dep. Por ejemplo:For example:

go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go

Importación de paquetes en el archivo de códigoImport packages in your code file

Para importar paquetes de Go, utilice el siguiente ejemplo de código:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
)

Creación de una entidad de servicioCreate service principal

Cree una nueva entidad de servicio siguiendo las instrucciones que se indican en Creación de una entidad de servicio de Azure con la CLI de Azure 2.0.Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. Guarde las credenciales proporcionadas en su entorno con los siguientes nombres.Save the provided credentials in your environment with the following names. Los paquetes de Azure SDK para Go y de Event Hubs se han configurado previamente para buscar estos nombres de variables:Both the Azure SDK for Go and the Event Hubs packages are preconfigured to look for these variable names:

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Ahora, cree un proveedor de autorización para el cliente de Event Hubs que usa estas credenciales:Now, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Creación de cliente de Event HubsCreate Event Hubs client

El siguiente código crea un cliente de Event Hubs:The following code creates an Event Hubs client:

hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
    log.Fatalf("failed to get hub %s\n", err)
}

Escritura de código para enviar mensajesWrite code to send messages

En el siguiente fragmento de código, utilice (1) para enviar mensajes de forma interactiva desde un terminal, o (2) para enviar mensajes desde el programa:In the following snippet, use (1) to send messages interactively from a terminal, or (2) to send messages within your program:

// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
    fmt.Printf("Input a message to send: ")
    text, _ := reader.ReadString('\n')
    hub.Send(ctx, eventhubs.NewEventFromString(text))
}

// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))

ExtrasExtras

Obtenga los identificadores de las particiones del centro de eventos:Get the IDs of the partitions in your event hub:

info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
    log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)

Ejecute la aplicación para enviar eventos al centro de eventos.Run the application to send events to the event hub.

Felicidades.Congratulations! Ha enviado mensajes a un centro de eventos.You have now sent messages to an event hub.

Recepción de eventosReceive events

Creación de una cuenta de almacenamiento y un contenedorCreate a Storage account and container

Los estados como, por ejemplo, las concesiones sobre particiones y puntos de comprobación del flujo de eventos se comparten entre receptores mediante un contenedor de Azure Storage.State such as leases on partitions and checkpoints in the event stream are shared between receivers using an Azure Storage container. Puede crear una cuenta de almacenamiento y un contenedor con el SDK para Go, pero también puede crearlos siguiendo las instrucciones de Acerca de las cuentas de almacenamiento de Azure.You can create a storage account and container with the Go SDK, but you can also create one by following the instructions in About Azure storage accounts.

Hay ejemplos disponibles para la creación de artefactos de Storage con el SDK para Go en el repositorio de ejemplos de Go y en el ejemplo correspondiente de este tutorial.Samples for creating Storage artifacts with the Go SDK are available in the Go samples repo and in the sample corresponding to this tutorial.

Paquetes de GoGo packages

Para recibir los mensajes, obtenga los paquetes de Go para Event Hubs con go get o dep:To receive the messages, get the Go packages for Event Hubs with go get or dep:

go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest

Importación de paquetes en el archivo de códigoImport packages in your code file

Para importar paquetes de Go, utilice el siguiente ejemplo de código:To import the Go packages, use the following code example:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
    eph "github.com/Azure/azure-event-hubs-go/eph"
    storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
    azure "github.com/Azure/go-autorest/autorest/azure"
)

Creación de una entidad de servicioCreate service principal

Cree una nueva entidad de servicio siguiendo las instrucciones que se indican en Creación de una entidad de servicio de Azure con la CLI de Azure 2.0.Create a new service principal by following the instructions in Create an Azure service principal with Azure CLI 2.0. Guarde las credenciales proporcionadas en su entorno con los siguientes nombres: Los paquetes de Azure SDK para Go y de Event Hubs se han configurado previamente para buscar estos nombres de variables.Save the provided credentials in your environment with the following names: Both Azure SDK for Go and Event Hubs package are preconfigured to look for these variable names.

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

A continuación, cree un proveedor de autorización para el cliente de Event Hubs que usa estas credenciales:Next, create an authorization provider for your Event Hubs client that uses these credentials:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Obtención de la estructura de metadatosGet metadata struct

Obtenga una estructura con metadatos sobre el entorno de Azure mediante Azure SDK para Go.Get a struct with metadata about your Azure environment using the Azure Go SDK. Las operaciones posteriores usarán esta estructura para buscar los puntos de conexión correctos.Later operations use this struct to find correct endpoints.

azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
    log.Fatalf("could not get azure.Environment struct: %s\n", err)
}

Creación de un asistente de credencialesCreate credential helper

Cree un asistente de credenciales que use las credenciales anteriores de Azure Active Directory (AAD) para crear una credencial de firma de acceso compartido (SAS) para Storage.Create a credential helper that uses the previous Azure Active Directory (AAD) credentials to create a Shared Access Signature (SAS) credential for Storage. El último parámetro le indica a este constructor que debe usar las mismas variables de entorno que se usaron anteriormente:The last parameter tells this constructor to use the same environment variables as used previously:

cred, err := storageLeaser.NewAADSASCredential(
    subscriptionID,
    resourceGroupName,
    storageAccountName,
    storageContainerName,
    storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
    log.Fatalf("could not prepare a storage credential: %s\n", err)
}

Creación de un generador de puntos de comprobación y un generador de concesionesCreate a check pointer and a leaser

Cree un generador de concesiones, que se encargue de conceder una partición a un receptor determinado, y un generador de puntos de comprobación, encargado de escribir puntos de comprobación para el flujo de mensajes para que otros receptores puedan empezar a leerlos desde la posición de desplazamiento correcta.Create a leaser, responsible for leasing a partition to a particular receiver, and a check pointer, responsible for writing checkpoints for the message stream so that other receivers can begin reading from the correct offset.

Actualmente, solo hay una instancia de StorageLeaserCheckpointer disponible que usa el mismo contenedor de Storage para administrar las concesiones y los puntos de comprobación.Currently, a single StorageLeaserCheckpointer is available that uses the same Storage container to manage both leases and checkpoints. Además de la cuenta de almacenamiento y de los nombres de contenedor, StorageLeaserCheckpointer necesita las credenciales que se crearon en el paso anterior y la estructura del entorno de Azure para acceder correctamente al contenedor.In addition to the storage account and container names, the StorageLeaserCheckpointer needs the credential created in the previous step and the Azure environment struct to correctly access the container.

leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
    cred,
    storageAccountName,
    storageContainerName,
    azureEnv)
if err != nil {
    log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}

Construcción del host del procesador de eventosConstruct Event Processor Host

Ahora tiene los elementos necesarios para construir un host EventProcessorHost, de la forma que se indica a continuación.You now have the pieces needed to construct an EventProcessorHost, as follows. Se usa la misma instancia de StorageLeaserCheckpointer para el generador de concesiones y de puntos de comprobación, como se ha descrito anteriormente:The same StorageLeaserCheckpointer is used as both a leaser and check pointer, as described previously:

ctx := context.Background()
p, err := eph.New(
    ctx,
    nsName,
    hubName,
    tokenProvider,
    leaserCheckpointer,
    leaserCheckpointer)
if err != nil {
    log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())

Creación de un controladorCreate handler

Cree un controlador y regístrelo con el host del procesador de eventos.Now create a handler and register it with the Event Processor Host. Cuando se inicie el host, se aplicará este y todos los demás controladores especificados a los mensajes entrantes:When the host is started, it applies this and any other specified handlers to incoming messages:

handler := func(ctx context.Context, event *eventhubs.Event) error {
    fmt.Printf("received: %s\n", string(event.Data))
    return nil
}

// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
    log.Fatalf("failed to register handler: %s\n", err)
}

Escritura de código para recibir mensajesWrite code to receive messages

Una vez configurado todo, puede iniciar el host del procesador de eventos con Start(context) para mantenerlo constantemente en ejecución, o con StartNonBlocking(context) para que se ejecute solo si los mensajes están disponibles.With everything set up, you can start the Event Processor Host with Start(context) to keep it permanently running, or with StartNonBlocking(context) to run only as long as messages are available.

Este tutorial se inicia y se ejecuta de la siguiente manera. Consulte el ejemplo mediante StartNonBlocking:This tutorial starts and runs as follows; see the GitHub sample for an example using StartNonBlocking:

ctx := context.Background()
err = p.Start()
if err != nil {
    log.Fatalf("failed to start EPH: %s\n", err)
}

Pasos siguientesNext steps

Lea los siguientes artículos:Read the following articles: