Skip to content

2.0 Design: Bounded Buffers #3213

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
benjchristensen opened this issue Aug 27, 2015 · 4 comments
Closed

2.0 Design: Bounded Buffers #3213

benjchristensen opened this issue Aug 27, 2015 · 4 comments
Milestone

Comments

@benjchristensen
Copy link
Member

A discussion about bounded and unbounded buffers evolved out of a naming discussion in #2787 (comment) I want to make it a top-level discussion as it is important.

The question is whether we should support a type in v2 that does not support backpressure and has unbounded buffers in merge/flatMap/observeOn/zip. Effectively this would mean having a type like RxJava 0.19 and earlier, before backpressure was added.

If we did have both types, we would have something like Observable (without backpressure) and Flowable (with backpressure).

My perspective is that RxJava should never automatically use an unbounded buffer. It should only happen when a user opts-in, such as through use of onBackpressureBuffer or toList. Use of observeOn, merge/flatMap, and zip should not permit unbounded buffer growth, as that is not the use case any of those operators is asking for.

In a production application, unbounded buffer growth equates to unbounded latency and memory growth. Additionally, unbounded buffers are an illusion anyways, as everything is always bounded at some point, in time or space.

The argument for unbounded buffers typically is something like "mouse events can't be backpressured". However, this is not quite correct. Even mouse events already have flow control being applied to them (backpressure is just one form of flow control) by the system through sampling resolution.

For a "hot" data source, such as mouse events, or system events (such as occur in our server-side apps as side effects of processing user traffic), we can't "stop" the flow of events, but buffers must be bounded, and we must apply a strategy for how to deal with overflow.

The "backpressure signal", if unhandled should cause a failure, not result in unbounded latency and memory growth. The "backpressure signal" can in turn be used to trigger strategies such as sampling, dropping, throttling, debouncing, and-the-like.

Thus, my proposal is that RxJava v2 stick to the decisions made in v1 to provide a single stream type, Observable, that supports backpressure, and not use unbounded buffers unless they are explicitly asked for.

The "complexity" concerns of modeling "hot" sources (those I sometimes hear called "truly reactive") should be solved by a better Observable.create API that allows simple onNext behavior with a choice of strategy for handling the backpressure signal.

@benjchristensen
Copy link
Member Author

@ReactiveX/rxjava-committers please weigh in on this

@akarnokd
Copy link
Member

I think it would be quite confusing to have two fluent base classes and having the Observable not use backpressure after 1.x.

I've been implementing operators such as merge/zip with a bufferSize parameter that let's it tune the internal buffers on a per-chain basis. It is possible to add an extra parameter unbounded which instructs the operator to use an unbounded SpscLinkedArrayQueue with island size of bufferSize.

@stealthcode
Copy link

If we decide to not have any unbounded buffers then I like the idea of having 2 types that fluently convert between each other. Otherwise it's not worth the confusion of having 2 types. Also a developer should be able to make an intelligent decision about what the buffer size should be for almost all context. For instance, an observable that consumes off a hot source and uses observeOn(Schedulers.computation()) should be bounded by the size of the computation scheduler thread pool.

An observable stream that windows once per some time interval could also be used to estimate an approximate necessary buffer size. For instance if the developer estimates that a windowed observable should be consumed and unsubscribed before the window emits another one then the developer might safely say that their bounds are 1.

Also in both context they should be able to say what they want to do in the case that the buffer capacity is exceeded. Similar to onBackpressureBuffer or onBackpressureDrop the bounds of Observable<Observable<T>> needs to have some concept of behavior.

IFF it's decided that we will not allow unbounded buffers then I propose the following:

  • Hot observables have static methods that define a type that's assumed to be non-compliant with buffer bounds
  • The hot observable type would have every non-buffering operator and overloads of the buffering operators that take a bounds and overflow strategy.
  • Setting the bounds on a hot observable should return a bounded observable

The implementation and maintenance of having two different wrapper types (something like BoundedObservable and HotObservable) would be relatively trivial since they would delegate to the same underlying operators and native observable semantics (lift, subscribe, x, compose, etc). However the users would have to learn that they are working with a different type in different contexts. I think that this is a net positive in the sense that performance and efficiency concerns will be first class in the minds of the users. Also this would be useful for interpreting the code and understanding the intention.

@akarnokd akarnokd added this to the 2.0 RC 1 milestone Jun 22, 2016
@akarnokd
Copy link
Member

I think all of the discussion points are now resolved:

  • new base reactive type io.reactivex.Observable relative to 1.x: does not do backpressure but supports synchronous cancellation
  • io.reactivex.Flowable.create(FlowableOnSubscribe) supports push-like sources with safe backpressure and cancellation support
  • io.reactivex.Observable.create(ObservableOnSubscribe) supports push-like sources with safe cancellation support.
  • Buffers of the Flowable operators are generally bounded and adjustable via overload.
  • Buffers of the Observable operators are unbounded and operators have capacity hints to limit internal buffer churn

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

3 participants