EventHubConsumerClient Klas
De klasse EventHubConsumerClient definieert een interface op hoog niveau voor het ontvangen van gebeurtenissen van de Azure Event Hubs-service.
Het belangrijkste doel van EventHubConsumerClient is het ontvangen van gebeurtenissen van alle partities van een EventHub met taakverdeling en controlepunten.
Wanneer meerdere EventHubConsumerClient-exemplaren worden uitgevoerd op dezelfde Event Hub, consumentengroep en controlepuntlocatie, worden de partities gelijkmatig over hen verdeeld.
Als u taakverdeling en permanente controlepunten wilt inschakelen, moet checkpoint_store worden ingesteld bij het maken van de EventHubConsumerClient. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard.
Een EventHubConsumerClient kan ook ontvangen van een specifieke partitie wanneer u de methode receive() of receive_batch() aanroept en de partition_id opgeeft. Taakverdeling werkt niet in de modus met één partitie. Maar gebruikers kunnen nog steeds controlepunten opslaan als de checkpoint_store is ingesteld.
- Overname
-
azure.eventhub._client_base.ClientBaseEventHubConsumerClient
Constructor
EventHubConsumerClient(fully_qualified_namespace: str, eventhub_name: str, consumer_group: str, credential: CredentialTypes, **kwargs: Any)
Parameters
- fully_qualified_namespace
- str
De volledig gekwalificeerde hostnaam voor de Event Hubs-naamruimte. De indeling van de naamruimte is : .servicebus.windows.net.
- eventhub_name
- str
Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.
- credential
- TokenCredential of AzureSasCredential of AzureNamedKeyCredential
Het referentieobject dat wordt gebruikt voor verificatie, waarmee een bepaalde interface wordt geïmplementeerd voor het ophalen van tokens. Het accepteert , of referentieobjecten EventHubSharedKeyCredentialdie zijn gegenereerd door de azure-identity-bibliotheek en objecten die de methode *get_token(zelf, bereiken) implementeren.
- logging_enable
- bool
Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.
- auth_timeout
- float
De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.
- user_agent
- str
Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.
- retry_total
- int
Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3. De context van retry_total bij het ontvangen is speciaal: de ontvangstmethode wordt geïmplementeerd door een while-loop-aanroepende interne ontvangstmethode in elke iteratie. In het ontvangstscenariogeeft retry_total het aantal nieuwe pogingen na een fout op die is gegenereerd door de interne ontvangstmethode in de while-lus. Als nieuwe pogingen zijn uitgeput, wordt de on_error callback aangeroepen (indien opgegeven) met de foutinformatie. De mislukte interne partitieconsumer wordt gesloten (on_partition_close wordt aangeroepen indien opgegeven) en er wordt een nieuwe interne partitieconsumer gemaakt (on_partition_initialize wordt aangeroepen indien opgegeven) om de ontvangst te hervatten.
- retry_backoff_factor
- float
Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.
- retry_backoff_max
- float
De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).
- retry_mode
- str
Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.
- idle_timeout
- float
Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen verdere activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.
- transport_type
- TransportType
Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.
HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.
- checkpoint_store
- CheckpointStore of None
Een manager die de partitietaakverdeling en controlepuntgegevens opslaat bij het ontvangen van gebeurtenissen. Het controlepuntarchief wordt gebruikt in beide gevallen van ontvangst van alle partities of één partitie. In het laatste geval is taakverdeling niet van toepassing. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard en ontvangt het EventHubConsumerClient-exemplaar gebeurtenissen zonder taakverdeling.
- load_balancing_interval
- float
Wanneer taakverdeling wordt gestart. Dit is het interval in seconden tussen twee taakverdelingsevaluaties. De standaardwaarde is 30 seconden.
- partition_ownership_expiration_interval
- float
Het eigendom van een partitie verloopt na dit aantal seconden. Bij elke taakverdelingsevaluatie wordt de verlooptijd van het eigendom automatisch verlengd. De standaardwaarde is 6 * load_balancing_interval, dat wil zeggen 180 seconden bij gebruik van de standaard load_balancing_interval van 30 seconden.
- load_balancing_strategy
- str of LoadBalancingStrategy
Wanneer taakverdeling wordt gestart, wordt deze strategie gebruikt om het eigendom van de partitie te claimen en te verdelen. Gebruik 'greedy' of LoadBalancingStrategy.GREEDY voor de greedy-strategie, die voor elke taakverdelingsevaluatie zoveel niet-geclaimde partities opneemt die nodig zijn om de taak te verdelen. Gebruik 'balanced' of LoadBalancingStrategy.BALANCED voor de gebalanceerde strategie, die voor elke taakverdelingsevaluatie slechts één partitie claimt die niet wordt geclaimd door andere EventHubConsumerClient. Als alle partities van een EventHub worden geclaimd door andere EventHubConsumerClient en deze client te weinig partities heeft geclaimd, steelt deze client één partitie van andere clients voor elke taakverdelingsevaluatie, ongeacht de taakverdelingsstrategie. Greedy-strategie wordt standaard gebruikt.
Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.
Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.
- uamqp_transport
- bool
Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.
- socket_timeout
- float
De tijd in seconden dat de onderliggende socket op de verbinding moet wachten bij het verzenden en ontvangen van gegevens voordat er een time-out optreedt. De standaardwaarde is 0,2 voor TransportType.Amqp en 1 voor TransportType.AmqpOverWebsocket. Als er EventHubsConnectionError-fouten optreden vanwege een time-out van schrijfbewerkingen, moet mogelijk een grotere waarde dan de standaardwaarde worden doorgegeven. Dit is voor geavanceerde gebruiksscenario's en normaal gesproken moet de standaardwaarde voldoende zijn.
Voorbeelden
Maak een nieuw exemplaar van de EventHubConsumerClient.
import os
from azure.eventhub import EventHubConsumerClient, EventHubSharedKeyCredential
fully_qualified_namespace = os.environ['EVENT_HUB_HOSTNAME']
eventhub_name = os.environ['EVENT_HUB_NAME']
shared_access_policy = os.environ['EVENT_HUB_SAS_POLICY']
shared_access_key = os.environ['EVENT_HUB_SAS_KEY']
credential = EventHubSharedKeyCredential(shared_access_policy, shared_access_key)
consumer = EventHubConsumerClient(
fully_qualified_namespace=fully_qualified_namespace,
eventhub_name=eventhub_name,
consumer_group='$Default',
credential=credential)
Methoden
close |
Stop met het ophalen van gebeurtenissen uit de Event Hub en sluit de onderliggende AMQP-verbinding en koppelingen. |
from_connection_string |
Maak een EventHubConsumerClient op basis van een verbindingsreeks. |
get_eventhub_properties |
Eigenschappen van de Event Hub ophalen. Sleutels in de geretourneerde woordenlijst zijn onder andere:
|
get_partition_ids |
Partitie-id's van de Event Hub ophalen. |
get_partition_properties |
Eigenschappen van de opgegeven partitie ophalen. Sleutels in de eigenschappenwoordenlijst zijn onder andere:
|
receive |
Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten. |
receive_batch |
Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten. |
close
Stop met het ophalen van gebeurtenissen uit de Event Hub en sluit de onderliggende AMQP-verbinding en koppelingen.
close() -> None
Retourtype
Voorbeelden
Sluit de client af.
import os
import threading
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
from azure.eventhub import EventHubConsumerClient
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group="$Default",
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
# The 'receive' method is a blocking call, it can be executed in a thread for
# non-blocking behavior, and combined with the 'close' method.
worker = threading.Thread(
target=consumer.receive,
kwargs={
"on_event": on_event,
"starting_position": "-1", # "-1" is from the beginning of the partition.
}
)
worker.start()
time.sleep(10) # Keep receiving for 10s then close.
# Close down the consumer handler explicitly.
consumer.close()
from_connection_string
Maak een EventHubConsumerClient op basis van een verbindingsreeks.
from_connection_string(conn_str: str, consumer_group: str, **kwargs: Any) -> EventHubConsumerClient
Parameters
- eventhub_name
- str
Het pad van de specifieke Event Hub waarmee de client verbinding moet maken.
- logging_enable
- bool
Of netwerktraceringslogboeken moeten worden uitgevoerd naar de logboekregistratie. De standaardwaarde is False.
- auth_timeout
- float
De tijd in seconden om te wachten tot een token is geautoriseerd door de service. De standaardwaarde is 60 seconden. Als dit is ingesteld op 0, wordt er geen time-out afgedwongen door de client.
- user_agent
- str
Indien opgegeven, wordt deze toegevoegd vóór de tekenreeks van de gebruikersagent.
- retry_total
- int
Het totale aantal pogingen om een mislukte bewerking opnieuw uit te voeren wanneer er een fout optreedt. De standaardwaarde is 3. De context van retry_total bij het ontvangen is speciaal: de ontvangstmethode wordt geïmplementeerd door een while-loop-aanroepende interne ontvangstmethode in elke iteratie. In het ontvangstscenariogeeft retry_total het aantal nieuwe pogingen na een fout op die is gegenereerd door de interne ontvangstmethode in de while-lus. Als nieuwe pogingen zijn uitgeput, wordt de on_error callback aangeroepen (indien opgegeven) met de foutinformatie. De mislukte interne partitieconsumer wordt gesloten (on_partition_close wordt aangeroepen indien opgegeven) en er wordt een nieuwe interne partitieconsumer gemaakt (on_partition_initialize wordt aangeroepen indien opgegeven) om de ontvangst te hervatten.
- retry_backoff_factor
- float
Een uitstelfactor die moet worden toegepast tussen pogingen na de tweede poging (de meeste fouten worden onmiddellijk opgelost door een tweede poging zonder vertraging). In de vaste modus wordt beleid voor opnieuw proberen altijd in de slaapstand gezet voor {backoff factor}. In de exponentiële modus wordt het beleid voor opnieuw proberen in de slaapstand gezet voor: {uitstelfactor} * (2 ** ({aantal totale nieuwe pogingen} - 1)) seconden. Als de backoff_factor 0,1 is, wordt de nieuwe poging in de slaapstand gezet voor [0.0s, 0.2s, 0.4s, ...] tussen nieuwe pogingen. De standaardwaarde is 0,8.
- retry_backoff_max
- float
De maximale verloftijd. De standaardwaarde is 120 seconden (2 minuten).
- retry_mode
- str
Het vertragingsgedrag tussen nieuwe pogingen. Ondersteunde waarden zijn 'vast' of 'exponentieel', waarbij de standaardwaarde 'exponentieel' is.
- idle_timeout
- float
Time-out, in seconden, waarna deze client de onderliggende verbinding sluit als er geen furthur-activiteit is. Standaard is de waarde Geen, wat betekent dat de client niet wordt afgesloten vanwege inactiviteit, tenzij gestart door de service.
- transport_type
- TransportType
Het type transportprotocol dat wordt gebruikt voor de communicatie met de Event Hubs-service. De standaardwaarde is TransportType.Amqp . In dat geval wordt poort 5671 gebruikt. Als poort 5671 niet beschikbaar/geblokkeerd is in de netwerkomgeving, kan TransportType.AmqpOverWebsocket worden gebruikt in plaats daarvan, waarbij poort 443 wordt gebruikt voor communicatie.
- http_proxy
- dict
HTTP-proxyinstellingen. Dit moet een woordenlijst zijn met de volgende sleutels: 'proxy_hostname' (str-waarde) en 'proxy_port' (int-waarde). Daarnaast kunnen de volgende sleutels ook aanwezig zijn: 'gebruikersnaam', 'wachtwoord'.
- checkpoint_store
- CheckpointStore of None
Een manager die de partitietaakverdeling en controlepuntgegevens opslaat bij het ontvangen van gebeurtenissen. Het controlepuntarchief wordt gebruikt in beide gevallen van ontvangst van alle partities of één partitie. In het laatste geval is taakverdeling niet van toepassing. Als er geen controlepuntarchief wordt opgegeven, wordt het controlepunt intern in het geheugen bewaard en ontvangt het EventHubConsumerClient-exemplaar gebeurtenissen zonder taakverdeling.
- load_balancing_interval
- float
Wanneer taakverdeling wordt gestart. Dit is het interval in seconden tussen twee taakverdelingsevaluaties. De standaardwaarde is 10 seconden.
- partition_ownership_expiration_interval
- float
Het eigendom van een partitie verloopt na dit aantal seconden. Bij elke taakverdelingsevaluatie wordt de verlooptijd van het eigendom automatisch verlengd. De standaardwaarde is 6 * load_balancing_interval, dat wil zeggen 60 seconden bij gebruik van de standaard load_balancing_interval van 30 seconden.
- load_balancing_strategy
- str of LoadBalancingStrategy
Wanneer taakverdeling wordt gestart, wordt deze strategie gebruikt om het eigendom van de partitie te claimen en te verdelen. Gebruik 'greedy' of LoadBalancingStrategy.GREEDY voor de greedy-strategie, die voor elke taakverdelingsevaluatie zoveel niet-geclaimde partities opneemt die nodig zijn om de taak te verdelen. Gebruik 'balanced' of LoadBalancingStrategy.BALANCED voor de gebalanceerde strategie, die voor elke taakverdelingsevaluatie slechts één partitie claimt die niet wordt geclaimd door andere EventHubConsumerClient. Als alle partities van een EventHub worden geclaimd door andere EventHubConsumerClient en deze client te weinig partities heeft geclaimd, steelt deze client één partitie van andere clients voor elke taakverdelingsevaluatie, ongeacht de taakverdelingsstrategie. Greedy-strategie wordt standaard gebruikt.
Het aangepaste eindpuntadres dat moet worden gebruikt voor het tot stand brengen van een verbinding met de Event Hubs-service, zodat netwerkaanvragen kunnen worden gerouteerd via toepassingsgateways of andere paden die nodig zijn voor de hostomgeving. De standaardwaarde is Geen. De indeling ziet er als volgt uit: 'sb://< custom_endpoint_hostname>:<custom_endpoint_port>'. Als poort niet is opgegeven in de custom_endpoint_address, wordt standaard poort 443 gebruikt.
Pad naar het aangepaste CA_BUNDLE-bestand van het SSL-certificaat dat wordt gebruikt om de identiteit van het verbindingseindpunt te verifiëren. De standaardwaarde is Geen, in welk geval certifi.where() wordt gebruikt.
- uamqp_transport
- bool
Of u de uamqp-bibliotheek als het onderliggende transport wilt gebruiken. De standaardwaarde is False en de Pure Python AMQP-bibliotheek wordt gebruikt als het onderliggende transport.
Retourtype
Voorbeelden
Maak een nieuw exemplaar van de EventHubConsumerClient vanuit verbindingsreeks.
import os
from azure.eventhub import EventHubConsumerClient
event_hub_connection_str = os.environ['EVENT_HUB_CONN_STR']
eventhub_name = os.environ['EVENT_HUB_NAME']
consumer = EventHubConsumerClient.from_connection_string(
conn_str=event_hub_connection_str,
consumer_group='$Default',
eventhub_name=eventhub_name # EventHub name should be specified if it doesn't show up in connection string.
)
get_eventhub_properties
Eigenschappen van de Event Hub ophalen.
Sleutels in de geretourneerde woordenlijst zijn onder andere:
eventhub_name (str)
created_at (UTC datetime.datetime)
partition_ids (list[str])
get_eventhub_properties() -> Dict[str, Any]
Retouren
Een woordenlijst met informatie over de Event Hub.
Retourtype
Uitzonderingen
get_partition_ids
Partitie-id's van de Event Hub ophalen.
get_partition_ids() -> List[str]
Retouren
Een lijst met partitie-id's.
Retourtype
Uitzonderingen
get_partition_properties
Eigenschappen van de opgegeven partitie ophalen.
Sleutels in de eigenschappenwoordenlijst zijn onder andere:
eventhub_name (str)
id (str)
beginning_sequence_number (int)
last_enqueued_sequence_number (int)
last_enqueued_offset (str)
last_enqueued_time_utc (UTC datetime.datetime)
is_empty (bool)
get_partition_properties(partition_id: str) -> Dict[str, Any]
Parameters
Retouren
Een woordenlijst met partitie-eigenschappen.
Retourtype
Uitzonderingen
receive
Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.
receive(on_event: Callable[[PartitionContext, EventData | None], None], **kwargs: Any) -> None
Parameters
- on_event
- callable[PartitionContext, EventData of None]
De callback-functie voor het verwerken van een ontvangen gebeurtenis. De callback heeft twee parameters: partition_context die partitiecontext bevat en gebeurtenis die de ontvangen gebeurtenis is. De callback-functie moet als volgt worden gedefinieerd: on_event(partition_context, gebeurtenis). Raadpleeg PartitionContextvoor gedetailleerde informatie over de partitiecontext.
- max_wait_time
- float
Het maximale interval in seconden dat de gebeurtenisprocessor wacht voordat de callback wordt aangeroepen. Als er binnen dit interval geen gebeurtenissen worden ontvangen, wordt de on_event callback aangeroepen met Geen. Als deze waarde is ingesteld op Geen of 0 (de standaardinstelling), wordt de callback pas aangeroepen als er een gebeurtenis is ontvangen.
- partition_id
- str
Indien opgegeven, ontvangt de client alleen van deze partitie. Anders ontvangt de client van alle partities.
- owner_level
- int
De prioriteit voor een exclusieve consument. Er wordt een exclusieve consument gemaakt als owner_level is ingesteld. Een consument met een hogere owner_level heeft een hogere exclusieve prioriteit. Het eigenaarsniveau wordt ook wel de 'epoch-waarde' van de consument genoemd.
- prefetch
- int
Het aantal gebeurtenissen dat vooraf moet worden opgehaald uit de service voor verwerking. De standaardwaarde is 300.
- track_last_enqueued_event_properties
- bool
Hiermee wordt aangegeven of de consument informatie moet aanvragen over de laatste enqueued-gebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen. Wanneer informatie over de laatste enqueued-gebeurtenis van de partities wordt bijgehouden, bevat elke gebeurtenis die van de Event Hubs-service wordt ontvangen metagegevens over de partitie. Dit resulteert in een klein extra verbruik van netwerkbandbreedte. Dit is over het algemeen een gunstige afweging wanneer wordt overwogen tegen het periodiek indienen van aanvragen voor partitie-eigenschappen met behulp van de Event Hub-client. Deze is standaard ingesteld op False .
Begin met ontvangen van deze gebeurtenispositie als er geen controlepuntgegevens voor een partitie zijn. Controlepuntgegevens worden gebruikt, indien beschikbaar. Dit kan een dict zijn met partitie-id als de sleutel en positie als de waarde voor afzonderlijke partities, of één waarde voor alle partities. Het waardetype kan str, int of datetime.datetime zijn. Ook worden de waarden '-1' ondersteund voor het ontvangen vanaf het begin van de stream en '@latest' voor het ontvangen van alleen nieuwe gebeurtenissen. De standaardwaarde is '@latest'.
Bepaal of de opgegeven starting_position inclusief(>=) of niet () is.> Waar voor inclusief en Onwaar voor exclusief. Dit kan een dict zijn met partitie-id als de sleutel en bool als de waarde die aangeeft of de starting_position voor een specifieke partitie inclusief is of niet. Dit kan ook één boolwaarde zijn voor alle starting_position. De standaardwaarde is False.
- on_error
- callable[[PartitionContext, Exception]]
De callback-functie die wordt aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput of tijdens het taakverdelingsproces. De callback heeft twee parameters: partition_context met partitiegegevens en fout als uitzondering. partition_context kan Geen zijn als de fout optreedt tijdens het taakverdelingsproces. De callback moet als volgt worden gedefinieerd: on_error(partition_context, fout). De on_error callback wordt ook aangeroepen als er een onverwerkte uitzondering wordt gegenereerd tijdens de on_event callback.
- on_partition_initialize
- callable[[PartitionContext]]
De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie de initialisatie heeft voltooid. Deze wordt ook aangeroepen wanneer een nieuwe interne partitieconsumer wordt gemaakt om het ontvangende proces over te nemen voor een gebruiker van een mislukte en gesloten interne partitie. De callback neemt één parameter: partition_context die de partitiegegevens bevat. De callback moet als volgt worden gedefinieerd: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie is gesloten. Het wordt ook aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput. De callback heeft twee parameters: partition_context die partitiegegevens en reden voor de sluiting bevat. De callback moet als volgt worden gedefinieerd: on_partition_close(partition_context, reden). Raadpleeg CloseReason voor de verschillende afsluitende redenen.
Retourtype
Voorbeelden
Gebeurtenissen ontvangen van de EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event(partition_context, event):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received event from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive(on_event=on_event)
receive_batch
Gebeurtenissen ontvangen van partities, met optionele taakverdeling en controlepunten.
receive_batch(on_event_batch: Callable[[PartitionContext, List[EventData]], None], **kwargs: Any) -> None
Parameters
- on_event_batch
- callable[PartitionContext, list[EventData]]
De callback-functie voor het verwerken van een batch met ontvangen gebeurtenissen. De callback heeft twee parameters: partition_context die partitiecontext bevat en event_batch, de ontvangen gebeurtenissen. De callback-functie moet als volgt worden gedefinieerd: on_event_batch(partition_context, event_batch). event_batch kan een lege lijst zijn als max_wait_time geen of geen 0 is en er geen gebeurtenis wordt ontvangen na max_wait_time. Raadpleeg PartitionContextvoor gedetailleerde informatie over de partitiecontext.
- max_batch_size
- int
Het maximum aantal gebeurtenissen in een batch dat is doorgegeven aan callback on_event_batch. Als het werkelijke aantal ontvangen gebeurtenissen groter is dan max_batch_size, worden de ontvangen gebeurtenissen onderverdeeld in batches en wordt de callback voor elke batch aangeroepen met maximaal max_batch_size gebeurtenissen.
- max_wait_time
- float
Het maximale interval in seconden dat de gebeurtenisprocessor wacht voordat de callback wordt aangeroepen. Als er binnen dit interval geen gebeurtenissen worden ontvangen, wordt de on_event_batch callback aangeroepen met een lege lijst.
- partition_id
- str
Indien opgegeven, ontvangt de client alleen van deze partitie. Anders ontvangt de client van alle partities.
- owner_level
- int
De prioriteit voor een exclusieve consument. Er wordt een exclusieve consument gemaakt als owner_level is ingesteld. Een consument met een hogere owner_level heeft een hogere exclusieve prioriteit. Het eigenaarsniveau wordt ook wel de 'epoch-waarde' van de consument genoemd.
- prefetch
- int
Het aantal gebeurtenissen dat vooraf moet worden opgehaald uit de service voor verwerking. De standaardwaarde is 300.
- track_last_enqueued_event_properties
- bool
Hiermee wordt aangegeven of de consument informatie moet aanvragen over de laatste enqueued-gebeurtenis op de bijbehorende partitie en die informatie moet bijhouden wanneer gebeurtenissen worden ontvangen. Wanneer informatie over de laatste enqueued-gebeurtenis van de partities wordt bijgehouden, bevat elke gebeurtenis die van de Event Hubs-service wordt ontvangen metagegevens over de partitie. Dit resulteert in een klein extra verbruik van netwerkbandbreedte. Dit is over het algemeen een gunstige afweging wanneer wordt overwogen tegen het periodiek indienen van aanvragen voor partitie-eigenschappen met behulp van de Event Hub-client. Deze is standaard ingesteld op False .
Begin met ontvangen van deze gebeurtenispositie als er geen controlepuntgegevens voor een partitie zijn. Controlepuntgegevens worden gebruikt, indien beschikbaar. Dit kan een dict zijn met partitie-id als de sleutel en positie als de waarde voor afzonderlijke partities, of één waarde voor alle partities. Het waardetype kan str, int of datetime.datetime zijn. Ook worden de waarden '-1' ondersteund voor het ontvangen vanaf het begin van de stream en '@latest' voor het ontvangen van alleen nieuwe gebeurtenissen. De standaardwaarde is '@latest'.
Bepaal of de opgegeven starting_position inclusief(>=) of niet () is.> Waar voor inclusief en Onwaar voor exclusief. Dit kan een dict zijn met partitie-id als de sleutel en bool als de waarde die aangeeft of de starting_position voor een specifieke partitie inclusief is of niet. Dit kan ook één boolwaarde zijn voor alle starting_position. De standaardwaarde is False.
- on_error
- callable[[PartitionContext, Exception]]
De callback-functie die wordt aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput of tijdens het taakverdelingsproces. De callback heeft twee parameters: partition_context met partitiegegevens en fout als uitzondering. partition_context kan Geen zijn als de fout optreedt tijdens het taakverdelingsproces. De callback moet als volgt worden gedefinieerd: on_error(partition_context, fout). De on_error callback wordt ook aangeroepen als er een onverwerkte uitzondering wordt gegenereerd tijdens de on_event callback.
- on_partition_initialize
- callable[[PartitionContext]]
De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie de initialisatie heeft voltooid. Deze wordt ook aangeroepen wanneer een nieuwe interne partitieconsumer wordt gemaakt om het ontvangende proces over te nemen voor een gebruiker van een mislukte en gesloten interne partitie. De callback neemt één parameter: partition_context die de partitiegegevens bevat. De callback moet als volgt worden gedefinieerd: on_partition_initialize(partition_context).
- on_partition_close
- callable[[PartitionContext, CloseReason]]
De callback-functie die wordt aangeroepen nadat een consument voor een bepaalde partitie is gesloten. Het wordt ook aangeroepen wanneer er een fout optreedt tijdens het ontvangen nadat nieuwe pogingen zijn uitgeput. De callback heeft twee parameters: partition_context die partitiegegevens en reden voor de sluiting bevat. De callback moet als volgt worden gedefinieerd: on_partition_close(partition_context, reden). Raadpleeg CloseReason voor de verschillende afsluitende redenen.
Retourtype
Voorbeelden
Gebeurtenissen in batches ontvangen van de EventHub.
logger = logging.getLogger("azure.eventhub")
def on_event_batch(partition_context, event_batch):
# Put your code here.
# If the operation is i/o intensive, multi-thread will have better performance.
logger.info("Received events from partition: {}".format(partition_context.partition_id))
with consumer:
consumer.receive_batch(on_event_batch=on_event_batch)
Azure SDK for Python
Feedback
https://aka.ms/ContentUserFeedback.
Binnenkort beschikbaar: In de loop van 2024 zullen we GitHub-problemen geleidelijk uitfaseren als het feedbackmechanisme voor inhoud en deze vervangen door een nieuw feedbacksysteem. Zie voor meer informatie:Feedback verzenden en weergeven voor