Aracılığıyla paylaş


Python için Azure EventHubs Checkpoint Store istemci kitaplığı - sürüm 1.1.4

Depolama Bloblarını kullanma

Azure EventHubs Denetim Noktası Deposu, Azure Event Hubs olayları işlenirken denetim noktalarını depolamak için kullanılır. Bu Checkpoint Store paketi, için EventHubConsumerClientbir eklenti paketi olarak çalışır. Denetim noktalarını ve bölüm sahipliği bilgilerini korumak için kalıcı depo olarak Azure Depolama Blobunu kullanır.

Bunun zaman uyumsuz bir kitaplık olduğunu unutmayın. Azure EventHubs Checkpoint Store istemci kitaplığının eşitleme sürümü için lütfen azure-eventhub-checkpointstoreblob konusuna bakın.

Kaynak kodu | Paket (PyPi) | API başvuru belgeleri | Azure Eventhubs belgeleri | Azure Depolama belgeleri

Başlarken

Önkoşullar

  • Python 3.6 veya üzeri.

  • Microsoft Azure Aboneliği: Azure Event Hubs dahil olmak üzere Azure hizmetlerini kullanmak için bir aboneliğe ihtiyacınız vardır. Mevcut bir Azure hesabınız yoksa ücretsiz deneme sürümüne kaydolabilir veya hesap oluştururken MSDN abone avantajlarınızı kullanabilirsiniz.

  • Olay Hub'ı ile Event Hubs ad alanı: Azure Event Hubs etkileşime geçmek için bir ad alanı ve Olay Hub'ına da sahip olmanız gerekir. Azure kaynakları oluşturma hakkında bilginiz yoksa, Azure portal kullanarak Olay Hub'ı oluşturmak için adım adım kılavuzu izlemek isteyebilirsiniz. Burada, Olay Hub'ı oluşturmak için Azure CLI, Azure PowerShell veya Azure Resource Manager (ARM) şablonlarını kullanmaya yönelik ayrıntılı yönergeleri de bulabilirsiniz.

  • Azure Depolama Hesabı: Denetim noktası verilerini bloblarla depolamak için bir Azure Depolama Hesabınız olması ve bir Azure Blob Depolama Engelleme Kapsayıcısı oluşturmanız gerekir. Azure Blok Blob Depolama Hesabı oluşturma kılavuzunu izleyebilirsiniz.

Paketi yükleme

$ pip install azure-eventhub-checkpointstoreblob-aio

Önemli kavramlar

Denetim noktası oluşturma

Denetim noktası oluşturma, okuyucuların bir bölüm olay dizisindeki konumlarını işaretledikleri veya uyguladıkları bir işlemdir. Denetim noktası oluşturma, tüketicinin sorumluluğundadır ve bir tüketici grubunda bölüm başına temelinde gerçekleşir. Bu sorumluluk, her bir tüketici grubu için her bölüm okuyucusunun geçerli konumunu olay akışında izlemesi gerektiği ve veri akışının tamamlandığını düşündüğünde hizmeti bilgilendirebileceği anlamına gelir. Bir okuyucunun bölüm bağlantısı kesilirse yeniden bağlandığında ilgili tüketici grubundaki o bölümün son okuyucusu tarafından daha önce gönderilen denetim noktasında okumaya başlar. Okuyucu bağlandığında, okumaya başlayacağı konumu belirtmek için uzaklığı olay hub'ına geçirir. Bu şekilde, denetim noktası oluşturma özelliğini hem aşağı akış uygulamaları ile olayları "tamamlandı" olarak işaretlemek hem de farklı makinelerde çalışan okuyucular arasında bir yük devretme oluşması durumunda esneklik sağlamak amacıyla kullanabilirsiniz. Bu denetim noktası oluşturma işleminden daha düşük bir uzaklık belirterek daha eski verilere geri dönülebilir. Bu mekanizmayla denetim noktası oluşturma özelliği hem yük devretme esnekliği hem de olay akışı yeniden yürütmesi sağlar.

Sıra numaralarını uzaklıklar &

Her iki uzaklık & dizisi numarası da bir bölümün içindeki bir olayın konumuna başvurur. Bunları istemci tarafı imleç olarak düşünebilirsiniz. Uzaklık, olayın bayt cinsinden numaralandırılmasıdır. Uzaklık/sıra numarası, olay tüketicisinin (okuyucu) olay akışında olayları okumaya başlamak istediği bir nokta belirtmesini sağlar. Yalnızca verilen zaman damgasından sonra sıralanan olayları alacak şekilde bir zaman damgası belirtebilirsiniz. Tüketiciler, kendi uzaklık değerlerini Event Hubs hizmetinin dışında saklamaktan sorumludur. Bir bölüm içinde her olay bir uzaklık, sıra numarası ve sıralandığı zaman damgasını içerir.

Örnekler

Oluşturma EventHubConsumerClient

Oluşturmanın EventHubConsumerClient en kolay yolu bağlantı dizesi kullanmaktır.

from azure.eventhub.aio import EventHubConsumerClient
eventhub_client = EventHubConsumerClient.from_connection_string("my_eventhub_namespace_connection_string", "my_consumer_group", eventhub_name="my_eventhub")

oluşturmanın EventHubConsumerClientdiğer yolları için daha fazla ayrıntı için EventHubs kitaplığına bakın.

Yapılacaklar denetim noktası kullanarak olayları kullanma BlobCheckpointStore

import asyncio

from azure.eventhub.aio import EventHubConsumerClient
from azure.eventhub.extensions.checkpointstoreblobaio import BlobCheckpointStore

connection_str = '<< CONNECTION STRING FOR THE EVENT HUBS NAMESPACE >>'
consumer_group = '<< CONSUMER GROUP >>'
eventhub_name = '<< NAME OF THE EVENT HUB >>'
storage_connection_str = '<< CONNECTION STRING OF THE STORAGE >>'
container_name = '<< STORAGE CONTAINER NAME>>'

async def on_event(partition_context, event):
    # Put your code here.
    await partition_context.update_checkpoint(event)  # Or update_checkpoint every N events for better performance.

async def main():
    checkpoint_store = BlobCheckpointStore.from_connection_string(
        storage_connection_str,
        container_name
    )
    client = EventHubConsumerClient.from_connection_string(
        connection_str,
        consumer_group,
        eventhub_name=eventhub_name,
        checkpoint_store=checkpoint_store,
    )

    async with client:
        await client.receive(on_event)

if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

Azure Depolama Hizmeti API'sinin farklı bir sürümüyle kullanma BlobCheckpointStore

Bazı ortamlar Azure Depolama Hizmeti API'sinin farklı sürümlerine sahiptir. BlobCheckpointStore varsayılan olarak Depolama Hizmeti API'sinin 2019-07-07 sürümünü kullanır. Bunu farklı bir sürümde kullanmak için, nesneyi ne zaman oluşturduğunuzu BlobCheckpointStore belirtinapi_version.

Sorun giderme

Genel

Günlüğe kaydetmeyi etkinleştirmek, çekimde sorun yaşama konusunda yardımcı olacaktır.

Günlüğe Kaydetme

  • Günlükçü'leri kitaplıktan izlemeleri toplamak için etkinleştirin azure.eventhub.extensions.checkpointstoreblobaio .
  • Ana azure-eventhub kitaplığından izlemeleri toplamak için günlükçü'leri etkinleştirin azure.eventhub .
  • Günlükçü'leri azure depolama blob kitaplığından izlemeleri toplamak için etkinleştirin azure.eventhub.extensions.checkpointstoreblobaio._vendor.storage .
  • Günlükçü'leri temel alınan uAMQP kitaplığından izlemeleri toplamak için etkinleştirin uamqp .
  • İstemciyi oluştururken ayarlayarak logging_enable=True AMQP kare düzeyi izlemesini etkinleştirin.

Sonraki adımlar

Daha fazla örnek kod

EventHubs Checkpoint Store zaman uyumsuz örneklerimizi kullanmaya başlayın.

Belgeler

Başvuru belgelerine buradan ulaşabilirsiniz.

Geri Bildirim Sağlama

Herhangi bir hatayla karşılaşırsanız veya önerileriniz varsa, lütfen projenin Sorunlar bölümünde bir sorun oluşturun.

Katkıda bulunma

Bu proje, katkı ve önerilere açıktır. Çoğu durumda, sağladığınız katkıyı kullanmamız için bize hak tanıma hakkına sahip olduğunuzu ve bu hakkı bize tanıdığınızı bildiren bir Katkıda Bulunan Lisans Sözleşmesi’ni (CLA) kabul etmeniz gerekir. Ayrıntılar için bkz. https://cla.microsoft.com.

Bir çekme isteği gönderdiğinizde, CLA robotu bir CLA sağlamanız gerekip gerekmediğini otomatik olarak belirler ve çekme isteğini uygun şekilde donatır (örn. etiket, açıklama). Robot tarafından sağlanan yönergeleri izlemeniz yeterlidir. Bu işlemi, CLA’mızı kullanarak tüm depolarda yalnızca bir kere yapmanız gerekir.

Bu proje Microsoft Open Source Code of Conduct (Microsoft Açık Kaynak Kullanım Kuralları) belgesinde listelenen kurallara uygundur. Daha fazla bilgi için Kullanım Kuralları SSS bölümüne bakın veya ek sorular veya yorumlarla iletişime geçin opencode@microsoft.com .

İzlenimler