Rychlý start: Odesílání nebo příjem událostí z Event Hubs Go

Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro ingestování událostí, která je schopná přijmout a zpracovat miliony událostí za sekundu. Služba Event Hubs dokáže zpracovávat a ukládat události, data nebo telemetrické údaje produkované distribuovaným softwarem a zařízeními. Data odeslaná do centra událostí je možné transformovat a uložit pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů pro dávkové zpracování a ukládání. Podrobnější přehled služby Event Hubs najdete v tématech Přehled služby Event Hubs a Funkce služby Event Hubs.

Tento kurz popisuje, jak psát aplikace Go pro odesílání událostí nebo příjem událostí z centra událostí.

Poznámka

Tento rychlý start si můžete stáhnout jako ukázku z GitHubu, nahradit řetězce EventHubConnectionString a EventHubName, hodnotami pro vaše centrum událostí a spustit. Alternativně můžete vytvořit vlastní řešení podle kroků v tomto kurzu.

Požadavky

Pro absolvování tohoto kurzu musí být splněné následující požadavky:

  • Přejděte do místní instalace. V případě potřeby postupujte podle těchto pokynů.
  • Aktivní účet Azure. Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
  • Vytvořte Event Hubs oboru názvů a centrum událostí. Pomocí Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.

Odesílání událostí

V této části si ukážeme, jak vytvořit aplikaci Go pro odesílání událostí do centra událostí.

Instalace balíčku Go

Získejte balíček Go pro Event Hubs pomocí go get nebo dep . Například:

go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go

Import balíčků do souboru kódu

Pokud chcete importovat balíčky Go, použijte následující příklad kódu:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
)

Vytvoření instančního objektu

Vytvořte nový instanční objekt podle pokynů v tématu Vytvoření instančního objektu Azure pomocí Azure CLI 2.0. Zadané přihlašovací údaje uložte ve svém prostředí s následujícími názvy. Balíčky Azure SDK pro Go i Event Hubs jsou předem nakonfigurované tak, aby hledejy tyto názvy proměnných:

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Teď vytvořte zprostředkovatele autorizace pro vašeho Event Hubs, který používá tyto přihlašovací údaje:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Vytvoření Event Hubs klienta

Následující kód vytvoří Event Hubs klienta:

hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
    log.Fatalf("failed to get hub %s\n", err)
}

Napsání kódu pro odesílání zpráv

V následujícím fragmentu kódu použijte (1) k interaktivnímu odesílání zpráv z terminálu nebo (2) k odesílání zpráv v rámci programu:

// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
    fmt.Printf("Input a message to send: ")
    text, _ := reader.ReadString('\n')
    hub.Send(ctx, eventhubs.NewEventFromString(text))
}

// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))

Extra

Získejte ID oddílů v centru událostí:

info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
    log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)

Spusťte aplikaci pro odesílání událostí do centra událostí.

Gratulujeme! Nyní jste odeslali zprávy do centra událostí.

Příjem událostí

Vytvoření Storage a kontejneru

Stav, jako je zapůjčení oddílů a kontrolních bodů ve streamu událostí, se sdílí mezi příjemci pomocí Azure Storage kontejneru. Účet úložiště a kontejner můžete vytvořit pomocí sady Go SDK, ale můžete si ho také vytvořit podle pokynů v tématu Informace o účtech úložiště Azure.

Ukázky pro Storage artefaktů pomocí sady Go SDK jsou k dispozici v úložiště ukázek Go a v ukázce odpovídající tomuto kurzu.

Balíčky Go

Pokud chcete zprávy přijímat, získejte balíčky Go pro Event Hubs pomocí go get nebo dep :

go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest

Import balíčků do souboru kódu

Pokud chcete importovat balíčky Go, použijte následující příklad kódu:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
    eph "github.com/Azure/azure-event-hubs-go/eph"
    storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
    azure "github.com/Azure/go-autorest/autorest/azure"
)

Vytvoření instančního objektu

Vytvořte nový instanční objekt podle pokynů v tématu Vytvoření instančního objektu Azure pomocí Azure CLI 2.0. Uložte zadané přihlašovací údaje ve vašem prostředí s následujícími názvy: Jak balíček Azure SDK pro Go, tak Event Hubs jsou předem nakonfigurované tak, aby tyto názvy proměnných vypadaly.

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Dále vytvořte zprostředkovatele autorizace pro vašeho klienta Event Hubs, který používá tyto přihlašovací údaje:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Získání metadata – struktura

Získejte struktury s metadaty o prostředí Azure pomocí sady Azure Go SDK. Při pozdějších operacích se tato struktura používá k vyhledání správných koncových bodů.

azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
    log.Fatalf("could not get azure.Environment struct: %s\n", err)
}

Vytvoření pomocníka pro přihlašovací údaje

Vytvořte pomocná služba přihlašovacích údajů, která používá předchozí přihlašovací údaje Azure Active Directory (AAD) k vytvoření přihlašovacích údajů sdíleného přístupového podpisu (SAS) pro Storage. Poslední parametr říká tomuto konstruktoru, aby používal stejné proměnné prostředí jako dříve:

cred, err := storageLeaser.NewAADSASCredential(
    subscriptionID,
    resourceGroupName,
    storageAccountName,
    storageContainerName,
    storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
    log.Fatalf("could not prepare a storage credential: %s\n", err)
}

Vytvoření kontrolního ukazatele a zapůjčení

Vytvořte zapůjčení, které zodpovídá za pronájem oddílu konkrétnímu příjemci, a kontrolní ukazatel , který zodpovídá za zápis kontrolních bodů pro stream zpráv, aby ostatní příjemci mohli začít číst ze správného posunu.

V současné době je k dispozici jeden StorageLeaserCheckpointer, který používá stejný kontejner Storage ke správě zapůjčení i kontrolních bodů. Kromě názvů účtů úložiště a kontejnerů potřebuje StorageLeaserCheckpointer přihlašovací údaje vytvořené v předchozím kroku a struktury prostředí Azure pro správný přístup ke kontejneru.

leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
    cred,
    storageAccountName,
    storageContainerName,
    azureEnv)
if err != nil {
    log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}

Vytvoření hostitele procesoru událostí

Teď máte součásti potřebné k vytvoření třídy EventProcessorHost následujícím způsobem. Stejný StorageLeaserCheckpointer se používá jako ukazatel zapůjčení i kontrolní ukazatel, jak je popsáno výše:

ctx := context.Background()
p, err := eph.New(
    ctx,
    nsName,
    hubName,
    tokenProvider,
    leaserCheckpointer,
    leaserCheckpointer)
if err != nil {
    log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())

Vytvoření obslužné rutiny

Teď vytvořte obslužnou rutinu a zaregistrujte ji do třídy Event Processor Host. Při spuštění hostitele se použije toto a všechny ostatní zadané obslužné rutiny na příchozí zprávy:

handler := func(ctx context.Context, event *eventhubs.Event) error {
    fmt.Printf("received: %s\n", string(event.Data))
    return nil
}

// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
    log.Fatalf("failed to register handler: %s\n", err)
}

Napsání kódu pro přijímání zpráv

Když máte všechno nastavené, můžete spustit Event Processor Host pomocí , aby byl trvale spuštěný, nebo příkazem , který se spustí jenom v případě, že Start(context) StartNonBlocking(context) jsou k dispozici zprávy.

Tento kurz se spustí a spustí se následujícím způsobem. Příklad použití GitHub ukázku najdete v následující StartNonBlocking ukázce:

ctx := context.Background()
err = p.Start()
if err != nil {
    log.Fatalf("failed to start EPH: %s\n", err)
}

Další kroky

Přečtěte si následující články: