Skip to content

ScheduledObserver doesn't Guaranty Ordering #233

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
mairbek opened this issue Apr 16, 2013 · 9 comments
Closed

ScheduledObserver doesn't Guaranty Ordering #233

mairbek opened this issue Apr 16, 2013 · 9 comments

Comments

@mairbek
Copy link
Contributor

mairbek commented Apr 16, 2013

ScheduledObserver should be reimplemented the way that it guaranties ordering of the events.

Right now this test prints values in different ordering.

    @Test
    public void testToIteratorObserveOn() {
        Observable<String> obs = Observable.from("one", "two", "three", "four");

        Iterable<String> it = obs.observeOn(Schedulers.threadPoolForComputation()).toIterable();

        for (String s : it) {
            System.out.println(s);
        }

    }
@benjchristensen
Copy link
Member

This is used as a pass-thru for ObserveOn to a Scheduler so why does this need to ensure ordering? That should be up to the scheduler implementation should it not?

For example, if I need to ensure ordering then I probably don't sent it to a thread-pool, but send it to a single thread (such as the UI thread).

If I choose to send something to a thread-pool for execution on multiple threads why should I expect the events to all be processed and returned in order when I am specifically injecting concurrency.

I can't find anything in the Rx Design Guidelines that dictates that order should be retained and it's not at all guaranteed by all operators (for example merge definitely does not retain order whereas concat does).

@mairbek
Copy link
Contributor Author

mairbek commented Apr 16, 2013

Quote from the 4.2. Assume observer instances are called in a serialized fashion

Consumers of observables can safely assume that messages arrive in a serialized fashion.

var count = 0;
xs.Subscribe(v =>
{
count++;
Console.WriteLine("OnNext has been called {0} times.", count);
});

In this sample, no locking or interlocking is required to read and write to count as only one call to OnNext can be in-flight at any time.

If ordering is not guarantied we might observe onComplete event before onNext. In this case operators like toIterator doesn't work correctly.

@benjchristensen
Copy link
Member

But I don't read that to mean ordered. We must ensure the contract of onNext|onCompleted|onError but I don't see how we need to or should ensure that execution of onNext events arrive in the same order once they've been thrown off on a thread-pool and multiple threads.

Do you think otherwise?

@benjchristensen
Copy link
Member

You can take a look at an Observer wrapper we used in production in our internal version before open-sourcing that allows concurrent onNext executions but ensures the onCompleted/onError contract: https://gist.github.com/benjchristensen/5400653

We were more lenient in our internal version about allowing concurrent execution of onNext since we forced everything to be functional and without state (which I still actually prefer and think onNext should be allowed to be concurrent) but when we open sourced I removed this since it is against the contract of official Rx and do not allow concurrent onNext execution.

However, it shows the principle of how onNext calls could interleave (be out of order) but still be serialized while ensuring onCompleted/onError occur only at the end after all onNext calls are completed.

@benjchristensen
Copy link
Member

Thinking about this more ... the ordering still isn't the part that concerns me (though maybe it should) but the real issue here is that each onNext/onCompleted/onError could be triggered on a separate thread.

We need to not only ensure only one of those is running at a time but then we ensure visibility/memory consistency. This means we are basically forced into wrapping this in SynchronizedObserver so only a single onNext/onCompleted/onError call is happening at any given time, correct?

Without that I can't see ObserveOn complying with the Rx contract.

@mairbek
Copy link
Contributor Author

mairbek commented Apr 17, 2013

I thought if onNext calls are sequential it makes sense to make them sequencial in the order they have appeared. I've tried to implement non-blocking algorithm to make it work #234.

@mairbek
Copy link
Contributor Author

mairbek commented Apr 17, 2013

We need to not only ensure only one of those is running at a time but then we ensure visibility/memory consistency. This means we are basically forced into wrapping this in SynchronizedObserver so only a single onNext/onCompleted/onError call is happening at any given time, correct?

Using only SynchronizedObserver itself wouldn't be helpful, since it doesn't guaranty the ordering of the events. OnCompleted could be called before OnNext.

It is a good question whether memory consistency/visibility should be responsibility of the library or of the observer.

@benjchristensen
Copy link
Member

It would guarantee ordering if the synchronization happens BEFORE putting the event on to the Scheduler since they would enter and leave the Scheduler in the same order they hit the SynchronizedObserver.

It should only be permitted to schedule one event at a time. It can't schedule more than one at a time because then you can no longer prevent them from running concurrently.

@benjchristensen
Copy link
Member

I believe this was fixed in f1be5f4 and 071b894

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants