Rychlý start: Odesílání nebo příjem událostí z Event Hubs Go
Azure Event Hubs je platforma pro streamování velkých objemů dat a služba pro ingestování událostí, která je schopná přijmout a zpracovat miliony událostí za sekundu. Služba Event Hubs dokáže zpracovávat a ukládat události, data nebo telemetrické údaje produkované distribuovaným softwarem a zařízeními. Data odeslaná do centra událostí je možné transformovat a uložit pomocí libovolného poskytovatele analýz v reálném čase nebo adaptérů pro dávkové zpracování a ukládání. Podrobnější přehled služby Event Hubs najdete v tématech Přehled služby Event Hubs a Funkce služby Event Hubs.
Tento kurz popisuje, jak psát aplikace Go pro odesílání událostí nebo příjem událostí z centra událostí.
Poznámka
Tento rychlý start si můžete stáhnout jako ukázku z GitHubu, nahradit řetězce EventHubConnectionString a EventHubName, hodnotami pro vaše centrum událostí a spustit. Alternativně můžete vytvořit vlastní řešení podle kroků v tomto kurzu.
Požadavky
Pro absolvování tohoto kurzu musí být splněné následující požadavky:
- Přejděte do místní instalace. V případě potřeby postupujte podle těchto pokynů.
- Aktivní účet Azure. Pokud ještě nemáte předplatné Azure, vytvořte si napřed bezplatný účet.
- Vytvořte Event Hubs oboru názvů a centrum událostí. Pomocí Azure Portal vytvořte obor názvů typu Event Hubs a získejte přihlašovací údaje pro správu, které vaše aplikace potřebuje ke komunikaci s centrem událostí. Pokud chcete vytvořit obor názvů a centrum událostí, postupujte podle pokynů v tomto článku.
Odesílání událostí
V této části si ukážeme, jak vytvořit aplikaci Go pro odesílání událostí do centra událostí.
Instalace balíčku Go
Získejte balíček Go pro Event Hubs pomocí go get nebo dep . Například:
go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...
# or
dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
Import balíčků do souboru kódu
Pokud chcete importovat balíčky Go, použijte následující příklad kódu:
import (
aad "github.com/Azure/azure-amqp-common-go/aad"
eventhubs "github.com/Azure/azure-event-hubs-go"
)
Vytvoření instančního objektu
Vytvořte nový instanční objekt podle pokynů v tématu Vytvoření instančního objektu Azure pomocí Azure CLI 2.0. Zadané přihlašovací údaje uložte ve svém prostředí s následujícími názvy. Balíčky Azure SDK pro Go i Event Hubs jsou předem nakonfigurované tak, aby hledejy tyto názvy proměnných:
export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID=
Teď vytvořte zprostředkovatele autorizace pro vašeho Event Hubs, který používá tyto přihlašovací údaje:
tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}
Vytvoření Event Hubs klienta
Následující kód vytvoří Event Hubs klienta:
hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
log.Fatalf("failed to get hub %s\n", err)
}
Napsání kódu pro odesílání zpráv
V následujícím fragmentu kódu použijte (1) k interaktivnímu odesílání zpráv z terminálu nebo (2) k odesílání zpráv v rámci programu:
// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
fmt.Printf("Input a message to send: ")
text, _ := reader.ReadString('\n')
hub.Send(ctx, eventhubs.NewEventFromString(text))
}
// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))
Extra
Získejte ID oddílů v centru událostí:
info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)
Spusťte aplikaci pro odesílání událostí do centra událostí.
Gratulujeme! Nyní jste odeslali zprávy do centra událostí.
Příjem událostí
Vytvoření Storage a kontejneru
Stav, jako je zapůjčení oddílů a kontrolních bodů ve streamu událostí, se sdílí mezi příjemci pomocí Azure Storage kontejneru. Účet úložiště a kontejner můžete vytvořit pomocí sady Go SDK, ale můžete si ho také vytvořit podle pokynů v tématu Informace o účtech úložiště Azure.
Ukázky pro Storage artefaktů pomocí sady Go SDK jsou k dispozici v úložiště ukázek Go a v ukázce odpovídající tomuto kurzu.
Balíčky Go
Pokud chcete zprávy přijímat, získejte balíčky Go pro Event Hubs pomocí go get nebo dep :
go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...
# or
dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest
Import balíčků do souboru kódu
Pokud chcete importovat balíčky Go, použijte následující příklad kódu:
import (
aad "github.com/Azure/azure-amqp-common-go/aad"
eventhubs "github.com/Azure/azure-event-hubs-go"
eph "github.com/Azure/azure-event-hubs-go/eph"
storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
azure "github.com/Azure/go-autorest/autorest/azure"
)
Vytvoření instančního objektu
Vytvořte nový instanční objekt podle pokynů v tématu Vytvoření instančního objektu Azure pomocí Azure CLI 2.0. Uložte zadané přihlašovací údaje ve vašem prostředí s následujícími názvy: Jak balíček Azure SDK pro Go, tak Event Hubs jsou předem nakonfigurované tak, aby tyto názvy proměnných vypadaly.
export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID=
Dále vytvořte zprostředkovatele autorizace pro vašeho klienta Event Hubs, který používá tyto přihlašovací údaje:
tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}
Získání metadata – struktura
Získejte struktury s metadaty o prostředí Azure pomocí sady Azure Go SDK. Při pozdějších operacích se tato struktura používá k vyhledání správných koncových bodů.
azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
log.Fatalf("could not get azure.Environment struct: %s\n", err)
}
Vytvoření pomocníka pro přihlašovací údaje
Vytvořte pomocná služba přihlašovacích údajů, která používá předchozí přihlašovací údaje Azure Active Directory (AAD) k vytvoření přihlašovacích údajů sdíleného přístupového podpisu (SAS) pro Storage. Poslední parametr říká tomuto konstruktoru, aby používal stejné proměnné prostředí jako dříve:
cred, err := storageLeaser.NewAADSASCredential(
subscriptionID,
resourceGroupName,
storageAccountName,
storageContainerName,
storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
log.Fatalf("could not prepare a storage credential: %s\n", err)
}
Vytvoření kontrolního ukazatele a zapůjčení
Vytvořte zapůjčení, které zodpovídá za pronájem oddílu konkrétnímu příjemci, a kontrolní ukazatel , který zodpovídá za zápis kontrolních bodů pro stream zpráv, aby ostatní příjemci mohli začít číst ze správného posunu.
V současné době je k dispozici jeden StorageLeaserCheckpointer, který používá stejný kontejner Storage ke správě zapůjčení i kontrolních bodů. Kromě názvů účtů úložiště a kontejnerů potřebuje StorageLeaserCheckpointer přihlašovací údaje vytvořené v předchozím kroku a struktury prostředí Azure pro správný přístup ke kontejneru.
leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
cred,
storageAccountName,
storageContainerName,
azureEnv)
if err != nil {
log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}
Vytvoření hostitele procesoru událostí
Teď máte součásti potřebné k vytvoření třídy EventProcessorHost následujícím způsobem. Stejný StorageLeaserCheckpointer se používá jako ukazatel zapůjčení i kontrolní ukazatel, jak je popsáno výše:
ctx := context.Background()
p, err := eph.New(
ctx,
nsName,
hubName,
tokenProvider,
leaserCheckpointer,
leaserCheckpointer)
if err != nil {
log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())
Vytvoření obslužné rutiny
Teď vytvořte obslužnou rutinu a zaregistrujte ji do třídy Event Processor Host. Při spuštění hostitele se použije toto a všechny ostatní zadané obslužné rutiny na příchozí zprávy:
handler := func(ctx context.Context, event *eventhubs.Event) error {
fmt.Printf("received: %s\n", string(event.Data))
return nil
}
// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
log.Fatalf("failed to register handler: %s\n", err)
}
Napsání kódu pro přijímání zpráv
Když máte všechno nastavené, můžete spustit Event Processor Host pomocí , aby byl trvale spuštěný, nebo příkazem , který se spustí jenom v případě, že Start(context) StartNonBlocking(context) jsou k dispozici zprávy.
Tento kurz se spustí a spustí se následujícím způsobem. Příklad použití GitHub ukázku najdete v následující StartNonBlocking ukázce:
ctx := context.Background()
err = p.Start()
if err != nil {
log.Fatalf("failed to start EPH: %s\n", err)
}
Další kroky
Přečtěte si následující články: