Was .NET’s IObserver intended for subscribing to multiple IObservables

netobserver-pattern

There are IObservable and IObserver interfaces in .NET (also here and here). Interestingly, the concrete implementation of the IObserver does not hold a direct reference to the IObservable. It doesn't know who it's subscribed to. It can only invoke the unsubscriber. "Please pull the pin to unsubscribe."

edit: The unsubscriber implements the IDisposable. I think, this scheme was employed to prevent the lapsed listener problem.

Two things are not entirely clear to me, though.

  1. Does the inner Unsubscriber class provide the subscribe-and-forget behavior? Who (and when exactly) calls IDisposable.Dispose() on the Unsubscriber? Garbage collector (GC) is not deterministic.
    [Disclaimer: overall, I've spent more time with C and C++ than with C# .]
  2. What should happen if I want to subscribe an observer K to an observable L1 and the observer is already subscribed to some other observable L2?

    K.Subscribe(L1);
    K.Subscribe(L2);
    K.Unsubscribe();
    L1.PublishObservation(1003);
    L2.PublishObservation(1004);
    

    When I ran this test code against MSDN's example, the observer remained subscribed to L1. This would be peculiar in real development. Potentially, there are 3 avenues to improve this:

    • If the observer already has an unsubscriber instance (i.e. it’s already subscribed), then it quietly unsubscribes from the original provider before subscribing to a new one. This approach hides the fact that it’s no longer subscribed to the original provider, which may become a surprise later.
    • If the observer already has an unsubscriber instance, then is throws an exception. A well-behaved calling code has to unsubscribe the observer explicitly.
    • Observer subscribes to multiple providers. This is the most intriguing option, but can this be implemented with IObservable and IObserver? Let’s see. It is possible for the observer to keep a list of unsubscriber objects: one for each source. Unfortunately, IObserver.OnComplete() does not provide a reference back to the provider who have sent it. So, the IObserver implementation with multiple providers would not be able to determine which one to unsubscribe from.
  3. Was .NET's IObserver intended for subscribing to multiple IObservables?
    Does the textbook definition of the observer pattern require that one observer has to be able to subscribe to multiple providers? Or is it optional and implementation-dependent?

Best Answer

The two interfaces are actually part of Reactive Extensions (Rx for short), you should use that library pretty much whenever you want to use them.

The interfaces are technically in mscrolib, not in any of the Rx assemblies. I think this is to ease interoperability: this way, libraries like TPL Dataflow can provide members that work with those interfaces, without actually referencing Rx.

If you use Rx's Subject as your implementation of IObservable, Subscribe will return an IDisposable that can be used for unsubscribing:

var observable = new Subject<int>();

var unsubscriber =
    observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("1: {0}", i)));
observable.Subscribe(Observer.Create<int>(i => Console.WriteLine("2: {0}", i)));

unsubscriber.Dispose();

observable.OnNext(1003);
observable.OnNext(1004);
Related Topic