適用於 Python 的服務匯流排程式庫Service Bus libraries for Python

Microsoft Azure 服務匯流排支援一組以雲端為基礎、訊息導向的中介軟體技術,包括可靠的訊息佇列和持久的發佈/訂閱訊息。Microsoft Azure Service Bus supports a set of cloud-based, message-oriented middleware technologies including reliable message queuing and durable publish/subscribe messaging.

v0.50.0 的新功能為何?What's new in v0.50.0?

從 0.50.0 版起,新的 AMQP 型 API 可用於傳送及接收訊息。As of version 0.50.0 a new AMQP-based API is available for sending and receiving messages. 此更新涉及重大變更This update involves breaking changes.

請閱讀從 v0.21.1 遷移至 v0.50.0 以判斷您在這個階段是否適合升級。Please read Migration from v0.21.1 to v0.50.0 to determine if upgrading is right for you at this time.

新的 AMQP 型 API 今後會提供改善的訊息傳遞可靠性、效能及擴充功能。The new AMQP-based API offers improved message passing reliability, performance and expanded feature support going forward. 新的 API 也支援非同步作業 (根據 asyncio) 來傳送、接收和處理訊息。The new API also offers support for asynchronous operations (based on asyncio) for sending, receiving and handling messages.

如需舊版 HTTP 型作業的文件,請參閱使用舊版 API 的 HTTP 型作業For documentation on the legacy HTTP-based operations please see Using HTTP-based operations of the legacy API.

必要條件Prerequisites

安裝Installation

pip install azure-servicebus

連線到 Azure 服務匯流排Connect to Azure Service Bus

取得認證Get credentials

使用以下 Azure CLI 程式碼片段,以服務匯流排連接字串填入環境變數 (您也可以在 Azure 入口網站中找到此值)。Use the Azure CLI snippet below to populate an environment variable with the Service Bus connection string (you can also find this value in the Azure portal). 此程式碼片段會針對 Bash Shell 加以格式化。The snippet is formatted for the Bash shell.

RES_GROUP=<resource-group-name>
NAMESPACE=<servicebus-namespace>

export SB_CONN_STR=$(az servicebus namespace authorization-rule keys list \
 --resource-group $RES_GROUP \
 --namespace-name $NAMESPACE \
 --name RootManageSharedAccessKey \
 --query primaryConnectionString \
 --output tsv)

建立用戶端Create client

填入 SB_CONN_STR 環境變數後,即可建立 ServiceBusClient。Once you've populated the SB_CONN_STR environment variable, you can create the ServiceBusClient.

import os
from azure.servicebus import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

如果想要使用非同步作業,請使用 azure.servicebus.aio 命名空間。If you wish to use asynchronous operations, please use the azure.servicebus.aio namespace.

import os
from azure.servicebus.aio import ServiceBusClient

connection_str = os.environ['SB_CONN_STR']

sb_client = ServiceBusClient.from_connection_string(connection_str)

服務匯流排佇列Service Bus queues

服務匯流排佇列是儲存體佇列的替代方案,其可能適用於使用推送型傳遞 (使用長期輪詢) 而需要更進階傳訊功能的案例 (訊息大小較大、訊息順序、單一作業破壞性讀取、排程的傳遞)。Service Bus queues are an alternative to Storage queues that might be useful in scenarios where more advanced messaging features are needed (larger message sizes, message ordering, single-operation destructive reads, scheduled delivery) using push-style delivery (using long polling).

建立佇列Create queue

這會在服務匯流排命名空間內建立新的佇列。This creates a new queue within the Service Bus namespace. 如果命名空間內已經存在同名的佇列,則會引發錯誤。If a queue of the same name already exists within the namespace an error will be raised.

sb_client.create_queue("MyQueue")

您也可指定用以設定佇列行為的選擇性參數。Optional parameters to configure the queue behavior can also be specified.

sb_client.create_queue(
    "MySessionQueue",
    requires_session=True  # Create a sessionful queue
    max_delivery_count=5  # Max delivery attempts per message
)

取得佇列用戶端Get a queue client

QueueClient 可用來傳送和接收佇列中的訊息,以及其他作業。A QueueClient can be used to send and receive messages from the queue, along with other operations.

queue_client = sb_client.get_queue("MyQueue")

傳送訊息Sending messages

佇列用戶端可以一次傳送一個或多則訊息:The queue client can send one or more messages at a time:

from azure.servicebus import Message

message = Message("Hello World")
queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
queue_client.send([message_one, message_two])

每次呼叫 QueueClient.send 都會建立新的服務連線。Each call to QueueClient.send will create a new service connection. 若要對多次傳送呼叫重複使用相同的連線,您可以開啟傳送端:To reuse the same connection for multiple send calls, you can open a sender:

message_one = Message("First")
message_two = Message("Second")

with queue_client.get_sender() as sender:
    sender.send(message_one)
    sender.send(message_two)

如果您使用非同步用戶端,則上述作業會使用非同步語法:If you are using an asynchronous client, the above operations will use async syntax:

from azure.servicebus.aio import Message

message = Message("Hello World")
await queue_client.send(message)

message_one = Message("First")
message_two = Message("Second")
async with queue_client.get_sender() as sender:
    await sender.send(message_one)
    await sender.send(message_two)

接收訊息Receiving messages

您可從作為連續迭代器的佇列接收訊息。Messages can be received from a queue as a continuous iterator. 訊息接收的預設模式是 PeekLock,其要求明確完成每則訊息,以便將它從佇列中移除。The default mode for message receiving is PeekLock, which requires each message to be explicitly completed in order that it be removed from the queue.

messages = queue_client.get_receiver()
for message in messages:
    print(message)
    message.complete()

服務連線會針對整個迭代器維持開啟狀態。The service connection will remain open for the entirety of the iterator. 如果您發現自己僅部分逐一查看訊息資料流,則應該在 with 陳述式中執行接收端,確保連線已關閉:If you find yourself only partially iterating the message stream, you should run the receiver in a with statement to ensure the connection is closed:

with queue_client.get_receiver() as messages:
    for message in messages:
        print(message)
        message.complete()
        break

如果您使用非同步用戶端,則上述作業會使用非同步語法:If you are using an asynchronous client, the above operations will use async syntax:

async with queue_client.get_receiver() as messages:
    async for message in messages:
        print(message)
        await message.complete()
        break

服務匯流排主題和訂用帳戶Service Bus topics and subscriptions

服務匯流排主題和訂用帳戶是服務匯流排佇列的抽象概念,其使用發佈/訂閱模式提供一對多的通訊形式。Service Bus topics and subscriptions are an abstraction on top of Service Bus queues that provide a one-to-many form of communication, in a publish/subscribe pattern. 訊息會傳送至主題並傳遞給一或多個相關聯的訂用帳戶,這適合於調整為大量收件者。Messages are sent to a topic and delivered to one or more associated subscriptions, which is useful for scaling to large numbers of recipients.

建立主題Create topic

這會在服務匯流排命名空間內建立新的主題。This creates a new topic within the Service Bus namespace. 如果已經存在同名的主題,則會引發錯誤。If a topic of the same name already exists an error will be raised.

sb_client.create_topic("MyTopic")

取得主題用戶端Get a topic client

TopicClient 可用來將訊息傳送到主題,以及其他作業。A TopicClient can be used to send messages to the topic, along with other operations.

topic_client = sb_client.get_topic("MyTopic")

建立訂用帳戶Create subscription

這會在服務匯流排命名空間內,針對指定的主題建立新訂用帳戶。This creates a new subscription for the specified topic within the Service Bus namespace.

sb_client.create_subscription("MyTopic", "MySubscription")

取得訂用帳戶用戶端Get a subscription client

SubscriptionClient 可用來接收來自主題的訊息,以及其他作業。A SubscriptionClient can be used to receive messages from the topic, along with other operations.

topic_client = sb_client.get_subscription("MyTopic", "MySubscription")

從 v0.21.1 遷移至 v0.50.0Migration from v0.21.1 to v0.50.0

0.50.0 版引進了重大變更。Major breaking changes were introduced in version 0.50.0. 原始 HTTP 型 API 仍適用於 v0.50.0,不過現在存在於新的命名空間之下:azure.servicebus.control_clientThe original HTTP-based API is still available in v0.50.0 - however it now exists under a new namesapce: azure.servicebus.control_client.

我應該升級嗎?Should I upgrade?

新套件 (v0.50.0) 不會提供任何超過 v0.21.1 的 HTTP 型作業改善。The new package (v0.50.0) offers no improvements in HTTP-based operations over v0.21.1. 除了現在存在於新的命名空間之下以外,HTTP 型 API 完全相同。The HTTP-based API is identical except that it now exists under a new namespace. 基於這個理由,如果您只想要使用 HTTP 型作業 (create_queuedelete_queue 等等),在此時升級不會有任何額外的好處。For this reason if you only wish to use HTTP-based operations (create_queue, delete_queue etc) - there will be no additional benefit in upgrading at this time.

如何將我的程式碼遷移至新版本?How do I migrate my code to the new version?

只要變更匯入命名空間,即可將針對 v0.21.0 撰寫的程式碼移植到 0.50.0 版:Code written against v0.21.0 can be ported to version 0.50.0 by simply changing the import namespace:

# from azure.servicebus import ServiceBusService  <- This will now raise an ImportError
from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

使用舊版 API 的 HTTP 型作業Using HTTP-based operations of the legacy API

下列文件描述舊版 API,且應適用於希望在不進行任何其他變更的情形下,將現有程式碼移植到 v0.50.0 的使用者。The following documentation describes the legacy API and should be used for those wishing to port existing code to v0.50.0 without making any additional changes. 使用 v0.21.1 的使用者也可以將此參考當作指導方針。This reference can also be used as guidance by those using v0.21.1. 如需撰寫新的程式碼,我們建議使用上述的新 API。For those writing new code, we recommend using the new API described above.

服務匯流排佇列Service Bus queues

共用存取簽章 (SAS) 驗證Shared Access Signature (SAS) authentication

若要使用共用存取簽章驗證,請使用下列程式碼建立服務匯流排服務:To use Shared Access Signature authentication, create the service bus service with:

from azure.servicebus.control_client import ServiceBusService

key_name = 'RootManageSharedAccessKey' # SharedAccessKeyName from Azure portal
key_value = '' # SharedAccessKey from Azure portal
sbs = ServiceBusService(service_namespace,
                        shared_access_key_name=key_name,
                        shared_access_key_value=key_value)

存取控制服務 (ACS) 驗證Access Control Service (ACS) authentication

新服務匯流排命名空間不支援 ACS。ACS is not supported on new Service Bus namespaces. 我們建議將應用程式遷移至 SAS 驗證We recommend migrating applications to SAS authentication. 若要在舊版服務匯流排命名空間內使用 ACS 驗證,請使用下列程式碼建立 ServiceBusService:To use ACS authentication within an older Service Bus namesapce, create the ServiceBusService with:

from azure.servicebus.control_client import ServiceBusService

account_key = '' # DEFAULT KEY from Azure portal
issuer = 'owner' # DEFAULT ISSUER from Azure portal
sbs = ServiceBusService(service_namespace,
                        account_key=account_key,
                        issuer=issuer)

傳送和接收訊息Sending and receiving messages

create_queue 方法可用來確保佇列存在:The create_queue method can be used to ensure a queue exists:

sbs.create_queue('taskqueue')

send_queue_message 方法之後可透過呼叫來對佇列插入訊息:The send_queue_message method can then be called to insert the message into the queue:

from azure.servicebus.control_client import Message

msg = Message('Hello World!')
sbs.send_queue_message('taskqueue', msg)

send_queue_message_batch 方法之後可透過呼叫來一次傳送數則訊息:The send_queue_message_batch method can then be called to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message('Hello World!')
msg2 = Message('Hello World again!')
sbs.send_queue_message_batch('taskqueue', [msg1, msg2])

然後,可以呼叫 receive_queue_message 方法對訊息清除佇列。It is then possible to call the receive_queue_message method to dequeue the message.

msg = sbs.receive_queue_message('taskqueue')

服務匯流排主題Service Bus topics

create_topic 方法可用來建立伺服器端主題:The create_topic method can be used to create a server-side topic:

sbs.create_topic('taskdiscussion')

傳送_主題_訊息方法可以用來將訊息傳送至主題:The send_topic_message method can be used to send a message to a topic:

from azure.servicebus.control_client import Message

msg = Message(b'Hello World!')
sbs.send_topic_message('taskdiscussion', msg)

send_topic_message_batch 方法可用來一次傳送數則訊息:The send_topic_message_batch method can be used to send several messages at once:

from azure.servicebus.control_client import Message

msg1 = Message(b'Hello World!')
msg2 = Message(b'Hello World again!')
sbs.send_topic_message_batch('taskdiscussion', [msg1, msg2])

請考慮到在 Python 3 中,str 訊息會以 utf-8 編碼,而且您應該要在 Python 2 中自行管理您的編碼。Please consider that in Python 3 a str message will be utf-8 encoded and you should have to manage your encoding yourself in Python 2.

然後,用戶端可以藉由先後呼叫 create_subscription 方法和 receive_subscription_message 方法,來建立訂用帳戶和開始取用訊息。A client can then create a subscription and start consuming messages by calling the create_subscription method followed by the receive_subscription_message method. 請注意,您不會收到任何在訂用帳戶建立之前所傳送的訊息。Please note that any messages sent before the subscription is created will not be received.

from azure.servicebus.control_client import Message

sbs.create_subscription('taskdiscussion', 'client1')
msg = Message('Hello World!')
sbs.send_topic_message('taskdiscussion', msg)
msg = sbs.receive_subscription_message('taskdiscussion', 'client1')

事件中樞Event Hub

事件中樞可在高輸送量情況下收集來自各種不同裝置與服務的事件資料流。Event Hubs enable the collection of event streams at high throughput, from a diverse set of devices and services.

create_event_hub 方法可用來建立事件中樞:The create_event_hub method can be used to create an event hub:

sbs.create_event_hub('myhub')

若要傳送事件:To send an event:

sbs.send_event('myhub', '{ "DeviceId":"dev-01", "Temperature":"37.0" }')

事件內容是事件訊息或包含多個訊息的 JSON 編碼字串。The event content is the event message or JSON-encoded string that contains multiple messages.

進階功能Advanced features

訊息代理程式屬性和使用者屬性Broker properties and user properties

本節說明如何使用這裡所定義的訊息代理程式和使用者屬性:This section describes how to use Broker and User properties defined here:

sent_msg = Message(b'This is the third message',
                   broker_properties={'Label': 'M3'},
                   custom_properties={'Priority': 'Medium',
                                      'Customer': 'ABC'}
            )

您可以使用日期時間、整數、浮點或布林值You can use datetime, int, float or boolean

props = {'hello': 'world',
         'number': 42,
         'active': True,
         'deceased': False,
         'large': 8555111000,
         'floating': 3.14,
         'dob': datetime(2011, 12, 14),
         'double_quote_message': 'This "should" work fine',
         'quote_message': "This 'should' work fine"}
sent_msg = Message(b'message with properties', custom_properties=props)

為了與此程式庫的舊版本相容,broker_properties 也可定義為 JSON 字串。For compatibility reason with old version of this library, broker_properties could also be defined as a JSON string. 若為此情況,您必須負責撰寫有效的 JSON 字串,Python 在將字串傳送至 RestAPI 之前,不會先進行檢查。If this situation, you're responsible to write a valid JSON string, no check will be made by Python before sending to the RestAPI.

broker_properties = '{"ForcePersistence": false, "Label": "My label"}'
sent_msg = Message(b'receive message',
                   broker_properties = broker_properties
)

後續步驟Next Steps