Python (Azure-eventhub) kullanarak Olay Hub 'larına olay gönderme veya olayları alma

Bu hızlı başlangıçta, Azure-eventhub Python paketini kullanarak Olay Hub 'ından olay gönderme ve olayları alma işlemlerinin nasıl yapılacağı gösterilir.

Önkoşullar

Azure Event Hubs yeni başladıysanız, bu hızlı başlangıcı uygulamadan önce Event Hubs genel bakış bölümüne bakın.

Bu hızlı başlangıcı tamamlayabilmeniz için aşağıdaki önkoşullara sahip olmanız gerekir:

  • Microsoft Azure aboneliği. Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğiniz olması gerekir. Mevcut bir Azure hesabınız yoksa, ücretsiz deneme için kaydolabilir veya BIR hesap oluştururkenMSDN abonesi avantajlarınızı kullanabilirsiniz.

  • PıP yüklü ve güncelleştirilmiş Python 2,7 veya 3,6 ya da üzeri.

  • Event Hubs için Python paketi.

    Paketi yüklemek için bu komutu, yolunda Python içeren bir komut isteminde çalıştırın:

    pip install azure-eventhub
    

    Denetim noktası deposu olarak Azure Blob depolamayı kullanarak olayları almak için aşağıdaki paketi yükleyeceksiniz:

    pip install azure-eventhub-checkpointstoreblob-aio
    
  • Event Hubs bir ad alanı ve bir olay hub 'ı oluşturun. İlk adım, Event Hubs türünde bir ad alanı oluşturmak için Azure Portal ve uygulamanızın Olay Hub 'ı ile iletişim kurması için gereken yönetim kimlik bilgilerini elde etmek için kullanılır. Bir ad alanı ve Olay Hub 'ı oluşturmak için Bu makaledekiyordamı izleyin. Ardından, makalenin yönergelerini izleyerek Event Hubs ad alanı için bağlantı dizesini alın: bağlantı dizesi al. Bağlantı dizesini daha sonra bu hızlı başlangıçta kullanacaksınız.

Olayları gönderme

Bu bölümde, daha önce oluşturduğunuz Olay Hub 'ına olayları göndermek için bir Python betiği oluşturacaksınız.

  1. Visual Studio Codegibi en sevdiğiniz Python düzenleyicisini açın.

  2. Send.py adlı bir komut dosyası oluşturun. Bu betik, daha önce oluşturduğunuz Olay Hub 'ına bir olay toplu işi gönderir.

  3. Aşağıdaki kodu Send.py' ye yapıştırın:

    import asyncio
    from azure.eventhub.aio import EventHubProducerClient
    from azure.eventhub import EventData
    
    async def run():
        # Create a producer client to send messages to the event hub.
        # Specify a connection string to your event hubs namespace and
        # the event hub name.
        producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME")
        async with producer:
            # Create a batch.
            event_data_batch = await producer.create_batch()
    
            # Add events to the batch.
            event_data_batch.add(EventData('First event '))
            event_data_batch.add(EventData('Second event'))
            event_data_batch.add(EventData('Third event'))
    
            # Send the batch of events to the event hub.
            await producer.send_batch(event_data_batch)
    
    loop = asyncio.get_event_loop()
    loop.run_until_complete(run())
    
    

    Not

    bilgilendirici açıklamalar da dahil olmak üzere, tüm kaynak kodu için GitHub send_async. kopyala sayfasınagidin.

Olayları alma

Bu hızlı başlangıç, bir denetim noktası deposu olarak Azure Blob depolamayı kullanır. Denetim noktası deposu, kontrol noktalarını kalıcı hale getirmek için kullanılır (yani, son okuma konumları).

Uyarı

bu kodu Azure Stack Hub 'da çalıştırırsanız, belirli bir Depolama apı sürümünü hedefetmediğiniz takdirde çalışma zamanı hatalarıyla karşılaşırsınız. bunun nedeni, Event Hubs SDK 'sının azure 'da kullanılabilen ve Azure Stack Hub platformunda kullanılamayan en son azure Depolama apı 'sini kullanması nedeniyle oluşur. Azure Stack Hub, Azure 'da genel kullanıma sunulan Depolama Blob SDK 'sının farklı bir sürümünü destekleyebilir. bir denetim noktası deposu olarak azure Blog Depolama kullanıyorsanız, Azure Stack Hub derlemeniz için desteklenen Azure Depolama apı sürümünü denetleyin ve kodunuzda bu sürümü hedefleyin.

örneğin, Azure Stack Hub sürüm 2005 üzerinde çalıştırıyorsanız, Depolama hizmeti için kullanılabilen en yüksek sürüm 2019-02-02 ' dir. Event Hubs SDK istemci kitaplığı, varsayılan olarak Azure 'da kullanılabilen en yüksek sürümü (SDK 'nın sürümü sırasında 2019-07-07) kullanır. bu durumda, bu bölümdeki adımların yanı sıra, Depolama service apı sürüm 2019-02-02 ' i hedeflemek için de kod eklemeniz gerekecektir. belirli bir Depolama apı sürümünün nasıl hedeflenecek hakkında bir örnek için, GitHub üzerinde zaman uyumlu ve zaman uyumsuz örneklere bakın.

Azure depolama hesabı ve BLOB kapsayıcısı oluşturma

Aşağıdaki adımları uygulayarak bir Azure depolama hesabı ve içinde bir blob kapsayıcısı oluşturun:

  1. Azure Depolama hesabı oluşturma
  2. Blob kapsayıcısı oluşturma
  3. Bağlantı dizesini depolama hesabına al

Alma kodunda daha sonra kullanmak için bağlantı dizesini ve kapsayıcı adını kaydettiğinizden emin olun.

Olayları almak için bir Python betiği oluşturma

Bu bölümde, Olay Hub 'ınızdan olayları almak için bir Python betiği oluşturursunuz:

  1. Visual Studio Codegibi en sevdiğiniz Python düzenleyicisini açın.

  2. Recv.py adlı bir komut dosyası oluşturun.

  3. Aşağıdaki kodu recv.py' ye yapıştırın:

    import asyncio
    from azure.eventhub.aio import EventHubConsumerClient
    from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore
    
    
    async def on_event(partition_context, event):
        # Print the event data.
        print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id))
    
        # Update the checkpoint so that the program doesn't read the events
        # that it has already read when you run it next time.
        await partition_context.update_checkpoint(event)
    
    async def main():
        # Create an Azure blob checkpoint store to store the checkpoints.
        checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME")
    
        # Create a consumer client for the event hub.
        client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store)
        async with client:
            # Call the receive method. Read from the beginning of the partition (starting_position: "-1")
            await client.receive(on_event=on_event,  starting_position="-1")
    
    if __name__ == '__main__':
        loop = asyncio.get_event_loop()
        # Run the main method.
        loop.run_until_complete(main())    
    

    Not

    ek bilgilendirme açıklamaları dahil olmak üzere, tüm kaynak kodu için GitHub recv_with_checkpoint_store_async. kopyala sayfasınagidin.

Alıcı uygulamasını çalıştırma

Betiği çalıştırmak için, yolunda Python 'a sahip bir komut istemi açın ve ardından şu komutu çalıştırın:

python recv.py

Gönderen uygulamasını çalıştırma

Betiği çalıştırmak için, yolunda Python 'a sahip bir komut istemi açın ve ardından şu komutu çalıştırın:

python send.py

Alıcı penceresi, Olay Hub 'ına gönderilen iletileri görüntülemelidir.

Sonraki adımlar

Bu hızlı başlangıçta olayları zaman uyumsuz olarak gönderdiniz ve aldınız. olayları zaman uyumlu olarak gönderme ve alma hakkında bilgi edinmek için GitHub sync_samples sayfasınagidin.

GitHub tüm örnekler (hem zaman uyumlu hem de zaman uyumsuz) için, Python örnekleri için Azure Event Hubs istemci kitaplığı'na gidin.