Skip to content

Commit ed29fac

Browse files
authored
2.x: Expand the Javadoc of Flowable (#6506)
1 parent c5e1b3a commit ed29fac

File tree

2 files changed

+101
-9
lines changed

2 files changed

+101
-9
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 100 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -36,12 +36,12 @@
3636
import io.reactivex.subscribers.*;
3737

3838
/**
39-
* The Flowable class that implements the Reactive-Streams Pattern and offers factory methods,
40-
* intermediate operators and the ability to consume reactive dataflows.
39+
* The Flowable class that implements the <a href="https://github.com/reactive-streams/reactive-streams-jvm">Reactive Streams</a>
40+
* Pattern and offers factory methods, intermediate operators and the ability to consume reactive dataflows.
4141
* <p>
42-
* Reactive-Streams operates with {@code Publisher}s which {@code Flowable} extends. Many operators
42+
* Reactive Streams operates with {@link Publisher}s which {@code Flowable} extends. Many operators
4343
* therefore accept general {@code Publisher}s directly and allow direct interoperation with other
44-
* Reactive-Streams implementations.
44+
* Reactive Streams implementations.
4545
* <p>
4646
* The Flowable hosts the default buffer size of 128 elements for operators, accessible via {@link #bufferSize()},
4747
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
@@ -51,11 +51,103 @@
5151
* <p>
5252
* <img width="640" height="317" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/legend.png" alt="">
5353
* <p>
54+
* The {@code Flowable} follows the protocol
55+
* <pre><code>
56+
* onSubscribe onNext* (onError | onComplete)?
57+
* </code></pre>
58+
* where the stream can be disposed through the {@link Subscription} instance provided to consumers through
59+
* {@link Subscriber#onSubscribe(Subscription)}.
60+
* Unlike the {@code Observable.subscribe()} of version 1.x, {@link #subscribe(Subscriber)} does not allow external cancellation
61+
* of a subscription and the {@link Subscriber} instance is expected to expose such capability if needed.
62+
* <p>
63+
* Flowables support backpressure and require {@link Subscriber}s to signal demand via {@link Subscription#request(long)}.
64+
* <p>
65+
* Example:
66+
* <pre><code>
67+
* Disposable d = Flowable.just("Hello world!")
68+
* .delay(1, TimeUnit.SECONDS)
69+
* .subscribeWith(new DisposableSubscriber&lt;String&gt;() {
70+
* &#64;Override public void onStart() {
71+
* System.out.println("Start!");
72+
* request(1);
73+
* }
74+
* &#64;Override public void onNext(String t) {
75+
* System.out.println(t);
76+
* request(1);
77+
* }
78+
* &#64;Override public void onError(Throwable t) {
79+
* t.printStackTrace();
80+
* }
81+
* &#64;Override public void onComplete() {
82+
* System.out.println("Done!");
83+
* }
84+
* });
85+
*
86+
* Thread.sleep(500);
87+
* // the sequence can now be cancelled via dispose()
88+
* d.dispose();
89+
* </code></pre>
90+
* <p>
91+
* The Reactive Streams specification is relatively strict when defining interactions between {@code Publisher}s and {@code Subscriber}s, so much so
92+
* that there is a significant performance penalty due certain timing requirements and the need to prepare for invalid
93+
* request amounts via {@link Subscription#request(long)}.
94+
* Therefore, RxJava has introduced the {@link FlowableSubscriber} interface that indicates the consumer can be driven with relaxed rules.
95+
* All RxJava operators are implemented with these relaxed rules in mind.
96+
* If the subscribing {@code Subscriber} does not implement this interface, for example, due to it being from another Reactive Streams compliant
97+
* library, the Flowable will automatically apply a compliance wrapper around it.
98+
* <p>
99+
* {@code Flowable} is an abstract class, but it is not advised to implement sources and custom operators by extending the class directly due
100+
* to the large amounts of <a href="https://github.com/reactive-streams/reactive-streams-jvm#specification">Reactive Streams</a>
101+
* rules to be followed to the letter. See <a href="https://github.com/ReactiveX/RxJava/wiki/Writing-operators-for-2.0">the wiki</a> for
102+
* some guidance if such custom implementations are necessary.
103+
* <p>
104+
* The recommended way of creating custom {@code Flowable}s is by using the {@link #create(FlowableOnSubscribe, BackpressureStrategy)} factory method:
105+
* <pre><code>
106+
* Flowable&lt;String&gt; source = Flowable.create(new FlowableOnSubscribe&lt;String&gt;() {
107+
* &#64;Override
108+
* public void subscribe(FlowableEmitter&lt;String&gt; emitter) throws Exception {
109+
*
110+
* // signal an item
111+
* emitter.onNext("Hello");
112+
*
113+
* // could be some blocking operation
114+
* Thread.sleep(1000);
115+
*
116+
* // the consumer might have cancelled the flow
117+
* if (emitter.isCancelled() {
118+
* return;
119+
* }
120+
*
121+
* emitter.onNext("World");
122+
*
123+
* Thread.sleep(1000);
124+
*
125+
* // the end-of-sequence has to be signaled, otherwise the
126+
* // consumers may never finish
127+
* emitter.onComplete();
128+
* }
129+
* }, BackpressureStrategy.BUFFER);
130+
*
131+
* System.out.println("Subscribe!");
132+
*
133+
* source.subscribe(System.out::println);
134+
*
135+
* System.out.println("Done!");
136+
* </code></pre>
137+
* <p>
138+
* RxJava reactive sources, such as {@code Flowable}, are generally synchronous and sequential in nature. In the ReactiveX design, the location (thread)
139+
* where operators run is <i>orthogonal</i> to when the operators can work with data. This means that asynchrony and parallelism
140+
* has to be explicitly expressed via operators such as {@link #subscribeOn(Scheduler)}, {@link #observeOn(Scheduler)} and {@link #parallel()}. In general,
141+
* operators featuring a {@link Scheduler} parameter are introducing this type of asynchrony into the flow.
142+
* <p>
54143
* For more information see the <a href="http://reactivex.io/documentation/Publisher.html">ReactiveX
55144
* documentation</a>.
56145
*
57146
* @param <T>
58147
* the type of the items emitted by the Flowable
148+
* @see Observable
149+
* @see ParallelFlowable
150+
* @see io.reactivex.subscribers.DisposableSubscriber
59151
*/
60152
public abstract class Flowable<T> implements Publisher<T> {
61153
/** The default buffer size. */
@@ -2199,7 +2291,7 @@ public static <T> Flowable<T> fromIterable(Iterable<? extends T> source) {
21992291
}
22002292

22012293
/**
2202-
* Converts an arbitrary Reactive-Streams Publisher into a Flowable if not already a
2294+
* Converts an arbitrary Reactive Streams Publisher into a Flowable if not already a
22032295
* Flowable.
22042296
* <p>
22052297
* The {@link Publisher} must follow the
@@ -4385,7 +4477,7 @@ public static Flowable<Long> timer(long delay, TimeUnit unit, Scheduler schedule
43854477

43864478
/**
43874479
* Create a Flowable by wrapping a Publisher <em>which has to be implemented according
4388-
* to the Reactive-Streams specification by handling backpressure and
4480+
* to the Reactive Streams specification by handling backpressure and
43894481
* cancellation correctly; no safeguards are provided by the Flowable itself</em>.
43904482
* <dl>
43914483
* <dt><b>Backpressure:</b></dt>
@@ -13569,7 +13661,7 @@ public final Flowable<T> retryWhen(
1356913661
* Subscribes to the current Flowable and wraps the given Subscriber into a SafeSubscriber
1357013662
* (if not already a SafeSubscriber) that
1357113663
* deals with exceptions thrown by a misbehaving Subscriber (that doesn't follow the
13572-
* Reactive-Streams specification).
13664+
* Reactive Streams specification).
1357313665
* <dl>
1357413666
* <dt><b>Backpressure:</b></dt>
1357513667
* <dd>This operator leaves the reactive world and the backpressure behavior depends on the Subscriber's behavior.</dd>
@@ -14792,7 +14884,7 @@ public final void subscribe(Subscriber<? super T> s) {
1479214884
* If the {@link Flowable} rejects the subscription attempt or otherwise fails it will signal
1479314885
* the error via {@link FlowableSubscriber#onError(Throwable)}.
1479414886
* <p>
14795-
* This subscribe method relaxes the following Reactive-Streams rules:
14887+
* This subscribe method relaxes the following Reactive Streams rules:
1479614888
* <ul>
1479714889
* <li>§1.3: onNext should not be called concurrently until onSubscribe returns.
1479814890
* <b>FlowableSubscriber.onSubscribe should make sure a sync or async call triggered by request() is safe.</b></li>

src/main/java/io/reactivex/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@
4242
* Many operators in the class accept {@code ObservableSource}(s), the base reactive interface
4343
* for such non-backpressured flows, which {@code Observable} itself implements as well.
4444
* <p>
45-
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()},
45+
* The Observable's operators, by default, run with a buffer size of 128 elements (see {@link Flowable#bufferSize()}),
4646
* that can be overridden globally via the system parameter {@code rx2.buffer-size}. Most operators, however, have
4747
* overloads that allow setting their internal buffer size explicitly.
4848
* <p>

0 commit comments

Comments
 (0)