Rx: Controlling frequency of events

One of the strengths of the Reactive Framework is the number of LINQ operators that ship with it.  This can also be a weakness though, as you wade through the list of sometimes-strangely named method looking for something in particular.  I recently wanted an operator that would have the following effect on an event stream:

regulate marble diagram

In other words, if events get raised too often then delay them so that the output stream has a defined maximum frequency of events over time. 

My first attempt at an implementation was to use the built-in Throttle operator, although I had a nagging feeling that this one doesn’t quite do what I expect it to.  My nagging sensation was proved correct … Throttle will throw away events if they are too frequent. In the end I resorted to building my own operator, which I call Regulate.  It’s a bit long, as it needs to address some threading considerations, but it’s not that difficult to follow:

using System;
using System.Reactive.Concurrency;
using System.Reactive.Linq;

namespace SharpFellows.RxUtils
{
    public static class ObservableExtensions
    {
        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration)
        {
            return Regulate(observable, duration, Scheduler.TaskPool);
        }

        public static IObservable<T> Regulate<T>(this IObservable<T> observable, TimeSpan duration, IScheduler scheduler)
        {
            var regulator = new ObservableRegulator<T>(duration, scheduler);

            return Observable.Create<T>(observer => observable.Subscribe(obj => regulator.ProcessItem(obj, observer)));
        }

        private class ObservableRegulator<T>
        {
            private DateTimeOffset _lastEntry = DateTimeOffset.MinValue;
            private readonly object _lastEntryLock = new object();

            private readonly TimeSpan _duration;
            private readonly IScheduler _scheduler;

            public ObservableRegulator(TimeSpan duration, IScheduler scheduler)
            {
                _duration = duration;
                _scheduler = scheduler;
            }

            public void ProcessItem(T val, IObserver<T> observer)
            {
                var canBroadcastNow = false;
                var nexEntryTime = DateTimeOffset.MaxValue;
                lock (_lastEntryLock)
                {
                    var now = DateTimeOffset.Now;
                    if (now.Subtract(_lastEntry) > _duration)
                    {
                        _lastEntry = now;
                        canBroadcastNow = true;
                    }
                    else
                    {
                        _lastEntry = _lastEntry.Add(_duration);
                        nexEntryTime = _lastEntry;
                    }
                }

                if (canBroadcastNow)
                {
                    observer.OnNext(val);
                }
                else
                {
                    _scheduler.Schedule(nexEntryTime, () => observer.OnNext(val));
                }

            }
        }
    }
}

The result is that you can specify the minimum time between events in the output stream (and optionally a scheduler):

return service.GetIrregularEvents()
              .Regulate(TimeSpan.FromSeconds(1));

FYI my use of this is to slowly drip feed items onto a data-bound UI.  Each new item triggers an animation and I don’t want 5 items simultaneously starting to animate.  Drop me an email or leave a comment if you find other uses for it!

Ninject: Auto-registration is changing in version 3

When I was using Ninject v2.2 I had this code to do my auto-registration:

using Ninject;
using Ninject.Extensions.Conventions;

public class CommonBootstrapper<TShell>
{
    private StandardKernel _kernel;

    protected override void Configure()
    {
        _kernel = new StandardKernel();
        _kernel.Scan(scanner =>
                         {
                             scanner.FromAssembliesInPath(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location));
                             scanner.BindWithDefaultConventions();
                             scanner.InSingletonScope();
                         });
    }
}

For various reasons I upgraded to Ninject v3.0 RC3, and I found that there have been a number of breaking in Ninject.Extensions.Conventions.  My code now looks like this:

using Ninject;
using Ninject.Extensions.Conventions;

public class CommonBootstrapper<TShell>
{
    private StandardKernel _kernel;

    protected override void Configure()
    {
        _kernel = new StandardKernel();
        _kernel.Bind(scanner => scanner.FromAssembliesInPath(Path.GetDirectoryName(Assembly.GetExecutingAssembly().Location))
                                    .Select(IsServiceType)
                                    .BindToDefaultInterface()
                                    .Configure(binding => binding.InSingletonScope()));
    }

    private static bool IsServiceType(Type type)
    {
        return type.IsClass && type.GetInterfaces().Any(intface => intface.Name == "I" + type.Name);
    }
}

 

So there is a sweet new fluent interface, but more importantly you have to specify the exact conventions to use.  This makes it a lot easier to have non-standard convention, but it would be great if Ninject had the standard convention built in (IFoo binds to Foo).

For those who are into their own custom binding conventions (like Anthony), you will unfortunately find that the IBindingGenerator interface has changed.  Here is the v2.2 way of implementing a custom binding:

public class ViewModelConventions : IBindingGenerator
{
    public void Process(Type type, Func<IContext, object> scopeCallback, IKernel kernel)
    {
        if(type != null && kernel != null && !type.IsAbstract && type.IsClass && type.Name.EndsWith("Model"))
        {
            kernel.Bind(type)
                  .ToSelf()
                  .InScope(scopeCallback);
        }
    }
}

And here is the same class ported to Ninject v3.0 RC3.

public class ViewModelConventions : IBindingGenerator
{
    public IEnumerable<IBindingWhenInNamedWithOrOnSyntax<object>> CreateBindings(Type type, IBindingRoot bindingRoot)
    {
        if (type != null && !type.IsAbstract && type.IsClass && type.Name.EndsWith("Model"))
        {
            yield return bindingRoot.Bind(type)
                                    .ToSelf();
        }
    }
}

It must be said, however, the goodness of the new fluent interface shown above means that I no longer need a custom binding generator – instead I can simply select the types I want and bind them directly in my bootstrapper.   I suspect that others will find the same!

March 20 2012

Getting values into a string

There are often times where it’s necessary to have some sort of template in a string, with placeholders that will be replaced with data-driven values at runtime.  Examples of this include email templates, mail merges and even integration scenarios.  Now there are many sophisticated and complex approaches to solving this problem, but here is a simple approach that works:

public static class StringExtensions
{
    public static string AddTokens(this string message, object tokenValues)
    {
        return tokenValues.GetType()
                .GetProperties()
                .Select(property => new
                            {
                                Key = property.Name,
                                Value = property.GetValue(tokenValues, new object[0]) as string
                            })
                .Aggregate(message, (current, token) => current.Replace("{" + token.Key + "}", token.Value));

    }

}

And to show you how to use it:

[Test]
public void ShouldReplaceTokens()
{
    var result = "1234{numbers}890".AddTokens(new { numbers = "567" });
    Assert.AreEqual("1234567890", result, "Tokens not added correctly");
}
January 16 2012

FluentValidation and PostSharp for RESTful WCF parameter validation – Part 4

 

This is the last of the series and you can find the previous parts below:

FluentValidation and PostSharp for RESTful WCF parameter validation – Part 1
FluentValidation and PostSharp for RESTful WCF parameter validation – Part 2
FluentValidation and PostSharp for RESTful WCF parameter validation – Part 3

The promised example solution at: https://github.com/naeemkhedarun/WcfValidation 

You will need PostSharp 2.0 installed, the community edition is fine.

This is a functional RESTful WCF service where you can send a product using a JSON request, and have the same product sent back in the response.

[RestArgumentValidation]
public ProductItem Create([ProductItemValidator] ProductItem instance)
{
    return new ProductItem
               {
                   Name = instance.Name
               };
}

On my current project we use a structure similar to the sample project to organise our attributes and validators.

image

To test this service I’m using the REST Console application for Chrome. First we need to set up our target:

image

When we make the request we should get a helpful HTTP Status Code of 400 (Bad Request):

image

We should also get a response body giving us some more detail about the issue with our request.

image

To make a valid request, we simply send the correct request body which looks like:

image

image

Running the request once more we receive the proper response:

image

And there you have it, I hope you’ve found this useful!

Older Posts