A Brief Introduction to the Reactive Extensions for .NET, Rx

Wow, it has been a long time since I have blogged.  Yesterday, we made the first official public release of Rx on devlabs.  And that means that I can now talk about what has been on my mind for the past while: Rx.  What is Rx?  Rx is short for the Reactive Extensions for .NET.  Think of it as a prescription strength library for writing reactive programs in your favorite .NET language.

Today, I want to give you a brief introduction to Rx that complements my screencast.  In subsequent posts, I will dive deep into the semantics of Rx providing detailed explanations and examples.  Hopefully, this will positively impact your day to day work and allow you to have a lot of fun in the process.  Please feel free to ask any questions that you may have or provide feedback about Rx.  This first release is a sort of preview of the technology.  We will provide regular updates that both extend and refine the library and we want you to be a part of that process.

First Steps

Rx is currently available for .NET Framework 3.5 SP1, .NET Framework 4 Beta 2, and Silverlight 3.  Download and install one of these versions and then open Visual Studio.


Create a WPF Application or a Silverlight Application.



The Chrome

When the designer opens, replace the existing Grid control with the following XAML.

Text="Rx Rocks!" />
Canvas.Top="25" />

Then switch to the code view.


Starting with Rx

To begin using Rx, first add a reference to System.Reactive and System.CoreEx.  The two other assemblies included with this release are System.Threading (task parallel library) and System.Interactive (more enumerable goodness).




Once you have added a reference to System.Reactive and System.CoreEx, then you can use the reactive operators found in Observable.

Observable Sequences

Observables, like enumerables, are sequences.  The only difference is that the latter is pull-based and the former is push-based.  Both sequences are a possibly empty sequence of values optionally followed by either normal termination or abnormal termination.

When you want to enumerate an enumerable:

1.  Get the enumerator from the enumerable

2.  Call MoveNext and call Current on the enumerator for each value in the sequence

3.  The sequence ends when MoveNext returns false (normal termination) or MoveNext or Current throws an exception (abnormal termination)

The user writes a foreach statement over an enumerable sequence.

foreach (var x in xs)

With enumerables, each value is pulled out of the source and bound to the iteration variable before executing the user code.  Note that the consumer controls this process.

When you want to observe an observable sequence:

1.  Subscribe an observer to the observable

2.  OnNext is called on the observer for each value in the sequence

3.  The sequence ends when OnCompleted (normal termination) or OnError (abnormal termination) is called on the observer

The user writes a subscribe statement over an observable sequence.

xs.Subscribe(x => F(x));

With observables, each value is pushed out of the source and bound to the handler variable before executing the user code.  Note that the producer controls this process.

Hello, World!

Let’s write our first Rx application, the obligatory “Hello, World!” application.

Remember all those times that you wrote UI applications with asynchronous operations and got the dreaded Cross-Thread operation exception.


Well Rx, makes this simpler despite the fact that it has a lot of asynchronous and event-based operations going on at once.

The user may specify the default SynchronizationContext to which to send Rx messages.  By default, it uses a reactive event loop except in Silverlight where it uses the dispatcher by default.  The user can change this by setting Observable.Context to the appropriate SynchronizationContext.  Rx provides a few commonly used synchronization contexts in the class SynchronizationContexts.

Since we are using WPF, we should set the default context to the dispatcher context.

Observable.Context = SynchronizationContexts.CurrentDispatcher;

Now, we can use the UI without getting cross-thread exceptions.

Rx includes a number of basic operations for constructing observable sequences.  For example, Return creates a singleton sequence that contains an OnNext message with the specified value followed by an OnCompleted message.  We can then subscribe to the observable using a value handler which specifies what to do with each OnNext message from the observable sequence.

var helloWorld = Observable.Return("Hello, World!");

helloWorld.Subscribe(value => textBlock.Text = value);

When we run the application, we see that the text is set to “Hello, World!”.


A Second Step

Another basic operation for constructing observable sequences, is the Interval operator.  It creates an observable sequence that repeatedly sends an OnNext message after the specified time interval.  For example, we can create an observable sequence that sends a message after every second.

var timer = Observable.Interval(TimeSpan.FromSeconds(1));

timer.Subscribe(value => textBlock.Text = value.ToString());


Wrapping .NET Events

Another source of observable sequences is existing .NET events and asynchronous computations.  We can bring these existing reactive sources into Rx by using conversions.  For example, we can convert .NET events to observable sequences using the FromEvent operator.  The user must specify the type of EventArgs for the event, the target, and the event name.  We can use this to convert the MouseDown event on the image to an observable sequence of events

var mouseDown = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown");

LINQ to Rx

Observable contains an implementation of the standard query operators for observable sequences and that means that we can use LINQ to write queries over observable sequences.  For example, we can modify our mouseDown observable sequence to contain a sequence of position values by projecting each event to the current mouse position relative to the image.

var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
select evt.EventArgs.GetPosition(image);


Rx also contains a number of operators to compose together observable sequences in a way very similar to how Linq to Objects composes together enumerable sequences. For example, the operator Until takes all of the messages from the left source until an OnNext message occurs on the right source at which time it will unsubscribe from the left source and the right source.

var q = timer.Until(mouseDown);

q.Subscribe(value => textBlock.Text = value.ToString());

Drag and Drop

We can put together these simple pieces and build drag and drop.  In the screencast, I wrote drag and drop using the mouse deltas, here I will write it using the mouse offset to show another way to do the same thing.  As you begin to use Rx, you will find that often you can simplify your queries repeatedly and if you do it right then at the end you will get a query that looks remarkably like the specification of the problem.

What is the specification of drag and drop?  Every time the user clicks down on the image, we want to move the image relative to where she clicked down until the mouse button goes up.

Therefore, we need three events: the mouse down on the image, the mouse move on the window, and the mouse up on the window.  For the mouse down, we need the position relative to the image, for the mouse move, we need the position relative to the window, and for the mouse up we do not need the position.

var mouseDown = from evt in Observable.FromEvent<MouseButtonEventArgs>(image, "MouseDown")
select evt.EventArgs.GetPosition(image);

var mouseUp = Observable.FromEvent<MouseButtonEventArgs>(image, "MouseUp");

var mouseMove = from evt in Observable.FromEvent<MouseEventArgs>(image, "MouseMove")
select evt.EventArgs.GetPosition(this);

Now, we can write a query that expresses our intent.  When the mouse goes down grab the offset of the mouse relative to the image, and then listen to the mouse moves relative to the window until the mouse goes up.  For each mouse move during this time, compute the image position as the mouse position subtracted by the original offset relative to the image.

var q = from imageOffset in mouseDown
from pos in mouseMove.Until(mouseUp)
select new { X = pos.X - imageOffset.X, Y = pos.Y - imageOffset.Y };

Finally, we can subscribe to the observable sequence of positions and update the position of the image.

q.Subscribe(value =>
Canvas.SetLeft(image, value.X);
Canvas.SetTop(image, value.Y);


Looking Forward

Drag and drop is fun, but it is only one small example of the power of Rx.  Next time, we will start our deep dive into the semantics of Rx.  Until then take the time to download it and take it for a test drive.