EventHubConsumerClient Klas

De klasse EventHubConsumerClient definieert een interface op hoog niveau voor het ontvangen van gebeurtenissen van de Azure Event Hubs-service.

Het belangrijkste doel van EventHubConsumerClient is het ontvangen van gebeurtenissen van alle partities van een EventHub met taakverdeling en controlepunten.

Wanneer meerdere EventHubConsumerClient-exemplaren worden uitgevoerd op dezelfde Event Hub, consumentengroep en controlepuntlocatie, worden de partities gelijkmatig over hen verdeeld.

Als u taakverdeling en permanente controlepunten wilt inschakelen, moet checkpoint_store worden ingesteld bij het maken van de EventHubConsumerClient. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard.

Een EventHubConsumerClient kan ook ontvangen van een specifieke partitie wanneer u de methode receive() of receive_batch() aanroept en de partition_id opgeeft. Taakverdeling werkt niet in de modus met één partitie. Maar gebruikers kunnen nog steeds controlepunten opslaan als de checkpoint_store is ingesteld.

Overname
azure.eventhub._client_base.ClientBase
EventHubConsumerClient

Constructor

EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)

Parameters

fully_qualified_namespace
str
Vereist

De volledig gekwalificeerde hostnaam voor de Event Hubs-naamruimte. De indeling van de naamruimte is : .servicebus.windows.net.

eventhub_name
str
Vereist

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

consumer_group
str
Vereist

Gebeurtenissen ontvangen van de Event Hub voor deze consumentengroep.

credential
TokenCredential of AzureSasCredential of AzureNamedKeyCredential
Vereist

Het referentieobject dat wordt gebruikt voor verificatie, waarmee een bepaalde interface wordt geïmplementeerd voor het ophalen van tokens. Het accepteert , of referentieobjecten EventHubSharedKeyCredentialdie zijn gegenereerd door de azure-identity-bibliotheek en objecten die de methode *get_token(zelf, bereiken) implementeren.

logging_enable
bool

Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3. De context van retry_total bij het ontvangen is speciaal: de ontvangstmethode wordt geïmplementeerd door een while-loop-aanroepende interne ontvangstmethode in elke iteratie. In het ontvangstscenariogeeft retry_total het aantal nieuwe pogingen na een fout op die is gegenereerd door de interne ontvangstmethode in de while-lus. Als nieuwe pogingen zijn uitgeput, wordt de on_error callback aangeroepen (indien opgegeven) met de foutinformatie. De mislukte interne partitieconsumer wordt gesloten (on_partition_close wordt aangeroepen indien opgegeven) en er wordt een nieuwe interne partitieconsumer gemaakt (on_partition_initialize wordt aangeroepen indien opgegeven) om de ontvangst te hervatten.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen verdere activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.

http_proxy
dict[str, str of int]

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

checkpoint_store
CheckpointStore of None

Een manager die de partitietaakverdeling en controlepuntgegevens opslaat bij het ontvangen van gebeurtenissen. Het controlepuntarchief wordt gebruikt in beide gevallen van ontvangst van alle partities of één partitie. In het laatste geval is taakverdeling niet van toepassing. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard en ontvangt het EventHubConsumerClient-exemplaar gebeurtenissen zonder taakverdeling.

load_balancing_interval
float

Wanneer taakverdeling wordt gestart. Dit is het interval in seconden tussen twee taakverdelingsevaluaties. De standaardwaarde is 30 seconden.

partition_ownership_expiration_interval
float

Het eigendom van een partitie verloopt na dit aantal seconden. Bij elke taakverdelingsevaluatie wordt de verlooptijd van het eigendom automatisch verlengd. De standaardwaarde is 6 * load_balancing_interval, dat wil zeggen 180 seconden bij gebruik van de standaard load_balancing_interval van 30 seconden.

load_balancing_strategy
str of LoadBalancingStrategy

Wanneer taakverdeling wordt gestart, wordt deze strategie gebruikt om het eigendom van de partitie te claimen en te verdelen. Gebruik 'greedy' of LoadBalancingStrategy.GREEDY voor de greedy-strategie, die voor elke taakverdelingsevaluatie zoveel niet-geclaimde partities opneemt die nodig zijn om de taak te verdelen. Gebruik 'balanced' of LoadBalancingStrategy.BALANCED voor de gebalanceerde strategie, die voor elke taakverdelingsevaluatie slechts één partitie claimt die niet wordt geclaimd door andere EventHubConsumerClient. Als alle partities van een EventHub worden geclaimd door andere EventHubConsumerClient en deze client te weinig partities heeft geclaimd, steelt deze client één partitie van andere clients voor elke taakverdelingsevaluatie, ongeacht de taakverdelingsstrategie. Greedy-strategie wordt standaard gebruikt.

custom_endpoint_address
str of None

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
str of None

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

socket_timeout
float

De tijd in seconden dat de onderliggende socket op de verbinding moet wachten bij het verzenden en ontvangen van gegevens voordat er een time-out optreedt. De standaardwaarde is 0,2 voor TransportType.Amqp en 1 voor TransportType.AmqpOverWebsocket. Als er EventHubsConnectionError-fouten optreden vanwege een time-out van schrijfbewerkingen, moet mogelijk een grotere waarde dan de standaardwaarde worden doorgegeven. Dit is voor geavanceerde gebruiksscenario's en normaal gesproken moet de standaardwaarde voldoende zijn.

Voorbeelden

Maak een nieuw exemplaar van de EventHubConsumerClient.


   import os
   from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential

   fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
   shared_access_key = os.environ['EVENT_HUB_SAS_KEY']

   credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
   consumer = EventHubConsumerClient(
       fully_qualified_namespace=fully_qualified_namespace,
       eventhub_name=eventhub_name,
       consumer_group='$Default',
       credential=credential)

Methoden

close

Stop met het ophalen van gebeurtenissen uit de Event Hub en sluit de onderliggende AMQP-verbinding en koppelingen.

from_connection_string

Maak een EventHubConsumerClient op basis van een verbindingsreeks.

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_partition_ids

Partitie-id's van de Event Hub ophalen.

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

receive

Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.

receive_batch

Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.

close

Stop met het ophalen van gebeurtenissen uit de Event Hub en sluit de onderliggende AMQP-verbinding en koppelingen.

close() -> None

Retourtype

Voorbeelden

Sluit de client af.


   import os
   import threading

   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']

   from azure.eventhub import EventHubConsumerClient
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group="$Default",
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

   logger = logging.getLogger("azure.eventhub")

   def on_event(partition_context, event):
       # Put your code here.
       # If the operation is i/o intensive, multi-thread will have better performance.
       logger.info("Received event from partition: {}".format(partition_context.partition_id))

   # The 'receive' method is a blocking call, it can be executed in a thread for
   # non-blocking behavior, and combined with the 'close' method.

   worker = threading.Thread(
       target=consumer.receive,
       kwargs={
           "on_event": on_event,
           "starting_position": "-1",  # "-1" is from the beginning of the partition.
       }
   )
   worker.start()
   time.sleep(10)  # Keep receiving for 10s then close.
   # Close down the consumer handler explicitly.
   consumer.close()

from_connection_string

Maak een EventHubConsumerClient op basis van een verbindingsreeks.

from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient

Parameters

conn_str
str
Vereist

De verbindingsreeks van een Event Hub.

consumer_group
str
Vereist

Gebeurtenissen ontvangen van de Event Hub voor deze consumentengroep.

eventhub_name
str

Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.

logging_enable
bool

Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.

auth_timeout
float

De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.

user_agent
str

Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.

retry_total
int

Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3. De context van retry_total bij het ontvangen is speciaal: de ontvangstmethode wordt geïmplementeerd door een while-loop-aanroepende interne ontvangstmethode in elke iteratie. In het ontvangstscenariogeeft retry_total het aantal nieuwe pogingen na een fout op die is gegenereerd door de interne ontvangstmethode in de while-lus. Als nieuwe pogingen zijn uitgeput, wordt de on_error callback aangeroepen (indien opgegeven) met de foutinformatie. De mislukte interne partitieconsumer wordt gesloten (on_partition_close wordt aangeroepen indien opgegeven) en er wordt een nieuwe interne partitieconsumer gemaakt (on_partition_initialize wordt aangeroepen indien opgegeven) om de ontvangst te hervatten.

retry_backoff_factor
float

Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.

retry_backoff_max
float

De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).

retry_mode
str

Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.

idle_timeout
float

Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen furthur-activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.

transport_type
TransportType

Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.

http_proxy
dict

HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.

checkpoint_store
CheckpointStore of None

Een manager die de partitietaakverdeling en controlepuntgegevens opslaat bij het ontvangen van gebeurtenissen. Het controlepuntarchief wordt gebruikt in beide gevallen van ontvangst van alle partities of één partitie. In het laatste geval is taakverdeling niet van toepassing. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard en ontvangt het EventHubConsumerClient-exemplaar gebeurtenissen zonder taakverdeling.

load_balancing_interval
float

Wanneer taakverdeling wordt gestart. Dit is het interval in seconden tussen twee taakverdelingsevaluaties. De standaardwaarde is 10 seconden.

partition_ownership_expiration_interval
float

Het eigendom van een partitie verloopt na dit aantal seconden. Bij elke taakverdelingsevaluatie wordt de verlooptijd van het eigendom automatisch verlengd. De standaardwaarde is 6 * load_balancing_interval, dat wil zeggen 60 seconden bij gebruik van de standaard load_balancing_interval van 30 seconden.

load_balancing_strategy
str of LoadBalancingStrategy

Wanneer taakverdeling wordt gestart, wordt deze strategie gebruikt om het eigendom van de partitie te claimen en te verdelen. Gebruik 'greedy' of LoadBalancingStrategy.GREEDY voor de greedy-strategie, die voor elke taakverdelingsevaluatie zoveel niet-geclaimde partities opneemt die nodig zijn om de taak te verdelen. Gebruik 'balanced' of LoadBalancingStrategy.BALANCED voor de gebalanceerde strategie, die voor elke taakverdelingsevaluatie slechts één partitie claimt die niet wordt geclaimd door andere EventHubConsumerClient. Als alle partities van een EventHub worden geclaimd door andere EventHubConsumerClient en deze client te weinig partities heeft geclaimd, steelt deze client één partitie van andere clients voor elke taakverdelingsevaluatie, ongeacht de taakverdelingsstrategie. Greedy-strategie wordt standaard gebruikt.

custom_endpoint_address
str of None

Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.

connection_verify
str of None

Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.

uamqp_transport
bool

Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.

Retourtype

Voorbeelden

Maak een nieuw exemplaar van de EventHubConsumerClient vanuit verbindingsreeks.


   import os
   from azure.eventhub import EventHubConsumerClient
   event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
   eventhub_name = os.environ['EVENT_HUB_NAME']
   consumer = EventHubConsumerClient.from_connection_string(
       conn_str=event_hub_connection_str,
       consumer_group='$Default',
       eventhub_name=eventhub_name  # EventHub name should be specified if it doesn't show up in connection string.
   )

get_eventhub_properties

Eigenschappen van de Event Hub ophalen.

Sleutels in de geretourneerde woordenlijst zijn onder andere:

  • eventhub_name (str)

  • created_at (UTC datetime.datetime)

  • partition_ids (list[str])

get_eventhub_properties() -> Dict[str, Any]

Retouren

Een woordenlijst met informatie over de Event Hub.

Retourtype

Uitzonderingen

get_partition_ids

Partitie-id's van de Event Hub ophalen.

get_partition_ids() -> List[str]

Retouren

Een lijst met partitie-id's.

Retourtype

Uitzonderingen

get_partition_properties

Eigenschappen van de opgegeven partitie ophalen.

Sleutels in de eigenschappenwoordenlijst zijn onder andere:

  • eventhub_name (str)

  • id (str)

  • beginning_sequence_number (int)

  • last_enqueued_sequence_number (int)

  • last_enqueued_offset (str)

  • last_enqueued_time_utc (UTC datetime.datetime)

  • is_empty (bool)

get_partition_properties(partition_id: str) -> Dict[str, Any]

Parameters

partition_id
str
Vereist

De doelpartitie-id.

Retouren

Een woordenlijst met partitie-eigenschappen.

Retourtype

Uitzonderingen

receive

Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.

receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None

Parameters

on_event
callable[PartitionContext, EventData of None]
Vereist

De callback-functie voor het verwerken van een ontvangen gebeurtenis. De callback heeft twee parameters: partition_context die partitiecontext bevat en gebeurtenis die de ontvangen gebeurtenis is. De callback-functie moet als volgt worden gedefinieerd: on_event(partition_context, gebeurtenis). Raadpleeg PartitionContextvoor gedetailleerde informatie over de partitiecontext.

max_wait_time
float

Het maximale interval in seconden dat de gebeurtenisprocessor wacht voordat de callback wordt aangeroepen. Als er binnen dit interval geen gebeurtenissen worden ontvangen, wordt de on_event callback aangeroepen met Geen. Als deze waarde is ingesteld op Geen of 0 (de standaardinstelling), wordt de callback pas aangeroepen als er een gebeurtenis is ontvangen.

partition_id
str

Indien opgegeven, ontvangt de client alleen van deze partitie. Anders ontvangt de client van alle partities.

owner_level
int

De prioriteit voor een exclusieve consument. Er wordt een exclusieve consument gemaakt als owner_level is ingesteld. Een consument met een hogere owner_level heeft een hogere exclusieve prioriteit. Het eigenaarsniveau wordt ook wel de 'epoch-waarde' van de consument genoemd.

prefetch
int

Het aantal gebeurtenissen dat vooraf moet worden opgehaald uit de service voor verwerking. De standaardwaarde is 300.

track_last_enqueued_event_properties
bool

Hiermee wordt aangegeven of de consument informatie moet aanvragen over de laatste enqueued-gebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen. Wanneer informatie over de laatste enqueued-gebeurtenis van de partities wordt bijgehouden, bevat elke gebeurtenis die van de Event Hubs-service wordt ontvangen metagegevens over de partitie. Dit resulteert in een klein extra verbruik van netwerkbandbreedte. Dit is over het algemeen een gunstige afweging wanneer wordt overwogen tegen het periodiek indienen van aanvragen voor partitie-eigenschappen met behulp van de Event Hub-client. Deze is standaard ingesteld op False .

starting_position
str, int, datetime of dict[str,any]

Begin met ontvangen van deze gebeurtenispositie als er geen controlepuntgegevens voor een partitie zijn. Controlepuntgegevens worden gebruikt, indien beschikbaar. Dit kan een dict zijn met partitie-id als de sleutel en positie als de waarde voor afzonderlijke partities, of één waarde voor alle partities. Het waardetype kan str, int of datetime.datetime zijn. Ook worden de waarden '-1' ondersteund voor het ontvangen vanaf het begin van de stream en '@latest' voor het ontvangen van alleen nieuwe gebeurtenissen. De standaardwaarde is '@latest'.

starting_position_inclusive
bool of dict[str,bool]

Bepaal of de opgegeven starting_position inclusief(>=) of niet () is.> Waar voor inclusief en Onwaar voor exclusief. Dit kan een dict zijn met partitie-id als de sleutel en bool als de waarde die aangeeft of de starting_position voor een specifieke partitie inclusief is of niet. Dit kan ook één boolwaarde zijn voor alle starting_position. De standaardwaarde is False.

on_error
callable[[PartitionContext, Exception]]

De callback-functie die wordt aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput of tijdens het taakverdelingsproces. De callback heeft twee parameters: partition_context met partitiegegevens en fout als uitzondering. partition_context kan Geen zijn als de fout optreedt tijdens het taakverdelingsproces. De callback moet als volgt worden gedefinieerd: on_error(partition_context, fout). De on_error callback wordt ook aangeroepen als er een onverwerkte uitzondering wordt gegenereerd tijdens de on_event callback.

on_partition_initialize
callable[[PartitionContext]]

De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie de initialisatie heeft voltooid. Deze wordt ook aangeroepen wanneer een nieuwe interne partitieconsumer wordt gemaakt om het ontvangende proces over te nemen voor een gebruiker van een mislukte en gesloten interne partitie. De callback neemt één parameter: partition_context die de partitiegegevens bevat. De callback moet als volgt worden gedefinieerd: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie is gesloten. Het wordt ook aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput. De callback heeft twee parameters: partition_context die partitiegegevens en reden voor de sluiting bevat. De callback moet als volgt worden gedefinieerd: on_partition_close(partition_context, reden). Raadpleeg CloseReason voor de verschillende afsluitende redenen.

Retourtype

Voorbeelden

Gebeurtenissen ontvangen van de EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event(partition_context, event):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received event from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive(on_event=on_event)

receive_batch

Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.

receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None

Parameters

on_event_batch
callable[PartitionContext, list[EventData]]
Vereist

De callback-functie voor het verwerken van een batch met ontvangen gebeurtenissen. De callback heeft twee parameters: partition_context die partitiecontext bevat en event_batch, de ontvangen gebeurtenissen. De callback-functie moet als volgt worden gedefinieerd: on_event_batch(partition_context, event_batch). event_batch kan een lege lijst zijn als max_wait_time geen of geen 0 is en er geen gebeurtenis wordt ontvangen na max_wait_time. Raadpleeg PartitionContextvoor gedetailleerde informatie over de partitiecontext.

max_batch_size
int

Het maximum aantal gebeurtenissen in een batch dat is doorgegeven aan callback on_event_batch. Als het werkelijke aantal ontvangen gebeurtenissen groter is dan max_batch_size, worden de ontvangen gebeurtenissen onderverdeeld in batches en wordt de callback voor elke batch aangeroepen met maximaal max_batch_size gebeurtenissen.

max_wait_time
float

Het maximale interval in seconden dat de gebeurtenisprocessor wacht voordat de callback wordt aangeroepen. Als er binnen dit interval geen gebeurtenissen worden ontvangen, wordt de on_event_batch callback aangeroepen met een lege lijst.

partition_id
str

Indien opgegeven, ontvangt de client alleen van deze partitie. Anders ontvangt de client van alle partities.

owner_level
int

De prioriteit voor een exclusieve consument. Er wordt een exclusieve consument gemaakt als owner_level is ingesteld. Een consument met een hogere owner_level heeft een hogere exclusieve prioriteit. Het eigenaarsniveau wordt ook wel de 'epoch-waarde' van de consument genoemd.

prefetch
int

Het aantal gebeurtenissen dat vooraf moet worden opgehaald uit de service voor verwerking. De standaardwaarde is 300.

track_last_enqueued_event_properties
bool

Hiermee wordt aangegeven of de consument informatie moet aanvragen over de laatste enqueued-gebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen. Wanneer informatie over de laatste enqueued-gebeurtenis van de partities wordt bijgehouden, bevat elke gebeurtenis die van de Event Hubs-service wordt ontvangen metagegevens over de partitie. Dit resulteert in een klein extra verbruik van netwerkbandbreedte. Dit is over het algemeen een gunstige afweging wanneer wordt overwogen tegen het periodiek indienen van aanvragen voor partitie-eigenschappen met behulp van de Event Hub-client. Deze is standaard ingesteld op False .

starting_position
str, int, datetime of dict[str,any]

Begin met ontvangen van deze gebeurtenispositie als er geen controlepuntgegevens voor een partitie zijn. Controlepuntgegevens worden gebruikt, indien beschikbaar. Dit kan een dict zijn met partitie-id als de sleutel en positie als de waarde voor afzonderlijke partities, of één waarde voor alle partities. Het waardetype kan str, int of datetime.datetime zijn. Ook worden de waarden '-1' ondersteund voor het ontvangen vanaf het begin van de stream en '@latest' voor het ontvangen van alleen nieuwe gebeurtenissen. De standaardwaarde is '@latest'.

starting_position_inclusive
bool of dict[str,bool]

Bepaal of de opgegeven starting_position inclusief(>=) of niet () is.> Waar voor inclusief en Onwaar voor exclusief. Dit kan een dict zijn met partitie-id als de sleutel en bool als de waarde die aangeeft of de starting_position voor een specifieke partitie inclusief is of niet. Dit kan ook één boolwaarde zijn voor alle starting_position. De standaardwaarde is False.

on_error
callable[[PartitionContext, Exception]]

De callback-functie die wordt aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput of tijdens het taakverdelingsproces. De callback heeft twee parameters: partition_context met partitiegegevens en fout als uitzondering. partition_context kan Geen zijn als de fout optreedt tijdens het taakverdelingsproces. De callback moet als volgt worden gedefinieerd: on_error(partition_context, fout). De on_error callback wordt ook aangeroepen als er een onverwerkte uitzondering wordt gegenereerd tijdens de on_event callback.

on_partition_initialize
callable[[PartitionContext]]

De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie de initialisatie heeft voltooid. Deze wordt ook aangeroepen wanneer een nieuwe interne partitieconsumer wordt gemaakt om het ontvangende proces over te nemen voor een gebruiker van een mislukte en gesloten interne partitie. De callback neemt één parameter: partition_context die de partitiegegevens bevat. De callback moet als volgt worden gedefinieerd: on_partition_initialize(partition_context).

on_partition_close
callable[[PartitionContext, CloseReason]]

De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie is gesloten. Het wordt ook aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput. De callback heeft twee parameters: partition_context die partitiegegevens en reden voor de sluiting bevat. De callback moet als volgt worden gedefinieerd: on_partition_close(partition_context, reden). Raadpleeg CloseReason voor de verschillende afsluitende redenen.

Retourtype

Voorbeelden

Gebeurtenissen in batches ontvangen van de EventHub.


       logger = logging.getLogger("azure.eventhub")

       def on_event_batch(partition_context, event_batch):
           # Put your code here.
           # If the operation is i/o intensive, multi-thread will have better performance.
           logger.info("Received events from partition: {}".format(partition_context.partition_id))

       with consumer:
           consumer.receive_batch(on_event_batch=on_event_batch)