Python で Service Bus のトピックとサブスクリプションを使用する方法How to use Service Bus topics and subscriptions with Python

この記事では、Service Bus のトピックとサブスクリプションの使用方法について説明します。This article describes how to use Service Bus topics and subscriptions. サンプルは Python で記述され、Azure Python SDK パッケージを使用しています。The samples are written in Python and use the Azure Python SDK package. 紹介するシナリオは次のとおりです。The scenarios covered include:

  • キュー、トピック、およびサブスクリプションを作成するCreating topics and subscriptions
  • サブスクリプション フィルターを作成するCreating subscription filters
  • メッセージをトピックに送信するSending messages to a topic
  • サブスクリプションからメッセージを受信するReceiving messages from a subscription
  • トピックとサブスクリプションを削除するDeleting topics and subscriptions

前提条件Prerequisites

  1. Azure サブスクリプション。An Azure subscription. このチュートリアルを完了するには、Azure アカウントが必要です。To complete this tutorial, you need an Azure account. Visual Studio または MSDN のサブスクライバー特典を有効にするか、無料アカウントにサインアップしてください。You can activate your Visual Studio or MSDN subscriber benefits or sign up for a free account.

  2. Quickstart:Use the Azure portal to create a Service Bus topic and subscriptions to the topic」(クイック スタート: Azure portal を使用して Service Bus トピックとその中に含まれるサブスクリプションを作成する) の手順に従って、Service Bus の名前空間を作成し、接続文字列を取得します。Follow steps in the Quickstart: Use the Azure portal to create a Service Bus topic and subscriptions to the topic to create a Service Bus namespace and get the connection string.

    注意

    このクイック スタートでは、Python を使用して トピックとその中に含まれるサブスクリプションを作成します。You will create a topic and a subscription to the topic by using Python in this quickstart.

  3. Azure Python パッケージをインストールします。Install Azure Python package. Python インストール ガイド」を参照してください。See the Python Installation Guide.

トピックを作成するCreate a topic

ServiceBusService オブジェクトを使用すると、トピックを操作できます。The ServiceBusService object enables you to work with topics. プログラムを使用して Service Bus にアクセスするすべての Python ファイルの先頭付近に次のコードを追加します。Add the following code near the top of any Python file in which you wish to programmatically access Service Bus:

from azure.servicebus.control_client import ServiceBusService, Message, Topic, Rule, DEFAULT_RULE_NAME

次のコードでは、ServiceBusService オブジェクトを作成します。The following code creates a ServiceBusService object. mynamespacesharedaccesskeynamesharedaccesskey の部分は、実際の名前空間、Shared Access Signature (SAS) キー名、キー値に置き換えます。Replace mynamespace, sharedaccesskeyname, and sharedaccesskey with your actual namespace, Shared Access Signature (SAS) key name, and key value.

bus_service = ServiceBusService(
    service_namespace='mynamespace',
    shared_access_key_name='sharedaccesskeyname',
    shared_access_key_value='sharedaccesskey')

SAS キー名とキー値のそれぞれの値は、Azure ポータル から取得できます。You can obtain the values for the SAS key name and value from the Azure portal.

bus_service.create_topic('mytopic')

create_topic メソッドは追加のオプションもサポートしています。これにより、メッセージの有効期間や最大トピック サイズなどの既定のトピックの設定をオーバーライドできます。The create_topic method also supports additional options, which enable you to override default topic settings such as message time to live or maximum topic size. 次の例では、最大トピック サイズを 5 GB に、有効期間 (TTL) を 1 分に設定します。The following example sets the maximum topic size to 5 GB, and a time to live (TTL) value of one minute:

topic_options = Topic()
topic_options.max_size_in_megabytes = '5120'
topic_options.default_message_time_to_live = 'PT1M'

bus_service.create_topic('mytopic', topic_options)

サブスクリプションを作成するCreate subscriptions

トピックのサブスクリプションも ServiceBusService オブジェクトで作成します。Subscriptions to topics are also created with the ServiceBusService object. サブスクリプションを指定し、サブスクリプションの仮想キューに配信するメッセージを制限するフィルターを設定できます。Subscriptions are named and can have an optional filter that restricts the set of messages delivered to the subscription's virtual queue.

注意

既定では、サブスクリプションは永続的であり、サブスクリプション、またはサブスクリプションがサブスクライブされているトピックが削除されるまで存在し続けます。By default, subscriptions are persistent and will continue to exist until either they, or the topic to which they are subscribed, are deleted.

auto_delete_on_idle プロパティを設定することで、サブスクリプションを自動的に削除することができます。You can have the subscriptions automatically deleted by setting the auto_delete_on_idle property.

既定の (MatchAll) フィルターを適用したサブスクリプションの作成Create a subscription with the default (MatchAll) filter

新しいサブスクリプションの作成時にフィルターが指定されていない場合は、MatchAll フィルター (既定) が使用されます。If no filter is specified when a new subscription is created, the MatchAll filter (default) is used. MatchAll フィルターを使用すると、トピックに発行されたすべてのメッセージがサブスクリプションの仮想キューに置かれます。When the MatchAll filter is used, all messages published to the topic are placed in the subscription's virtual queue. 次の例では、AllMessages という名前のサブスクリプションを作成し、既定の MatchAll フィルターを使用します。The following example creates a subscription named AllMessages and uses the default MatchAll filter.

bus_service.create_subscription('mytopic', 'AllMessages')

フィルターを適用したサブスクリプションの作成Create subscriptions with filters

トピックに送信されたメッセージのうち、特定のトピック サブスクリプション内に表示されるメッセージを指定するフィルターを定義することもできます。You can also define filters that enable you to specify which messages sent to a topic should show up within a specific topic subscription.

サブスクリプションでサポートされるフィルターのうち、最も柔軟性の高いものが、SQL92 のサブセットを実装する SqlFilter です。The most flexible type of filter supported by subscriptions is a SqlFilter, which implements a subset of SQL92. SQL フィルターは、トピックに発行されるメッセージのプロパティに対して適用されます。SQL filters operate on the properties of the messages that are published to the topic. SQL フィルターで使用できる式の詳細については、SqlFilter.SqlExpression 構文の説明を参照してください。For more information about the expressions that can be used with a SQL filter, see the SqlFilter.SqlExpression syntax.

ServiceBusService オブジェクトの create_rule メソッドを使用して、サブスクリプションにフィルターを追加できます。You can add filters to a subscription by using the create_rule method of the ServiceBusService object. このメソッドによって、新しいフィルターを既存のサブスクリプションに追加できます。This method allows you to add new filters to an existing subscription.

注意

既定のフィルターはすべての新しいサブスクリプションに自動的に適用されるため、最初に既定のフィルターを削除する必要があります。削除しなければ、指定された他のすべてのフィルターは MatchAll によってオーバーライドされます。Because the default filter is applied automatically to all new subscriptions, you must first remove the default filter or the MatchAll will override any other filters you may specify. 既定のルールを削除するには、ServiceBusService オブジェクトの delete_rule メソッドを使用します。You can remove the default rule by using the delete_rule method of the ServiceBusService object.

次の例では、HighMessages という名前のサブスクリプションを作成し、SqlFilter を適用します。このフィルターでは、カスタム messagenumber プロパティが 3 を超えるメッセージのみが選択されます。The following example creates a subscription named HighMessages with a SqlFilter that only selects messages that have a custom messagenumber property greater than 3:

bus_service.create_subscription('mytopic', 'HighMessages')

rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'messagenumber > 3'

bus_service.create_rule('mytopic', 'HighMessages', 'HighMessageFilter', rule)
bus_service.delete_rule('mytopic', 'HighMessages', DEFAULT_RULE_NAME)

同様に、次の例では LowMessages という名前のサブスクリプションを作成し、SqlFilter を適用します。このフィルターでは、messagenumber プロパティが 3 以下のメッセージのみが選択されます。Similarly, the following example creates a subscription named LowMessages with a SqlFilter that only selects messages that have a messagenumber property less than or equal to 3:

bus_service.create_subscription('mytopic', 'LowMessages')

rule = Rule()
rule.filter_type = 'SqlFilter'
rule.filter_expression = 'messagenumber <= 3'

bus_service.create_rule('mytopic', 'LowMessages', 'LowMessageFilter', rule)
bus_service.delete_rule('mytopic', 'LowMessages', DEFAULT_RULE_NAME)

これでメッセージが mytopic に送信されると、そのメッセージは AllMessages トピック サブスクリプションをサブスクライブした受信者に必ず配信され、さらにメッセージの内容に応じて、HighMessagesLowMessages トピック サブスクリプションをサブスクライブしている受信者に対して選択的に配信されます。Now, when a message is sent to mytopic it is always delivered to receivers subscribed to the AllMessages topic subscription, and selectively delivered to receivers subscribed to the HighMessages and LowMessages topic subscriptions (depending on the message content).

メッセージをトピックに送信するSend messages to a topic

メッセージを Service Bus トピックに送信するには、アプリケーションで ServiceBusService オブジェクトの send_topic_message メソッドを使用する必要があります。To send a message to a Service Bus topic, your application must use the send_topic_message method of the ServiceBusService object.

次の例では、mytopic トピックに 5 件のテスト メッセージを送信する方法を示しています。The following example demonstrates how to send five test messages to mytopic. 各メッセージの messagenumber プロパティの値がループの反復回数に応じて変化します (これによってメッセージを受信するサブスクリプションが決定されます)。The messagenumber property value of each message varies on the iteration of the loop (this determines which subscriptions receive it):

for i in range(5):
    msg = Message('Msg {0}'.format(i).encode('utf-8'),
                  custom_properties={'messagenumber': i})
    bus_service.send_topic_message('mytopic', msg)

Service Bus トピックでサポートされているメッセージの最大サイズは、Standard レベルでは 256 KB、Premium レベルでは 1 MB です。Service Bus topics support a maximum message size of 256 KB in the Standard tier and 1 MB in the Premium tier. 標準とカスタムのアプリケーション プロパティが含まれるヘッダーの最大サイズは 64 KB です。The header, which includes the standard and custom application properties, can have a maximum size of 64 KB. 1 つのトピックで保持されるメッセージ数に上限はありませんが、1 つのトピックで保持できるメッセージの合計サイズには上限があります。There is no limit on the number of messages held in a topic but there is a cap on the total size of the messages held by a topic. このトピックのサイズはトピックの作成時に定義します。上限は 5 GB です。This topic size is defined at creation time, with an upper limit of 5 GB. クォータの詳細については、「Service Bus のクォータ」を参照してください。For more information about quotas, see Service Bus quotas.

サブスクリプションからメッセージを受信するReceive messages from a subscription

サブスクリプションからメッセージを受信するには、ServiceBusService オブジェクトの receive_subscription_message メソッドを使用します。Messages are received from a subscription using the receive_subscription_message method on the ServiceBusService object:

msg = bus_service.receive_subscription_message(
    'mytopic', 'LowMessages', peek_lock=False)
print(msg.body)

peek_lock パラメーターが False に設定されていると、メッセージが読み取られるときにサブスクリプションから削除されます。Messages are deleted from the subscription as they are read when the parameter peek_lock is set to False. peek_lock パラメーターを True に設定することによって、キューからメッセージを削除せずに、メッセージを読み取って (ピークして) ロックすることができます。You can read (peek) and lock the message without deleting it from the queue by setting the parameter peek_lock to True.

受信操作の過程で行われるメッセージの読み取りと削除の動作は、最もシンプルなモデルであり、障害発生時にアプリケーション側でメッセージを処理しないことを許容できるシナリオに最適です。The behavior of reading and deleting the message as part of the receive operation is the simplest model, and works best for scenarios in which an application can tolerate not processing a message when there is a failure. この動作を理解するために、コンシューマーが受信要求を発行した後で、メッセージを処理する前にクラッシュしたというシナリオを考えてみましょう。To understand this behavior, consider a scenario in which the consumer issues the receive request and then crashes before processing it. Service Bus はメッセージを読み取り済みとしてマークしているため、アプリケーションが再起動してメッセージの読み取りを再開すると、クラッシュ前に読み取られていたメッセージは見落とされます。Because Service Bus has marked the message as being consumed, then when the application restarts and begins consuming messages again, it has missed the message that was consumed prior to the crash.

peek_lock パラメーターが True に設定されている場合、受信処理が 2 段階の動作になり、メッセージが失われることが許容できないアプリケーションに対応することができます。If the peek_lock parameter is set to True, the receive becomes a two stage operation, which makes it possible to support applications that cannot tolerate missing messages. Service Bus は要求を受け取ると、次に読み取られるメッセージを検索して、他のコンシューマーが受信できないようロックしてから、アプリケーションにメッセージを返します。When Service Bus receives a request, it finds the next message to be consumed, locks it to prevent other consumers receiving it, and then returns it to the application. アプリケーションがメッセージの処理を終えた後 (または後で処理するために確実に保存した後)、Message オブジェクトの delete メソッドを呼び出して受信処理の第 2 段階を完了します。After the application finishes processing the message (or stores it reliably for future processing), it completes the second stage of the receive process by calling delete method on the Message object. delete メソッドによって、メッセージが読み取り中としてマークされ、サブスクリプションから削除されます。The delete method marks the message as being consumed and removes it from the subscription.

msg = bus_service.receive_subscription_message('mytopic', 'LowMessages', peek_lock=True)
if msg.body is not None:
print(msg.body)
msg.delete()

アプリケーションのクラッシュと読み取り不能のメッセージを処理する方法How to handle application crashes and unreadable messages

Service Bus には、アプリケーションにエラーが発生した場合や、メッセージの処理に問題がある場合に復旧を支援する機能が備わっています。Service Bus provides functionality to help you gracefully recover from errors in your application or difficulties processing a message. 受信側のアプリケーションがなんらかの理由によってメッセージを処理できない場合には、Message オブジェクトの unlock メソッドを呼び出すことができます。If a receiver application is unable to process the message for some reason, then it can call the unlock method on the Message object. このメソッドが呼び出されると、Service Bus によりサブスクリプション内のメッセージのロックが解除され、メッセージが再度受信できる状態に変わります。このメッセージは、同じコンシューマー側アプリケーションまたは他のコンシューマー側アプリケーションで受信されます。This method causes Service Bus to unlock the message within the subscription and make it available to be received again, either by the same consuming application or by another consuming application.

サブスクリプション内でロックされているメッセージにはタイムアウトも設定されています。アプリケーションがクラッシュした場合など、ロックがタイムアウトになる前にアプリケーションがメッセージの処理に失敗した場合には、Service Bus によりメッセージのロックが自動的に解除され、再度受信できる状態に変わります。There is also a timeout associated with a message locked within the subscription, and if the application fails to process the message before the lock timeout expires (for example, if the application crashes), then Service Bus unlocks the message automatically and makes it available to be received again.

メッセージが処理された後、delete メソッドが呼び出される前にアプリケーションがクラッシュした場合は、アプリケーションが再起動する際にメッセージが再配信されます。In the event that the application crashes after processing the message but before the delete method is called, then the message will be redelivered to the application when it restarts. この動作は、しばしばThis behavior is often called. "1 回以上の処理*" と呼ばれます。つまり、すべてのメッセージが 1 回以上処理されますが、状況によっては、同じメッセージが再配信される可能性があります。At least Once Processing*; that is, each message is processed at least once but in certain situations the same message may be redelivered. 重複処理が許されないシナリオの場合、重複メッセージの配信を扱うロジックをアプリケーションに追加する必要があります。If the scenario cannot tolerate duplicate processing, then application developers should add additional logic to their application to handle duplicate message delivery. これを行うには、メッセージの MessageId プロパティを使用できます。このプロパティは配信が試行された後も同じ値を保持します。To do so, you can use the MessageId property of the message, which remains constant across delivery attempts.

トピックとサブスクリプションを削除するDelete topics and subscriptions

トピックとサブスクリプションは、auto_delete_on_idle プロパティが設定されている場合を除き、永続的です。Topics and subscriptions are persistent unless the auto_delete_on_idle property is set. これらは Azure portal を通じて、またはプログラムで削除できます。They can be deleted either through the Azure portal or programmatically. 次の例では、mytopic という名前のトピックを削除する方法を示しています。The following example shows how to delete the topic named mytopic:

bus_service.delete_topic('mytopic')

トピックを削除すると、そのトピックに登録されたサブスクリプションもすべて削除されます。Deleting a topic also deletes any subscriptions that are registered with the topic. サブスクリプションは、個別に削除することもできます。Subscriptions can also be deleted independently. 次のコードでは、HighMessages という名前のサブスクリプションを mytopic トピックから削除する方法を示しています。The following code shows how to delete a subscription named HighMessages from the mytopic topic:

bus_service.delete_subscription('mytopic', 'HighMessages')

注意

Service Bus リソースは、Service Bus Explorer で管理できます。You can manage Service Bus resources with Service Bus Explorer. Service Bus Explorer を使用すると、ユーザーは Service Bus 名前空間に接続し、簡単な方法でメッセージング エンティティを管理できます。The Service Bus Explorer allows users to connect to a Service Bus namespace and administer messaging entities in an easy manner. このツールには、インポート/エクスポート機能や、トピック、キュー、サブスクリプション、リレー サービス、通知ハブ、イベント ハブをテストする機能などの高度な機能が用意されています。The tool provides advanced features like import/export functionality or the ability to test topic, queues, subscriptions, relay services, notification hubs and events hubs.

次の手順Next steps

これで、サービス バス トピックの基本を学習できました。さらに詳細な情報が必要な場合は、次のリンク先をご覧ください。Now that you've learned the basics of Service Bus topics, follow these links to learn more.