测试和调试可观测序列

测试 Rx 应用程序

如果你有一个可观测的序列,该序列在较长一段时间内发布值,则实时测试它可能是一个延伸。 反应式扩展库提供 TestScheduler 类型,以帮助测试这种与时间相关的代码,而无需实际等待时间通过。 TestScheduler 继承 VirtualScheduler,并允许在模拟时间内创建、发布和订阅序列。 例如,可以将需要 5 天的发布压缩为 2 分钟运行,同时保持正确的缩放比例。 还可以获取过去实际发生的序列 (例如,前一年的股票时钟周期序列) 计算或订阅,就像实时推送新值一样。

工厂方法 Start 将执行所有计划任务,直到队列为空,也可以指定一个时间,以便排队的任务仅执行到指定时间。

以下示例创建具有指定 OnNext 通知的热可观测序列。 然后,它启动测试计划程序,并指定何时订阅和释放热可观测序列。 Start 方法返回 ITestableObserver 的实例,其中包含记录列表中所有通知的 Messages 属性。

序列完成后,我们使用 ReactiveAssert.AreElementEqual 方法来比较 Messages 属性,以及预期值列表,以查看两者是否相同 (项数相同,项是否相等且顺序) 相同。 通过这样做,我们可以确认我们确实收到了预期的通知。 在我们的示例中,由于我们只开始订阅 150,因此将错过值 abc。 但是,当我们比较到目前为止收到的值 400 时,我们注意到,实际上我们在订阅序列后收到了所有已发布的值。 我们还验证通知 OnCompleted 是在正确的时间在 500 触发的。 此外,订阅信息还会由 CreateHotObservable 方法返回的 ITestableObservable 类型捕获。

同样,可以使用 ReactiveAssert.AreElementsEqual 确认订阅确实发生在预期时间。

using System;
using System.Reactive;
using System.Reactive.Linq;
using Microsoft.Reactive.Testing;

class Program : ReactiveTest
{
    static void Main(string[] args)
    {
        var scheduler = new TestScheduler();

        var input = scheduler.CreateHotObservable(
            OnNext(100, "abc"),
            OnNext(200, "def"),
            OnNext(250, "ghi"),
            OnNext(300, "pqr"),
            OnNext(450, "xyz"),
            OnCompleted<string>(500)
            );

        var results = scheduler.Start(
            () => input.Buffer(() => input.Throttle(TimeSpan.FromTicks(100), scheduler))
                       .Select(b => string.Join(",", b)),
            created: 50,
            subscribed: 150,
            disposed: 600);

        ReactiveAssert.AreElementsEqual(results.Messages, new Recorded<Notification<string>>[] {
                OnNext(400, "def,ghi,pqr"),
                OnNext(500, "xyz"),
                OnCompleted<string>(500)
            });

        ReactiveAssert.AreElementsEqual(input.Subscriptions, new Subscription[] {
                Subscribe(150, 500),
                Subscribe(150, 400),
                Subscribe(400, 500)
            });
    }
}

调试 Rx 应用程序

可以使用 Do 运算符调试 Rx 应用程序。 Do 运算符允许您指定要对每个可观测序列 (项执行的各种操作,例如打印或记录项等) 。 当你链接许多运算符,并且想知道在每个级别生成了哪些值时,这尤其有用。

在下面的示例中,我们将重复使用每秒生成整数的 Buffer 示例,同时将它们放入每个可以容纳 5 个项的缓冲区中。 在 使用 LINQ 运算符查询可观测序列 主题中的原始示例中,我们仅订阅当缓冲区已满 (并在) 清空缓冲区之前,最终的可观测 (IList<>) 序列。 但是,在此示例中,我们将使用 Do 运算符在原始序列推送值时输出值, (每秒) 一个整数。 缓冲区已满时,我们使用 Do 运算符打印状态,然后将所有这些作为观察程序订阅的最终序列进行移交。

var seq1 = Observable.Interval(TimeSpan.FromSeconds(1))
           .Do(x => Console.WriteLine(x.ToString()))
           .Buffer(5)
           .Do(x => Console.WriteLine("buffer is full"))
           .Subscribe(x => Console.WriteLine("Sum of the buffer is " + x.Sum()));
Console.ReadKey();

从此示例中可以看到,订阅位于一系列链式可观测序列的接收端。 首先,我们使用 Interval 运算符创建一个可观测的整数序列(以秒为单位)。 然后,我们使用 Buffer 运算符将 5 个项放入缓冲区,仅当缓冲区已满时才将它们作为另一个序列发送出去。 最后,该操作将移交给 Subscribe 运算符。 数据向下传播所有这些中间序列,直到它们被推送到观察者。 同样,订阅以相反的方向传播到源序列。 通过在此类传播中间插入 Do 运算符,可以“监视”此类数据流,就像在 .NET 中使用 Console.WriteLine 或在 C 中使用 printf () 执行调试一样。

还可以使用 Timestamp 运算符来验证项被可观测序列推送出的时间。 这有助于对基于时间的操作进行故障排除,以确保准确性。 回想一下创建 和订阅简单可观测序列 主题中的以下示例,在该主题中,我们将 Timestamp 运算符链接到查询,以便源序列推送出的每个值都将在发布时追加。 通过这样做,当我们订阅此源序列时,我们可以同时接收其值和时间戳。

Console.WriteLine(“Current Time: “ + DateTime.Now);

var source = Observable.Timer(TimeSpan.FromSeconds(5), TimeSpan.FromSeconds(1))
                       .Timestamp();
using (source.Subscribe(x => Console.WriteLine("{0}: {1}", x.Value, x.Timestamp)))
      {
           Console.WriteLine("Press any key to unsubscribe");
           Console.ReadKey();
      }
Console.WriteLine("Press any key to exit");
Console.ReadKey();

输出将类似于下面:

Current Time: 5/31/2011 5:35:08 PM

Press any key to unsubscribe

0: 5/31/2011 5:35:13 PM -07:00

1: 5/31/2011 5:35:14 PM -07:00

2: 5/31/2011 5:35:15 PM -07:00

通过使用 Timestamp 运算符,我们已验证第一项确实在序列后 5 秒外推,并且每个项在 1 秒后发布。

此外,还可以在 lambda 表达式中设置断点以帮助调试。 通常,只能为整个查询设置断点,而不单独显示特定值来查找它。 若要解决此限制,可以在查询中间插入 Select 运算符并设置一个断点,并在 Select 语句中,使用返回语句在其自己的行中将相同的值投影出为其源。 然后,可以在语句行设置断点 return ,并在值通过查询时检查这些值。

var seq = Observable.Interval(TimeSpan.FromSeconds(1))
          .Do(x => Console.WriteLine(x.ToString()))
          .Buffer(5)
          .Select(y => { 
                  return y; }) // set a breakpoint at this line
          .Do(x => Console.WriteLine("buffer is full"))
          .Subscribe(x => Console.WriteLine("Sum of the buffer is " + x.Sum()));
Console.ReadKey();

在此示例中,断点在 行处 return y 设置。 调试到程序中时, y 变量将显示在“ 局部变量 ”窗口中,你可以检查其总计数 (5)。 如果展开 y,还可以检查列表中的每一项,包括其值和类型。

或者,可以将 lambda 表达式转换为语句 lambda 表达式,设置代码格式,使语句位于自己的行上,然后设置断点。

调试完成后,可以删除任何 Do 和 Select 调用。

另请参阅

概念

创建和订阅简单可观测序列
使用 LINQ 运算符查询可观测序列
使用计划程序