Python kullanarak Event Hubs’daki olayları gönderme veya alma

Bu hızlı başlangıçta azure-eventhub Python paketini kullanarak olay hub'ına olay gönderme ve olay hub'ından olay alma adımları gösterilmektedir.

Önkoşullar

Azure Event Hubs'da yeniyseniz bu hızlı başlangıcı gerçekleştirmeden önce event hubs'a genel bakış konusuna bakın.

Bu hızlı başlangıcı tamamlamak için aşağıdaki önkoşullara ihtiyacınız vardır:

  • Microsoft Azure aboneliği. Azure Event Hubs da dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolun.
  • Python 3.8 veya üzeri, pip yüklü ve güncelleştirilmiş.
  • Visual Studio Code (önerilir) veya diğer tüm tümleşik geliştirme ortamları (IDE).
  • Event Hubs ad alanı ve olay hub'ı oluşturun. İlk adım, Event Hubs ad alanı oluşturmak ve uygulamanızın olay hub'ı ile iletişim kurmak için ihtiyaç duyduğu yönetim kimlik bilgilerini almak için Azure portalını kullanmaktır. Ad alanı ve olay hub'ı oluşturmak için bu makaledeki yordamı izleyin.

Olayları göndermek için paketleri yükleme

Event Hubs için Python paketlerini yüklemek için, yolunda Python bulunan bir komut istemi açın. Dizini, örneklerinizi saklamak istediğiniz klasörle değiştirin.

pip install azure-eventhub
pip install azure-identity
pip install aiohttp

Azure'da uygulamanın kimliğini doğrulama

Bu hızlı başlangıçta Azure Event Hubs'a bağlanmanın iki yolu gösterilmektedir: parolasız ve bağlantı dizesi. İlk seçenek, Bir Event Hubs ad alanına bağlanmak için Microsoft Entra Id ve rol tabanlı erişim denetiminde (RBAC) güvenlik sorumlunuzu nasıl kullanacağınızı gösterir. Kodunuzda, yapılandırma dosyasında veya Azure Key Vault gibi güvenli bir depolama alanında sabit kodlanmış bağlantı dizesi olması konusunda endişelenmeniz gerekmez. İkinci seçenek, event hubs ad alanına bağlanmak için bir bağlantı dizesi nasıl kullanacağınızı gösterir. Azure'da yeniyseniz bağlantı dizesi seçeneğini daha kolay takip edebilirsiniz. Gerçek dünyadaki uygulamalarda ve üretim ortamlarında parolasız seçeneği kullanmanızı öneririz. Daha fazla bilgi için bkz . Kimlik doğrulaması ve yetkilendirme. Ayrıca, genel bakış sayfasında parolasız kimlik doğrulaması hakkında daha fazla bilgi edinebilirsiniz.

Microsoft Entra kullanıcınıza rol atama

Yerel olarak geliştirme yaparken, Azure Event Hubs'a bağlanan kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. İleti gönderip almak için Azure Event Hubs Veri Sahibi rolüne ihtiyacınız vardır. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya eylemi içeren Microsoft.Authorization/roleAssignments/write başka bir role ihtiyacınız olacaktır. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Kapsam genel bakış sayfasında rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgi edinin.

Aşağıdaki örnekte rol, Azure Event Hubs kaynaklarına tam erişim sağlayan kullanıcı hesabınıza atanır Azure Event Hubs Data Owner . Gerçek bir senaryoda, kullanıcılara yalnızca daha güvenli bir üretim ortamı için gereken minimum izinleri vermek için En Az Ayrıcalık İlkesi'ni izleyin.

Azure Event Hubs için Azure yerleşik rolleri

Azure Event Hubs için, Azure portalı ve Azure kaynak yönetimi API'sini kullanarak ad alanlarının ve tüm ilgili kaynakların yönetimi Azure RBAC modeli kullanılarak zaten korunur. Azure, Event Hubs ad alanına erişim yetkisi vermek için aşağıdaki Azure yerleşik rollerini sağlar:

Özel bir rol oluşturmak istiyorsanız bkz . Event Hubs işlemleri için gereken haklar.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer. Nadir durumlarda, sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak Event Hubs ad alanınızı bulun.

  2. Genel bakış sayfasında, sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.

    A screenshot showing how to assign a role.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için eşleşen sonucu arayın Azure Event Hubs Data Owner ve seçin. Ardından İleri'yi seçin.

  6. Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.

Olayları gönderme

Bu bölümde, daha önce oluşturduğunuz olay hub'ına olay göndermek için bir Python betiği oluşturun.

  1. Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.

  2. send.py adlı bir betik oluşturun. Bu betik, daha önce oluşturduğunuz olay hub'ına bir dizi olay gönderir.

  3. Aşağıdaki kodu send.py yapıştırın:

    Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:

    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub import EventData
    from azure.eventhub.aio import EventHubProducerClient
    from azure.identity.aio import DefaultAzureCredential
    
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a credential that has correct role assigned to access
        # event hubs namespace and the event hub name.
        producer = EventHubProducerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            credential=credential,
        )
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData("First event "))
            event_data_batch.add(EventData("Second event"))
            event_data_batch.add(EventData("Third event"))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
            # Close credential when no longer needed.
            await credential.close()
    
    asyncio.run(run())
    

    Not

    Olay Hub'ına zaman uyumsuz olarak bağlantı dizesi kullanarak olay göndermeye yönelik diğer seçeneklere ilişkin örnekler için GitHub send_async.py sayfasına bakın. Burada gösterilen desenler, olayları parolasız göndermek için de geçerlidir.

Olayları alma

Bu hızlı başlangıçta denetim noktası deposu olarak Azure Blob depolama kullanılır. Denetim noktası deposu, denetim noktalarını (yani son okuma konumlarını) kalıcı hale getirmek için kullanılır.

Azure Blob Depolama denetim noktası deposu olarak kullanırken şu önerileri izleyin:

  • Her tüketici grubu için ayrı bir kapsayıcı kullanın. Aynı depolama hesabını kullanabilirsiniz, ancak her grup için bir kapsayıcı kullanabilirsiniz.
  • Kapsayıcıyı başka hiçbir şey için kullanmayın ve depolama hesabını başka hiçbir şey için kullanmayın.
  • Depolama hesabın dağıtılan uygulamanın bulunduğu bölgede olması gerekir. Uygulama şirket içindeyse, mümkün olan en yakın bölgeyi seçmeyi deneyin.

Azure portalındaki Depolama hesabı sayfasında, Blob hizmeti bölümünde aşağıdaki ayarların devre dışı bırakıldığından emin olun.

  • Hiyerarşik ad alanı
  • Blob geçici silme
  • Sürüm oluşturma

Azure depolama hesabı ve blob kapsayıcısı oluşturma

Aşağıdaki adımları uygulayarak içinde bir Azure depolama hesabı ve blob kapsayıcısı oluşturun:

  1. Azure Depolama hesabı oluşturma
  2. Blob kapsayıcısı oluşturma.
  3. Blob kapsayıcısının kimliğini doğrulama.

Daha sonra alma kodunda kullanmak üzere bağlantı dizesi ve kapsayıcı adını kaydettiğinizden emin olun.

Yerel olarak geliştirme yaparken blob verilerine erişen kullanıcı hesabının doğru izinlere sahip olduğundan emin olun. Blob verilerini okumak ve yazmak için Depolama Blob Veri Katkıda Bulunanı gerekir. Kendinize bu rolü atamak için Kullanıcı Erişimi Yönetici istrator rolüne veya Microsoft.Authorization/roleAssignments/write eylemini içeren başka bir role atanmalısınız. Azure portalı, Azure CLI veya Azure PowerShell'i kullanarak kullanıcıya Azure RBAC rolleri atayabilirsiniz. Rol atamaları için kullanılabilir kapsamlar hakkında daha fazla bilgiyi kapsam genel bakış sayfasından öğrenebilirsiniz.

Bu senaryoda, En Az Ayrıcalık İlkesi'ni izlemek için depolama hesabı kapsamındaki kullanıcı hesabınıza izinler atayacaksınız. Bu uygulama kullanıcılara yalnızca gereken minimum izinleri verir ve daha güvenli üretim ortamları oluşturur.

Aşağıdaki örnek, depolama hesabınızdaki blob verilerine hem okuma hem de yazma erişimi sağlayan Depolama Blob Verileri Katkıda Bulunanı rolünü kullanıcı hesabınıza atar.

Önemli

Çoğu durumda rol atamasının Azure'a yayılması bir veya iki dakika sürer, ancak nadir durumlarda sekiz dakikaya kadar sürebilir. Kodunuzu ilk kez çalıştırdığınızda kimlik doğrulama hataları alıyorsanız, birkaç dakika bekleyin ve yeniden deneyin.

  1. Azure portalında ana arama çubuğunu veya sol gezintiyi kullanarak depolama hesabınızı bulun.

  2. Depolama hesabına genel bakış sayfasında sol taraftaki menüden Erişim denetimi (IAM) öğesini seçin.

  3. Erişim denetimi (IAM) sayfasında Rol atamaları sekmesini seçin.

  4. Üst menüden + Ekle'yi seçin ve ardından açılan menüden Rol ataması ekle'yi seçin.

    A screenshot showing how to assign a storage account role.

  5. Sonuçları istenen role göre filtrelemek için arama kutusunu kullanın. Bu örnek için Depolama Blob Veri Katkıda Bulunanı'nı arayın, eşleşen sonucu seçin ve ardından İleri'yi seçin.

  6. Erişim ata'nın altında Kullanıcı, grup veya hizmet sorumlusu'na tıklayın ve ardından + Üye seç'e tıklayın.

  7. İletişim kutusunda Microsoft Entra kullanıcı adınızı (genellikle user@domain e-posta adresiniz) arayın ve iletişim kutusunun alt kısmındaki Seç'i seçin.

  8. Son sayfaya gitmek için Gözden geçir + ata'yı seçin ve ardından işlemi tamamlamak için Gözden geçir + yeniden ata'yı seçin.

Olayları almak için paketleri yükleme

Alıcı taraf için bir veya daha fazla paket yüklemeniz gerekir. Bu hızlı başlangıçta, programın önceden okuduğu olayları okumaması için denetim noktalarını kalıcı hale getirmek için Azure Blob depolamayı kullanacaksınız. Blobda düzenli aralıklarla alınan iletilerde meta veri denetim noktaları gerçekleştirir. Bu yaklaşım, iletileri daha sonra kaldığınız yerden almaya devam etmenizi kolaylaştırır.

pip install azure-eventhub-checkpointstoreblob-aio
pip install azure-identity

Olayları almak için Python betiği oluşturma

Bu bölümde, olay hub'ınızdan olayları almak için bir Python betiği oluşturacaksınız:

  1. Visual Studio Code gibi sık kullandığınız Python düzenleyicisini açın.

  2. recv.py adlı bir betik oluşturun.

  3. Aşağıdaki kodu recv.py yapıştırın:

    Kodda, aşağıdaki yer tutucuları değiştirmek için gerçek değerleri kullanın:

    • BLOB_STORAGE_ACCOUNT_URL
    • BLOB_CONTAINER_NAME
    • EVENT_HUB_FULLY_QUALIFIED_NAMESPACE
    • EVENT_HUB_NAME
    import asyncio
    
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import (
        BlobCheckpointStore,
    )
    from azure.identity.aio import DefaultAzureCredential
    
    BLOB_STORAGE_ACCOUNT_URL = "BLOB_STORAGE_ACCOUNT_URL"
    BLOB_CONTAINER_NAME = "BLOB_CONTAINER_NAME"
    EVENT_HUB_FULLY_QUALIFIED_NAMESPACE = "EVENT_HUB_FULLY_QUALIFIED_NAMESPACE"
    EVENT_HUB_NAME = "EVENT_HUB_NAME"
    
    credential = DefaultAzureCredential()
    
    async def on_event(partition_context, event):
        # Print the event data.
        print(
            'Received the event: "{}" from the partition with ID: "{}"'.format(
                event.body_as_str(encoding="UTF-8"), partition_context.partition_id
            )
        )
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore(
            blob_account_url=BLOB_STORAGE_ACCOUNT_URL,
            container_name=BLOB_CONTAINER_NAME,
            credential=credential,
        )
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient(
            fully_qualified_namespace=EVENT_HUB_FULLY_QUALIFIED_NAMESPACE,
            eventhub_name=EVENT_HUB_NAME,
            consumer_group="$Default",
            checkpoint_store=checkpoint_store,
            credential=credential,
        )
        async with client:
            # Call the receive method. Read from the beginning of the partition
            # (starting_position: "-1")
            await client.receive(on_event=on_event, starting_position="-1")
    
        # Close credential when no longer needed.
        await credential.close()
    
    if __name__ == "__main__":
        # Run the main method.
        asyncio.run(main())
    

    Not

    Olay Hub'ından zaman uyumsuz olarak bağlantı dizesi kullanarak olay almaya yönelik diğer seçeneklere ilişkin örnekler için GitHub recv_with_checkpoint_store_async.py sayfasına bakın. Burada gösterilen desenler, parolasız olayları almak için de geçerlidir.

Alıcı uygulamasını çalıştırma

Betiği çalıştırmak için, yolunda Python bulunan bir komut istemi açın ve şu komutu çalıştırın:

python recv.py

Gönderen uygulamasını çalıştırma

Betiği çalıştırmak için, yolunda Python bulunan bir komut istemi açın ve şu komutu çalıştırın:

python send.py

Alıcı penceresinde olay hub'ına gönderilen iletiler görüntülenmelidir.

Sorun giderme

Alıcı penceresinde olayları görmüyorsanız veya kod hata bildiriyorsa aşağıdaki sorun giderme ipuçlarını deneyin:

  • recy.py sonuçlarını görmüyorsanız send.py birkaç kez çalıştırın.

  • Parolasız kodu (kimlik bilgileriyle) kullanırken "coroutine" ile ilgili hatalar görürseniz, öğesinden içeri aktarmayı kullandığınızdan azure.identity.aioemin olun.

  • Parolasız kodla (kimlik bilgileriyle) "Kapatılmamış istemci oturumu" görüyorsanız, işiniz bittiğinde kimlik bilgilerini kapattığınıza emin olun. Daha fazla bilgi için bkz . Zaman uyumsuz kimlik bilgileri.

  • Depolamaya erişirken recv.py yetkilendirme hataları görürseniz Azure depolama hesabı ve blob kapsayıcısı oluşturma makalesindeki adımları izlediğinize ve hizmet sorumlusuna Depolama Blob Veri Katkıda Bulunanı rolünü atadığınızdan emin olun.

  • Farklı bölüm kimliklerine sahip olaylar alırsanız bu sonuç beklenir. Bölümler, tüketen uygulamalarda gerekli aşağı akış paralelliğiyle ilişkili bir veri düzenleme mekanizmasıdır. Bir olay hub'ındaki bölüm sayısı, sahip olmayı beklediğiniz eşzamanlı okuyucu sayısıyla doğrudan ilgilidir. Daha fazla bilgi için bkz . Bölümler hakkında daha fazla bilgi edinin.

Sonraki adımlar

Bu hızlı başlangıçta zaman uyumsuz olarak olaylar gönderip aldınız. Olayları zaman uyumlu olarak göndermeyi ve almayı öğrenmek için GitHub sync_samples sayfasına gidin.

GitHub'daki tüm örnekler (zaman uyumlu ve zaman uyumsuz) için Python örnekleri için Azure Event Hubs istemci kitaplığına gidin.