ServiceBusReceiver Class
The ServiceBusReceiver class defines a high level interface for receiving messages from the Azure Service Bus Queue or Topic Subscription.
The two primary channels for message receipt are receive() to make a single request for messages, and async for message in receiver: to continuously receive incoming messages in an ongoing fashion.
Please use the get_<queue/subscription>_receiver method of ~azure.servicebus.aio.ServiceBusClient to create a
ServiceBusReceiver instance.
- Inheritance
-
ServiceBusReceiverazure.servicebus.aio._base_handler_async.BaseHandlerServiceBusReceiverazure.servicebus._common.receiver_mixins.ReceiverMixinServiceBusReceiver
Constructor
ServiceBusReceiver(fully_qualified_namespace: str, credential: Union[AsyncTokenCredential, azure.core.credentials.AzureSasCredential, azure.core.credentials.AzureNamedKeyCredential], **kwargs: Any)
Parameters
- fully_qualified_namespace
- str
The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.
- credential
- AsyncTokenCredential or AzureSasCredential or AzureNamedKeyCredential
The credential object used for authentication which implements a particular interface for getting tokens. It accepts credential objects generated by the azure-identity library and objects that implement the *get_token(self, scopes) method, or alternatively, an AzureSasCredential can be provided too.
- queue_name
- str
The path of specific Service Bus Queue the client connects to.
- topic_name
- str
The path of specific Service Bus Topic which contains the Subscription the client connects to.
- subscription_name
- str
The path of specific Service Bus Subscription under the specified Topic the client connects to.
- receive_mode
- <xref:Union>[ServiceBusReceiveMode, str]
The mode with which messages will be retrieved from the entity. The two options are PEEK_LOCK and RECEIVE_AND_DELETE. Messages received with PEEK_LOCK must be settled within a given lock period before they will be removed from the queue. Messages received with RECEIVE_AND_DELETE will be immediately removed from the queue, and cannot be subsequently abandoned or re-received if the client fails to process the message. The default mode is PEEK_LOCK.
- max_wait_time
- <xref:Optional>[float]
The timeout in seconds between received messages after which the receiver will automatically stop receiving. The default value is None, meaning no timeout.
- logging_enable
- bool
Whether to output network trace logs to the logger. Default is False.
- transport_type
- TransportType
The type of transport protocol that will be used for communicating with the Service Bus service. Default is TransportType.Amqp.
- http_proxy
- dict
HTTP proxy settings. This must be a dictionary with the following keys: 'proxy_hostname' (str value) and 'proxy_port' (int value). Additionally the following keys may also be present: 'username', 'password'.
- user_agent
- str
If specified, this will be added in front of the built-in user agent string.
- auto_lock_renewer
- <xref:Optional>[AutoLockRenewer]
An ~azure.servicebus.aio.AutoLockRenewer can be provided such that messages are automatically registered on receipt. If the receiver is a session receiver, it will apply to the session instead.
- prefetch_count
- int
The maximum number of messages to cache with each request to the service. This setting is only for advanced performance tuning. Increasing this value will improve message throughput performance but increase the chance that messages will expire while they are cached if they're not processed fast enough. The default value is 0, meaning messages will be received from the service and processed one at a time. In the case of prefetch_count being 0, ServiceBusReceiver.receive would try to cache max_message_count (if provided) within its request to the service.
Variables
- fully_qualified_namespace
- str
The fully qualified host name for the Service Bus namespace. The namespace format is: .servicebus.windows.net.
- entity_path
- str
The path of the entity that the client connects to.
Methods
| abandon_message |
Abandon the message. This message will be returned to the queue and made available to be received again. |
| close |
Close down the handler connection. If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error. |
| complete_message |
Complete the message. This removes the message from the queue. |
| dead_letter_message |
Move the message to the Dead Letter queue. The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue. |
| defer_message |
Defers the message. This message will remain in the queue but must be requested specifically by its sequence number in order to be received. |
| peek_messages |
Browse messages currently pending in the queue. Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered. |
| receive_deferred_messages |
Receive messages that have previously been deferred. When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition. |
| receive_messages |
Receive a batch of messages at once. This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call. Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service. This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size. |
| renew_message_lock |
Renew the message lock. This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed. In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed. Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well. |
abandon_message
Abandon the message.
This message will be returned to the queue and made available to be received again.
async abandon_message(message)
Parameters
Return type
Exceptions
Examples
Abandon a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.abandon_message(message)
close
Close down the handler connection.
If the handler has already closed, this operation will do nothing. An optional exception can be passed in to indicate that the handler was shutdown due to error.
async close() -> None
Return type
Exceptions
complete_message
Complete the message.
This removes the message from the queue.
async complete_message(message)
Parameters
Return type
Exceptions
Examples
Complete a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.complete_message(message)
dead_letter_message
Move the message to the Dead Letter queue.
The Dead Letter queue is a sub-queue that can be used to store messages that failed to process correctly, or otherwise require further inspection or processing. The queue can also be configured to send expired messages to the Dead Letter queue.
async dead_letter_message(message, reason=None, error_description=None)
Parameters
- error_description
- <xref:Optional>[str]
The detailed error description for dead-lettering the message.
Return type
Exceptions
Examples
Dead letter a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.dead_letter_message(message)
defer_message
Defers the message.
This message will remain in the queue but must be requested specifically by its sequence number in order to be received.
async defer_message(message)
Parameters
Return type
Exceptions
Examples
Defer a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.defer_message(message)
peek_messages
Browse messages currently pending in the queue.
Peeked messages are not removed from queue, nor are they locked. They cannot be completed, deferred or dead-lettered.
async peek_messages(max_message_count: int = 1, **kwargs: Any) -> List[azure.servicebus._common.message.ServiceBusReceivedMessage]
Parameters
- max_message_count
- int
The maximum number of messages to try and peek. The default value is 1.
- sequence_number
- int
A message sequence number from which to start browsing messages.
- timeout
- <xref:Optional>[float]
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
Return type
Exceptions
Examples
Peek messages in the queue.
async with servicebus_receiver:
messages = await servicebus_receiver.peek_messages()
for message in messages:
print(str(message))
receive_deferred_messages
Receive messages that have previously been deferred.
When receiving deferred messages from a partitioned entity, all of the supplied sequence numbers must be messages from the same partition.
async receive_deferred_messages(sequence_numbers: Union[int, List[int]], **kwargs: Any) -> List[azure.servicebus._common.message.ServiceBusReceivedMessage]
Parameters
- list[int]] sequence_numbers
- <xref:Union>[<xref:int,>
A list of the sequence numbers of messages that have been deferred.
- timeout
- <xref:Optional>[float]
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
Return type
Exceptions
Examples
Receive deferred messages from ServiceBus.
async with servicebus_receiver:
deferred_sequenced_numbers = []
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
deferred_sequenced_numbers.append(message.sequence_number)
print(str(message))
await servicebus_receiver.defer_message(message)
received_deferred_msg = await servicebus_receiver.receive_deferred_messages(
sequence_numbers=deferred_sequenced_numbers
)
for message in received_deferred_msg:
await servicebus_receiver.complete_message(message)
receive_messages
Receive a batch of messages at once.
This approach is optimal if you wish to process multiple messages simultaneously, or perform an ad-hoc receive as a single call.
Note that the number of messages retrieved in a single batch will be dependent on whether prefetch_count was set for the receiver. If prefetch_count is not set for the receiver, the receiver would try to cache max_message_count (if provided) messages within the request to the service.
This call will prioritize returning quickly over meeting a specified batch size, and so will return as soon as at least one message is received and there is a gap in incoming messages regardless of the specified batch size.
async receive_messages(max_message_count: Optional[int] = 1, max_wait_time: Optional[float] = None) -> List[azure.servicebus._common.message.ServiceBusReceivedMessage]
Parameters
- max_message_count
- <xref:Optional>[int]
Maximum number of messages in the batch. Actual number returned will depend on prefetch_count size and incoming stream rate. Setting to None will fully depend on the prefetch config. The default value is 1.
- max_wait_time
- <xref:Optional>[float]
Maximum time to wait in seconds for the first message to arrive. If no messages arrive, and no timeout is specified, this call will not return until the connection is closed. If specified, and no messages arrive within the timeout period, an empty list will be returned.
Return type
Exceptions
Examples
Receive messages from ServiceBus.
async with servicebus_receiver:
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
print(str(message))
await servicebus_receiver.complete_message(message)
renew_message_lock
Renew the message lock.
This will maintain the lock on the message to ensure it is not returned to the queue to be reprocessed.
In order to complete (or otherwise settle) the message, the lock must be maintained, and cannot already have expired; an expired lock cannot be renewed.
Messages received via RECEIVE_AND_DELETE mode are not locked, and therefore cannot be renewed. This operation is only available for non-sessionful messages as well.
async renew_message_lock(message: azure.servicebus._common.message.ServiceBusReceivedMessage, **kwargs: Any) -> datetime.datetime
Parameters
- timeout
- <xref:Optional>[float]
The total operation timeout in seconds including all the retries. The value must be greater than 0 if specified. The default value is None, meaning no timeout.
Returns
The utc datetime the lock is set to expire at.
Return type
Exceptions
Examples
Renew the lock on a received message.
messages = await servicebus_receiver.receive_messages(max_wait_time=5)
for message in messages:
await servicebus_receiver.renew_message_lock(message)
Attributes
session
Get the ServiceBusSession object linked with the receiver. Session is only available to session-enabled entities, it would return None if called on a non-sessionful receiver.
Return type
Examples
Get session from a receiver
async with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
session = receiver.session
Saran dan Komentar
Kirim dan lihat umpan balik untuk