Reactive Extensions and ObservableCollection

 

I was disappointed when I noticed ObservableCollection wasn’t using the new IObservable interface or publishing any Subjects to listen to collection changed events on. So to make working with it easier I created some extensions methods.

Initially I read the Rx 101 and created an extension method which looks something like this:

public static IObservable<IEvent<NotifyCollectionChangedEventArgs>> FromCollectionChangedEvent<T>(this ObservableCollection<T> observableCollection)
{
    return Observable
        .FromEvent<NotifyCollectionChangedEventHandler, NotifyCollectionChangedEventArgs>(
            handler => new NotifyCollectionChangedEventHandler(handler),
            handler => observableCollection.CollectionChanged += handler,
            handler => observableCollection.CollectionChanged -= handler);
}

Great, now I can leverage Linq against my IObservable and take advantage of IDisposable to clean up my events. However it’s still not that practical as we still need to do some legwork to subscribe to the individual events we are interested, so there’s a pair of extension methods to make that a little easier…

public static IObservable<List<T>> AddEventListener<T>(this ObservableCollection<T> observableCollection)
{
    var collectionChangeEventNotification = observableCollection.FromCollectionChangedEvent();

    return
        from anEvent in collectionChangeEventNotification 
        where anEvent.EventArgs.Action == NotifyCollectionChangedAction.Add
        select anEvent.EventArgs.NewItems.Cast<T>().ToList();
}

public static IObservable<List<T>> RemoveEventListener<T>(this ObservableCollection<T> observableCollection)
{
    var collectionChangeEventNotification = observableCollection.FromCollectionChangedEvent();

    return
        from anEvent in collectionChangeEventNotification
        where anEvent.EventArgs.Action == NotifyCollectionChangedAction.Remove
        select anEvent.EventArgs.OldItems.Cast<T>().ToList();
}

 

Great but I still want to actually perform an action when one of those events are raised…

public static IDisposable SubscribeToAddEvent<T>(this ObservableCollection<T> observableCollection, Action<T> onAddAction)
{
    return SubscribeToAddEvent(observableCollection, onAddAction, Scheduler.Dispatcher);
}

public static IDisposable SubscribeToRemoveEvent<T>(this ObservableCollection<T> observableCollection, Action<T> onRemoveAction)
{
    return SubscribeToRemoveEvent(observableCollection, onRemoveAction, Scheduler.Dispatcher);
}

 

We now finally have something useful, when the required event is detected it will call our lamda, this makes our code a little neater:

_addItemsSubscription = sessions.SubscribeToAddEvent(session => 
    this.ActiveSessions.Add(this.ActiveSessionMapper.MapFrom(session)), synchronizationContext);

 

This is in my view model, and it can now react to the observable collection neatly using Rx. Of course don’t forget to take advantage of the IDisposable it gives you and clean up after yourself:

public override void Shutdown()
{
    _addItemsSubscription.Dispose();
    _removeItemsSubscription.Dispose();
}

 

You can easily download the code from below, there are overloads which allow you to pass the synchronisation context, letting you run threaded at runtime and synchronous in unit tests, courtesy of John!

http://bitbucket.org/naeem.khedarun/warp.profiler/src/tip/trunk/Solutions/Warp.Profiler.Framework/Extensions/ObservableCollectionExtensions.cs

blog comments powered by Disqus