Reactive Extensions and Prism: LINQ to your Event Aggregator

A lot of people have been talking about the Reactive Extensions for .NET, a flash new library from Devlabs which lets you do LINQ to .NET events just like you can with collections. My team at work has started using it pretty heavily, and today I wrote an adapter that allows you to use Rx over events published on the event aggregator in a Prism/Composite WPF application.

Here’s an example of using it in an imaginary chat client, to filter incoming messages from a particular person:

public class ConversationViewModel : IDisposable{    public IContact Friend { get; private set; }    public ObservableCollection<IMessage> Messages { get; private set; }    public ConversationViewModel(IContact friend,         IEventAggregator eventAggregator)    {        // Interested only in messages sent by the person we are talking to.        var messageReceivedEvents = eventAggregator            .GetEvent<MessageReceived>()            .AsObservable()            .Where(msg => msg.Sender.Equals(Friend));        eventSubscription = messageReceivedEvents.Connect();        messageReceivedEvents.Subscribe(msg => Messages.Add(msg));    }        // We will keep receiving messages until the subscription is disposed.    IDisposable eventSubscription;}

It was a good exercise that helped me understand a bit more about how Rx works, and illustrates how easy it is to integrate with callback-based event systems (e.g. your favourite message bus).

You can grab the code here: CompositePresentationEventExtensions.cs

Update Jun 16 2010: Added finalizer to CompositePresentationEventSubscription.

Update Feb 9 2012: Fixed bug found by Ariel Ben Horesh where keepSubscriberReferenceAlive needed to be true.