Managing multiple web service calls with Rx

Lets say I have a phone app (or any app really) that needs to make several web service calls at once. For example, imagine the user multiselects three items on a page and then taps the “add favorite” button. I then need to make three calls to the “set favorite” service. I’ll do this asynchronously so that they can all run simultaneously, but if I make three asynchronous calls to a web API that updates a local collection of resources, I want to be able to

  1. know which ones failed, to notify the user
  2. know which ones succeeded, to update the local data model
  3. get a notification when all of them are complete so I can turn off the “Is Busy” indicator in the UI

Typically the first two are no problem because I could run them all through the same logic each having its own completion callback which does the right thing. The trick comes in aggregating the responses all together so that I can do #3 – turn off the progress control. Normally this would entail creating some state object and tracking how many items were started, identifying them as they finished and updating the state object all in a thread safe manner. This is also complicated by the fact that the requests may (and likely will) complete in a different order from that which I started them.

Because I like to use Rx to populate my viewmodels, my web requests are IObservables which means that I can compose them in a way to get the behavior I want without worrying about threading or state management.

Lets say I’m making a call to mark something as a favorite. The service proxy looks like this. On success, the service will echo the new value sent to it.

 public IObservable<string> SetFavorite(int ID, bool isFavorite)

A single call would look something like this

 private void SingleCall()
{
    FavoritesService svc = new FavoritesService();
    var svcCall = svc.SetFavorite(5, true);

    svcCall.ObserveOnDispatcher().Subscribe(
        result => Debug.WriteLine("Call succeeded = {0}", result),
        ex => Debug.WriteLine("Call failed: {0}", ex),
        () => Debug.WriteLine("Call completed")
    );
}

If I needed to favorite three items I just do it three times.

 private void IndividualCalls()
{
    FavoritesService svc = new FavoritesService();

    var call1 = svc.SetFavorite(5, true);
    var call2 = svc.SetFavorite(6, true);
    var call3 = svc.SetFavorite(7, true);

    call1.ObserveOnDispatcher().Subscribe(
        result => Debug.WriteLine("Call1 succeeded = {0}", result),
        ex => Debug.WriteLine("Call1 failed: {0}", ex),
        () => Debug.WriteLine("Call1 completed")
    );

    call2.ObserveOnDispatcher().Subscribe(
        result => Debug.WriteLine("Call2 succeeded = {0}", result),
        ex => Debug.WriteLine("Call2 failed: {0}", ex),
        () => Debug.WriteLine("Call2 completed")
    );

    call3.ObserveOnDispatcher().Subscribe(
        result => Debug.WriteLine("Call3 succeeded = {0}", result),
        ex => Debug.WriteLine("Call3 failed: {0}", ex),
        () => Debug.WriteLine("Call3 completed")
    );
}
  

But how do I know when all three are done? My first thought was to just use Merge

 private void MergedCalls()
{
    FavoritesService svc = new FavoritesService();

    var call1 = svc.SetFavorite(5, true);
    var call2 = svc.SetFavorite(6, true);
    var call3 = svc.SetFavorite(7, true);

    var allCalls = call1.Merge(call2).Merge(call3);

    allCalls.ObserveOnDispatcher().Subscribe(
        result => Debug.WriteLine("All calls succeeded = {0}",
                                    result),
        ex => Debug.WriteLine("Some calls failed: {0}", ex),
        () => Debug.WriteLine("All calls completed")
    );
}

But there is a major issue with this. If an exception happens in an observable sequence, the sequence is over. So if any of the calls throw a fault (e.g. if there is a network error) then I wont receive any notifications for any of the calls that haven't yet completed. I tried a number of variations on this until Bart De Smet pointed out the Materialize operator to me.

Materialize “materializes the implicit notifications of an observable sequence as explicit notification values.” What this means is that if you materialize a sequence of strings you get a sequence of Notification<string>. The Notification<string> object tells you if it was a string value, exception, or completion. Therefore, you get the information but the sequence doesn’t throw errors. Instead, the error becomes a value in the sequence.

I can now materialize each of the three calls and then merge them together. If I generalize this to N calls to my SetFavorites it looks something like this

 private void SetFavoritesWithMaterialize2(List<int> IDs)
{
    FavoritesService svc = new FavoritesService();
    var total = IDs.ToObservable()
                    .SelectMany(id => 
                         svc.SetFavorite(id, id % 2 == 0 ? true : false)
                            .Materialize()
                            .Select(callResult => 
                                     new { Result = callResult, ID = id })
                     );
    total.Subscribe(call =>
            {
                switch (call.Result.Kind)
                {
                    case NotificationKind.OnNext:
                        Debug.WriteLine("CLIENT {0} succeeded = {1}", 
                                           call.ID, call.Result.Value);
                        break;
                    case NotificationKind.OnError:
                        Debug.WriteLine("CLIENT {0} failed", call.ID);
                        break;
                }
            },
            ex => Debug.WriteLine("CLIENT Something failed: {0}", ex.Message),
            () => Debug.WriteLine("CLIENT All completed")
        );
}

Initially I was foreach’ing over the list of IDs, but ‘duh! it’s a sequence!’, turn it into an observable and pipe them in the SetFavorite call. Then materialized the values and inject the current ID of the call with a Select operator so that later in the Subscribe method I can determine which call is succeeding or failing. The resulting sequence is a sequence of IObservables so I used SelectMany to flatten all of them into a single sequence.

At last, I can now subscribe the the sequence which will provide me a value which has (1) the ID of the call, (2) the success/failure of the call and (3) will raise a completion event only when all of the calls are done.

Here is a sample trace of my test program. The server is hard coded to return a “400 Bad Request” on ID 7. Notice the client receives the responses out of order, handles the error condition, and notifies when everything is done.

 

CLIENT: PUTing https://localhost:48963/api/favorites/1/False : {"value":"False"}

CLIENT: PUTing https://localhost:48963/api/favorites/2/True : {"value":"True"}

CLIENT: PUTing https://localhost:48963/api/favorites/3/False : {"value":"False"}

CLIENT: PUTing https://localhost:48963/api/favorites/4/True : {"value":"True"}

CLIENT: PUTing https://localhost:48963/api/favorites/5/False : {"value":"False"}

CLIENT: PUTing https://localhost:48963/api/favorites/6/True : {"value":"True"}

CLIENT: PUTing https://localhost:48963/api/favorites/7/False : {"value":"False"}

SERVER: FavoritesController.Put(id:1, value:False)

SERVER: FavoritesController.Put(id:7, value:False)

CLIENT: 1 succeeded = "False"

SERVER: FavoritesController.Put(id:2, value:True)

SERVER: FavoritesController.Put(id:5, value:False)

SERVER: FavoritesController.Put(id:6, value:True)

SERVER: FavoritesController.Put(id:3, value:False)

SERVER: FavoritesController.Put(id:4, value:True)

CLIENT: 2 succeeded = "True"

CLIENT: 7 failed

CLIENT: 6 succeeded = "True"

CLIENT: 5 succeeded = "False"

CLIENT: 4 succeeded = "True"

CLIENT: 3 succeeded = "False"

CLIENT: All completed