Share via


ServiceBusReceiver 類別

ServiceBusReceiver 類別會定義從Azure 服務匯流排佇列或主題訂用帳戶接收訊息的高階介面。

訊息回條的兩個主要通道是 receive () 來提出訊息的單一要求,以及 接收者中的訊息: 以持續的方式持續接收傳入訊息。

請使用 get_<queue/subscription>_receiver ~azure.servicebus.ServiceBusClient 的 方法來建立 ServiceBusReceiver 實例。

繼承
azure.servicebus._base_handler.BaseHandler
ServiceBusReceiver
azure.servicebus._common.receiver_mixins.ReceiverMixin
ServiceBusReceiver

建構函式

ServiceBusReceiver(fully_qualified_namespace: str, credential: TokenCredential | AzureSasCredential | AzureNamedKeyCredential, *, queue_name: str | None = None, topic_name: str | None = None, subscription_name: str | None = None, receive_mode: ServiceBusReceiveMode | str = ServiceBusReceiveMode.PEEK_LOCK, max_wait_time: float | None = None, auto_lock_renewer: AutoLockRenewer | None = None, prefetch_count: int = 0, **kwargs: Any)

參數

fully_qualified_namespace
str
必要

服務匯流排命名空間的完整主機名稱。 命名空間格式為: .servicebus.windows.net

credential
TokenCredentialAzureSasCredentialAzureNamedKeyCredential
必要

用於驗證的認證物件,會實作用於取得權杖的特定介面。 它接受 azure-identity 程式庫所產生的認證物件,以及實作 *get_token (自我、 範圍) 方法的物件,或者也可以提供 AzureSasCredential。

queue_name
str

用戶端所連線的特定服務匯流排佇列路徑。

topic_name
str

包含用戶端所連線之訂用帳戶的特定服務匯流排主題路徑。

subscription_name
str

用戶端所連接之指定主題下的特定服務匯流排訂用帳戶路徑。

max_wait_time
Optional[float]

接收的訊息之間的逾時,之後接收者會自動停止接收。 預設值為 None,表示沒有逾時。

receive_mode
Union[ServiceBusReceiveMode, str]

從實體擷取訊息的模式。 這兩個選項PEEK_LOCK和RECEIVE_AND_DELETE。 使用 PEEK_LOCK 接收的訊息必須在指定的鎖定期間內解決,才能從佇列中移除訊息。 使用 RECEIVE_AND_DELETE 接收的訊息將會立即從佇列中移除,如果用戶端無法處理訊息,則無法後續放棄或重新接收。 預設模式為 PEEK_LOCK。

logging_enable
bool

是否要將網路追蹤記錄輸出至記錄器。 預設值為 False

transport_type
TransportType

將用於與服務匯流排服務通訊的傳輸通訊協定類型。 預設值為 TransportType.Amqp

http_proxy
Dict

HTTP Proxy 設定。 這必須是具有下列索引鍵的字典: 'proxy_hostname' ( str 值) 和 'proxy_port' (int 值) 。 此外,也可能會出現下列金鑰: 'username'、'password'

user_agent
str

如果指定,這會新增到內建使用者代理程式字串前面。

auto_lock_renewer
Optional[AutoLockRenewer]

您可以提供 ~azure.servicebus.AutoLockRenewer,讓訊息在回條時自動註冊。 如果接收者是會話接收者,則會改為套用至會話。

prefetch_count
int

每個對服務的要求要快取的訊息數目上限。 此設定僅適用于進階效能微調。 增加此值可改善訊息輸送量效能,但如果訊息處理速度不夠快,則會增加訊息在快取時到期的機會。 預設值為 0,這表示訊息會從服務接收,並一次處理一個。 如果prefetch_count為 0,ServiceBusReceiver.receive 會嘗試快取max_message_count (,如果服務的要求內) 。

client_identifier
str

用來唯一識別用戶端實例的字串型識別碼。 服務匯流排會將它與某些錯誤訊息產生關聯,以便更輕鬆地相互關聯錯誤。 如果未指定,則會產生唯一識別碼。

socket_timeout
float

連接上基礎通訊端在傳送和接收資料之前應該等候的時間,以秒為單位。TransportType.Amqp 的預設值為 0.2,而 TransportType.AmqpOverWebsocket 的預設值為 1。 如果因為寫入逾時而發生連線錯誤,可能需要傳入大於預設值。

變數

fully_qualified_namespace
str

服務匯流排命名空間的完整主機名稱。 命名空間格式為: .servicebus.windows.net

entity_path
str

用戶端所連線之實體的路徑。

方法

abandon_message

放棄訊息。

此訊息會傳回至佇列,並可供再次接收。

close
complete_message

完成訊息。

這會從佇列中移除訊息。

dead_letter_message

將訊息移至寄不出的信件佇列。

寄不出的信件佇列是一個子佇列,可用來儲存無法正確處理的訊息,否則需要進一步檢查或處理。 佇列也可以設定為將過期的訊息傳送至寄不出的信件佇列。

defer_message

延遲訊息。

此訊息會保留在佇列中,但必須透過其序號特別要求,才能接收。

next
peek_messages

流覽佇列中目前擱置中的訊息。

查看的訊息不會從佇列中移除,也不會遭到鎖定。 無法完成、延遲或寄不出的信件。

receive_deferred_messages

接收先前已延遲的訊息。

從分割實體接收延後訊息時,所有提供的序號都必須是來自相同資料分割的訊息。

receive_messages

一次接收一批訊息。

如果您想要同時處理多個訊息,或以單一呼叫的形式執行臨機操作接收,此方法是最佳的方法。

請注意,在單一批次中擷取的訊息數目將取決於是否已為接收者設定 prefetch_count 。 如果未為接收者設定 prefetch_count ,接收者會嘗試快取max_message_count (,如果在要求內提供) 訊息給服務。

不論指定的批次大小為何,此呼叫都會優先傳回,因此在收到至少一則訊息時,就會立即傳回,而傳入訊息會有間距。不論指定的批次大小為何。

renew_message_lock

更新訊息鎖定。

這會維護訊息的鎖定,以確保不會傳回要重新處理的佇列。

若要完成 (或解決訊息) ,必須維護鎖定,而且不能已經過期;無法更新過期的鎖定。

透過 RECEIVE_AND_DELETE 模式收到的訊息不會鎖定,因此無法更新。 這項作業也適用于非會話訊息。

abandon_message

放棄訊息。

此訊息會傳回至佇列,並可供再次接收。

abandon_message(message: ServiceBusReceivedMessage) -> None

參數

message
ServiceBusReceivedMessage
必要

要放棄的已接收訊息。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

放棄收到的訊息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.abandon_message(message)

close

close() -> None

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

complete_message

完成訊息。

這會從佇列中移除訊息。

complete_message(message: ServiceBusReceivedMessage) -> None

參數

message
ServiceBusReceivedMessage
必要

要完成的已接收訊息。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

完成收到的訊息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.complete_message(message)

dead_letter_message

將訊息移至寄不出的信件佇列。

寄不出的信件佇列是一個子佇列,可用來儲存無法正確處理的訊息,否則需要進一步檢查或處理。 佇列也可以設定為將過期的訊息傳送至寄不出的信件佇列。

dead_letter_message(message: ServiceBusReceivedMessage, reason: str | None = None, error_description: str | None = None) -> None

參數

message
ServiceBusReceivedMessage
必要

要寄不出的訊息。

reason
Optional[str]
預設值: None

寄不出的信件訊息的原因。

error_description
Optional[str]
預設值: None

寄不出的信件訊息的詳細錯誤描述。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

已接收的訊息寄不出的信件。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.dead_letter_message(message)

defer_message

延遲訊息。

此訊息會保留在佇列中,但必須透過其序號特別要求,才能接收。

defer_message(message: ServiceBusReceivedMessage) -> None

參數

message
ServiceBusReceivedMessage
必要

要延遲的接收訊息。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

延遲收到的訊息。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.defer_message(message)

next

next()

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

peek_messages

流覽佇列中目前擱置中的訊息。

查看的訊息不會從佇列中移除,也不會遭到鎖定。 無法完成、延遲或寄不出的信件。

peek_messages(max_message_count: int = 1, *, sequence_number: int = 0, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

參數

max_message_count
int
預設值: 1

要嘗試和查看的訊息數目上限。 預設值為 1。

sequence_number
int

要從中開始流覽訊息的訊息序號。

timeout
Optional[float]

作業逾時總計,以秒為單位,包括所有重試。 如果指定,此值必須大於 0。 預設值為 None,表示沒有逾時。

傳回

~azure.servicebus.ServiceBusReceivedMessage 的清單。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

查看佇列中的擱置訊息。


   with servicebus_receiver:
       messages = servicebus_receiver.peek_messages()
       for message in messages:
           print(str(message))

receive_deferred_messages

接收先前已延遲的訊息。

從分割實體接收延後訊息時,所有提供的序號都必須是來自相同資料分割的訊息。

receive_deferred_messages(sequence_numbers: int | List[int], *, timeout: float | None = None, **kwargs: Any) -> List[ServiceBusReceivedMessage]

參數

sequence_numbers
Union[int,List[int]]
必要

已延遲之訊息的序號清單。

timeout
Optional[float]

作業逾時總計,以秒為單位,包括所有重試。 如果指定,此值必須大於 0。 預設值為 None,表示沒有逾時。

傳回

要求的 ~azure.servicebus.ServiceBusReceivedMessage 實例清單。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

從 ServiceBus 接收延遲的訊息。


   with servicebus_receiver:
       deferred_sequenced_numbers = []
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           deferred_sequenced_numbers.append(message.sequence_number)
           print(str(message))
           servicebus_receiver.defer_message(message)

       received_deferred_msg = servicebus_receiver.receive_deferred_messages(
           sequence_numbers=deferred_sequenced_numbers
       )

       for msg in received_deferred_msg:
           servicebus_receiver.complete_message(msg)

receive_messages

一次接收一批訊息。

如果您想要同時處理多個訊息,或以單一呼叫的形式執行臨機操作接收,此方法是最佳的方法。

請注意,在單一批次中擷取的訊息數目將取決於是否已為接收者設定 prefetch_count 。 如果未為接收者設定 prefetch_count ,接收者會嘗試快取max_message_count (,如果在要求內提供) 訊息給服務。

不論指定的批次大小為何,此呼叫都會優先傳回,因此在收到至少一則訊息時,就會立即傳回,而傳入訊息會有間距。不論指定的批次大小為何。

receive_messages(max_message_count: int | None = 1, max_wait_time: float | None = None) -> List[ServiceBusReceivedMessage]

參數

max_message_count
Optional[int]
預設值: 1

批次中的訊息數目上限。 傳回的實際數目取決於prefetch_count和傳入資料流程速率。 將 設定為 [無] 完全取決於預先擷取設定。預設值為 1。

max_wait_time
Optional[float]
預設值: None

等待第一則訊息到達的時間上限,以秒為單位。 如果未傳送任何訊息,而且未指定逾時,此呼叫將不會傳回,直到關閉連線為止。 如果指定,則不會在逾時期間內送達任何訊息,則會傳回空白清單。

傳回

已接收的訊息清單。 如果沒有可用的訊息,這會是空的清單。

傳回類型

例外狀況

azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.SessionLockLostError if session lock has already expired.
azure.servicebus.exceptions.ServiceBusError when errors happen.

範例

從 ServiceBus 接收訊息。


   with servicebus_receiver:
       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           print(str(message))
           servicebus_receiver.complete_message(message)

renew_message_lock

更新訊息鎖定。

這會維護訊息的鎖定,以確保不會傳回要重新處理的佇列。

若要完成 (或解決訊息) ,必須維護鎖定,而且不能已經過期;無法更新過期的鎖定。

透過 RECEIVE_AND_DELETE 模式收到的訊息不會鎖定,因此無法更新。 這項作業也適用于非會話訊息。

renew_message_lock(message: ServiceBusReceivedMessage, *, timeout: float | None = None, **kwargs: Any) -> datetime

參數

message
ServiceBusReceivedMessage
必要

要更新鎖定的訊息。

timeout
Optional[float]

作業逾時總計,以秒為單位,包括所有重試。 如果指定,此值必須大於 0。 預設值為 None,表示沒有逾時。

傳回

鎖定設定為到期的 utc 日期時間。

傳回類型

例外狀況

TypeError if the message is sessionful.
azure.servicebus.exceptions.MessageAlreadySettled if the message has been settled.
azure.servicebus.exceptions.MessageLockLostError if message lock has already expired.

範例

更新所接收訊息的鎖定。


       messages = servicebus_receiver.receive_messages(max_wait_time=5)
       for message in messages:
           servicebus_receiver.renew_message_lock(message)

屬性

client_identifier

取得與接收者實例相關聯的 ServiceBusReceiver client_identifier。

傳回類型

str

session

取得與接收者連結的 ServiceBusSession 物件。 會話僅適用于啟用會話的實體,如果在非會話接收者上呼叫,則會傳回 None。

傳回類型

範例

從接收者取得會話


       with servicebus_client.get_queue_receiver(queue_name=queue_name, session_id=session_id) as receiver:
           session = receiver.session