Hızlı Başlangıç: Go kullanarak Event Hubs'a olay gönderme veya olay alma

Azure Event Hubs saniyede milyonlarca olay alıp işleme kapasitesine sahip olan bir Büyük Veri akış platformu ve olay alma hizmetidir. Event Hubs dağıtılan yazılımlar ve cihazlar tarafından oluşturulan olayları, verileri ve telemetrileri işleyebilir ve depolayabilir. Bir olay hub’ına gönderilen veriler, herhangi bir gerçek zamanlı analiz sağlayıcısı ve işlem grubu oluşturma/depolama bağdaştırıcıları kullanılarak dönüştürülüp depolanabilir. Event Hubs’a ayrıntılı bir genel bakış için bkz. Event Hubs'a genel bakış ve Event Hubs özellikleri.

Bu hızlı başlangıçta, olay hub'ına olay göndermek veya olay hub'ından olay almak için Go uygulamalarının nasıl yazıldığı açıklanmaktadır.

Not

Bu hızlı başlangıçta GitHub'daki örnekler temel alınmaktadır https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs. Gönder örneği example_producing_events_test.go örneğini, alma örneği ise example_processor_test.go örneğini temel alır. Kod hızlı başlangıç için basitleştirilmiştir ve tüm ayrıntılı açıklamalar kaldırılır, bu nedenle daha fazla ayrıntı ve açıklama için örneklere bakın.

Önkoşullar

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:

Olayları gönderme

Bu bölümde, bir olay hub'ına olay göndermek için bir Go uygulaması oluşturma gösterilmektedir.

Go paketini yükleme

Aşağıdaki örnekte gösterildiği gibi Event Hubs için Go paketini alın.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Olay hub'ına olay gönderme kodu

Olay hub'ına olay gönderme kodu aşağıdadır. Koddaki ana adımlar şunlardır:

  1. Event Hubs ad alanına ve olay hub'ı adına bağlantı dizesi kullanarak bir Event Hubs üretici istemcisi oluşturun.
  2. Bir batch nesnesi oluşturun ve toplu işleme örnek olaylar ekleyin.
  3. Olay toplu işlemini th olaylara gönderin.

Önemli

değerini Event Hubs ad alanınızın bağlantı dizesi ve EVENT HUB NAME örnek koddaki olay hub'ı adıyla değiştirinNAMESPACE CONNECTION STRING.

package main

import (
    "context"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
)

func main() {

    // create an Event Hubs producer client using a connection string to the namespace and the event hub
    producerClient, err := azeventhubs.NewProducerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", nil)

    if err != nil {
        panic(err)
    }

    defer producerClient.Close(context.TODO())

    // create sample events
    events := createEventsForSample()

    // create a batch object and add sample events to the batch
    newBatchOptions := &azeventhubs.EventDataBatchOptions{}

    batch, err := producerClient.NewEventDataBatch(context.TODO(), newBatchOptions)

    for i := 0; i < len(events); i++ {
        err = batch.AddEventData(events[i], nil)
    }

    // send the batch of events to the event hub
    producerClient.SendEventDataBatch(context.TODO(), batch, nil)
}

func createEventsForSample() []*azeventhubs.EventData {
    return []*azeventhubs.EventData{
        {
            Body: []byte("hello"),
        },
        {
            Body: []byte("world"),
        },
    }
}

Uygulamayı henüz çalıştırmayın. Önce alıcı uygulamasını ve ardından gönderen uygulamasını çalıştırmanız gerekir.

Olayları alma

Depolama hesabı ve kapsayıcı oluşturma

Olay akışındaki bölümlerdeki kiralar ve denetim noktaları gibi durumlar, Azure Depolama kapsayıcısı kullanılarak alıcılar arasında paylaşılır. Go SDK'sı ile bir depolama hesabı ve kapsayıcı oluşturabilirsiniz, ancak Azure depolama hesapları hakkında başlığındaki yönergeleri izleyerek de bir depolama hesabı oluşturabilirsiniz.

Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:

  • Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
  • Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
  • Depolama hesabın dağıtılan uygulamanın bulunduğu bölgede olması gerekir. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.

Azure portalındaki Depolama hesabı sayfasında, Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.

  • Hiyerarşik ad alanı
  • Blob geçici silme
  • Sürüm oluşturma

Go paketleri

İletileri almak için, aşağıdaki örnekte gösterildiği gibi Event Hubs için Go paketlerini alın.

go get github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs

Olay hub'ından olay alma kodu

Olay hub'ından olay almaya yönelik kod aşağıdadır. Koddaki ana adımlar şunlardır:

  1. Denetim noktası oluşturma için olay hub'ı tarafından kullanılan Azure Blob Depolama temsil eden bir denetim noktası deposu nesnesini denetleyin.
  2. Event Hubs ad alanına ve olay hub'ı adına bağlantı dizesi kullanarak bir Event Hubs tüketici istemcisi oluşturun.
  3. İstemci nesnesini ve denetim noktası deposu nesnesini kullanarak bir olay işlemcisi oluşturun. İşlemci olayları alır ve işler.
  4. Olay hub'ında her bölüm için olayları işlemek için işlev olarak processEvents ile bir bölüm istemcisi oluşturun.
  5. Olayları almak ve işlemek için tüm bölüm istemcilerini çalıştırın.

Önemli

Aşağıdaki yer tutucu değerleri gerçek değerlerle değiştirin:

  • AZURE STORAGE CONNECTION STRINGAzure depolama hesabınızın bağlantı dizesi
  • BLOB CONTAINER NAME depolama hesabında oluşturduğunuz blob kapsayıcısının adıyla
  • NAMESPACE CONNECTION STRINGEvent Hubs ad alanınızın bağlantı dizesi ile
  • EVENT HUB NAME örnek kodda olay hub'ı adıyla.
package main

import (
    "context"
    "errors"
    "fmt"
    "time"

    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs"
    "github.com/Azure/azure-sdk-for-go/sdk/messaging/azeventhubs/checkpoints"
)

func main() {

    // create a container client using a connection string and container name
    checkClient, err := container.NewClientFromConnectionString("AZURE STORAGE CONNECTION STRING", "CONTAINER NAME", nil)
    
    // create a checkpoint store that will be used by the event hub
    checkpointStore, err := checkpoints.NewBlobStore(checkClient, nil)

    if err != nil {
        panic(err)
    }

    // create a consumer client using a connection string to the namespace and the event hub
    consumerClient, err := azeventhubs.NewConsumerClientFromConnectionString("NAMESPACE CONNECTION STRING", "EVENT HUB NAME", azeventhubs.DefaultConsumerGroup, nil)

    if err != nil {
        panic(err)
    }

    defer consumerClient.Close(context.TODO())

    // create a processor to receive and process events
    processor, err := azeventhubs.NewProcessor(consumerClient, checkpointStore, nil)

    if err != nil {
        panic(err)
    }

    //  for each partition in the event hub, create a partition client with processEvents as the function to process events
    dispatchPartitionClients := func() {
        for {
            partitionClient := processor.NextPartitionClient(context.TODO())

            if partitionClient == nil {
                break
            }

            go func() {
                if err := processEvents(partitionClient); err != nil {
                    panic(err)
                }
            }()
        }
    }

    // run all partition clients
    go dispatchPartitionClients()

    processorCtx, processorCancel := context.WithCancel(context.TODO())
    defer processorCancel()

    if err := processor.Run(processorCtx); err != nil {
        panic(err)
    }
}

func processEvents(partitionClient *azeventhubs.ProcessorPartitionClient) error {
    defer closePartitionResources(partitionClient)
    for {
        receiveCtx, receiveCtxCancel := context.WithTimeout(context.TODO(), time.Minute)
        events, err := partitionClient.ReceiveEvents(receiveCtx, 100, nil)
        receiveCtxCancel()

        if err != nil && !errors.Is(err, context.DeadlineExceeded) {
            return err
        }

        fmt.Printf("Processing %d event(s)\n", len(events))

        for _, event := range events {
            fmt.Printf("Event received with body %v\n", string(event.Body))
        }

        if len(events) != 0 {
            if err := partitionClient.UpdateCheckpoint(context.TODO(), events[len(events)-1]); err != nil {
                return err
            }
        }
    }
}

func closePartitionResources(partitionClient *azeventhubs.ProcessorPartitionClient) {
    defer partitionClient.Close(context.TODO())
}

Alıcı ve gönderen uygulamalarını çalıştırma

  1. Önce alıcı uygulamasını çalıştırın.

  2. Gönderen uygulamasını çalıştırın.

  3. Alıcı penceresinde aşağıdaki çıkışı görmek için bir dakika bekleyin.

    Processing 2 event(s)
    Event received with body hello
    Event received with body world
    

Sonraki adımlar

GitHub'daki örneklere bakın.https://github.com/Azure/azure-sdk-for-go/tree/main/sdk/messaging/azeventhubs