Rx: Controlling frequency of events

One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it.  This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular.  I recently wanted an operator that would have the following effect on an event stream:

regulate marble diagram

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time. 

My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to.  My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate.  It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace SharpFellows.RxUtils
{
    public static class ObservableExtensions
    {
        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
        {
            return Regulate(observable, duration, Scheduler.TaskPool);
        }

        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
        {
            var regulator = new ObservableRegulator<T>(duration, scheduler);

            return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
        }

        private class ObservableRegulator<T>
        {
            private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
            private readonly object _lastEntryLock = new object();

            private readonly TimeSpan _duration;
            private readonly IScheduler _scheduler;

            public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
            {
                _duration = duration;
                _scheduler = scheduler;
            }

            public void ProcessItem(T val, IObserver<T> observer)
            {
                var canBroadcastNow = false;
                var nexEntryTime = DateTimeOffset.MaxValue;
                lock (_lastEntryLock)
                {
                    var now = DateTimeOffset.Now;
                    if (now.Subtract(_lastEntry) > _duration)
                    {
                        _lastEntry = now;
                        canBroadcastNow = true;
                    }
                    else
                    {
                        _lastEntry = _lastEntry.Add(_duration);
                        nexEntryTime = _lastEntry;
                    }
                }

                if (canBroadcastNow)
                {
                    observer.OnNext(val);
                }
                else
                {
                    _scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
                }

            }
        }
    }
}

The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):

return service.GetIrregularEvents()
              .Regulate(TimeSpan.FromSeconds(1));

FYI my use of this is to slowly drip feed items onto a data-bound UI.  Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate.  Drop me an email or leave a comment if you find other uses for it!

Ninject: Auto-registration is changing in version 3

When I was using Ninject v2.2 I had this code to do my auto-registration:

using Ninject;
using Ninject.Extensions.Conventions;

public class CommonBootstrapper<TShell>
{
    private StandardKernel _kernel;

    protected override void Configure()
    {
        _kernel = new StandardKernel();
        _kernel.Scan(scanner =>
                         {
                             scanner.FromAssembliesInPath(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location));
                             scanner.BindWithDefaultConventions();
                             scanner.InSingletonScope();
                         });
    }
}

For various reasons I upgraded to Ninject v3.0 RC3, and I found that there have been a number of breaking in Ninject.Extensions.Conventions.  My code now looks like this:

using Ninject;
using Ninject.Extensions.Conventions;

public class CommonBootstrapper<TShell>
{
    private StandardKernel _kernel;

    protected override void Configure()
    {
        _kernel = new StandardKernel();
        _kernel.Bind(scanner => scanner.FromAssembliesInPath(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location))
                                    .Select(IsServiceType)
                                    .BindToDefaultInterface()
                                    .Configure(binding => binding.InSingletonScope()));
    }

    private static bool IsServiceType(Type type)
    {
        return type.IsClass && type.GetInterfaces().Any(intface => intface.Name == "I" + type.Name);
    }
}

 

So there is a sweet new fluent interface, but more importantly you have to specify the exact conventions to use.  This makes it a lot easier to have non-standard convention, but it would be great if Ninject had the standard convention built in (IFoo binds to Foo).

For those who are into their own custom binding conventions (like Anthony), you will unfortunately find that the IBindingGenerator interface has changed.  Here is the v2.2 way of implementing a custom binding:

public class ViewModelConventions : IBindingGenerator
{
    public void Process(Type type, Func<IContext, object> scopeCallback, IKernel kernel)
    {
        if(type != null && kernel != null && !type.IsAbstract && type.IsClass && type.Name.EndsWith("Model"))
        {
            kernel.Bind(type)
                  .ToSelf()
                  .InScope(scopeCallback);
        }
    }
}

And here is the same class ported to Ninject v3.0 RC3.

public class ViewModelConventions : IBindingGenerator
{
    public IEnumerable<IBindingWhenInNamedWithOrOnSyntax<object>> CreateBindings(Type type, IBindingRoot bindingRoot)
    {
        if (type != null && !type.IsAbstract && type.IsClass && type.Name.EndsWith("Model"))
        {
            yield return bindingRoot.Bind(type)
                                    .ToSelf();
        }
    }
}

It must be said, however, the goodness of the new fluent interface shown above means that I no longer need a custom binding generator – instead I can simply select the types I want and bind them directly in my bootstrapper.   I suspect that others will find the same!

March 20 2012

Getting values into a string

There are often times where it’s necessary to have some sort of template in a string, with placeholders that will be replaced with data-driven values at runtime.  Examples of this include email templates, mail merges and even integration scenarios.  Now there are many sophisticated and complex approaches to solving this problem, but here is a simple approach that works:

public static class StringExtensions
{
    public static string AddTokens(this string message, object tokenValues)
    {
        return tokenValues.GetType()
                .GetProperties()
                .Select(property => new
                            {
                                Key = property.Name,
                                Value = property.GetValue(tokenValues, new object[0]) as string
                            })
                .Aggregate(message, (current, token) => current.Replace("{" + token.Key + "}", token.Value));

    }

}

And to show you how to use it:

[Test]
public void ShouldReplaceTokens()
{
    var result = "1234{numbers}890".AddTokens(new { numbers = "567" });
    Assert.AreEqual("1234567890", result, "Tokens not added correctly");
}
January 16 2012

MVC: Handling file uploads

ASP.Net MVC has some great abstractions over HTTP and the various Http classes, but one area which is currently lacking is around processing of file uploads.  Uploaded files are (still) available on the Request.Files collection and some posts suggest that you should go down this route.  And for testing, Scott Hanselman has provided some (fairly scary) mocking code.

Now Phil Haack has suggested a few improvements on this, but it’s still not as easy to test as I’d like (I’m a lazy developer, really) – I like to avoid mocks when I can since they can easily obscure the intention of the test.  Fortunately MVC model binders came to my rescue here.  Note that my web UI is only ever submitting one file.

DTO class:

public class UploadedFile
{
    public int Length { get; set; }
    public string ContentType { get; set; }
    public string FileName { get; set; }
    public Stream Stream { get; set; }
}

Model binder:

public class FileUploadBinder : IModelBinder
{
    public object BindModel(ControllerContext controllerContext, ModelBindingContext bindingContext)
    {
        var postedFile = controllerContext.HttpContext.Request.Files[0];
        if (postedFile == null) return null;
        return new UploadedFile
                   {
                       Length = postedFile.ContentLength,
                       ContentType = postedFile.ContentType,
                       FileName = postedFile.FileName,
                       Stream = postedFile.InputStream
                   };
    }
}

Controller action method:

[HttpPost, Authorize]
public ActionResult PeformFullUpload(string dataSourceName, 
            [ModelBinder(typeof(FileUploadBinder))] UploadedFile uploadedFile)

Test code:

[When("the user submits a file")]
public void PerformFileUpload()
{
    var uploadedFile = new UploadedFile{ Stream = _memoryStream};
    _viewResult = _controller.PeformFullUpload(_dataSourceName, _uploadedFile) as ViewResult;
}

Now that’s how I like my tests.   Smile

December 16 2011
Older Posts