Rx: Controlling frequency of events

One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it.  This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular.  I recently wanted an operator that would have the following effect on an event stream:

regulate marble diagram

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time. 

My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to.  My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate.  It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace SharpFellows.RxUtils
{
    public static class ObservableExtensions
    {
        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
        {
            return Regulate(observable, duration, Scheduler.TaskPool);
        }

        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
        {
            var regulator = new ObservableRegulator<T>(duration, scheduler);

            return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
        }

        private class ObservableRegulator<T>
        {
            private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
            private readonly object _lastEntryLock = new object();

            private readonly TimeSpan _duration;
            private readonly IScheduler _scheduler;

            public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
            {
                _duration = duration;
                _scheduler = scheduler;
            }

            public void ProcessItem(T val, IObserver<T> observer)
            {
                var canBroadcastNow = false;
                var nexEntryTime = DateTimeOffset.MaxValue;
                lock (_lastEntryLock)
                {
                    var now = DateTimeOffset.Now;
                    if (now.Subtract(_lastEntry) > _duration)
                    {
                        _lastEntry = now;
                        canBroadcastNow = true;
                    }
                    else
                    {
                        _lastEntry = _lastEntry.Add(_duration);
                        nexEntryTime = _lastEntry;
                    }
                }

                if (canBroadcastNow)
                {
                    observer.OnNext(val);
                }
                else
                {
                    _scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
                }

            }
        }
    }
}

The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):

return service.GetIrregularEvents()
              .Regulate(TimeSpan.FromSeconds(1));

FYI my use of this is to slowly drip feed items onto a data-bound UI.  Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate.  Drop me an email or leave a comment if you find other uses for it!

blog comments powered by Disqus