A random walk in Rx

A random walk can be defined as “a stochastic process consisting of a sequence of changes each of whose characteristics (as magnitude or direction) is determined by chance”.  Or to put it another way, from a starting point, keep adding (or subtracting) random numbers … the resulting stream of numbers is a random walk.

Now I recently needed to demonstrate some graphing capability and decided that I would plot a continuous random walk.  Here was my first attempt:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1);
    }
    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}

This did OK until I added a second graph and noticed something odd.  What was happening was that each graph was different even though my IOC (MEF) was configured to create this service as a singleton (CreationPolicy.Shared).  And then I realised that the two graphs subscribing to the Tick property were each generating their own stream of data, since the Observable.Interval does not start until someone subscribes to it (and then it starts again when someone else subscribes to it).  So my second attempt hinged on the Publish method, which allows an observable stream to be reused:

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IConnectableObservable<double> _randomWalk;
    private IDisposable _connection;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish();
    }
    public IObservable<double> Tick
    {
        get
        {
            if(_connection == null)
                _connection = _randomWalk.Connect();
            return _randomWalk;
        }
    }
}

The Tick property has gotten a bit ugly here and I wasn’t quite sure why I had this disposable instance kicking around.  It turned out that while my two charts now plotted the same stream of numbers, the numbers didn’t stop even when I closed all my charts.  #Fellow Andy pointed me in the direction of the RefCount method and so here is my final code (which is neater works nicely):

public class RandomTickDataService : IRandomTickDataService
{
    private readonly IObservable<double> _randomWalk;

    public RandomTickDataService()
    {
        var rnd = new Random();
        _randomWalk = Observable.Interval(TimeSpan.FromMilliseconds(500))
                                .Select(l => (rnd.NextDouble() - 0.5)*2)
                                .Scan((d, d1) => d + d1)
                                .Publish()
                                .RefCount();
    }

    public IObservable<double> Tick
    {
        get { return _randomWalk; }
    }
}
blog comments powered by Disqus