Reactive Extensions and Prism: LINQ to your Event Aggregator

A lot of people have been talking about the Reactive Extensions for .NET, a flash new library from Devlabs which lets you do LINQ to .NET events just like you can with collections. My team at work has started using it pretty heavily, and today I wrote an adapter that allows you to use Rx over events published on the event aggregator in a Prism/Composite WPF application.

Here’s an example of using it in an imaginary chat client, to filter incoming messages from a particular person:

public class ConversationViewModel : IDisposable
{
    public IContact Friend { get; private set; }
    public ObservableCollection<IMessage> Messages { get; private set; }

    public ConversationViewModel(IContact friend, 
        IEventAggregator eventAggregator)
    {
        // Interested only in messages sent by the person we are talking to.
        var messageReceivedEvents = eventAggregator
            .GetEvent<MessageReceived>()
            .AsObservable()
            .Where(msg => msg.Sender.Equals(Friend));

        eventSubscription = messageReceivedEvents.Connect();

        messageReceivedEvents.Subscribe(msg => Messages.Add(msg));
    }
    
    // We will keep receiving messages until the subscription is disposed.
    IDisposable eventSubscription;
}

It was a good exercise that helped me understand a bit more about how Rx works, and illustrates how easy it is to integrate with callback-based event systems (e.g. your favourite message bus).

You can grab the code here: CompositePresentationEventExtensions.cs

Update Jun 16 2010: Added finalizer to CompositePresentationEventSubscription.

Update Feb 9 2012: Fixed bug found by Ariel Ben Horesh where keepSubscriberReferenceAlive needed to be true.

March 30, 2010

8 Comments

Pingback: DotNetShoutout

Adam Mills on June 16, 2010 at 5:09 pm.

Nice…
I searched for this on google and guess who pops up.
Just what i needed.

Adam Mills on June 16, 2010 at 6:35 pm.

A couple of changes I would make…
in CompositePresentationEventSubscription, you use the SubscriptionToken to unsubscribe, its safer than passing a delegate.

subscription = @event.Subscribe(observer.OnNext, true);
@event.Unsubscribe(subscription);

The other issue is that if you do not store the IDisposable returned from IObservable.Subscribe your subscription to the event aggregator event is recycled.

As above you can pass true into the EventBase.Subscribe to prevent WeakReferences being used. But this may lead to problems, need to write some tests.

Richard Szalay on June 25, 2010 at 3:38 pm.

Awesome idea, though the implementation could be a little more Rx-y:

return Observable.CreateWithDisposable(observer =>
{
var token = @event.Subscribe(observer.OnNext);

return Disposable.Create(() =>
{
@event.Unsubscribe(token);
});
});

Ariel Ben Horesh on January 15, 2012 at 1:54 pm.

Hey, Richard
Thanks, great post,
I’ve tried to use the attached code and I had to make a few changes to make it work.
More to the point, when you subscribe to the event aggregator I had to tell the EA to keep subscribers alive, otherwise they will be GCed.
Ariel

James Doran on July 16, 2012 at 4:10 am.

Hi Richard,

Thanks for sharing this great code, it’s just what I needed. Is there a licence and/or copyright associated with it?

Kind regards,

James

Richard on July 16, 2012 at 8:35 pm.

@James: nope, go crazy :)

James Doran on July 17, 2012 at 11:28 pm.

Fantastic, thanks again Richard

Leave a Reply