如何通过 PHP 使用队列存储

提示

试用 Microsoft Azure 存储资源管理器

Microsoft Azure 存储资源管理器是 Microsoft 免费提供的独立应用,适用于在 Windows、macOS 和 Linux 上以可视方式处理 Azure 存储数据。

本指南展示了如何使用 Azure 队列存储服务执行常见方案。 这些示例是通过适用于 PHP 的 Azure 存储客户端库中的类编写的。 介绍的方案包括插入、扫视、获取和删除队列消息以及创建和删除队列。

什么是队列存储?

Azure 队列存储是一项可存储大量消息的服务,用户可以通过经验证的呼叫,使用 HTTP 或 HTTPS 从世界任何地方访问这些消息。 一条队列消息的大小最多可为 64 KB,一个队列中可以包含数百万条消息,直至达到存储帐户的总容量限值。 队列存储通常用于创建要异步处理的积压工作 (backlog)。

队列服务概念

Azure 队列服务包含以下组件:

Azure 队列服务组件

  • 存储帐户: 对 Azure 存储进行的所有访问都要通过存储帐户完成。 有关存储帐户的详细信息,请参阅存储帐户概述

  • 队列:一个队列包含一组消息。 所有消息必须位于相应的队列中。 请注意,队列名称必须全部小写。 有关命名队列的信息,请参阅 命名队列和元数据

  • 消息: 一条消息(无论哪种格式)的最大大小为 64 KB。 消息可以保留在队列中的最长时间为 7 天。 在 2017-07-29 或更高版本中,最大生存时间可以是任何正数,或者是 -1(表示消息不会过期)。 如果省略此参数,则默认的生存时间为 7 天。

  • URL 格式:使用以下 URL 格式对队列进行寻址:http://<storage account>.queue.core.windows.net/<queue>

    可使用以下 URL 访问示意图中的某个队列:

    http://myaccount.queue.core.windows.net/incoming-orders

创建 Azure 存储帐户

创建第一个 Azure 存储帐户的最简单方法是使用 Azure 门户。 若要了解更多信息,请参阅 创建存储帐户

还可使用 Azure PowerShellAzure CLI适用于 .NET 的 Azure 存储资源提供程序创建 Azure 存储帐户。

如果暂时不想在 Azure 中创建存储帐户,也可以使用 Azurite 存储模拟器在本地环境中运行和测试代码。 有关详细信息,请参阅使用 Azurite 模拟器进行本地 Azure 存储开发

创建 PHP 应用程序

创建用于访问 Azure 队列存储的 PHP 应用程序的唯一要求是从代码中引用适用于 PHP 的 Azure 存储客户端库中的类。 可以使用任何开发工具(包括“记事本”)创建应用程序。

在本指南中,我们使用队列存储服务功能。这些功能可在 PHP 应用程序中本地调用,或通过在 Azure 的 Web 应用程序中运行的代码调用。

获取 Azure 客户端库

通过 composer 安装

  1. 在项目的根目录中创建一个名为 composer.json 的文件并向其添加以下代码:

    {
      "require": {
        "microsoft/azure-storage-queue": "*"
      }
    }
    
  2. composer.phar 下载到你的项目根目录中。

  3. 打开命令提示符并在项目根目录中运行以下命令:

    php composer.phar install
    

或者转到 GitHub 上的 Azure 存储 PHP 客户端库,以克隆源代码。

配置应用程序以访问队列存储

若要使用 Azure 队列存储 API,需执行以下操作:

  1. 使用 require_once 语句引用自动换带机文件。
  2. 引用可使用的所有类。

下面的示例演示了如何包括 autoloader 文件并引用 QueueRestProxy 类。

require_once 'vendor/autoload.php';
use MicrosoftAzure\Storage\Queue\QueueRestProxy;

以下示例中,会始终显示 require_once 语句,但只会引用运行示例所需的类。

设置 Azure 存储连接

若要实例化 Azure 队列存储客户端,首先必须有有效的连接字符串。 队列存储连接字符串的格式如下。

若要访问实时服务:

DefaultEndpointsProtocol=[http|https];AccountName=[yourAccount];AccountKey=[yourKey]

若要访问模拟器存储:

UseDevelopmentStorage=true

若要创建 Azure 队列存储客户端,需要使用 QueueRestProxy 类。 可使用以下方法之一:

  • 将连接字符串直接传递给它。
  • 在 Web 应用中使用环境变量来存储连接字符串。 要配置连接字符串,请参阅 Azure Web 应用配置设置文档。

在此处列出的示例中,将直接传递连接字符串。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";
$queueClient = QueueRestProxy::createQueueService($connectionString);

创建队列

QueueRestProxy 对象允许你使用 CreateQueue 方法创建队列。 创建队列时,可以在该队列上设置选项,但此操作不是必需的。 此示例展示了如何在队列上设置元数据。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateQueueOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set queue metadata.
$createQueueOptions = new CreateQueueOptions();
$createQueueOptions->addMetaData("key1", "value1");
$createQueueOptions->addMetaData("key2", "value2");

try    {
    // Create queue.
    $queueClient->createQueue("myqueue", $createQueueOptions);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

注意

不应依赖元数据密钥区分大小写的性质。 以小写形式从服务中读取所有密钥。

向队列添加消息

若要向队列添加消息,请使用 QueueRestProxy->createMessage。 此方法接受队列名称、消息文本和消息选项(这些都是可选的)。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\CreateMessageOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Create message.
    $queueClient->createMessage("myqueue", "Hello, World");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

扫视下一条消息

通过调用 QueueRestProxy->peekMessages,可以速览队列前面的一条或多条消息,不将其从队列中删除。 默认情况下,peekMessage 方法返回单条消息,但你可以使用 PeekMessagesOptions->setNumberOfMessages 方法更改该值。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\PeekMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// OPTIONAL: Set peek message options.
$message_options = new PeekMessagesOptions();
$message_options->setNumberOfMessages(1); // Default value is 1.

try    {
    $peekMessagesResult = $queueClient->peekMessages("myqueue", $message_options);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

$messages = $peekMessagesResult->getQueueMessages();

// View messages.
$messageCount = count($messages);
if($messageCount <= 0){
    echo "There are no messages.<br />";
}
else{
    foreach($messages as $message)    {
        echo "Peeked message:<br />";
        echo "Message Id: ".$message->getMessageId()."<br />";
        echo "Date: ".date_format($message->getInsertionDate(), 'Y-m-d')."<br />";
        echo "Message text: ".$message->getMessageText()."<br /><br />";
    }
}

取消对下一条消息的排队

代码分两步从队列中删除消息。 首先,调用 QueueRestProxy->listMessages,这将使消息对从队列进行读取的任何其他代码不可见。 默认情况下,此消息持续 30 秒不可见。 (如果在此时段内未删除该消息,它会在队列上再次可见。)若要完成从队列中删除消息的操作,必须调用 QueueRestProxy->deleteMessage。 此删除消息的两步过程可确保当代码因硬件或软件故障而无法处理消息时,其他代码实例可以获取同一消息并重试。 代码在处理消息后会立即调用 deleteMessage

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

/* ---------------------
    Process message.
   --------------------- */

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Delete message.
    $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

更改已排队消息的内容

可以通过调用 QueueRestProxy->updateMessage 在队列中就地更改消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常同时保留重试计数,当消息重试次数超过 n 时再删除该消息。 这可避免每次处理某条消息时都触发应用程序错误。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Get message.
$listMessagesResult = $queueClient->listMessages("myqueue");
$messages = $listMessagesResult->getQueueMessages();
$message = $messages[0];

// Define new message properties.
$new_message_text = "New message text.";
$new_visibility_timeout = 5; // Measured in seconds.

// Get message ID and pop receipt.
$messageId = $message->getMessageId();
$popReceipt = $message->getPopReceipt();

try    {
    // Update message.
    $queueClient->updateMessage("myqueue",
                                $messageId,
                                $popReceipt,
                                $new_message_text,
                                $new_visibility_timeout);
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

用于取消对消息进行排队的其他选项

可通过两种方式自定义队列中的消息检索。 首先,可获取一批消息(最多 32 条)。 其次,可设置更长或更短的可见超时时间,允许代码使用更长或更短的时间来彻底处理每条消息。 以下代码示例使用 getMessages 方法在一次调用中获取 16 条消息。 然后,它使用 for 循环处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;
use MicrosoftAzure\Storage\Queue\Models\ListMessagesOptions;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

// Set list message options.
$message_options = new ListMessagesOptions();
$message_options->setVisibilityTimeoutInSeconds(300);
$message_options->setNumberOfMessages(16);

// Get messages.
try{
    $listMessagesResult = $queueClient->listMessages("myqueue",
                                                     $message_options);
    $messages = $listMessagesResult->getQueueMessages();

    foreach($messages as $message){

        /* ---------------------
            Process message.
        --------------------- */

        // Get message Id and pop receipt.
        $messageId = $message->getMessageId();
        $popReceipt = $message->getPopReceipt();

        // Delete message.
        $queueClient->deleteMessage("myqueue", $messageId, $popReceipt);
    }
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

获取队列长度

可以获取队列中消息的估计数。 QueueRestProxy->getQueueMetadata 方法检索有关队列的元数据。 对返回的对象调用 getApproximateMessageCount 方法时,系统会提供队列中消息的计数。 此计数只是一个近似值,因为在队列存储响应你的请求后,可能会添加或删除消息。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Get queue metadata.
    $queue_metadata = $queueClient->getQueueMetadata("myqueue");
    $approx_msg_count = $queue_metadata->getApproximateMessageCount();
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

echo $approx_msg_count;

删除队列

若要删除某个队列以及其中的所有消息,请调用 QueueRestProxy->deleteQueue 方法。

require_once 'vendor/autoload.php';

use MicrosoftAzure\Storage\Queue\QueueRestProxy;
use MicrosoftAzure\Storage\Common\Exceptions\ServiceException;

$connectionString = "DefaultEndpointsProtocol=http;AccountName=<accountNameHere>;AccountKey=<accountKeyHere>";

// Create queue REST proxy.
$queueClient = QueueRestProxy::createQueueService($connectionString);

try    {
    // Delete queue.
    $queueClient->deleteQueue("myqueue");
}
catch(ServiceException $e){
    // Handle exception based on error codes and messages.
    // Error codes and messages are here:
    // https://msdn.microsoft.com/library/azure/dd179446.aspx
    $code = $e->getCode();
    $error_message = $e->getMessage();
    echo $code.": ".$error_message."<br />";
}

后续步骤

现在,你已了解了有关 Azure 队列存储的基础知识,请单击以下链接来了解更复杂的存储任务:

有关详细信息,请参阅 PHP 开发人员中心