通过 F# 实现 Azure 队列存储入门
Azure 队列存储用于在应用程序组件之间进行云消息传送。 在设计应用程序以实现伸缩性时,通常要将各个应用程序组件分离,使其可以独立地进行伸缩。 队列存储提供的异步消息传送适用于在应用程序组件之间进行通信,无论这些应用程序组件是运行在云中、桌面上、本地服务器上还是移动设备上。 队列存储还支持管理异步任务以及构建过程工作流。
关于本教程
本教程展示了如何针对某些使用 Azure 队列存储的常见任务编写 F# 代码。 涉及的任务包括创建和删除队列、添加、读取和删除队列消息。
有关队列存储的概念性概述,请参阅队列存储的 .NET 指南。
先决条件
若要使用此指南,必须先创建 Azure 存储帐户。 你还需要此帐户的存储访问密钥。
创建 F# 脚本并启动 F# 交互
本文中的示例可用于 F# 应用程序或 F# 脚本。 若要创建 F# 脚本,请在 F# 开发环境中创建一个扩展名为 .fsx
的文件,例如 queues.fsx
。
如何执行脚本
F# 交互窗口 dotnet fsi
可通过交互方式启动,也可以从命令行启动,以运行脚本。 命令行语法是
> dotnet fsi [options] [ script-file [arguments] ]
在脚本中添加包
接下来,使用 #r
nuget:package name
安装 Azure.Storage.Queues
包和 open
命名空间。例如
> #r "nuget: Azure.Storage.Queues"
open Azure.Storage.Queues
添加命名空间声明
将下列 open
语句添加到 queues.fsx
文件顶部:
open Azure.Storage.Queues // Namespace for Queue storage types
open System
open System.Text
获取连接字符串
本教程需要一个 Azure 存储连接字符串。 有关连接字符串的详细信息,请参阅配置存储连接字符串。
在本教程中,你将在脚本中输入连接字符串,如下所示:
let storageConnString = "..." // fill this in from your storage account
创建队列服务客户端
使用 QueueClient
类可以检索存储在队列存储中的队列。 下面是创建客户端的一种方法:
let queueClient = QueueClient(storageConnString, "myqueue")
现在,已准备好编写从队列存储读取数据并将数据写入队列存储的代码。
创建队列
此示例演示如何创建队列(如果队列还不存在):
queueClient.CreateIfNotExists()
在队列中插入消息
要将消息插入到现有队列中,请先创建新的消息。 接下来,调用 SendMessage
方法。 可从字符串(UTF-8 格式)或 byte
数组创建消息:
queueClient.SendMessage("Hello, World") // Insert a String message into a queue
queueClient.SendMessage(BinaryData.FromBytes(Encoding.UTF8.GetBytes("Hello, World"))) // Insert a BinaryData message into a queue
扫视下一条消息
通过调用 PeekMessage
方法,可以速览队列前面的消息,且不会将其从队列中删除。
let peekedMessage = queueClient.PeekMessage()
let messageContents = peekedMessage.Value.Body.ToString()
获取下一条消息进行处理
可以通过调用 ReceiveMessage
方法检索队列前面的消息以进行处理。
let updateMessage = queueClient.ReceiveMessage().Value
稍后使用 DeleteMessage
指示消息处理成功。
更改已排队消息的内容
可以在队列中就地更改检索到的消息的内容。 如果消息表示工作任务,可使用此功能来更新该工作任务的状态。 以下代码使用新内容更新队列消息,并将可见性超时设置为再延长 60 秒。 这会保存与消息关联的工作的状态,并额外为客户端提供一分钟的时间来继续处理消息。 可使用此方法跟踪队列消息上的多步骤工作流,即使处理步骤因硬件或软件故障而失败,也无需从头开始操作。 通常,还可以保留重试计数,如果某条消息的重试次数超过一定次数,你将删除此消息。 这可避免每次处理某条消息时都触发应用程序错误。
queueClient.UpdateMessage(
updateMessage.MessageId,
updateMessage.PopReceipt,
"Updated contents.",
TimeSpan.FromSeconds(60.0))
取消对下一条消息的排队
代码通过两个步骤来取消对队列中某条消息的排队。 调用 ReceiveMessage
时,会获得队列中的下一条消息。 从 ReceiveMessage
返回的消息对于从此队列读取消息的任何其他代码都是不可见的。 默认情况下,此消息持续 30 秒不可见。 若要完成从队列中删除消息,还必须调用 DeleteMessage
。 此删除消息的两步过程可确保,如果代码因硬件或软件故障而无法处理消息,则代码的其他实例可以获取相同消息并重试。 代码在处理消息后会立即调用 DeleteMessage
。
到目前为止,我们展示的所有队列方法都有 Async
个替代方法。
let deleteMessage = queueClient.ReceiveMessage().Value
queueClient.DeleteMessage(deleteMessage.MessageId, deleteMessage.PopReceipt)
对通用队列存储 API 使用异步工作流
此示例展示了如何将异步工作流与常见队列存储 API 配合使用。
async {
let! exists = queueClient.CreateIfNotExistsAsync() |> Async.AwaitTask
let! delAsyncMessage = queueClient.ReceiveMessageAsync() |> Async.AwaitTask
// ... process the message here ...
// Now indicate successful processing:
queueClient.DeleteMessageAsync(delAsyncMessage.Value.MessageId, delAsyncMessage.Value.PopReceipt) |> Async.AwaitTask
}
用于取消对消息进行排队的其他选项
可通过两种方式自定义队列中消息的检索。
首先,可获取一批消息(最多 32 条)。 其次,可以设置更长或更短的不可见超时时间,从而允许代码使用更多或更少时间来完全处理每个消息。 下面的代码示例使用 ReceiveMessages
在一次调用中获取 20 条消息,然后处理每条消息。 它还将每条消息的不可见超时时间设置为 5 分钟。 5 分钟超时时间对于所有消息都是同时开始的,因此在调用 ReceiveMessages
5 分钟后,尚未删除的任何消息都将再次变得可见。
for dequeueMessage in queueClient.ReceiveMessages(20, Nullable(TimeSpan.FromMinutes(5.))).Value do
// Process the message here.
queueClient.DeleteMessage(dequeueMessage.MessageId, dequeueMessage.PopReceipt)
获取队列长度
可以获取队列中消息的估计数。 使用 GetProperties
方法可要求队列服务检索队列属性,包括消息计数。 ApproximateMessagesCount
属性返回 GetProperties
方法检索到的最后一个值。
let properties = queueClient.GetProperties().Value
let count = properties.ApproximateMessagesCount
删除队列
若要删除队列及其包含的所有消息,请对队列对象调用 Delete
方法。
queueClient.DeleteIfExists()
注意
如果要从旧库迁移,则默认情况下,它们采用 Base64 编码的消息,但新库不会,因为它性能更高。 有关如何设置编码的信息,请参阅 MessageEncoding。
另请参阅
反馈
https://aka.ms/ContentUserFeedback。
即将发布:在整个 2024 年,我们将逐步淘汰作为内容反馈机制的“GitHub 问题”,并将其取代为新的反馈系统。 有关详细信息,请参阅:提交和查看相关反馈