Snabbstart: Skicka händelser till eller ta emot händelser från Event Hubs med Go
Azure Event Hubs är en strömningstjänst för stordata och händelseinmatningstjänst som kan ta emot och bearbeta flera miljoner händelser per sekund. Event Hubs kan bearbeta och lagra händelser, data eller telemetri som producerats av distribuerade program och enheter. Data som skickas till en händelsehubb kan omvandlas och lagras med valfri provider för realtidsanalys eller batchbearbetnings-/lagringsadapter. En detaljerad översikt över Event Hubs finns i Översikt över Event Hubs och Event Hubs-funktioner.
Den här självstudien beskriver hur du skriver Go-program för att skicka händelser till eller ta emot händelser från en händelsehubb.
Anteckning
Du kan ladda ned den här snabbstarten som ett exempel från GitHub. Ersätt strängarna EventHubConnectionString och EventHubName med värdena för din händelsehubb och kör den. Alternativt kan du följa stegen i den här självstudiekursen och skapa ett eget.
Förutsättningar
För att slutföra den här självstudien, finns följande förhandskrav:
- Installera lokalt. Följ de här instruktionerna om det behövs.
- Ett aktivt Azure-konto. Om du inte har någon Azure-prenumeration kan du skapa ett kostnadsfritt konto innan du börjar.
- Skapa en Event Hubs och en händelsehubb. Använd Azure Portal för att skapa ett namnområde av Event Hubs och hämta de autentiseringsuppgifter för hantering som programmet behöver för att kommunicera med händelsehubben. Om du behöver skapa ett namnområde och en händelsehubb följer du anvisningarna i den här artikeln.
Skicka händelser
Det här avsnittet visar hur du skapar ett Go-program för att skicka händelser till en händelsehubb.
Installera Go-paketet
Hämta Go-paketet för Event Hubs med go get eller dep . Exempel:
go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...
# or
dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
Importera paket i kodfilen
Använd följande kodexempel för att importera Go-paketen:
import (
aad "github.com/Azure/azure-amqp-common-go/aad"
eventhubs "github.com/Azure/azure-event-hubs-go"
)
Skapa tjänstens huvudnamn
Skapa ett nytt huvudnamn för tjänsten genom att följa anvisningarna i Skapa ett huvudnamn för Azure-tjänsten med Azure CLI 2.0. Spara de angivna autentiseringsuppgifterna i din miljö med följande namn. Både Azure SDK för Go och Event Hubs är förkonfigurerade för att leta efter dessa variabelnamn:
export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID=
Skapa nu en auktoriseringsprovider för din Event Hubs som använder dessa autentiseringsuppgifter:
tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}
Skapa Event Hubs klient
Följande kod skapar en Event Hubs klient:
hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
log.Fatalf("failed to get hub %s\n", err)
}
Skriv kod för att skicka meddelanden
I följande kodfragment använder du (1) för att skicka meddelanden interaktivt från en terminal eller (2) för att skicka meddelanden i programmet:
// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
fmt.Printf("Input a message to send: ")
text, _ := reader.ReadString('\n')
hub.Send(ctx, eventhubs.NewEventFromString(text))
}
// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))
Tillägg
Hämta partitions-ID:erna i händelsehubben:
info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)
Kör programmet för att skicka händelser till händelsehubben.
Grattis! Du har nu skickat meddelanden till en händelsehubb.
Ta emot händelser
Skapa ett Storage konto och en container
Tillstånd som lån på partitioner och kontrollpunkter i händelseströmmen delas mellan mottagare med hjälp av en Azure Storage container. Du kan skapa ett lagringskonto och en container med Go SDK, men du kan också skapa ett genom att följa anvisningarna i Om Azure Storage-konton.
Exempel för att Storage artefakter med Go SDK finns på lagringsplatsen Go-exempel och i exemplet som motsvarar den här självstudien.
Go-paket
Om du vill ta emot meddelandena hämtar du Go-paketen för Event Hubs go get med eller dep :
go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...
# or
dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest
Importera paket i kodfilen
Använd följande kodexempel för att importera Go-paketen:
import (
aad "github.com/Azure/azure-amqp-common-go/aad"
eventhubs "github.com/Azure/azure-event-hubs-go"
eph "github.com/Azure/azure-event-hubs-go/eph"
storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
azure "github.com/Azure/go-autorest/autorest/azure"
)
Skapa tjänstens huvudnamn
Skapa ett nytt huvudnamn för tjänsten genom att följa anvisningarna i Skapa ett huvudnamn för Azure-tjänsten med Azure CLI 2.0. Spara de angivna autentiseringsuppgifterna i din miljö med följande namn: Både Azure SDK för Go och Event Hubs-paketet är förkonfigurerade för att leta efter dessa variabelnamn.
export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID=
Skapa sedan en auktoriseringsprovider för din Event Hubs som använder dessa autentiseringsuppgifter:
tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}
Hämta metadata-struct
Hämta en struct med metadata om din Azure-miljö med hjälp av Azure Go SDK. Senare åtgärder använder den här struct-structen för att hitta rätt slutpunkter.
azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
log.Fatalf("could not get azure.Environment struct: %s\n", err)
}
Skapa autentiseringshjälp
Skapa en autentiseringshjälp som använder de tidigare Azure Active Directory-autentiseringsuppgifterna (AAD) för att skapa en SAS-autentiseringsautentisering (signatur för delad åtkomst) för Storage. Den sista parametern anger att konstruktorn ska använda samma miljövariabler som tidigare:
cred, err := storageLeaser.NewAADSASCredential(
subscriptionID,
resourceGroupName,
storageAccountName,
storageContainerName,
storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
log.Fatalf("could not prepare a storage credential: %s\n", err)
}
Skapa en kontroll pekare och en leaser
Skapa en leaser, som ansvarar för att leasa en partition till en viss mottagare och en check pekare , som ansvarar för att skriva kontrollpunkter för meddelandeströmmen så att andra mottagare kan börja läsa från rätt offset.
För närvarande finns det en enda StorageLeaserCheckpointer som använder samma Storage för att hantera både lån och kontrollpunkter. Förutom lagringskontot och containernamnen behöver StorageLeaserCheckpointer de autentiseringsuppgifter som skapades i föregående steg och struct-struct för Azure-miljön för att få korrekt åtkomst till containern.
leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
cred,
storageAccountName,
storageContainerName,
azureEnv)
if err != nil {
log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}
Skapa en värd för händelseprocessorn
Nu har du de delar som behövs för att skapa en EventProcessorHost enligt följande. Samma StorageLeaserCheckpointer används som både leaser och check pointer, enligt beskrivningen ovan:
ctx := context.Background()
p, err := eph.New(
ctx,
nsName,
hubName,
tokenProvider,
leaserCheckpointer,
leaserCheckpointer)
if err != nil {
log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())
Skapa hanterare
Skapa nu en hanterare och registrera den med värden för händelseprocessorn. När värden startas tillämpas detta och alla andra angivna hanterare för inkommande meddelanden:
handler := func(ctx context.Context, event *eventhubs.Event) error {
fmt.Printf("received: %s\n", string(event.Data))
return nil
}
// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
log.Fatalf("failed to register handler: %s\n", err)
}
Skriv kod för att ta emot meddelanden
När allt har ställts in kan du starta värden för händelseprocessorn med för att hålla den permanent igång, eller med så att den bara körs Start(context) så länge meddelanden är StartNonBlocking(context) tillgängliga.
Den här självstudien startar och körs på följande sätt: se GitHub exempel för ett exempel med StartNonBlocking :
ctx := context.Background()
err = p.Start()
if err != nil {
log.Fatalf("failed to start EPH: %s\n", err)
}
Nästa steg
Läs följande artiklar: