测试和调试可观测序列
测试 Rx 应用程序
如果你有一个可观测的序列,该序列在较长一段时间内发布值,则实时测试它可能是一个延伸。 反应式扩展库提供 TestScheduler 类型,以帮助测试这种与时间相关的代码,而无需实际等待时间通过。 TestScheduler 继承 VirtualScheduler,并允许在模拟时间内创建、发布和订阅序列。 例如,可以将需要 5 天的发布压缩为 2 分钟运行,同时保持正确的缩放比例。 还可以获取过去实际发生的序列 (例如,前一年的股票时钟周期序列) 计算或订阅,就像实时推送新值一样。
工厂方法 Start 将执行所有计划任务,直到队列为空,也可以指定一个时间,以便排队的任务仅执行到指定时间。
以下示例创建具有指定 OnNext 通知的热可观测序列。 然后,它启动测试计划程序,并指定何时订阅和释放热可观测序列。 Start 方法返回 ITestableObserver 的实例,其中包含记录列表中所有通知的 Messages 属性。
序列完成后,我们使用 ReactiveAssert.AreElementEqual 方法来比较 Messages 属性,以及预期值列表,以查看两者是否相同 (项数相同,项是否相等且顺序) 相同。 通过这样做,我们可以确认我们确实收到了预期的通知。 在我们的示例中,由于我们只开始订阅 150,因此将错过值 abc
。 但是,当我们比较到目前为止收到的值 400 时,我们注意到,实际上我们在订阅序列后收到了所有已发布的值。 我们还验证通知 OnCompleted
是在正确的时间在 500 触发的。 此外,订阅信息还会由 CreateHotObservable 方法返回的 ITestableObservable 类型捕获。
同样,可以使用 ReactiveAssert.AreElementsEqual 确认订阅确实发生在预期时间。
using System;
using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;
class Program : ReactiveTest
{
static void Main(string[] args)
{
var scheduler = new TestScheduler();
var input = scheduler.CreateHotObservable(
OnNext(100, "abc"),
OnNext(200, "def"),
OnNext(250, "ghi"),
OnNext(300, "pqr"),
OnNext(450, "xyz"),
OnCompleted<string>(500)
);
var results = scheduler.Start(
() => input.Buffer(() => input.Throttle(TimeSpan.FromTicks(100), scheduler))
.Select(b => string.Join(",", b)),
created: 50,
subscribed: 150,
disposed: 600);
ReactiveAssert.AreElementsEqual(results.Messages, new Recorded<Notification<string>>[] {
OnNext(400, "def,ghi,pqr"),
OnNext(500, "xyz"),
OnCompleted<string>(500)
});
ReactiveAssert.AreElementsEqual(input.Subscriptions, new Subscription[] {
Subscribe(150, 500),
Subscribe(150, 400),
Subscribe(400, 500)
});
}
}
调试 Rx 应用程序
可以使用 Do 运算符调试 Rx 应用程序。 Do 运算符允许您指定要对每个可观测序列 (项执行的各种操作,例如打印或记录项等) 。 当你链接许多运算符,并且想知道在每个级别生成了哪些值时,这尤其有用。
在下面的示例中,我们将重复使用每秒生成整数的 Buffer 示例,同时将它们放入每个可以容纳 5 个项的缓冲区中。 在 使用 LINQ 运算符查询可观测序列 主题中的原始示例中,我们仅订阅当缓冲区已满 (并在) 清空缓冲区之前,最终的可观测 (IList<>) 序列。 但是,在此示例中,我们将使用 Do 运算符在原始序列推送值时输出值, (每秒) 一个整数。 缓冲区已满时,我们使用 Do 运算符打印状态,然后将所有这些作为观察程序订阅的最终序列进行移交。
var seq1 = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(x => Console.WriteLine(x.ToString()))
.Buffer(5)
.Do(x => Console.WriteLine("buffer is full"))
.Subscribe(x => Console.WriteLine("Sum of the buffer is " + x.Sum()));
Console.ReadKey();
从此示例中可以看到,订阅位于一系列链式可观测序列的接收端。 首先,我们使用 Interval 运算符创建一个可观测的整数序列(以秒为单位)。 然后,我们使用 Buffer 运算符将 5 个项放入缓冲区,仅当缓冲区已满时才将它们作为另一个序列发送出去。 最后,该操作将移交给 Subscribe 运算符。 数据向下传播所有这些中间序列,直到它们被推送到观察者。 同样,订阅以相反的方向传播到源序列。 通过在此类传播中间插入 Do 运算符,可以“监视”此类数据流,就像在 .NET 中使用 Console.WriteLine 或在 C 中使用 printf () 执行调试一样。
还可以使用 Timestamp 运算符来验证项被可观测序列推送出的时间。 这有助于对基于时间的操作进行故障排除,以确保准确性。 回想一下创建 和订阅简单可观测序列 主题中的以下示例,在该主题中,我们将 Timestamp 运算符链接到查询,以便源序列推送出的每个值都将在发布时追加。 通过这样做,当我们订阅此源序列时,我们可以同时接收其值和时间戳。
Console.WriteLine(“Current Time: “ + DateTime.Now);
var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
.Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
{
Console.WriteLine("Press any key to unsubscribe");
Console.ReadKey();
}
Console.WriteLine("Press any key to exit");
Console.ReadKey();
输出将类似于下面:
Current Time: 5/31/2011 5:35:08 PM
Press any key to unsubscribe
0: 5/31/2011 5:35:13 PM -07:00
1: 5/31/2011 5:35:14 PM -07:00
2: 5/31/2011 5:35:15 PM -07:00
通过使用 Timestamp 运算符,我们已验证第一项确实在序列后 5 秒外推,并且每个项在 1 秒后发布。
此外,还可以在 lambda 表达式中设置断点以帮助调试。 通常,只能为整个查询设置断点,而不单独显示特定值来查找它。 若要解决此限制,可以在查询中间插入 Select 运算符并设置一个断点,并在 Select 语句中,使用返回语句在其自己的行中将相同的值投影出为其源。 然后,可以在语句行设置断点 return
,并在值通过查询时检查这些值。
var seq = Observable.Interval(TimeSpan.FromSeconds(1))
.Do(x => Console.WriteLine(x.ToString()))
.Buffer(5)
.Select(y => {
return y; }) // set a breakpoint at this line
.Do(x => Console.WriteLine("buffer is full"))
.Subscribe(x => Console.WriteLine("Sum of the buffer is " + x.Sum()));
Console.ReadKey();
在此示例中,断点在 行处 return y
设置。 调试到程序中时, y
变量将显示在“ 局部变量 ”窗口中,你可以检查其总计数 (5)
。 如果展开 y
,还可以检查列表中的每一项,包括其值和类型。
或者,可以将 lambda 表达式转换为语句 lambda 表达式,设置代码格式,使语句位于自己的行上,然后设置断点。
调试完成后,可以删除任何 Do 和 Select 调用。