Skip to content

BehaviorSubject concurrent subscription and sending is broken #1184

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
andrask opened this issue May 9, 2014 · 21 comments
Closed

BehaviorSubject concurrent subscription and sending is broken #1184

andrask opened this issue May 9, 2014 · 21 comments
Milestone

Comments

@andrask
Copy link

andrask commented May 9, 2014

BehaviorSubject should ensure that the last notification always reaches the subscriber. When the subscription and sending happens concurrently, there is a high probability that this property gets broken.

Test has been prepared that easily reproduces the error.
https://gist.github.com/andrask/fc06abfd70daa6f91edb#file-behaviorsubjectsubscribeandsendconcurrently-java

The test involves two threads: 1) trying to subscribe 2) trying to send next. These are carefully coordinated to allow real concurrent execution. The test is that the subscriber must receive the sent value. The issue almost certainly happens in a few hundred retries.

Note that with a Thread.sleep(1) the issue goes away.

Note that the test is something I distilled from what I saw in my production code. There may be little issues with it but the concurrency problem certainly exists as it is reproducibly just by stepping through the code.

@akarnokd
Copy link
Member

akarnokd commented May 9, 2014

Thanks for the test case. I'm currently improving the BehaviorSubject and the new code passes this test.

@benjchristensen
Copy link
Member

@andrask This has been merged to master if you can validate against that prior to release?

@benjchristensen benjchristensen added this to the 0.19 milestone May 20, 2014
@benjchristensen
Copy link
Member

The tests in https://gist.github.com/andrask/fc06abfd70daa6f91edb#file-behaviorsubjectsubscribeandsendconcurrently-java are passing for me. Closing this out as it looks fixed. Please re-open if it's not and can be demonstrated. I'd still appreciate confirmation if you can.

@andrask
Copy link
Author

andrask commented May 20, 2014

@benjchristensen thank you for the notice. I wanted to try today but got overwhelmed by other stuff. I cloned the repo and started installing with gradlew install but the process stopped at a lib it couldn't download and I didn't have more time to debug the reason. I'll try again tomorrow with different network settings. I'm sure the issue is with my setup. Fortunately, removing the current workaround from the code is quite easy so it will be simple to see whether this worked.

Sadly, I use hystrix as well which makes it hard to apply the new RxJava version even if it comes out some time in the near future.

@benjchristensen
Copy link
Member

Thanks @andrask. Hystrix 1.3.16+ and Hystrix 1.4.+ should both be fully compatible with the current set of changes, as both of those support RxJava 0.18+. Since I work on both projects I'm keeping Hystrix moving forward.

Is there anything specific about Hystrix + Rx that isn't working well that I can solve?

@andrask
Copy link
Author

andrask commented May 21, 2014

I confirm that the new version works.

The combo works well. I'll check again why I wasn't able to upgrade the last time. I think I only upgraded RX and this is why it failed.

@benjchristensen
Copy link
Member

Thanks for confirming. Yes, Hystrix had to be upgraded to work. It is no longer using any deprecated Rx code so should be safe as we're nearing the end of changes and almost to 1.0.

@sregg
Copy link

sregg commented May 24, 2016

I'm still seeing some concurrency issue between the subscription and the emission of the first object.
When those 2 things happen at the very same time, the subscriber doesn't observe the initial emission...
Is there any remaining known issue?

@artem-zinnatullin
Copy link
Contributor

This is expected behavior.

If you you write to the subject from multiple threads and need consistency of on*() events -> you can convert your subject to SerializedSubject via subject.toSerialized().

Javadoc of Subject.toSerialized():

/**
 * Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
 * <p>
 * When you use an ordinary {@link Subject} as a {@link Subscriber}, you must take care not to call its 
 * {@link Subscriber#onNext} method (or its other {@code on} methods) from multiple threads, as this could 
 * lead to non-serialized calls, which violates
 * <a href="http://reactivex.io/documentation/contract.html">the Observable contract</a> and creates an
 * ambiguity in the resulting Subject.
 * <p>
 * To protect a {@code Subject} from this danger, you can convert it into a {@code SerializedSubject} with
 * code like the following:
 * <p><pre>{@code
 * mySafeSubject = myUnsafeSubject.toSerialized();
 * }</pre>
 * 
 * @return SerializedSubject wrapping the current Subject
 */

@JakeWharton
Copy link
Contributor

Subscription is not an on* method and therefore this is not expected.

On Tue, May 24, 2016 at 7:38 PM Artem Zinnatullin [email protected]
wrote:

This is expected behavior.

If you you write to the subject from multiple threads and need consistency
of on*() events -> you can convert your subject to SerializedSubject via
subject.toSerialized().

Javadoc of Subject.toSerialized():

/** * Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads. *

* When you use an ordinary {@link Subject} as a {@link Subscriber}, you must take care not to call its * {@link Subscriber#onNext} method (or its other {@code on} methods) from multiple threads, as this could * lead to non-serialized calls, which violates * the Observable contract and creates an * ambiguity in the resulting Subject. *

* To protect a {@code Subject} from this danger, you can convert it into a {@code SerializedSubject} with * code like the following: *

{@code * mySafeSubject = myUnsafeSubject.toSerialized(); * }
* * @return SerializedSubject wrapping the current Subject */


You are receiving this because you are subscribed to this thread.
Reply to this email directly or view it on GitHub
#1184 (comment)

@artem-zinnatullin
Copy link
Contributor

Well, it's hard to divide subscribe() and setting of initial value (which is ~onNext) for me.

What I see at the moment:

  1. Current implementation of BehaviorSubject.create(defaultValue) sets default value to volatile Object latest field of SubjectSubscriptionManager.
  2. When you call onNext() it also sets new value to that volatile field without any synchronization.
  3. When you subscribe() it won't start with defaultValue that you've passed as initial but it will emit that volatile latest state instead -> that leads to the effect that @sregg sees.

And now I'm not sure that toSerialized() will help with initial value because even though it synchronizes onNext, emission of the default value goes trough different lock object and still may happen concurrently with onNext

@sregg as temporary solution you can try to create BehaviorSubject without default value, convert it toSerialized() and set default value via onNext before giving reference to the subject to other threads, this should give you threading consistency you want to achieve.

Probably we need to define this behavior better either in javadoc or even change current impementation to emit default initial value first and only then emit volatile latest.

@akarnokd will be great to get your comment here!

@sregg
Copy link

sregg commented May 25, 2016

I'm sorry, when I said "initial" value I wanted to say "first" value (first onNext) not the default value you pass in create()

@artem-zinnatullin
Copy link
Contributor

Oh, that's better, then toSerialized() should be enough for you!

But the problem still remains: we need to define serialized behavior for initial value passed to Subject.create().

@akarnokd /other contributors: maybe create separate issue for that if you think it worth it?

@sregg
Copy link

sregg commented May 25, 2016

also, I'm using asObservable on my BehaviorSubject. Not sure if that changes anything... (and I'm not sure why I'm using asObservable either...)

@sregg
Copy link

sregg commented May 25, 2016

but I thought toSerialized was fixing concurrency issues between 2 onNext/onError/onComplete from 2 different threads, it doesn't say anything about concurrency issues between subscription and onNext...

@artem-zinnatullin
Copy link
Contributor

That doesn't change anything significant, it simply hides actual type of the Observable for consumers so they won't be able to simply cast your Observable to BehaviorSubject/etc.

@akarnokd
Copy link
Member

akarnokd commented May 25, 2016

I'm still seeing some concurrency issue between the subscription and the emission of the first object.
When those 2 things happen at the very same time, the subscriber doesn't observe the initial emission...
Is there any remaining known issue?

Can you demonstrate this with an unit test?

Currently, there is a bug with BehaviorSubject that when started out empty, a concurrent subscription with an onNext may result in receiving the same value twice - but this doesn't sound like your case.

int n = 100000;
Scheduler.Worker w = Schedulers.computation().createWorker();

try {
    for (int i = 0; i < n; i++) {

        if ((i % 100) == 0) {
            System.out.println("Round " + i);
        }

        AtomicInteger cdl = new AtomicInteger(2);

        AtomicBoolean done = new AtomicBoolean();

        BehaviorSubject<Integer> bs = BehaviorSubject.create();

        TestSubscriber<Integer> ts = new TestSubscriber<>();

        w.schedule(() -> {
            cdl.decrementAndGet();
            while (cdl.get() != 0) ;
            bs.onNext(1);
            done.set(true);
        });

        cdl.decrementAndGet();
        while (cdl.get() != 0) ;

        bs.subscribe(ts);

        while (!done.get()) ;

        int c = ts.getOnNextEvents().size();
        if (c == 0) {
            Assert.fail("The TestSubscriber didn't receive any values");
        }
    }
} finally {
    w.unsubscribe();
}

Note that if a BehaviorSubject is completed, the cached value, if any, won't be available anymore so you may end up with no values but a terminal event in the subscriber.

@sregg
Copy link

sregg commented May 25, 2016

Right, that's probably what's happening. I'm calling onComplete() right after onNext().

Here's my setup:
I wrapped a class that does asynchronous stuff with Rx java.
That class needs a callback listener in its constructor and has a method to trigger an action.
If that method is called, it will do the action in a different thread, and send the result back to me via the callback listener.
I wrapped this asynchronous behavior using a BehaviorSubject. Here's the pseudo code:

public class Wrapper implements Wrapped.Listener {
  private Wrapped wrapped;
  private BehaviorSubject<Result> behaviorSubject;

  public Wrapper() {
    wrapped = new Wrapped(this);
    behaviorSubject = BehaviorSubject.create();
  }

  public Observable<Result> getWrappedObservable() {
    wrapped.doAction();
    return behaviorSubject.asObservable();
  }

  @Override
  public void callback(Result result) {
    behaviorSubject.onNext(result);
    behaviorSubject.onCompleted();
  }
}

The problem is that the async action can be so quick to execute, it will call the callback right away, and some times, before or during the subscription of the Observable.

I managed to fix this by delaying the call of the doAction() call in doOnSubscribe():

  public Observable<Result> getWrappedObservable() {
    return behaviorSubject.asObservable()
        .doOnSubscribe(() -> {
            wrapped.doAction();
        });
  }

Is there a better way to do so?

@akarnokd
Copy link
Member

If you have only one value to relay, use AsyncSubject.

@sregg
Copy link

sregg commented May 25, 2016

Good to know. But would that fix my concurrency issue?

@sregg
Copy link

sregg commented May 26, 2016

Seems that my idea of delaying the action in doOnSubscribe still doesn't fix the issue.
Now I'm thinking there's something wrong between onNext and onComplete. Sometimes only onComplete is received...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

6 participants