Skip to content

ScheduledObserver Ordering #234

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 4 commits into from
Apr 18, 2013
Merged

Conversation

mairbek
Copy link
Contributor

@mairbek mairbek commented Apr 16, 2013

Fixing issue #233.
Blocking solution for now, thinking about non-blocking alternatives.

@cloudbees-pull-request-builder

RxJava-pull-requests #90 FAILURE
Looks like there's a problem with this pull request

}

if (count > 0) {
scheduler.schedule(this);
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this might grow the stack size, should be reimplemented.

@cloudbees-pull-request-builder

RxJava-pull-requests #91 FAILURE
Looks like there's a problem with this pull request

@cloudbees-pull-request-builder

RxJava-pull-requests #92 ABORTED

@cloudbees-pull-request-builder

RxJava-pull-requests #93 SUCCESS
This pull request looks good

@benjchristensen
Copy link
Member

This looks like an elegant solution @mairbek

I can't quite figure out if there's a chance for a race condition on the counter but am merging it for now since I can't prove one way or another and this is way better than what we have.

I'm manually merging via #238 since I added some unit tests.

@benjchristensen benjchristensen merged commit 35f2b2f into ReactiveX:master Apr 18, 2013
@mairbek
Copy link
Contributor Author

mairbek commented Apr 19, 2013

I'm not sure if this is a really working solution as well, looks too simple.
Need to think about use cases to break this code.

@benjchristensen
Copy link
Member

Thanks, I'd appreciate that as I haven't had time to think through those.

@mairbek
Copy link
Contributor Author

mairbek commented Apr 22, 2013

Okay, this seems to be working.

Assuming that underlying observable implemented correctly, this means that OnNext, OnCompleted and OnError events will be called sequentially. The solution itself is inspired by the way a CurrentThreadScheduler works except that it relies on concurrent primitives instead of ThreadLocal.

I can see 4 cases:

  1. Enqueue when queue was empty. Counter is set to 1 and queue processing is started. Since counter is atomic two processes cannot start.
  2. Enqueue when queue is not empty. Counter is set to 1, queue processing is already going on. Queue processing cannot stop since counter is atomic.
  3. Dequeue when queue has more than one element. This would recursively schedule queue processing.
  4. Dequeue when queue has one item. When item is processed counter would be set to 0. Queue processing stopped. Next enqueue will schedule out new queue processing.

@benjchristensen what do you think?


}

int count = counter.decrementAndGet();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think that as long as nothing makes any calls to the underlying observer after this line it should be ok.

jihoonson pushed a commit to jihoonson/RxJava that referenced this pull request Mar 6, 2020
…or_rate_limiter

Non-blocking API for RateLimiter
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants