Python (Azure-eventhub) kullanarak Olay Hub 'larına olay gönderme veya olayları alma
Bu hızlı başlangıçta, Azure-eventhub Python paketini kullanarak Olay Hub 'ından olay gönderme ve olayları alma işlemlerinin nasıl yapılacağı gösterilir.
Önkoşullar
Azure Event Hubs yeni başladıysanız, bu hızlı başlangıcı uygulamadan önce Event Hubs genel bakış bölümüne bakın.
Bu hızlı başlangıcı tamamlayabilmeniz için aşağıdaki önkoşullara sahip olmanız gerekir:
Microsoft Azure aboneliği. Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğiniz olması gerekir. Mevcut bir Azure hesabınız yoksa, ücretsiz deneme için kaydolabilir veya BIR hesap oluştururkenMSDN abonesi avantajlarınızı kullanabilirsiniz.
PıP yüklü ve güncelleştirilmiş Python 2,7 veya 3,6 ya da üzeri.
Event Hubs için Python paketi.
Paketi yüklemek için bu komutu, yolunda Python içeren bir komut isteminde çalıştırın:
pip install azure-eventhubDenetim noktası deposu olarak Azure Blob depolamayı kullanarak olayları almak için aşağıdaki paketi yükleyeceksiniz:
pip install azure-eventhub-checkpointstoreblob-aioEvent Hubs bir ad alanı ve bir olay hub 'ı oluşturun. İlk adım, Event Hubs türünde bir ad alanı oluşturmak için Azure Portal ve uygulamanızın Olay Hub 'ı ile iletişim kurması için gereken yönetim kimlik bilgilerini elde etmek için kullanılır. Bir ad alanı ve Olay Hub 'ı oluşturmak için Bu makaledekiyordamı izleyin. Ardından, makalenin yönergelerini izleyerek Event Hubs ad alanı için bağlantı dizesini alın: bağlantı dizesi al. Bağlantı dizesini daha sonra bu hızlı başlangıçta kullanacaksınız.
Olayları gönderme
Bu bölümde, daha önce oluşturduğunuz Olay Hub 'ına olayları göndermek için bir Python betiği oluşturacaksınız.
Visual Studio Codegibi en sevdiğiniz Python düzenleyicisini açın.
Send.py adlı bir komut dosyası oluşturun. Bu betik, daha önce oluşturduğunuz Olay Hub 'ına bir olay toplu işi gönderir.
Aşağıdaki kodu Send.py' ye yapıştırın:
import asyncio from azure.eventhub.aio import EventHubProducerClient from azure.eventhub import EventData async def run(): # Create a producer client to send messages to the event hub. # Specify a connection string to your event hubs namespace and # the event hub name. producer = EventHubProducerClient.from_connection_string(conn_str="EVENT HUBS NAMESPACE - CONNECTION STRING", eventhub_name="EVENT HUB NAME") async with producer: # Create a batch. event_data_batch = await producer.create_batch() # Add events to the batch. event_data_batch.add(EventData('First event ')) event_data_batch.add(EventData('Second event')) event_data_batch.add(EventData('Third event')) # Send the batch of events to the event hub. await producer.send_batch(event_data_batch) loop = asyncio.get_event_loop() loop.run_until_complete(run())Not
bilgilendirici açıklamalar da dahil olmak üzere, tüm kaynak kodu için GitHub send_async. kopyala sayfasınagidin.
Olayları alma
Bu hızlı başlangıç, bir denetim noktası deposu olarak Azure Blob depolamayı kullanır. Denetim noktası deposu, kontrol noktalarını kalıcı hale getirmek için kullanılır (yani, son okuma konumları).
Uyarı
bu kodu Azure Stack Hub 'da çalıştırırsanız, belirli bir Depolama apı sürümünü hedefetmediğiniz takdirde çalışma zamanı hatalarıyla karşılaşırsınız. bunun nedeni, Event Hubs SDK 'sının azure 'da kullanılabilen ve Azure Stack Hub platformunda kullanılamayan en son azure Depolama apı 'sini kullanması nedeniyle oluşur. Azure Stack Hub, Azure 'da genel kullanıma sunulan Depolama Blob SDK 'sının farklı bir sürümünü destekleyebilir. bir denetim noktası deposu olarak azure Blog Depolama kullanıyorsanız, Azure Stack Hub derlemeniz için desteklenen Azure Depolama apı sürümünü denetleyin ve kodunuzda bu sürümü hedefleyin.
örneğin, Azure Stack Hub sürüm 2005 üzerinde çalıştırıyorsanız, Depolama hizmeti için kullanılabilen en yüksek sürüm 2019-02-02 ' dir. Event Hubs SDK istemci kitaplığı, varsayılan olarak Azure 'da kullanılabilen en yüksek sürümü (SDK 'nın sürümü sırasında 2019-07-07) kullanır. bu durumda, bu bölümdeki adımların yanı sıra, Depolama service apı sürüm 2019-02-02 ' i hedeflemek için de kod eklemeniz gerekecektir. belirli bir Depolama apı sürümünün nasıl hedeflenecek hakkında bir örnek için, GitHub üzerinde zaman uyumlu ve zaman uyumsuz örneklere bakın.
Azure depolama hesabı ve BLOB kapsayıcısı oluşturma
Aşağıdaki adımları uygulayarak bir Azure depolama hesabı ve içinde bir blob kapsayıcısı oluşturun:
Alma kodunda daha sonra kullanmak için bağlantı dizesini ve kapsayıcı adını kaydettiğinizden emin olun.
Olayları almak için bir Python betiği oluşturma
Bu bölümde, Olay Hub 'ınızdan olayları almak için bir Python betiği oluşturursunuz:
Visual Studio Codegibi en sevdiğiniz Python düzenleyicisini açın.
Recv.py adlı bir komut dosyası oluşturun.
Aşağıdaki kodu recv.py' ye yapıştırın:
import asyncio from azure.eventhub.aio import EventHubConsumerClient from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore async def on_event(partition_context, event): # Print the event data. print("Received the event: \"{}\" from the partition with ID: \"{}\"".format(event.body_as_str(encoding='UTF-8'), partition_context.partition_id)) # Update the checkpoint so that the program doesn't read the events # that it has already read when you run it next time. await partition_context.update_checkpoint(event) async def main(): # Create an Azure blob checkpoint store to store the checkpoints. checkpoint_store = BlobCheckpointStore.from_connection_string("AZURE STORAGE CONNECTION STRING", "BLOB CONTAINER NAME") # Create a consumer client for the event hub. client = EventHubConsumerClient.from_connection_string("EVENT HUBS NAMESPACE CONNECTION STRING", consumer_group="$Default", eventhub_name="EVENT HUB NAME", checkpoint_store=checkpoint_store) async with client: # Call the receive method. Read from the beginning of the partition (starting_position: "-1") await client.receive(on_event=on_event, starting_position="-1") if __name__ == '__main__': loop = asyncio.get_event_loop() # Run the main method. loop.run_until_complete(main())Not
ek bilgilendirme açıklamaları dahil olmak üzere, tüm kaynak kodu için GitHub recv_with_checkpoint_store_async. kopyala sayfasınagidin.
Alıcı uygulamasını çalıştırma
Betiği çalıştırmak için, yolunda Python 'a sahip bir komut istemi açın ve ardından şu komutu çalıştırın:
python recv.py
Gönderen uygulamasını çalıştırma
Betiği çalıştırmak için, yolunda Python 'a sahip bir komut istemi açın ve ardından şu komutu çalıştırın:
python send.py
Alıcı penceresi, Olay Hub 'ına gönderilen iletileri görüntülemelidir.
Sonraki adımlar
Bu hızlı başlangıçta olayları zaman uyumsuz olarak gönderdiniz ve aldınız. olayları zaman uyumlu olarak gönderme ve alma hakkında bilgi edinmek için GitHub sync_samples sayfasınagidin.
GitHub tüm örnekler (hem zaman uyumlu hem de zaman uyumsuz) için, Python örnekleri için Azure Event Hubs istemci kitaplığı'na gidin.