You signed in with another tab or window. Reload to refresh your session.You signed out in another tab or window. Reload to refresh your session.You switched accounts on another tab or window. Reload to refresh your session.Dismiss alert
The CompletableObservable (and CompletableFlowable) implementors, publish and replay in RxJava 1.x and 2.x are inconsistent in their terminal behavior.
When publish terminates, its CompletableObservable will appear as fresh to the new subscribers. This has the drawback that such subscribers may hang as connect may be never called again.
In contrast, replay will stay terminated along with any cached items and new subscribers can still get those events. The drawback here is that a new connect will clear the internal storage and start the consumption of the main source while not giving any chance to subscribers to prepare and receive that stream of events from the start if the replay is bounded.
Dealing with this inconsistency currently requires refCount to trigger a reset on an unofficial channel: casting the CompletableObserver into Disposable if possible and disposing it when the count reaches zero again.
Suggested solution
I suggest changing the API to include an explicit reset() method and changing the logic to have 3 states:
In the fresh state, consumers can pile up and be ready to receive events. An atomic state change to running will begin streaming events until a terminal event is reached, at which point the state atomically changes to terminated. Consumers subscribing in this state will always receive the terminal event, and in case of replay, the cached items as well.
A call to reset() will clear the internal storage of the ConnectableObservable and start out as fresh again, allowing new consumers to gather around and get all fresh events from the beginning.
It is possible to support the call to connect in the terminated state to skip the fresh state. Preventing this transition, however, may be more involved as connect() should communicate this to be illegal transition someway as well as the need for a soft way for checking if connect is to succeed or not. Note that calling connect on a running ConnectableObservable is a no-op in 1.x and 2.x.