通过 F# 实现 Azure 队列存储入门

Azure 队列存储用于在应用程序组件之间进行云消息传送。 在设计应用程序以实现伸缩性时,通常要将各个应用程序组件分离,使其可以独立地进行伸缩。 队列存储提供的异步消息传送适用于在应用程序组件之间进行通信,无论这些应用程序组件是运行在云中、桌面上、本地服务器上还是移动设备上。 队列存储还支持管理异步任务以及构建过程工作流。

关于本教程

本教程展示了如何针对某些使用 Azure 队列存储的常见任务编写 F# 代码。 涉及的任务包括创建和删除队列、添加、读取和删除队列消息。

有关队列存储的概念性概述,请参阅队列存储的 .NET 指南

先决条件

若要使用此指南,必须先创建 Azure 存储帐户。 你还需要此帐户的存储访问密钥。

创建 F# 脚本并启动 F# 交互

本文中的示例可用于 F# 应用程序或 F# 脚本。 若要创建 F# 脚本,请在 F# 开发环境中创建一个扩展名为 .fsx 的文件,例如 queues.fsx

如何执行脚本

F# 交互窗口 dotnet fsi 可通过交互方式启动,也可以从命令行启动,以运行脚本。 命令行语法是

> dotnet fsi [options] [ script-file [arguments] ]

在脚本中添加包

接下来,使用 #rnuget:package name 安装 Azure.Storage.Queues 包和 open 命名空间。例如

> #r "nuget: Azure.Storage.Queues"
open Azure.Storage.Queues

添加命名空间声明

将下列 open 语句添加到 queues.fsx 文件顶部:

open Azure.Storage.Queues // Namespace for Queue storage types
open System
open System.Text

获取连接字符串

本教程需要一个 Azure 存储连接字符串。 有关连接字符串的详细信息,请参阅配置存储连接字符串

在本教程中,你将在脚本中输入连接字符串,如下所示:

let storageConnString = "..." // fill this in from your storage account

创建队列服务客户端

使用 QueueClient 类可以检索存储在队列存储中的队列。 下面是创建客户端的一种方法:

let queueClient = QueueClient(storageConnString, "myqueue")

现在,已准备好编写从队列存储读取数据并将数据写入队列存储的代码。

创建队列

此示例演示如何创建队列(如果队列还不存在):

queueClient.CreateIfNotExists()

在队列中插入消息

要将消息插入到现有队列中,请先创建新的消息。 接下来,调用 SendMessage 方法。 可从字符串(UTF-8 格式)或 byte 数组创建消息:

queueClient.SendMessage("Hello, World") // Insert a String message into a queue
queueClient.SendMessage(BinaryData.FromBytes(Encoding.UTF8.GetBytes("Hello, World"))) // Insert a BinaryData message into a queue

扫视下一条消息

通过调用 PeekMessage 方法,可以速览队列前面的消息,且不会将其从队列中删除。

let peekedMessage = queueClient.PeekMessage()
let messageContents = peekedMessage.Value.Body.ToString()

获取下一条消息进行处理

可以通过调用 ReceiveMessage 方法检索队列前面的消息以进行处理。

let updateMessage = queueClient.ReceiveMessage().Value

稍后使用 DeleteMessage 指示消息处理成功。

更改已排队消息的内容

可以在队列中就地更改检索到的消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常,还可以保留重试计数,如果某条消息的重试次数超过一定次数,你将删除此消息。 这可避免每次处理某条消息时都触发应用程序错误。

queueClient.UpdateMessage(
    updateMessage.MessageId,
    updateMessage.PopReceipt,
    "Updated contents.",
    TimeSpan.FromSeconds(60.0))

取消对下一条消息的排队

代码通过两个步骤来取消对队列中某条消息的排队。 调用 ReceiveMessage 时,会获得队列中的下一条消息。 从 ReceiveMessage 返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 DeleteMessage。 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。 代码在处理消息后会立即调用 DeleteMessage。 到目前为止,我们展示的所有队列方法都有 Async 个替代方法。

let deleteMessage = queueClient.ReceiveMessage().Value
queueClient.DeleteMessage(deleteMessage.MessageId, deleteMessage.PopReceipt)

对通用队列存储 API 使用异步工作流

此示例展示了如何将异步工作流与常见队列存储 API 配合使用。

async {
    let! exists = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask

    let! delAsyncMessage = queueClient.ReceiveMessageAsync() |> Async.AwaitTask

    // ... process the message here ...

    // Now indicate successful processing:
    queueClient.DeleteMessageAsync(delAsyncMessage.Value.MessageId, delAsyncMessage.Value.PopReceipt) |> Async.AwaitTask
}

用于取消对消息进行排队的其他选项

可通过两种方式自定义队列中消息的检索。 首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。 下面的代码示例使用 ReceiveMessages 在一次调用中获取 20 条消息,然后处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。 5 分钟超时时间对于所有消息都是同时开始的,因此在调用 ReceiveMessages 5 分钟后,尚未删除的任何消息都将再次变得可见。

for dequeueMessage in queueClient.ReceiveMessages(20, Nullable(TimeSpan.FromMinutes(5.))).Value do
        // Process the message here.
        queueClient.DeleteMessage(dequeueMessage.MessageId, dequeueMessage.PopReceipt)

获取队列长度

可以获取队列中消息的估计数。 使用 GetProperties 方法可要求队列服务检索队列属性,包括消息计数。 ApproximateMessagesCount 属性返回 GetProperties 方法检索到的最后一个值。

let properties = queueClient.GetProperties().Value
let count = properties.ApproximateMessagesCount

删除队列

若要删除队列及其包含的所有消息,请对队列对象调用 Delete 方法。

queueClient.DeleteIfExists()

注意

如果要从旧库迁移,则默认情况下,它们采用 Base64 编码的消息,但新库不会,因为它性能更高。 有关如何设置编码的信息,请参阅 MessageEncoding

另请参阅