Share via


EventHubConsumerClient Klasse

Die EventHubConsumerClient-Klasse definiert eine allgemeine Schnittstelle zum Empfangen von Ereignissen vom Azure Event Hubs-Dienst.

Das Standard Ziel von EventHubConsumerClient besteht darin, Ereignisse von allen Partitionen eines EventHub mit Lastenausgleich und Prüfpunkting zu empfangen.

Wenn mehrere EventHubConsumerClient-Instanzen für denselben Event Hub, dieselbe Consumergruppe und denselben Prüfpunktstandort ausgeführt werden, werden die Partitionen gleichmäßig unter ihnen verteilt.

Um den Lastenausgleich und persistente Prüfpunkte zu aktivieren, müssen beim Erstellen des EventHubConsumerClients checkpoint_store festgelegt werden. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet.

Ein EventHubConsumerClient kann auch von einer bestimmten Partition empfangen werden, wenn Sie die Methode receive() oder receive_batch() aufrufen und die partition_id angeben. Der Lastenausgleich funktioniert nicht im Einzelpartitionsmodus. Benutzer können jedoch weiterhin Prüfpunkte speichern, wenn die checkpoint_store festgelegt ist.

Vererbung
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Konstruktor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parameter

fully_qualified_namespace
str
Erforderlich

Der vollqualifizierte Hostname für den Event Hubs-Namespace. Das Namespaceformat ist . servicebus.windows.net.

eventhub_name
str
Erforderlich

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

consumer_group
str
Erforderlich

Empfangen von Ereignissen vom Event Hub für diese Consumergruppe.

credential
TokenCredential oder AzureSasCredential oder AzureNamedKeyCredential
Erforderlich

Das für die Authentifizierung verwendete Anmeldeinformationsobjekt, das eine bestimmte Schnittstelle zum Abrufen von Token implementiert. Es akzeptiert EventHubSharedKeyCredential, oder Anmeldeinformationsobjekte, die von der azure-identity-Bibliothek generiert werden, und Objekte, die die *get_token(self, scopes) -Methode implementieren.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen in den Ruhezustand versetzt. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine weitere Aktivität erfolgt. Standardmäßig ist der Wert None, was bedeutet, dass der Client aufgrund von Inaktivität nicht heruntergefahren wird, es sei denn, der Dienst wird initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn der Port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, kann stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

http_proxy
dict[str, str oder int]

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (str-Wert) und "proxy_port" (int-Wert). Darüber hinaus können auch die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

checkpoint_store
CheckpointStore oder None

Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen verwendet, in denen alle Partitionen oder eine einzelne Partition empfangen werden. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.

load_balancing_interval
float

Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist „30 Sekunden“.

partition_ownership_expiration_interval
float

Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Bei jeder Lastenausgleichsauswertung wird die Ablaufzeit des Besitzes automatisch verlängert. Der Standardwert ist 6 * load_balancing_interval, d. h. 180 Sekunden, wenn die Standard-load_balancing_interval von 30 Sekunden verwendet wird.

load_balancing_strategy
str oder LoadBalancingStrategy

Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "gierig" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen aufnimmt, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die Ausgewogene Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Gierstrategie wird standardmäßig verwendet.

custom_endpoint_address
str oder None

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder anderen Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format wäre wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>". Wenn port nicht im custom_endpoint_address angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
str oder None

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

socket_timeout
float

Die Zeit in Sekunden, die der zugrunde liegende Socket für die Verbindung beim Senden und Empfangen von Daten warten soll, bevor ein Timeout auftritt. Der Standardwert ist 0,2 für TransportType.Amqp und 1 für TransportType.AmqpOverWebsocket. Wenn EventHubsConnectionError-Fehler aufgrund eines Schreibzeitlimits auftreten, muss möglicherweise ein größer als der Standardwert übergeben werden. Dies ist für erweiterte Verwendungsszenarien vorgesehen, und normalerweise sollte der Standardwert ausreichend sein.

Beispiele

Erstellen Sie eine neue instance von EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Methoden

close

Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.

from_connection_string

Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.

get_eventhub_properties

Ruft Die Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Abrufen von Partitions-IDs des Event Hubs.

get_partition_properties

Rufen Sie die Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenwörterbuch gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

receive_batch

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

close

Beenden Sie das Abrufen von Ereignissen aus dem Event Hub, und schließen Sie die zugrunde liegende AMQP-Verbindung und -Links.

close() -> None

Rückgabetyp

Beispiele

Schließen Sie den Client.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Erstellen Sie einen EventHubConsumerClient aus einer Verbindungszeichenfolge.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Parameter

conn_str
str
Erforderlich

Die Verbindungszeichenfolge eines Event Hubs.

consumer_group
str
Erforderlich

Empfangen von Ereignissen vom Event Hub für diese Consumergruppe.

eventhub_name
str

Der Pfad des spezifischen Event Hubs, mit dem der Client verbunden werden soll.

logging_enable
bool

Gibt an, ob Netzwerkablaufverfolgungsprotokolle an die Protokollierung ausgegeben werden sollen. Der Standardwert ist False.

auth_timeout
float

Die Zeit in Sekunden, um auf die Autorisierung eines Tokens durch den Dienst zu warten. Der Standardwert beträgt 60 Sekunden. Wenn dieser Wert auf 0 festgelegt ist, wird kein Timeout vom Client erzwungen.

user_agent
str

Falls angegeben, wird dies vor der Zeichenfolge des Benutzer-Agents hinzugefügt.

retry_total
int

Die Gesamtanzahl der Versuche, einen fehlgeschlagenen Vorgang zu wiederholen, wenn ein Fehler auftritt. Der Standardwert ist 3. Der Kontext der retry_total beim Empfangen ist besonders: Die Empfangsmethode wird in jeder Iteration durch eine While-Schleife implementiert, die die interne Empfangsmethode aufruft. Im Empfangsfall gibt retry_total die Anzahl der Wiederholungsversuche nach einem Fehler an, der von der internen Empfangsmethode in der while-Schleife ausgelöst wurde. Wenn Wiederholungsversuche erschöpft sind, wird der on_error Rückruf (sofern angegeben) mit den Fehlerinformationen aufgerufen. Der fehlerhafte interne Partitionsconsumer wird geschlossen (on_partition_close wird aufgerufen, wenn angegeben), und ein neuer interner Partitionsconsumer wird erstellt (on_partition_initialize wird aufgerufen, wenn angegeben), um den Empfang fortzusetzen.

retry_backoff_factor
float

Ein Backofffaktor, der zwischen den Versuchen nach dem zweiten Versuch angewendet werden soll (die meisten Fehler werden sofort durch einen zweiten Versuch ohne Verzögerung behoben). Im festen Modus wird die Wiederholungsrichtlinie für {Backoff-Faktor} immer in den Ruhezustand versetzt. Im Modus "exponentiell" wird die Wiederholungsrichtlinie für: {backoff factor} * (2 ** ({number of total retries} - 1)) sekunden in den Ruhezustand versetzt. Wenn der backoff_factor 0,1 ist, wird der Wiederholungsversuch für [0.0s, 0.2s, 0.4s, ...] zwischen Wiederholungen ruhen. Der Standardwert ist 0,8.

retry_backoff_max
float

Die maximale Backoffzeit. Der Standardwert ist 120 Sekunden (2 Minuten).

retry_mode
str

Das Verzögerungsverhalten zwischen Wiederholungsversuchen. Unterstützte Werte sind "fixed" oder "exponential", wobei der Standardwert "exponentiell" ist.

idle_timeout
float

Timeout in Sekunden, nach dem dieser Client die zugrunde liegende Verbindung schließt, wenn keine furthur-Aktivität vorhanden ist. Standardmäßig ist der Wert None, was bedeutet, dass der Client nicht aufgrund von Inaktivität heruntergefahren wird, es sei denn, der Dienst initiiert.

transport_type
TransportType

Der Typ des Transportprotokolls, das für die Kommunikation mit dem Event Hubs-Dienst verwendet wird. Der Standardwert ist TransportType.Amqp , in diesem Fall wird Port 5671 verwendet. Wenn port 5671 in der Netzwerkumgebung nicht verfügbar/blockiert ist, könnte stattdessen TransportType.AmqpOverWebsocket verwendet werden, wobei Port 443 für die Kommunikation verwendet wird.

http_proxy
dict

HTTP-Proxyeinstellungen. Dies muss ein Wörterbuch mit den folgenden Schlüsseln sein: "proxy_hostname" (Str-Wert) und "proxy_port" (int-Wert). Zusätzlich können die folgenden Schlüssel vorhanden sein: "Benutzername", "Kennwort".

checkpoint_store
CheckpointStore oder None

Ein Manager, der die Partitionslastenausgleichs- und Prüfpunktdaten beim Empfangen von Ereignissen speichert. Der Prüfpunktspeicher wird in beiden Fällen des Empfangens von allen Partitionen oder einer einzelnen Partition verwendet. Im letzteren Fall gilt der Lastenausgleich nicht. Wenn kein Prüfpunktspeicher bereitgestellt wird, wird der Prüfpunkt intern im Arbeitsspeicher verwaltet, und der EventHubConsumerClient-instance empfängt Ereignisse ohne Lastenausgleich.

load_balancing_interval
float

Wenn der Lastenausgleich einsetzt. Dies ist das Intervall in Sekunden zwischen zwei Lastenausgleichsauswertungen. Der Standardwert ist 10 Sekunden.

partition_ownership_expiration_interval
float

Ein Partitionsbesitz läuft nach dieser Anzahl von Sekunden ab. Jede Lastenausgleichsauswertung verlängert automatisch die Ablaufzeit des Besitzes. Der Standardwert ist 6 * load_balancing_interval, d. h. 60 Sekunden bei Verwendung des Standard-load_balancing_interval von 30 Sekunden.

load_balancing_strategy
str oder LoadBalancingStrategy

Wenn der Lastenausgleich einsetzt, wird diese Strategie verwendet, um den Partitionsbesitz zu beanspruchen und auszugleichen. Verwenden Sie "greedy" oder LoadBalancingStrategy.GREEDY für die gierige Strategie, die für jede Lastenausgleichsauswertung so viele nicht beanspruchte Partitionen erfasst, die zum Lastenausgleich erforderlich sind. Verwenden Sie "balanced" oder LoadBalancingStrategy.BALANCED für die balanced-Strategie, die für jede Lastenausgleichsauswertung nur eine Partition beansprucht, die nicht von einem anderen EventHubConsumerClient beansprucht wird. Wenn alle Partitionen eines EventHubs von einem anderen EventHubConsumerClient beansprucht werden und dieser Client zu wenige Partitionen beansprucht hat, stiehlt dieser Client eine Partition von anderen Clients für jede Lastenausgleichsauswertung, unabhängig von der Lastenausgleichsstrategie. Die Greedy-Strategie wird standardmäßig verwendet.

custom_endpoint_address
str oder None

Die benutzerdefinierte Endpunktadresse, die zum Herstellen einer Verbindung mit dem Event Hubs-Dienst verwendet werden soll, sodass Netzwerkanforderungen über alle Anwendungsgateways oder andere Pfade weitergeleitet werden können, die für die Hostumgebung erforderlich sind. Der Standardwert ist None. Das Format würde wie "sb://< custom_endpoint_hostname>:<custom_endpoint_port>" aussehen. Wenn port im custom_endpoint_address nicht angegeben ist, wird standardmäßig Port 443 verwendet.

connection_verify
str oder None

Pfad zur benutzerdefinierten CA_BUNDLE Datei des SSL-Zertifikats, das zum Authentifizieren der Identität des Verbindungsendpunkts verwendet wird. Der Standardwert ist None. In diesem Fall wird certifi.where() verwendet.

uamqp_transport
bool

Gibt an, ob die uamqp-Bibliothek als zugrunde liegender Transport verwendet werden soll. Der Standardwert ist False, und die Pure Python AMQP-Bibliothek wird als zugrunde liegender Transport verwendet.

Rückgabetyp

Beispiele

Erstellen Sie eine neue instance von EventHubConsumerClient aus Verbindungszeichenfolge.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Ruft Die Eigenschaften des Event Hubs ab.

Zu den Schlüsseln im zurückgegebenen Wörterbuch gehören:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Gibt zurück

Ein Wörterbuch, das Informationen zum Event Hub enthält.

Rückgabetyp

Ausnahmen

get_partition_ids

Abrufen von Partitions-IDs des Event Hubs.

get_partition_ids() -> List[str]

Gibt zurück

Eine Liste von Partitions-IDs.

Rückgabetyp

Ausnahmen

get_partition_properties

Rufen Sie die Eigenschaften der angegebenen Partition ab.

Zu den Schlüsseln im Eigenschaftenwörterbuch gehören:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameter

partition_id
str
Erforderlich

Die Zielpartitions-ID.

Gibt zurück

Ein Wörterbuch, das Partitionseigenschaften enthält.

Rückgabetyp

Ausnahmen

receive

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Parameter

on_event
callable[PartitionContext, EventData oder None]
Erforderlich

Die Rückruffunktion für die Behandlung eines empfangenen Ereignisses. Der Rückruf benötigt zwei Parameter: partition_context , das den Partitionskontext und das Ereignis enthält, das das empfangene Ereignis ist. Die Rückruffunktion sollte wie folgt definiert werden: on_event(partition_context, Ereignis). Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .

max_wait_time
float

Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event Rückruf mit None aufgerufen. Wenn dieser Wert auf Keine oder 0 (Standardeinstellung) festgelegt ist, wird der Rückruf erst aufgerufen, wenn ein Ereignis empfangen wird.

partition_id
str

Falls angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.

owner_level
int

Die Priorität für einen exklusiven Verbraucher. Ein exklusiver Consumer wird erstellt, wenn owner_level festgelegt ist. Ein Verbraucher mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.

prefetch
int

Die Anzahl der Ereignisse, die vom Dienst für die Verarbeitung vorab ausgelöst werden sollen. Der Standardwert ist 300.

track_last_enqueued_event_properties
bool

Gibt an, ob der Consumer Informationen zum zuletzt in die Warteschlange gestellten Ereignis auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zum zuletzt in Warteschlange gestellten Partitionsereignis nachverfolgt werden, enthält jedes Ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten über die Partition. Dies führt zu einem geringen Zusätzlichen Netzwerkbandbreitenverbrauch, der im Allgemeinen ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mit dem Event Hub-Client berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.

starting_position
str, int, datetime oder dict[str,any]

Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Dict mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt. Der Standardwert ist "@latest".

starting_position_inclusive
bool oder dict[str,bool]

Bestimmen Sie, ob der angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Dict mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob der starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.

on_error
callable[[PartitionContext, Exception]]

Die Rückruffunktion, die aufgerufen wird, wenn ein Fehler beim Empfang ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückrufs wird auch aufgerufen, wenn während des on_event Rückrufs eine nicht behandelte Ausnahme ausgelöst wird.

on_partition_initialize
callable[[PartitionContext]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition aufgerufen wird, hat die Initialisierung abgeschlossen. Es wird auch aufgerufen, wenn ein neuer interner Partitionsverbraucher erstellt wird, um den Empfangensprozess für einen fehlerhaften und geschlossenen internen Partitionsverbraucher zu übernehmen. Der Rückruf benötigt einen einzigen Parameter: partition_context der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Die Rückruffunktion, die nach einem Consumer für eine bestimmte Partition aufgerufen wird, ist geschlossen. Es wird auch aufgerufen, wenn während des Empfangens ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie die CloseReason verschiedenen Schließgründe.

Rückgabetyp

Beispiele

Empfangen von Ereignissen vom EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Empfangen von Ereignissen von Partitionen mit optionalem Lastenausgleich und Prüfpunkten.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Parameter

on_event_batch
callable[PartitionContext, list[EventData]]
Erforderlich

Die Rückruffunktion für die Behandlung eines Batches empfangener Ereignisse. Der Rückruf benötigt zwei Parameter: partition_context , der den Partitionskontext enthält, und event_batch, bei dem es sich um die empfangenen Ereignisse handelt. Die Rückruffunktion sollte wie folgt definiert werden: on_event_batch(partition_context, event_batch). event_batch kann eine leere Liste sein, wenn max_wait_time nicht None oder 0 ist und nach max_wait_time kein Ereignis empfangen wird. Ausführliche Informationen zum PartitionContextPartitionskontext finden Sie unter .

max_batch_size
int

Die maximale Anzahl von Ereignissen in einem Batch, die an den Rückruf übergeben on_event_batch. Wenn die tatsächliche empfangene Anzahl von Ereignissen größer als max_batch_size ist, werden die empfangenen Ereignisse in Batches unterteilt und rufen den Rückruf für jeden Batch mit bis zu max_batch_size Ereignissen auf.

max_wait_time
float

Das maximale Intervall in Sekunden, das der Ereignisprozessor wartet, bevor der Rückruf aufgerufen wird. Wenn innerhalb dieses Intervalls keine Ereignisse empfangen werden, wird der on_event_batch Rückruf mit einer leeren Liste aufgerufen.

partition_id
str

Falls angegeben, empfängt der Client nur von dieser Partition. Andernfalls empfängt der Client von allen Partitionen.

owner_level
int

Die Priorität für einen exklusiven Verbraucher. Ein exklusiver Consumer wird erstellt, wenn owner_level festgelegt ist. Ein Verbraucher mit einem höheren owner_level hat eine höhere exklusive Priorität. Die Besitzerebene wird auch als "Epochenwert" des Consumers bezeichnet.

prefetch
int

Die Anzahl der Ereignisse, die vom Dienst für die Verarbeitung vorab ausgelöst werden sollen. Der Standardwert ist 300.

track_last_enqueued_event_properties
bool

Gibt an, ob der Consumer Informationen zum zuletzt in die Warteschlange gestellten Ereignis auf der zugeordneten Partition anfordern und diese Informationen nachverfolgen soll, wenn Ereignisse empfangen werden. Wenn Informationen zum zuletzt in Warteschlange gestellten Partitionsereignis nachverfolgt werden, enthält jedes Ereignis, das vom Event Hubs-Dienst empfangen wird, Metadaten über die Partition. Dies führt zu einem geringen Zusätzlichen Netzwerkbandbreitenverbrauch, der im Allgemeinen ein günstiger Kompromiss ist, wenn er bei regelmäßigen Anforderungen für Partitionseigenschaften mit dem Event Hub-Client berücksichtigt wird. Sie ist standardmäßig auf False festgelegt.

starting_position
str, int, datetime oder dict[str,any]

Beginnen Sie mit dem Empfang von dieser Ereignisposition, wenn keine Prüfpunktdaten für eine Partition vorhanden sind. Prüfpunktdaten werden verwendet, sofern verfügbar. Dies kann ein Dict mit partitions-ID als Schlüssel und Position als Wert für einzelne Partitionen oder ein einzelner Wert für alle Partitionen sein. Der Werttyp kann str, int oder datetime.datetime sein. Außerdem werden die Werte "-1" für den Empfang vom Anfang des Datenstroms und "@latest" für den Empfang nur neuer Ereignisse unterstützt. Der Standardwert ist "@latest".

starting_position_inclusive
bool oder dict[str,bool]

Bestimmen Sie, ob der angegebene starting_position inklusive(>=) ist oder nicht (>). True für inklusive und False für exklusiv. Dies kann ein Dict mit der Partitions-ID als Schlüssel und bool als Wert sein, der angibt, ob der starting_position für eine bestimmte Partition inklusive ist oder nicht. Dies kann auch ein einzelner Bool-Wert für alle starting_position sein. Der Standardwert ist False.

on_error
callable[[PartitionContext, Exception]]

Die Rückruffunktion, die aufgerufen wird, wenn ein Fehler beim Empfang ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind, oder während des Lastenausgleichs. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen enthält, und fehler als Ausnahme. partition_context kann Keine sein, wenn der Fehler während des Lastenausgleichs ausgelöst wird. Der Rückruf sollte wie folgt definiert werden: on_error(partition_context, Error). Der on_error Rückrufs wird auch aufgerufen, wenn während des on_event Rückrufs eine nicht behandelte Ausnahme ausgelöst wird.

on_partition_initialize
callable[[PartitionContext]]

Die Rückruffunktion, die aufgerufen wird, nachdem ein Consumer für eine bestimmte Partition aufgerufen wird, hat die Initialisierung abgeschlossen. Es wird auch aufgerufen, wenn ein neuer interner Partitionsverbraucher erstellt wird, um den Empfangensprozess für einen fehlerhaften und geschlossenen internen Partitionsverbraucher zu übernehmen. Der Rückruf benötigt einen einzigen Parameter: partition_context der die Partitionsinformationen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

Die Rückruffunktion, die nach einem Consumer für eine bestimmte Partition aufgerufen wird, ist geschlossen. Es wird auch aufgerufen, wenn während des Empfangens ein Fehler ausgelöst wird, nachdem Wiederholungsversuche erschöpft sind. Der Rückruf benötigt zwei Parameter: partition_context der Partitionsinformationen und grund für das Schließen enthält. Der Rückruf sollte wie folgt definiert werden: on_partition_close(partition_context, Reason). Bitte beachten Sie die CloseReason verschiedenen Schließgründe.

Rückgabetyp

Beispiele

Empfangen von Ereignissen in Batches vom EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)