Reactive Extensions and Prism: LINQ to your Event Aggregator

A lot of people have been talking about the Reactive Extensions for .NET, a flash new library from Devlabs which lets you do LINQ to .NET events just like you can with collections. My team at work has started using it pretty heavily, and today I wrote an adapter that allows you to use Rx over events published on the event aggregator in a Prism/Composite WPF application.

Here’s an example of using it in an imaginary chat client, to filter incoming messages from a particular person:

public class ConversationViewModel : IDisposable
    public IContact Friend { get; private set; }
    public ObservableCollection<IMessage> Messages { get; private set; }

    public ConversationViewModel(IContact friend, 
        IEventAggregator eventAggregator)
        // Interested only in messages sent by the person we are talking to.
        var messageReceivedEvents = eventAggregator
            .Where(msg => msg.Sender.Equals(Friend));

        eventSubscription = messageReceivedEvents.Connect();

        messageReceivedEvents.Subscribe(msg => Messages.Add(msg));
    // We will keep receiving messages until the subscription is disposed.
    IDisposable eventSubscription;

It was a good exercise that helped me understand a bit more about how Rx works, and illustrates how easy it is to integrate with callback-based event systems (e.g. your favourite message bus).

You can grab the code here: CompositePresentationEventExtensions.cs

Update Jun 16 2010: Added finalizer to CompositePresentationEventSubscription.

Update Feb 9 2012: Fixed bug found by Ariel Ben Horesh where keepSubscriberReferenceAlive needed to be true.

8 thoughts on “Reactive Extensions and Prism: LINQ to your Event Aggregator

  1. Pingback: DotNetShoutout
  2. A couple of changes I would make…
    in CompositePresentationEventSubscription, you use the SubscriptionToken to unsubscribe, its safer than passing a delegate.

    subscription = @event.Subscribe(observer.OnNext, true);

    The other issue is that if you do not store the IDisposable returned from IObservable.Subscribe your subscription to the event aggregator event is recycled.

    As above you can pass true into the EventBase.Subscribe to prevent WeakReferences being used. But this may lead to problems, need to write some tests.

  3. Awesome idea, though the implementation could be a little more Rx-y:

    return Observable.CreateWithDisposable(observer =>
    var token = @event.Subscribe(observer.OnNext);

    return Disposable.Create(() =>

  4. Hey, Richard
    Thanks, great post,
    I’ve tried to use the attached code and I had to make a few changes to make it work.
    More to the point, when you subscribe to the event aggregator I had to tell the EA to keep subscribers alive, otherwise they will be GCed.

  5. Hi Richard,

    Thanks for sharing this great code, it’s just what I needed. Is there a licence and/or copyright associated with it?

    Kind regards,


Comments are closed.