你当前正在访问 Microsoft Azure Global Edition 技术文档网站。 如果需要访问由世纪互联运营的 Microsoft Azure 中国技术文档网站,请访问 https://docs.azure.cn

ServiceBusReceiver 类

ServiceBusReceiver 类定义了一个高级接口,用于从Azure 服务总线队列或主题订阅接收消息。

消息接收的两个主要通道是 receive () ,用于对消息发出单个请求, 以及接收方中的消息: 以持续的方式持续接收传入消息。

请使用 get_<queue/subscription>_receiver ~azure.servicebus.ServiceBusClient 方法创建 ServiceBusReceiver 实例。

继承
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

构造函数

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

参数

fully_qualified_namespace
str
必需

服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net

credential
TokenCredentialAzureSasCredentialAzureNamedKeyCredential
必需

用于身份验证的凭据对象,该对象实现用于获取令牌的特定接口。 它接受 azure 标识库生成的凭据对象,以及实现 *get_token (self、 scopes) 方法的对象,或者也可以提供 AzureSasCredential。

queue_name
str

客户端连接到的特定服务总线队列的路径。

topic_name
str

包含客户端连接到的订阅的特定服务总线主题的路径。

subscription_name
str

客户端连接到的指定主题下的特定服务总线订阅的路径。

max_wait_time
Optional[float]

接收消息之间的超时(以秒为单位),在此之后接收方将自动停止接收。 默认值为 None,表示无超时。

receive_mode
Union[ServiceBusReceiveMode, str]

从实体检索消息的模式。 这两个选项是PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的消息必须在给定的锁定期内解决,然后才能从队列中删除消息。 使用 RECEIVE_AND_DELETE 接收的消息将立即从队列中删除,如果客户端无法处理消息,则无法随后放弃或重新接收消息。 默认模式为PEEK_LOCK。

logging_enable
bool

是否将网络跟踪日志输出到记录器。 默认值为 False。

transport_type
TransportType

将用于与服务总线服务通信的传输协议的类型。 默认值为 TransportType.Amqp

http_proxy
Dict

HTTP 代理设置。 这必须是具有以下键的字典: “proxy_hostname” (str 值) 和 “proxy_port” (int 值) 。 此外,还可能存在以下密钥:“username”、“password”。

user_agent
str

如果指定,则会将其添加到内置用户代理字符串的前面。

auto_lock_renewer
Optional[AutoLockRenewer]

可以提供 ~azure.servicebus.AutoLockRenewer,以便消息在收到时自动注册。 如果接收方是会话接收器,它将改为应用于会话。

prefetch_count
int

每次向服务发出请求时要缓存的最大消息数。 此设置仅用于高级性能优化。 增大此值将提高消息吞吐量性能,但如果处理速度不够快,则会增加消息在缓存时过期的可能性。 默认值为 0,这意味着将从服务接收消息,并一次处理一个消息。 如果prefetch_count为 0,ServiceBusReceiver.receive 将尝试缓存 max_message_count ((如果) 请求中向服务提供)。

client_identifier
str

用于唯一标识客户端实例的基于字符串的标识符。 服务总线会将其与一些错误消息相关联,以便更轻松地关联错误。 如果未指定,将生成唯一 ID。

socket_timeout
float

在超时之前,连接上的基础套接字在发送和接收数据时应等待的时间(以秒为单位)。对于 TransportType.Amqp,默认值为 0.2,对于 TransportType.AmqpOverWebsocket,默认值为 1。 如果由于写入超时而发生连接错误,则可能需要传入大于默认值的值。

变量

fully_qualified_namespace
str

服务总线命名空间的完全限定主机名。 命名空间格式为: .servicebus.windows.net

entity_path
str

客户端连接到的实体的路径。

方法

abandon_message

放弃消息。

此消息将返回到队列,并可供再次接收。

close
complete_message

完成消息。

这会从队列中删除消息。

dead_letter_message

将消息移动到死信队列。

死信队列是一个子队列,可用于存储无法正确处理的消息,或者需要进一步检查或处理的消息。 还可以将队列配置为将过期的消息发送到死信队列。

defer_message

延迟消息。

此消息将保留在队列中,但必须按其序列号专门请求才能接收。

next
peek_messages

浏览队列中当前挂起的消息。

不会从队列中删除速览消息,也不会将其锁定。 它们不能完成、延迟或死信。

receive_deferred_messages

接收以前已延迟的消息。

从分区实体接收延迟的消息时,提供的所有序列号必须是来自同一分区的消息。

receive_messages

一次接收一批消息。

如果要同时处理多条消息,或者以单个调用的形式执行即席接收,则此方法是最佳选择。

请注意,在单个批处理中检索的消息数取决于是否为接收方设置了 prefetch_count 。 如果未为接收方设置 prefetch_count ,则接收方将尝试缓存max_message_count ((如果向服务提供) 请求中的消息)。

此调用优先于满足指定批大小的快速返回,因此在收到至少一条消息并且传入消息中存在间隙时,无论指定的批大小如何,都将立即返回。

renew_message_lock

续订消息锁。

这将保持消息的锁定,以确保不会将其返回到队列中以重新处理。

若要完成 (或以其他方式解决消息) ,必须维护锁,并且不能已过期:无法续订过期的锁。

通过RECEIVE_AND_DELETE模式接收的消息未锁定,因此无法续订。 此操作也仅适用于非会话消息。

abandon_message

放弃消息。

此消息将返回到队列,并可供再次接收。

abandon_message(message: ServiceBusReceivedMessage) -> None

参数

message
ServiceBusReceivedMessage
必需

要放弃的接收消息。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

放弃收到的消息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

完成消息。

这会从队列中删除消息。

complete_message(message: ServiceBusReceivedMessage) -> None

参数

message
ServiceBusReceivedMessage
必需

要完成的接收消息。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

完成收到的消息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

将消息移动到死信队列。

死信队列是一个子队列,可用于存储无法正确处理的消息,或者需要进一步检查或处理的消息。 还可以将队列配置为将过期的消息发送到死信队列。

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

参数

message
ServiceBusReceivedMessage
必需

接收的死信消息。

reason
Optional[str]
默认值: None

死信消息的原因。

error_description
Optional[str]
默认值: None

死信消息的详细错误说明。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

收到的消息的死信。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

延迟消息。

此消息将保留在队列中,但必须按其序列号专门请求才能接收。

defer_message(message: ServiceBusReceivedMessage) -> None

参数

message
ServiceBusReceivedMessage
必需

要延迟的接收消息。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

延迟收到的消息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

浏览队列中当前挂起的消息。

不会从队列中删除速览消息,也不会将其锁定。 它们不能完成、延迟或死信。

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

参数

max_message_count
int
默认值: 1

要尝试查看的最大消息数。 默认值为 1。

sequence_number
int

从中开始浏览消息的消息序列号。

timeout
Optional[float]

总操作超时时间(秒)包括所有重试次数。 如果指定,该值必须大于 0。 默认值为 None,表示无超时。

返回

~azure.servicebus.ServiceBusReceivedMessage 的列表。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

查看队列中的挂起消息。


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

接收以前已延迟的消息。

从分区实体接收延迟的消息时,提供的所有序列号必须是来自同一分区的消息。

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

参数

sequence_numbers
Union[int,List[int]]
必需

已延迟的消息的序列号列表。

timeout
Optional[float]

总操作超时时间(秒)包括所有重试次数。 如果指定,该值必须大于 0。 默认值为 None,表示无超时。

返回

请求的 ~azure.servicebus.ServiceBusReceivedMessage 实例的列表。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

从 ServiceBus 接收延迟的消息。


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

一次接收一批消息。

如果要同时处理多条消息,或者以单个调用的形式执行即席接收,则此方法是最佳选择。

请注意,在单个批处理中检索的消息数取决于是否为接收方设置了 prefetch_count 。 如果未为接收方设置 prefetch_count ,则接收方将尝试缓存max_message_count ((如果向服务提供) 请求中的消息)。

此调用优先于满足指定批大小的快速返回,因此在收到至少一条消息并且传入消息中存在间隙时,无论指定的批大小如何,都将立即返回。

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

参数

max_message_count
Optional[int]
默认值: 1

批中的最大消息数。 返回的实际数量取决于prefetch_count和传入流速率。 设置为“无”将完全取决于预提取配置。默认值为 1。

max_wait_time
Optional[float]
默认值: None

等待第一条消息到达的最长时间(以秒为单位)。 如果没有消息到达,并且未指定超时,则此调用在连接关闭之前不会返回。 如果指定,则超时期限内没有消息到达,将返回空列表。

返回

收到的消息列表。 如果没有可用的消息,则此列表为空列表。

返回类型

例外

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

示例

从 ServiceBus 接收消息。


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

续订消息锁。

这将保持消息的锁定,以确保不会将其返回到队列中以重新处理。

若要完成 (或以其他方式解决消息) ,必须维护锁,并且不能已过期:无法续订过期的锁。

通过RECEIVE_AND_DELETE模式接收的消息未锁定,因此无法续订。 此操作也仅适用于非会话消息。

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

参数

message
ServiceBusReceivedMessage
必需

要为其续订锁的消息。

timeout
Optional[float]

总操作超时时间(秒)包括所有重试次数。 如果指定,该值必须大于 0。 默认值为 None,表示无超时。

返回

锁定设置为过期的 utc 日期时间。

返回类型

例外

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

示例

对收到的消息续订锁。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

属性

client_identifier

获取与接收方实例关联的 ServiceBusReceiver client_identifier。

返回类型

str

session

获取与接收方链接的 ServiceBusSession 对象。 会话仅适用于已启用会话的实体,如果在非会话接收方上调用,它将返回 None。

返回类型

示例

从接收方获取会话


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session