Hızlı Başlangıç: Go kullanarak olay gönderme veya Event Hubs alma

Azure Event Hubs saniyede milyonlarca olay alıp işleme kapasitesine sahip olan bir Büyük Veri akış platformu ve olay alma hizmetidir. Event Hubs dağıtılan yazılımlar ve cihazlar tarafından oluşturulan olayları, verileri ve telemetrileri işleyebilir ve depolayabilir. Bir olay hub’ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı ve işlem grubu oluşturma/depolama bağdaştırıcıları kullanılarak dönüştürülüp depolanabilir. Olay Hub’larının ayrıntılı genel bakışı için bkz. Olay Hub’larına genel bakış ve Olay Hub’ları özellikleri.

Bu öğreticide, bir olay hub'ını olaylara göndermek veya olay hub'larından olay almak için Go uygulamalarının nasıl yazacakları açıkmektedir.

Not

Bu hızlı başlangıcı GitHub’dan örnek olarak indirebilir, EventHubConnectionString ve EventHubName dizelerini olay hub’ınızdaki değerlerle değiştirebilir ve çalıştırabilirsiniz. Alternatif olarak bu öğreticideki adımları izleyerek kendi çözümünüzü de oluşturabilirsiniz.

Önkoşullar

Bu öğreticiyi tamamlamak için aşağıdaki önkoşulları karşılamanız gerekir:

  • Go yerel olarak yüklenir. Gerekirse bu yönergeleri izleyin.
  • Etkin bir Azure hesabı. Azure aboneliğiniz yoksa başlamadan önce [ücretsiz bir hesap][] oluşturun.
  • Bir Event Hubs alanı ve olay hub'ı oluşturun. Uygulama Azure portal ad alanı oluşturmak ve Event Hubs olay hub'ı ile iletişim kurması için gereken yönetim kimlik bilgilerini almak için kullanın. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin.

Olayları gönderme

Bu bölümde, bir olay hub'sine olay göndermek için bir Go uygulaması oluşturma hakkında size yol gösterir.

Go paketini yükleme

veya ile Event Hubs Go paketini go get elde. dep Örnek:

go get -u github.com/Azure/azure-event-hubs-go
go get -u github.com/Azure/azure-amqp-common-go/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go

Paketleri kod dosyanıza içeri aktarma

Go paketlerini içeri aktarın, aşağıdaki kod örneğini kullanın:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
)

Hizmet sorumlusu oluşturma

Azure CLI 2.0ile Azure hizmet sorumlusu oluşturma yönergelerini izleyerek yeni bir hizmet sorumlusu oluşturun. Sağlanan kimlik bilgilerini ortamınıza aşağıdaki adlarla kaydedin. Hem Go için Azure SDK hem de Event Hubs paketleri şu değişken adlarının bakarak önceden yapılandırılmıştır:

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Şimdi, bu kimlik bilgilerini kullanan Event Hubs için bir yetkilendirme sağlayıcısı oluşturun:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

İstemci Event Hubs oluşturma

Aşağıdaki kod bir istemci Event Hubs oluşturur:

hub, err := eventhubs.NewHub("namespaceName", "hubName", tokenProvider)
ctx, cancel := context.WithTimeout(context.Background(), 10 * time.Second)
defer hub.Close(ctx)
if err != nil {
    log.Fatalf("failed to get hub %s\n", err)
}

İleti göndermek için kod yazma

Aşağıdaki kod parçacığında, terminalden etkileşimli olarak ileti göndermek için (1) veya programınız içinde ileti göndermek için (2) kullanın:

// 1. send messages at the terminal
ctx = context.Background()
reader := bufio.NewReader(os.Stdin)
for {
    fmt.Printf("Input a message to send: ")
    text, _ := reader.ReadString('\n')
    hub.Send(ctx, eventhubs.NewEventFromString(text))
}

// 2. send messages within program
ctx = context.Background()
hub.Send(ctx, eventhubs.NewEventFromString("hello Azure!"))

Ek Özellikler

Olay hub' ınıza bölümlerin kimliklerini elde edin:

info, err := hub.GetRuntimeInformation(ctx)
if err != nil {
    log.Fatalf("failed to get runtime info: %s\n", err)
}
log.Printf("got partition IDs: %s\n", info.PartitionIDs)

Olay hub'a olay göndermek için uygulamayı çalıştırın.

Tebrikler! Bir olay hub'ına ileti gönderdiniz.

Olayları alma

Bir Depolama hesabı ve kapsayıcısı oluşturma

Olay akışındaki bölümler ve denetim noktalarındaki kiralamalar gibi durum, Azure depolama kapsayıcısı kullanılarak alıcılar Depolama paylaşılır. Go SDK ile bir depolama hesabı ve kapsayıcı oluşturabilirsiniz, ancak Azure depolama hesapları hakkında'daki yönergeleri izleyerek de bir tane oluşturabilirsiniz.

Go SDK ile Depolama yapıtları oluşturmaya ait örnekler Go örnekleri reponda ve bu öğreticiye karşılık gelen örnekte mevcuttur.

Go paketleri

İletileri almak için veya ile Event Hubs Go paketlerini go get dep alabilirsiniz:

go get -u github.com/Azure/azure-event-hubs-go/...
go get -u github.com/Azure/azure-amqp-common-go/...
go get -u github.com/Azure/go-autorest/...

# or

dep ensure -add github.com/Azure/azure-event-hubs-go
dep ensure -add github.com/Azure/azure-amqp-common-go
dep ensure -add github.com/Azure/go-autorest

Paketleri kod dosyanıza içeri aktarma

Go paketlerini içeri aktarın, aşağıdaki kod örneğini kullanın:

import (
    aad "github.com/Azure/azure-amqp-common-go/aad"
    eventhubs "github.com/Azure/azure-event-hubs-go"
    eph "github.com/Azure/azure-event-hubs-go/eph"
    storageLeaser "github.com/Azure/azure-event-hubs-go/storage"
    azure "github.com/Azure/go-autorest/autorest/azure"
)

Hizmet sorumlusu oluşturma

Azure CLI 2.0ile Azure hizmet sorumlusu oluşturma yönergelerini izleyerek yeni bir hizmet sorumlusu oluşturun. Sağlanan kimlik bilgilerini ortamınıza şu adlarla kaydedin: Hem Go için Azure SDK hem de Event Hubs paketi bu değişken adlarını kullanmak için önceden yapılandırılmıştır.

export AZURE_CLIENT_ID=
export AZURE_CLIENT_SECRET=
export AZURE_TENANT_ID=
export AZURE_SUBSCRIPTION_ID= 

Ardından, bu kimlik bilgilerini kullanan Event Hubs için bir yetkilendirme sağlayıcısı oluşturun:

tokenProvider, err := aad.NewJWTProvider(aad.JWTProviderWithEnvironmentVars())
if err != nil {
    log.Fatalf("failed to configure AAD JWT provider: %s\n", err)
}

Meta veri yapılarını al

Azure Go SDK'sı kullanarak Azure ortamınız hakkında meta verileri olan bir yapı edinin. Sonraki işlemler, doğru uç noktaları bulmak için bu yapıyı kullanır.

azureEnv, err := azure.EnvironmentFromName("AzurePublicCloud")
if err != nil {
    log.Fatalf("could not get azure.Environment struct: %s\n", err)
}

Kimlik bilgisi yardımcısı oluşturma

Kimlik bilgileri için Paylaşılan Erişim İmzası (SAS) kimlik Azure Active Directory (AAD) kimlik bilgilerini kullanan bir kimlik bilgisi Depolama. Son parametre, bu oluşturucuya daha önce kullanılan ortam değişkenlerini kullanmalarını söyler:

cred, err := storageLeaser.NewAADSASCredential(
    subscriptionID,
    resourceGroupName,
    storageAccountName,
    storageContainerName,
    storageLeaser.AADSASCredentialWithEnvironmentVars())
if err != nil {
    log.Fatalf("could not prepare a storage credential: %s\n", err)
}

Onay işaretçisi ve kirayı oluşturma

Bir bölümü belirli bir alıcıya kiralamadan sorumlu olan bir kiraleyici ve diğer alıcıların doğru uzaklıktan okumaya başlay için ileti akışı için denetim noktaları yazmaktan sorumlu bir onay işaretçisi oluşturun.

Şu anda hem kiralamaları hem de denetim noktalarını yönetmek için aynı Depolama depolama kapsayıcısını kullanan tek bir StorageLeaserCheckpointer kullanılabilir. StorageLeaserCheckpointer, depolama hesabı ve kapsayıcı adlarına ek olarak, önceki adımda oluşturulan kimlik bilgilerine ve kapsayıcıya doğru şekilde erişmek için Azure ortam yapılarına ihtiyaç gösterir.

leaserCheckpointer, err := storageLeaser.NewStorageLeaserCheckpointer(
    cred,
    storageAccountName,
    storageContainerName,
    azureEnv)
if err != nil {
    log.Fatalf("could not prepare a storage leaserCheckpointer: %s\n", err)
}

Olay İşlemcisi Ana Bilgisayarı Oluşturma

Artık bir EventProcessorHost oluşturmak için gereken parçalara aşağıdaki gibi sahipsiniz. Daha önce açıklandığı gibi aynı StorageLeaserCheckpointer hem kiralı hem de denetim işaretçisi olarak kullanılır:

ctx := context.Background()
p, err := eph.New(
    ctx,
    nsName,
    hubName,
    tokenProvider,
    leaserCheckpointer,
    leaserCheckpointer)
if err != nil {
    log.Fatalf("failed to create EPH: %s\n", err)
}
defer p.Close(context.Background())

İşleyici oluşturma

Şimdi bir işleyici oluşturun ve bunu Olay İşleyicisi Ana Bilgisayarı ile kaydedin. Konak başlatıldında, bunu ve belirtilen diğer işleyicileri gelen iletilere uygular:

handler := func(ctx context.Context, event *eventhubs.Event) error {
    fmt.Printf("received: %s\n", string(event.Data))
    return nil
}

// register the handler with the EPH
_, err := p.RegisterHandler(ctx, handler)
if err != nil {
    log.Fatalf("failed to register handler: %s\n", err)
}

İleti almak için kod yazma

Her şey ayarlanmış durumda, ile Olay İşlemcisi Ana Bilgisayarı'nın kalıcı olarak çalışmaya devam etmek veya yalnızca iletiler kullanılabilir olduğu sürece çalıştırmak Start(context) StartNonBlocking(context) için ile başlatabilirsiniz.

Bu öğretici başlar ve aşağıdaki gibi çalışır; kullanarak GitHub örnek için aşağıdaki örnek için StartNonBlocking bkz. :

ctx := context.Background()
err = p.Start()
if err != nil {
    log.Fatalf("failed to start EPH: %s\n", err)
}

Sonraki adımlar

Aşağıdaki makaleleri okuyun: