One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it. This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular. I recently wanted an operator that would have the following effect on an event stream:

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time.
My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to. My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate. It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:
using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
namespace SharpFellows.RxUtils
{
public static class ObservableExtensions
{
public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
{
return Regulate(observable, duration, Scheduler.TaskPool);
}
public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
{
var regulator = new ObservableRegulator<T>(duration, scheduler);
return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
}
private class ObservableRegulator<T>
{
private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
private readonly object _lastEntryLock = new object();
private readonly TimeSpan _duration;
private readonly IScheduler _scheduler;
public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
{
_duration = duration;
_scheduler = scheduler;
}
public void ProcessItem(T val, IObserver<T> observer)
{
var canBroadcastNow = false;
var nexEntryTime = DateTimeOffset.MaxValue;
lock (_lastEntryLock)
{
var now = DateTimeOffset.Now;
if (now.Subtract(_lastEntry) > _duration)
{
_lastEntry = now;
canBroadcastNow = true;
}
else
{
_lastEntry = _lastEntry.Add(_duration);
nexEntryTime = _lastEntry;
}
}
if (canBroadcastNow)
{
observer.OnNext(val);
}
else
{
_scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
}
}
}
}
}
The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):
return service.GetIrregularEvents()
.Regulate(TimeSpan.FromSeconds(1));
FYI my use of this is to slowly drip feed items onto a data-bound UI. Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate. Drop me an email or leave a comment if you find other uses for it!