Python から Azure Queue Storage を使用する方法

概要

この記事では、Azure Queue Storage サービスを使用した一般的なシナリオについて説明します。 紹介するシナリオには、キュー メッセージの挿入、ピーク、取得、削除が含まれます。 キューを作成および削除するためのコードについても説明します。

この記事のサンプルは Python で作成され、Python パッケージ用 Azure Queue Storage クライアント ライブラリを使用しています。 キューの詳細については、「次のステップ」のセクションを参照してください。

Queue storage とは

Azure キュー ストレージは、HTTP または HTTPS を使用した認証された呼び出しを介して世界中のどこからでもアクセスできる大量のメッセージを格納するためのサービスです。 キューの 1 つのメッセージの最大サイズは 64 KB で、1 つのキューには、ストレージ アカウントの合計容量の上限に達するまで、数百万のメッセージを格納できます。 Queue storage は、多くの場合、非同期的な処理用に作業のバックログを作成するために使用されます。

Queue サービスの概念

Azure Queue サービスには、次のコンポーネントが含まれます。

Azure Queue service components

  • ストレージ アカウント: Azure のストレージにアクセスする場合には必ず、ストレージ アカウントを使用します。 ストレージ アカウントの詳細については、「ストレージ アカウントの概要」を参照してください。

  • キュー: キューは、メッセージのセットを格納します。 すべてのメッセージはキューに 格納されている必要があります。 キュー名は小文字で入力する必要があります。 キューの名前付け規則については、「 Naming Queues and Metadata (キューとメタデータの名前付け規則)」を参照してください。

  • メッセージ: 形式を問わず、メッセージのサイズは最大で 64 KB です。 メッセージをキューで保持できる最長時間は 7 日間です。 バージョン 2017-07-29 以降では、最大有効期間を任意の正の数にすることができます。また、-1 は、メッセージが期限切れにならないことを示します。 このパラメーターを省略すると、既定の有効期間は 7 日になります。

  • URL 形式: キューは次の URL 形式を使って処理できます: http://<storage account>.queue.core.windows.net/<queue>

    次の URL を使用すると、図のいずれかのキューをアドレス指定できます。

    http://myaccount.queue.core.windows.net/incoming-orders

Azure のストレージ アカウントの作成

最初の Azure ストレージ アカウントを作成する最も簡単な方法は、Azure Portal を利用することです。 詳細については、「 ストレージ アカウントの作成」を参照してください。

Azure Storage アカウントは、Azure PowerShellAzure CLI、または .NET 用 Azure ストレージ リソース プロバイダーを使用して作成することもできます。

現時点で Azure にストレージ アカウントを作成しない場合は、Azure ストレージ エミュレーターを使って、ローカル環境でコードの実行とテストを行うこともできます。 詳細については、ローカルでの Azure Storage の開発に Azurite エミュレーターを使用する方法に関するページを参照してください。

Azure Storage SDK for Python をダウンロードしてインストールする

Azure Storage SDK for Python には、Python v2.7、v3.3、またはそれ以降が必要です。

PyPI でインストールする

Python Package Index (PyPI) でインストールするには、次のように入力します。

pip install azure-storage-queue

Note

Azure Storage SDK for Python v0.36 以前からアップグレードする場合は、pip uninstall azure-storage を使用して以前の SDK をアンインストールしてから、最新のパッケージをインストールします。

別のインストール方法については、Azure SDK for Pythonをご覧ください。

Azure Portal で資格情報をコピーする

サンプル アプリケーションから Azure Storage に対して要求を実行するときは、承認されている必要があります。 要求を承認するには、ストレージ アカウントの資格情報を接続文字列としてアプリケーションに追加します。 ストレージ アカウントの資格情報を表示するには、次の手順に従います。

  1. Azure portal にサインインします。

  2. 自分のストレージ アカウントを探します。

  3. ストレージ アカウント メニュー ウィンドウの [セキュリティとネットワーク] で、 [アクセス キー] を選択します。 ここで、アカウント アクセス キーと各キーの完全な接続文字列が確認できます。

    Screenshot that shows where the access key settings are in the Azure portal

  4. [アクセス キー] ペインで、 [キーの表示] を選択します。

  5. [key1] セクションで、 [接続文字列] の値を見つけます。 [クリップボードにコピー] アイコンを選択して、接続文字列をコピーします。 次のセクションで、この接続文字列の値を環境変数に追加します。

    Screenshot showing how to copy a connection string from the Azure portal

ストレージ接続文字列の構成

接続文字列をコピーした後、アプリケーションを実行しているローカル マシンの新しい環境変数にそれを書き込みます。 環境変数を設定するには、コンソール ウィンドウを開いて、お使いのオペレーティング システムの手順に従います。 <yourconnectionstring> は、実際の接続文字列に置き換えてください。

setx AZURE_STORAGE_CONNECTION_STRING "<yourconnectionstring>"

Windows で環境変数を追加した後、コマンド ウィンドウの新しいインスタンスを開始する必要があります。

プログラムの再起動

環境変数を追加した後、環境変数の読み取りを必要とする実行中のプログラムをすべて再起動します。 たとえば、続行する前に、ご使用の開発環境またはエディターを再起動します。

Queue ストレージにアクセスするようにアプリケーションを構成する

QueueClient オブジェクトを使用して、キューを操作できます。 プログラムを使用して Azure キューにアクセスするすべての Python ファイルの先頭付近に次のコードを追加します。

from azure.storage.queue import (
        QueueClient,
        BinaryBase64EncodePolicy,
        BinaryBase64DecodePolicy
)

import os, uuid

os パッケージは、環境変数を取得するためのサポートを提供します。 uuid パッケージは、キュー名の一意の識別子を生成するためのサポートを提供します。

キューを作成する

接続文字列は、前の手順で設定した AZURE_STORAGE_CONNECTION_STRING 環境変数から取得されます。

次のコードは、ストレージ接続文字列を使用して QueueClient オブジェクトを作成します。

# Retrieve the connection string from an environment
# variable named AZURE_STORAGE_CONNECTION_STRING
connect_str = os.getenv("AZURE_STORAGE_CONNECTION_STRING")

# Create a unique name for the queue
q_name = "queue-" + str(uuid.uuid4())

# Instantiate a QueueClient object which will
# be used to create and manipulate the queue
print("Creating queue: " + q_name)
queue_client = QueueClient.from_connection_string(connect_str, q_name)

# Create the queue
queue_client.create_queue()

Azure のキュー メッセージはテキストとして格納されます。 バイナリ データを格納する場合は、キューにメッセージを配置する前に、Base64 エンコードおよびデコード関数を設定します。

クライアント オブジェクトの作成時に Base64 エンコードおよびデコード関数を構成します。

# Setup Base64 encoding and decoding functions
base64_queue_client = QueueClient.from_connection_string(
                            conn_str=connect_str, queue_name=q_name,
                            message_encode_policy = BinaryBase64EncodePolicy(),
                            message_decode_policy = BinaryBase64DecodePolicy()
                        )

メッセージをキューに挿入する

メッセージをキューに挿入するには、send_message メソッドを使用します。

message = u"Hello World"
print("Adding message: " + message)
queue_client.send_message(message)

メッセージをピークする

peek_messages メソッドを呼び出せば、メッセージをキューから削除せずにピークできます。 既定では、このメソッドは 1 つのメッセージを対象としてピークします。

# Peek at the first message
messages = queue_client.peek_messages()

for peeked_message in messages:
    print("Peeked message: " + peeked_message.content)

キューに配置されたメッセージの内容を変更する

キュー内のメッセージの内容をインプレースで変更できます。 メッセージがタスクを表している場合は、この機能を使用して、タスクの状態を更新できます。

次のコードでは、update_message メソッドを使用してメッセージを更新します。 表示のタイムアウトは 0 に設定されています。これは、メッセージが即時に表示され、コンテンツが更新されることを示します。

messages = queue_client.receive_messages()
list_result = next(messages)

message = queue_client.update_message(
        list_result.id, list_result.pop_receipt,
        visibility_timeout=0, content=u'Hello World Again')

print("Updated message to: " + message.content)

キューの長さを取得する

キュー内のメッセージの概数を取得できます。

get_queue_properties メソッドからは、approximate_message_count を含む、キューのプロパティが返されます。

properties = queue_client.get_queue_properties()
count = properties.approximate_message_count
print("Message count: " + str(count))

サービスが要求に応答した後にメッセージが追加または削除される可能性があるため、取得される結果はおおよその数を示しているだけです。

メッセージをデキューする

キューからのメッセージの削除は、2 段階の手順で行います。 コードでのメッセージの処理が失敗した場合でも、この 2 段階のプロセスにより、同じメッセージを取得して再試行できます。 メッセージが正常に処理された後に delete_message を呼び出します。

receive_messages を呼び出すと、既定では、キュー内に次のメッセージが取得されます。 receive_messages から返されたメッセージは、このキューからメッセージを読み取る他のコードから参照できなくなります。 既定では、このメッセージを参照できない状態は 30 秒間続きます。 また、キューからのメッセージの削除を完了するには、delete_message を呼び出す必要があります。

messages = queue_client.receive_messages()

for message in messages:
    print("Dequeueing message: " + message.content)
    queue_client.delete_message(message.id, message.pop_receipt)

キューからのメッセージの取得をカスタマイズする方法は 2 つあります。 1 つ目の方法では、(最大 32 個の) メッセージのバッチを取得できます。 2 つ目の方法では、コードで各メッセージを完全に処理できるように、非表示タイムアウトの設定を長くまたは短くすることができます。

次のコード例では、receive_messages メソッドを使用して、メッセージを一括して取得します。 次に、入れ子になった for ループを使用して、各バッチ内の各メッセージを処理します。 また、各メッセージの非表示タイムアウトを 5 分に設定します。

messages = queue_client.receive_messages(messages_per_page=5, visibility_timeout=5*60)

for msg_batch in messages.by_page():
   for msg in msg_batch:
      print("Batch dequeue message: " + msg.content)
      queue_client.delete_message(msg)

キューを削除する

キューおよびキューに含まれているすべてのメッセージを削除するには、delete_queue メソッドを呼び出します。

print("Deleting queue: " + queue_client.queue_name)
queue_client.delete_queue()

ヒント

Microsoft Azure ストレージ エクスプローラーを試す

Microsoft Azure ストレージ エクスプローラーは、Windows、macOS、Linux で Azure Storage のデータを視覚的に操作できる Microsoft 製の無料のスタンドアロン アプリです。

次のステップ

これで、Queue Storage の基本を学習できました。さらに詳細な情報が必要な場合は、次のリンク先を参照してください。