Skip to content

Commit 603f6c6

Browse files
vanniktechakarnokd
authored andcommitted
2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable (#4589)
* 2.x: Add singleOrError, firstOrError, lastOrError & elementAtOrError to Observable and Flowable * Address issues
1 parent d923f31 commit 603f6c6

16 files changed

+584
-21
lines changed

src/main/java/io/reactivex/Flowable.java

Lines changed: 100 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7679,6 +7679,37 @@ public final Single<T> elementAt(long index, T defaultItem) {
76797679
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, defaultItem));
76807680
}
76817681

7682+
/**
7683+
* Returns a Flowable that emits the item found at a specified index in a sequence of emissions from a
7684+
* source Publisher.
7685+
* If the source Publisher does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
7686+
* <p>
7687+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrDefault.png" alt="">
7688+
* <dl>
7689+
* <dt><b>Backpressure:</b></dt>
7690+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an unbounded manner
7691+
* (i.e., no backpressure applied to it).</dd>
7692+
* <dt><b>Scheduler:</b></dt>
7693+
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
7694+
* </dl>
7695+
*
7696+
* @param index
7697+
* the zero-based index of the item to retrieve
7698+
* @return a Flowable that emits the item at the specified position in the sequence emitted by the source
7699+
* Publisher, or the default item if that index is outside the bounds of the source sequence
7700+
* @throws IndexOutOfBoundsException
7701+
* if {@code index} is less than 0
7702+
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
7703+
*/
7704+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
7705+
@SchedulerSupport(SchedulerSupport.NONE)
7706+
public final Single<T> elementAtOrError(long index) {
7707+
if (index < 0) {
7708+
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
7709+
}
7710+
return RxJavaPlugins.onAssembly(new FlowableElementAtSingle<T>(this, index, null));
7711+
}
7712+
76827713
/**
76837714
* Filters items emitted by a Publisher by only emitting those that satisfy a specified predicate.
76847715
* <p>
@@ -7753,6 +7784,29 @@ public final Single<T> first(T defaultItem) {
77537784
return elementAt(0, defaultItem);
77547785
}
77557786

7787+
/**
7788+
* Returns a Single that emits only the very first item emitted by the source Publisher, or a default
7789+
* item if the source Publisher completes without emitting anything.
7790+
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
7791+
* <p>
7792+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
7793+
* <dl>
7794+
* <dt><b>Backpressure:</b></dt>
7795+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
7796+
* unbounded manner (i.e., without applying backpressure).</dd>
7797+
* <dt><b>Scheduler:</b></dt>
7798+
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
7799+
* </dl>
7800+
*
7801+
* @return the new Single instance
7802+
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
7803+
*/
7804+
@BackpressureSupport(BackpressureKind.SPECIAL) // take may trigger UNBOUNDED_IN
7805+
@SchedulerSupport(SchedulerSupport.NONE)
7806+
public final Single<T> firstOrError() {
7807+
return elementAtOrError(0);
7808+
}
7809+
77567810
/**
77577811
* Returns a Flowable that emits items based on applying a function that you supply to each item emitted
77587812
* by the source Publisher, where that function returns a Publisher, and then merging those resulting
@@ -8904,6 +8958,28 @@ public final Single<T> last(T defaultItem) {
89048958
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, defaultItem));
89058959
}
89068960

8961+
/**
8962+
* Returns a Single that emits only the last item emitted by the source Publisher.
8963+
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
8964+
* <p>
8965+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
8966+
* <dl>
8967+
* <dt><b>Backpressure:</b></dt>
8968+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
8969+
* unbounded manner (i.e., without applying backpressure).</dd>
8970+
* <dt><b>Scheduler:</b></dt>
8971+
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
8972+
* </dl>
8973+
*
8974+
* @return the new Single instance
8975+
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
8976+
*/
8977+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
8978+
@SchedulerSupport(SchedulerSupport.NONE)
8979+
public final Single<T> lastOrError() {
8980+
return RxJavaPlugins.onAssembly(new FlowableLastSingle<T>(this, null));
8981+
}
8982+
89078983
/**
89088984
* <strong>This method requires advanced knowledge about building operators; please consider
89098985
* other standard composition methods first;</strong>
@@ -11211,6 +11287,30 @@ public final Single<T> single(T defaultItem) {
1121111287
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, defaultItem));
1121211288
}
1121311289

11290+
/**
11291+
* Returns a Single that emits the single item emitted by the source Publisher, if that Publisher
11292+
* emits only a single item.
11293+
* If the source Publisher completes without emitting any items a {@link NoSuchElementException} will be thrown.
11294+
* If the source Publisher emits more than one item, throw an {@code IllegalArgumentException}.
11295+
* <p>
11296+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrError.png" alt="">
11297+
* <dl>
11298+
* <dt><b>Backpressure:</b></dt>
11299+
* <dd>The operator honors backpressure from downstream and consumes the source {@code Publisher} in an
11300+
* unbounded manner (i.e., without applying backpressure).</dd>
11301+
* <dt><b>Scheduler:</b></dt>
11302+
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
11303+
* </dl>
11304+
*
11305+
* @return the new Single instance
11306+
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
11307+
*/
11308+
@BackpressureSupport(BackpressureKind.UNBOUNDED_IN)
11309+
@SchedulerSupport(SchedulerSupport.NONE)
11310+
public final Single<T> singleOrError() {
11311+
return RxJavaPlugins.onAssembly(new FlowableSingleSingle<T>(this, null));
11312+
}
11313+
1121411314
/**
1121511315
* Returns a Flowable that skips the first {@code count} items emitted by the source Publisher and emits
1121611316
* the remainder.

src/main/java/io/reactivex/Observable.java

Lines changed: 92 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -6657,6 +6657,32 @@ public final Single<T> elementAt(long index, T defaultItem) {
66576657
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, defaultItem));
66586658
}
66596659

6660+
/**
6661+
* Returns a Single that emits the item found at a specified index in a sequence of emissions from a source ObservableSource.
6662+
* If the source ObservableSource does not contain the item at the specified index a {@link NoSuchElementException} will be thrown.
6663+
* <p>
6664+
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/elementAtOrError.png" alt="">
6665+
* <dl>
6666+
* <dt><b>Scheduler:</b></dt>
6667+
* <dd>{@code elementAtOrError} does not operate by default on a particular {@link Scheduler}.</dd>
6668+
* </dl>
6669+
*
6670+
* @param index
6671+
* the zero-based index of the item to retrieve
6672+
* @return a Single that emits the item at the specified position in the sequence emitted by the source
6673+
* ObservableSource, or the default item if that index is outside the bounds of the source sequence
6674+
* @throws IndexOutOfBoundsException
6675+
* if {@code index} is less than 0
6676+
* @see <a href="http://reactivex.io/documentation/operators/elementat.html">ReactiveX operators documentation: ElementAt</a>
6677+
*/
6678+
@SchedulerSupport(SchedulerSupport.NONE)
6679+
public final Single<T> elementAtOrError(long index) {
6680+
if (index < 0) {
6681+
throw new IndexOutOfBoundsException("index >= 0 required but it was " + index);
6682+
}
6683+
return RxJavaPlugins.onAssembly(new ObservableElementAtSingle<T>(this, index, null));
6684+
}
6685+
66606686
/**
66616687
* Filters items emitted by an ObservableSource by only emitting those that satisfy a specified predicate.
66626688
* <p>
@@ -6698,8 +6724,8 @@ public final Maybe<T> firstElement() {
66986724
}
66996725

67006726
/**
6701-
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default
6702-
* item if the source ObservableSource completes without emitting anything.
6727+
* Returns a Single that emits only the very first item emitted by the source ObservableSource, or a default item
6728+
* if the source ObservableSource completes without emitting any items.
67036729
* <p>
67046730
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrDefault.png" alt="">
67056731
* <dl>
@@ -6717,6 +6743,24 @@ public final Single<T> first(T defaultItem) {
67176743
return elementAt(0L, defaultItem);
67186744
}
67196745

6746+
/**
6747+
* Returns a Single that emits only the very first item emitted by the source ObservableSource.
6748+
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
6749+
* <p>
6750+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/firstOrError.png" alt="">
6751+
* <dl>
6752+
* <dt><b>Scheduler:</b></dt>
6753+
* <dd>{@code firstOrError} does not operate by default on a particular {@link Scheduler}.</dd>
6754+
* </dl>
6755+
*
6756+
* @return the new Single instance
6757+
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
6758+
*/
6759+
@SchedulerSupport(SchedulerSupport.NONE)
6760+
public final Single<T> firstOrError() {
6761+
return elementAtOrError(0L);
6762+
}
6763+
67206764
/**
67216765
* Returns an Observable that emits items based on applying a function that you supply to each item emitted
67226766
* by the source ObservableSource, where that function returns an ObservableSource, and then merging those resulting
@@ -7660,7 +7704,7 @@ public final Maybe<T> lastElement() {
76607704
}
76617705

76627706
/**
7663-
* Returns an Observable that emits only the last item emitted by the source ObservableSource, or a default item
7707+
* Returns a Single that emits only the last item emitted by the source ObservableSource, or a default item
76647708
* if the source ObservableSource completes without emitting any items.
76657709
* <p>
76667710
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrDefault.png" alt="">
@@ -7681,6 +7725,25 @@ public final Single<T> last(T defaultItem) {
76817725
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, defaultItem));
76827726
}
76837727

7728+
/**
7729+
* Returns a Single that emits only the last item emitted by the source ObservableSource.
7730+
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
7731+
* <p>
7732+
* <img width="640" height="305" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/lastOrError.png" alt="">
7733+
* <dl>
7734+
* <dt><b>Scheduler:</b></dt>
7735+
* <dd>{@code lastOrError} does not operate by default on a particular {@link Scheduler}.</dd>
7736+
* </dl>
7737+
*
7738+
* @return a Single that emits only the last item emitted by the source ObservableSource.
7739+
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
7740+
* @see <a href="http://reactivex.io/documentation/operators/last.html">ReactiveX operators documentation: Last</a>
7741+
*/
7742+
@SchedulerSupport(SchedulerSupport.NONE)
7743+
public final Single<T> lastOrError() {
7744+
return RxJavaPlugins.onAssembly(new ObservableLastSingle<T>(this, null));
7745+
}
7746+
76847747
/**
76857748
* <strong>This method requires advanced knowledge about building operators; please consider
76867749
* other standard composition methods first;</strong>
@@ -9333,7 +9396,7 @@ public final Maybe<T> singleElement() {
93339396
}
93349397

93359398
/**
9336-
* Returns an Observable that emits the single item emitted by the source ObservableSource, if that ObservableSource
9399+
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
93379400
* emits only a single item, or a default item if the source ObservableSource emits no items. If the source
93389401
* ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
93399402
* <p>
@@ -9345,8 +9408,7 @@ public final Maybe<T> singleElement() {
93459408
*
93469409
* @param defaultItem
93479410
* a default value to emit if the source ObservableSource emits no item
9348-
* @return an Observable that emits the single item emitted by the source ObservableSource, or a default item if
9349-
* the source ObservableSource is empty
9411+
* @return the new Single instance
93509412
* @throws IllegalArgumentException
93519413
* if the source ObservableSource emits more than one item
93529414
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
@@ -9357,6 +9419,30 @@ public final Single<T> single(T defaultItem) {
93579419
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, defaultItem));
93589420
}
93599421

9422+
/**
9423+
* Returns a Single that emits the single item emitted by the source ObservableSource, if that ObservableSource
9424+
* emits only a single item.
9425+
* If the source ObservableSource completes without emitting any items a {@link NoSuchElementException} will be thrown.
9426+
* If the source ObservableSource emits more than one item, throw an {@code IllegalArgumentException}.
9427+
* <p>
9428+
* <img width="640" height="315" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/singleOrDefault.png" alt="">
9429+
* <dl>
9430+
* <dt><b>Scheduler:</b></dt>
9431+
* <dd>{@code singleOrError} does not operate by default on a particular {@link Scheduler}.</dd>
9432+
* </dl>
9433+
*
9434+
* @return the new Single instance
9435+
* @throws IllegalArgumentException
9436+
* if the source ObservableSource emits more than one item
9437+
* @throws NoSuchElementException
9438+
* if the source ObservableSource completes without emitting any items
9439+
* @see <a href="http://reactivex.io/documentation/operators/first.html">ReactiveX operators documentation: First</a>
9440+
*/
9441+
@SchedulerSupport(SchedulerSupport.NONE)
9442+
public final Single<T> singleOrError() {
9443+
return RxJavaPlugins.onAssembly(new ObservableSingleSingle<T>(this, null));
9444+
}
9445+
93609446
/**
93619447
* Returns an Observable that skips the first {@code count} items emitted by the source ObservableSource and emits
93629448
* the remainder.

src/main/java/io/reactivex/internal/operators/flowable/FlowableElementAtSingle.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.NoSuchElementException;
1617
import org.reactivestreams.*;
1718

1819
import io.reactivex.*;
@@ -104,7 +105,14 @@ public void onComplete() {
104105
s = SubscriptionHelper.CANCELLED;
105106
if (index <= count && !done) {
106107
done = true;
107-
actual.onSuccess(defaultValue);
108+
109+
T v = defaultValue;
110+
111+
if (v != null) {
112+
actual.onSuccess(v);
113+
} else {
114+
actual.onError(new NoSuchElementException());
115+
}
108116
}
109117
}
110118

src/main/java/io/reactivex/internal/operators/flowable/FlowableLastSingle.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.NoSuchElementException;
1617
import org.reactivestreams.*;
1718

1819
import io.reactivex.*;
@@ -100,7 +101,13 @@ public void onComplete() {
100101
item = null;
101102
actual.onSuccess(v);
102103
} else {
103-
actual.onSuccess(defaultItem);
104+
v = defaultItem;
105+
106+
if (v != null) {
107+
actual.onSuccess(v);
108+
} else {
109+
actual.onError(new NoSuchElementException());
110+
}
104111
}
105112
}
106113
}

src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@
1313

1414
package io.reactivex.internal.operators.flowable;
1515

16+
import java.util.NoSuchElementException;
1617
import org.reactivestreams.*;
1718

1819
import io.reactivex.*;
@@ -107,7 +108,12 @@ public void onComplete() {
107108
if (v == null) {
108109
v = defaultValue;
109110
}
110-
actual.onSuccess(v);
111+
112+
if (v != null) {
113+
actual.onSuccess(v);
114+
} else {
115+
actual.onError(new NoSuchElementException());
116+
}
111117
}
112118

113119
@Override

src/main/java/io/reactivex/internal/operators/observable/ObservableElementAtSingle.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,17 +16,20 @@
1616
import io.reactivex.*;
1717
import io.reactivex.disposables.Disposable;
1818
import io.reactivex.internal.disposables.DisposableHelper;
19+
import java.util.NoSuchElementException;
1920
import io.reactivex.plugins.RxJavaPlugins;
2021

2122
public final class ObservableElementAtSingle<T> extends Single<T> {
2223
final ObservableSource<T> source;
2324
final long index;
2425
final T defaultValue;
26+
2527
public ObservableElementAtSingle(ObservableSource<T> source, long index, T defaultValue) {
2628
this.source = source;
2729
this.index = index;
2830
this.defaultValue = defaultValue;
2931
}
32+
3033
@Override
3134
public void subscribeActual(SingleObserver<? super T> t) {
3235
source.subscribe(new ElementAtObserver<T>(t, index, defaultValue));
@@ -98,7 +101,14 @@ public void onError(Throwable t) {
98101
public void onComplete() {
99102
if (index <= count && !done) {
100103
done = true;
101-
actual.onSuccess(defaultValue);
104+
105+
T v = defaultValue;
106+
107+
if (v != null) {
108+
actual.onSuccess(v);
109+
} else {
110+
actual.onError(new NoSuchElementException());
111+
}
102112
}
103113
}
104114
}

0 commit comments

Comments
 (0)