Skip to content

BehaviorSubject Race Condition #658

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Dec 23, 2013 · 7 comments
Closed

BehaviorSubject Race Condition #658

benjchristensen opened this issue Dec 23, 2013 · 7 comments
Assignees

Comments

@benjchristensen
Copy link
Member

Need to investigate the BehaviorSubject as it likely has the same problem as Rx.Net as reported here: http://twistedoakstudios.com/blog/Post8424_deadlocks-in-practice-dont-hold-locks-while-notifying

Note that Subjects were just re-implemented but kept the same behavior and for the BehaviorSubject the race between returning the current value and subscribing for subsequent values appears to be wrong.

Use case:

onNext: 1 2 3 4 5 6 7 8 9
Observer: subscribe between 3 & 4 ... should receive 3 4 5 6 7 8 9

The likely bug is that in a race it could receive 3 6 7 8 9

@akarnokd
Copy link
Member

There are a few missing suggestions that I didn't have in my PR which also involves BehaviorSubject. I'll take a look at this as well.

Edit: Yes, this can happen. If an observer subscribes, it immediately gets the current value, then it is eventually added to the state object via addObserver, which add could be delayed and values lost.

Edit 2: How to avoid it:

  • One would need to block onNext while a subscription is happening, which would negate the improvements.
  • Include the lastNotification in the state so a new observed value triggers a state replace similar to when an observer subscribes. However, one would need 2 states: active and modifying: this avoids using a lock similar to how AbstractAtomicSubscription allows complex operations in the mutating state.

I thought about the queue/drain suggestion in the article, but I can't see it solving the value skip problem; (it surely acts against reordering if there happen to be concurrent onNext calls - but we don't allow / ignore those by contract).

@Strilanc
Copy link

The queue/drain approach can be adapted to also ensure you don't miss notifications. All you have to do is queue the subscription as well. (Queue/drain is a fully general synchronization mechanism akin to using an actor.)

Note that if you do this the initial value may not arrive immediately if the behavior subject is being touched on another thread, but it can be fixed by introducing a second queue specific to the subscription that was initialized with the latest value while atomically subscribing.

So a no-deadlock no-reorder no-thread-hopping-for-initial-value subscription would look like:

initialValueSent = false
missedDrainCount = 0
create drain queue Q for subscription
sync(this) {
    add observer to {
        enqueue new value onto Q
        sync(Q) {
            // other threads must not push initial value
            if (!initialValueSent) {
                missedDrainCount++
            }
        }
        tryDrain Q
    }

    enqueue current value onto Q
}
// forward value to caller before returning (and perhaps some later values too)
tryDrain Q

// --- allow other threads to forward future values ---
sync(Q) {
    initialValueSent = true;
}
// in case observers were enqueueing while we set the flag
// (pretty sure reading missedDrainCount is safe to touch without locking now...)
// (this can be done without the loop, but requires modifying the Q to drain N)
while (missedDrainCount-- > 0) {
    tryDrain Q
}

I think that's right... just off the top of my head, though.

@Strilanc
Copy link

If you're willing to allow the initial value to thread-hop, it's a lot simpler of course:

subscribe:
    enqueue {
        add observer
    }
    tryDrain
send:
    enqueue {
        send to all observers
    }
    tryDrain

(assuming you're using the queue as your only synchronization mechanism. If you're also using locks, you probably need to lock around the enqueues)

edit fixed enqueueing inside add observer instead of inside send

@akarnokd
Copy link
Member

akarnokd commented May 8, 2014

I think this is still an unsolved problem. Do we still want to put effort into this?

@benjchristensen
Copy link
Member Author

I was thinking about trying to solve it similar to the groupBy solution but haven't spent the time to determine if it can be made to work.

@akarnokd
Copy link
Member

akarnokd commented May 9, 2014

I've implemented a potential fix here but can't create a PR right now due to a github 500 error. A simple publish benchmark for PublishSubject is 83 MOps/s, BehaviorSubject (new) ~47MOps/s, BehaviorSubject (master) is ~48 MOps/sec.

@benjchristensen
Copy link
Member Author

Fix just merged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants