Silverlight duplex communication, part 2: Reactive Extensions

 

In part 1 of this series I posted a Silverlight chat application that used duplex communications over HTTP.  Now to me, incoming data or events imply that the Reactive Framework (Rx) is probably a good fit for the application.  And so I decided to refactor the chatroom application to use Rx.

Introduction to Reactive Framework

Other (possibly  better) introductions can be found here, here or here, but I think that I a metaphor described by a colleague of mine is appropriate.  Usage of IEnumerable<T> can be likened to a water well – you pull the water out just as fast as you want it.  On the other hand, usage of IObservable<T> (a key Rx interface) can be likened to a spring of water – it just keeps pumping water at you and you better try and keep up!

On a slightly more detailed note, the main interfaces within Rx are IObservable<T> and IObserver<T>.  An Observable represents an event source while the Observer repesents an event sink.  Of course, an Observer can also be an Observable … think for a moment about an event filter that passes on some events while ignoring others.  Classes that implement IObservable<T> and IObserver<T> are known in Rx as “subjects”.

Interestingly, Rx chooses to represent an “event” as a method call.  So being an Observer (i.e. you receive events) actually just means that you receive method calls on your IObserver<T> interface.  And being an Observable (i.e. you raise events) actually just means that you make method calls to Observers who have registered an interest by subscribing.

Now you may be wondering what the generic type parameter T is used for.  Well that is the type of data that accompanies the event.  Bearing in mind that an event is just a method call, how do can you attach data to this?  Rx assumes that the data is actually the one and only parameter to the method.

This pattern is really at the heart of Rx.  It’s very easy to get lost in the plethora of extension methods and classes, but it’s crucial to always remember and try to apply that pattern.

Reactive Framework Extension Methods

And there are indeed a lot of very useful extension methods provided by the Rx framework.  Some of them perform filtering, some are useful in combining event streams (and so reducing concurrency) and others are useful in splitting event streams (and so increasing concurrency and parallelism). The best resource I’ve found has to be the Rx Wiki.  It describes all the operators and provides one or more example usages for most of them.

Using Rx in Silverlight

And now back to the subject in the title of this post!  :-)  There are a few assemblies which form the core of Rx for Silverlight:

  • System.Observable: contains the definitions for IObserver<T> and IObservable<T>
  • System.Reactive: contains all the extensions that support LINQ over Observers and Observables
  • System.CoreEx: concurrency constructs and other classes that support Rx

Note that on the desktop version of .Net 4.0, IObserver<T> and IObservable<T> are now included in mscorlib, which means that System.Observable is not used on that platform.  Sadly, this is not the case with Silverlight 4.0.

Creating an Observable

I’ll let the code speak for itself here:

public class ChatRoom
{
    // Other code omitted for clarity

    private readonly Subject<ChatData> _messages = new Subject<ChatData>();

    public IObservable<Timestamped<ChatData>> Messages
    {
        get { return _messages.Timestamp().AsObservable(); }
    }

    public void AddReceivedMessage(ChatData chatData)
    {
        _messages.OnNext(chatData);
    }
}

 

As you can see, we create a subject (_messages) and then expose it via the Messages property so that other code can consume the events.  When we are notified of a new message being received off the wire, we simply publish it on the subject.

Note that the code that actually listens to the network and calls the AddReceivedMessage method is included in the download and was explained in part 1 of this series.

We are making use of one interesting Rx feature here … the Timestamp extension method.  It takes an IObservable<T> and returns an IObservable<Timestamped<T>>, which in effect just adorns the object with the a new property holding the current date and time.

Consuming the Observable

Here is an embarrassingly trivial example:

ChatRoom.Instance.Messages.Subscribe(data => messages.Add(data));

This code is listening for events being raised from the subject and, when each one occurs, adding the new message to the “messages” collection which is bound onto the UI.  To be honest, this is not a particularly good illustration of what Rx is capable of, since the line of code here replaced an equally simple line of code that didn’t use Rx!

So let’s suppose that incoming message were extremely high volume, and instead of updating the UI as each message comes in, you decide you want to do it 4 times per second.  Without Rx this involves timers and so on.  With Rx, you replace the above line with the following:

ChatRoom.Instance.Messages
     .BufferWithTime(TimeSpan.FromMilliseconds(250))
     .Subscribe(buffer => buffer.ForEach(msg => messages.Add(msg)));

 

Or perhaps you want to implement some kind of command protocol using the message stream.  Something like this would work:

ChatRoom.Instance.Messages
    .Where(msg => msg.Data.StartsWith("pls.forward:"))                    
    .Subscribe(msg => ForwardMessage(msg));

 

Or (and IMO this is the most impressive aspect of Rx) if you want to do all of the above simultaneously, all you need to include the relevant lines.  You see, all the aspects of message processing here are totally separated and independent of each other.  And to me, that’s sweeeeeet!

Download the code: DuplexChatClient-with-Rx.zip (52.78 kb)
Newer Posts Older Posts