Home > Blockchain >  In Rx, is it a responsibility of the consumer (IObserver) to deal with thread safety?
In Rx, is it a responsibility of the consumer (IObserver) to deal with thread safety?

Time:06-02

In ReactiveX paradigm, Is it a responsibility of the consumer (IObserver) to deal with thread safety?

E.g., if OnCompleted call comes along when OnNext is still executing on another thread?

It looks like it from Rx .NET sources but the docs are somewhat vague.

CodePudding user response:

Since I initially asked this question in a tweet, I believe I've now found an authoritative answer.

It appears I was wrong in my assumption that thread-safe serialization is the consumer's responsibility (IObserver).

According to the original Rx Design Guidelines document (a best-kept secret as it seems :)

4.2. Assume observer instances are called in a serialized fashion

As Rx uses a push model and .NET supports multithreading, it is possible for different messages to arrive different execution contexts at the same time. If consumers of observable sequences would have to deal with this in every place, their code would need to perform a lot of housekeeping to avoid common concurrency problems. Code written in this fashion would be harder to maintain and potentially suffer from performance issues.

Further:

6.7. Serialize calls to IObserver methods within observable sequence implementations Rx is a composable API, many operators can play together. If all operators had to deal with concurrency the individual operators would become very complex. Next to this, concurrency is best controlled at the place it first occurs. Finally, Consuming the Rx API would become harder if each usage of Rx would have to deal with concurrency.

And finally:

6.8. Avoid serializing operators As all Rx operators are bound to guideline 6.7, operators can safely assume that their inputs are serialized. Adding too much synchronization would clutter the code and can lead to performance degradation. If an observable sequence is not following the Rx contract (see chapter 0), it is up to the developer writing the end-user application to fix the observable sequence by calling the Synchronize operator at the first place the developer gets a hold of the observable sequence. This way the scope of additional synchronization is limited to where it is needed.

My personal take from this: if an original sequence producing IObservable can introduce parallelism when it calls OnNext, OnError, OnComplete (or when Dispose is called on its subscription), it should take care to serialize these calls properly.

  • Related