Rx: Controlling frequency of events

One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it.  This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular.  I recently wanted an operator that would have the following effect on an event stream:

regulate marble diagram

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time. 

My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to.  My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate.  It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace SharpFellows.RxUtils
{
    public static class ObservableExtensions
    {
        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
        {
            return Regulate(observable, duration, Scheduler.TaskPool);
        }

        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
        {
            var regulator = new ObservableRegulator<T>(duration, scheduler);

            return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
        }

        private class ObservableRegulator<T>
        {
            private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
            private readonly object _lastEntryLock = new object();

            private readonly TimeSpan _duration;
            private readonly IScheduler _scheduler;

            public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
            {
                _duration = duration;
                _scheduler = scheduler;
            }

            public void ProcessItem(T val, IObserver<T> observer)
            {
                var canBroadcastNow = false;
                var nexEntryTime = DateTimeOffset.MaxValue;
                lock (_lastEntryLock)
                {
                    var now = DateTimeOffset.Now;
                    if (now.Subtract(_lastEntry) > _duration)
                    {
                        _lastEntry = now;
                        canBroadcastNow = true;
                    }
                    else
                    {
                        _lastEntry = _lastEntry.Add(_duration);
                        nexEntryTime = _lastEntry;
                    }
                }

                if (canBroadcastNow)
                {
                    observer.OnNext(val);
                }
                else
                {
                    _scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
                }

            }
        }
    }
}

The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):

return service.GetIrregularEvents()
              .Regulate(TimeSpan.FromSeconds(1));

FYI my use of this is to slowly drip feed items onto a data-bound UI.  Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate.  Drop me an email or leave a comment if you find other uses for it!

A random walk in Rx

A random walk can be defined as “a stochastic process consisting of a sequence of changes each of whose characteristics (as magnitude or direction) is determined by chance”.  Or to put it another way, from a starting point, keep adding (or subtracting) random numbers … the resulting stream of numbers is a random walk.

Now I recently needed to demonstrate some graphing capability and decided that I would plot a continuous random walk.  Here was my first attempt:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1);
    }
    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}

This did OK until I added a second graph and noticed something odd.  What was happening was that each graph was different even though my IOC (MEF) was configured to create this service as a singleton (CreationPolicy.Shared).  And then I realised that the two graphs subscribing to the Tick property were each generating their own stream of data, since the Observable.Interval does not start until someone subscribes to it (and then it starts again when someone else subscribes to it).  So my second attempt hinged on the Publish method, which allows an observable stream to be reused:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IConnectableObservable<double> _randomWalk;
    private IDisposable _connection;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish();
    }
    public IObservable<double> Tick
    {
        get
        {
            if(_connection == null)
                _connection = _randomWalk.Connect();
            return _randomWalk;
        }
    }
}

The Tick property has gotten a bit ugly here and I wasn’t quite sure why I had this disposable instance kicking around.  It turned out that while my two charts now plotted the same stream of numbers, the numbers didn’t stop even when I closed all my charts.  #Fellow Andy pointed me in the direction of the RefCount method and so here is my final code (which is neater works nicely):

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish()
                                .RefCount();
    }

    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}

RX: Detecting the end of a rapid series of events

Storing application state and user preferences is a pretty common problem.  And knowing when to save them can be a tricky problem in a distributed application (such as a Silverlight client) … save too often and you will use too much bandwidth just for saving preferences, save too infrequently or irregularly and users might not get their preferences saved at all.

Now I have a service for holding and saving user preferences, which is a pretty common situation.  It is notified when the saved preferences are dirty (e.g. as the user changes values on a form, say) and it has to decide when to actually collect and save all the values.  Now my service uses the Reactive Framework (RX) internally and Andy (a fellow RX junkie) insisted I post this:

public class UserPreferenceService : IUserPreferenceService
{
    private readonly TimeSpan _cooloffPeriod = TimeSpan.FromSeconds(10);
    private ISubject<string> _changeNotification;
    
    public void PreferencesAreStale()
    {
        _changeNotification.OnNext( null );
    }

    private void BeginListeningToChangeNotifications()
    {
        _changeNotification = new Subject<string>();

        // We apply a timestamp to all changes.  Then we delay these changes and recombine them with the latest changes.
        //      Comparing these timestamps allows us to work out if there have been any subsequent changes during the 
        //      delay period.  We allow some leeway (50ms) in the timestamps, since the two observable stream are delayed
        //      execution and so can get timestamped slightly differently.

        var timestampedChanges = _changeNotification.Timestamp();
        var delayedChanges = timestampedChanges.Delay( _cooloffPeriod );

        timestampedChanges
            .CombineLatest( delayedChanges, ( latest, delayed ) => latest.Timestamp.Subtract( delayed.Timestamp ) )
            .Where( timeDifference => timeDifference.TotalMilliseconds < 50 )
            .Subscribe( _ => PersistPreferences() );
    }

    private void PersistPreferences()
    {
        // Collects and saves preferences
    }
}

 

I think the comments are pretty clear with regard to the implementation details, but the overall result is that preferences get saved 10 seconds after the last update in a rapid series and no sooner.  Updates coming in more frequently than one every 10 seconds do not trigger a save operation.

November 21 2010

Silverlight duplex communication, part 2: Reactive Extensions

 

In part 1 of this series I posted a Silverlight chat application that used duplex communications over HTTP.  Now to me, incoming data or events imply that the Reactive Framework (Rx) is probably a good fit for the application.  And so I decided to refactor the chatroom application to use Rx.

Introduction to Reactive Framework

Other (possibly  better) introductions can be found here, here or here, but I think that I a metaphor described by a colleague of mine is appropriate.  Usage of IEnumerable<T> can be likened to a water well – you pull the water out just as fast as you want it.  On the other hand, usage of IObservable<T> (a key Rx interface) can be likened to a spring of water – it just keeps pumping water at you and you better try and keep up!

On a slightly more detailed note, the main interfaces within Rx are IObservable<T> and IObserver<T>.  An Observable represents an event source while the Observer repesents an event sink.  Of course, an Observer can also be an Observable … think for a moment about an event filter that passes on some events while ignoring others.  Classes that implement IObservable<T> and IObserver<T> are known in Rx as “subjects”.

Interestingly, Rx chooses to represent an “event” as a method call.  So being an Observer (i.e. you receive events) actually just means that you receive method calls on your IObserver<T> interface.  And being an Observable (i.e. you raise events) actually just means that you make method calls to Observers who have registered an interest by subscribing.

Now you may be wondering what the generic type parameter T is used for.  Well that is the type of data that accompanies the event.  Bearing in mind that an event is just a method call, how do can you attach data to this?  Rx assumes that the data is actually the one and only parameter to the method.

This pattern is really at the heart of Rx.  It’s very easy to get lost in the plethora of extension methods and classes, but it’s crucial to always remember and try to apply that pattern.

Reactive Framework Extension Methods

And there are indeed a lot of very useful extension methods provided by the Rx framework.  Some of them perform filtering, some are useful in combining event streams (and so reducing concurrency) and others are useful in splitting event streams (and so increasing concurrency and parallelism). The best resource I’ve found has to be the Rx Wiki.  It describes all the operators and provides one or more example usages for most of them.

Using Rx in Silverlight

And now back to the subject in the title of this post!  :-)  There are a few assemblies which form the core of Rx for Silverlight:

  • System.Observable: contains the definitions for IObserver<T> and IObservable<T>
  • System.Reactive: contains all the extensions that support LINQ over Observers and Observables
  • System.CoreEx: concurrency constructs and other classes that support Rx

Note that on the desktop version of .Net 4.0, IObserver<T> and IObservable<T> are now included in mscorlib, which means that System.Observable is not used on that platform.  Sadly, this is not the case with Silverlight 4.0.

Creating an Observable

I’ll let the code speak for itself here:

public class ChatRoom
{
    // Other code omitted for clarity

    private readonly Subject<ChatData> _messages = new Subject<ChatData>();

    public IObservable<Timestamped<ChatData>> Messages
    {
        get { return _messages.Timestamp().AsObservable(); }
    }

    public void AddReceivedMessage(ChatData chatData)
    {
        _messages.OnNext(chatData);
    }
}

 

As you can see, we create a subject (_messages) and then expose it via the Messages property so that other code can consume the events.  When we are notified of a new message being received off the wire, we simply publish it on the subject.

Note that the code that actually listens to the network and calls the AddReceivedMessage method is included in the download and was explained in part 1 of this series.

We are making use of one interesting Rx feature here … the Timestamp extension method.  It takes an IObservable<T> and returns an IObservable<Timestamped<T>>, which in effect just adorns the object with the a new property holding the current date and time.

Consuming the Observable

Here is an embarrassingly trivial example:

ChatRoom.Instance.Messages.Subscribe(data => messages.Add(data));

This code is listening for events being raised from the subject and, when each one occurs, adding the new message to the “messages” collection which is bound onto the UI.  To be honest, this is not a particularly good illustration of what Rx is capable of, since the line of code here replaced an equally simple line of code that didn’t use Rx!

So let’s suppose that incoming message were extremely high volume, and instead of updating the UI as each message comes in, you decide you want to do it 4 times per second.  Without Rx this involves timers and so on.  With Rx, you replace the above line with the following:

ChatRoom.Instance.Messages
     .BufferWithTime(TimeSpan.FromMilliseconds(250))
     .Subscribe(buffer => buffer.ForEach(msg => messages.Add(msg)));

 

Or perhaps you want to implement some kind of command protocol using the message stream.  Something like this would work:

ChatRoom.Instance.Messages
    .Where(msg => msg.Data.StartsWith("pls.forward:"))                    
    .Subscribe(msg => ForwardMessage(msg));

 

Or (and IMO this is the most impressive aspect of Rx) if you want to do all of the above simultaneously, all you need to include the relevant lines.  You see, all the aspects of message processing here are totally separated and independent of each other.  And to me, that’s sweeeeeet!

Download the code: DuplexChatClient-with-Rx.zip (52.78 kb)
Older Posts