From 9f1d8465a062e165bf0af13ea6d54cbf324be400 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?D=C3=A1vid=20Karnok?= Date: Thu, 8 Mar 2018 22:24:17 +0100 Subject: [PATCH] 2.x: Fix Flowable.singleOrError().toFlowable() not signalling NSEE --- .../operators/flowable/FlowableSingle.java | 20 +++++++++++++++---- .../flowable/FlowableSingleMaybe.java | 2 +- .../flowable/FlowableSingleSingle.java | 2 +- .../flowable/FlowableSingleTest.java | 9 +++++++++ .../observable/ObservableSingleTest.java | 9 +++++++++ 5 files changed, 36 insertions(+), 6 deletions(-) diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java index a18505404f..70d29e7ad5 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingle.java @@ -13,6 +13,8 @@ package io.reactivex.internal.operators.flowable; +import java.util.NoSuchElementException; + import org.reactivestreams.*; import io.reactivex.*; @@ -23,14 +25,17 @@ public final class FlowableSingle extends AbstractFlowableWithUpstream final T defaultValue; - public FlowableSingle(Flowable source, T defaultValue) { + final boolean failOnEmpty; + + public FlowableSingle(Flowable source, T defaultValue, boolean failOnEmpty) { super(source); this.defaultValue = defaultValue; + this.failOnEmpty = failOnEmpty; } @Override protected void subscribeActual(Subscriber s) { - source.subscribe(new SingleElementSubscriber(s, defaultValue)); + source.subscribe(new SingleElementSubscriber(s, defaultValue, failOnEmpty)); } static final class SingleElementSubscriber extends DeferredScalarSubscription @@ -40,13 +45,16 @@ static final class SingleElementSubscriber extends DeferredScalarSubscription final T defaultValue; + final boolean failOnEmpty; + Subscription s; boolean done; - SingleElementSubscriber(Subscriber actual, T defaultValue) { + SingleElementSubscriber(Subscriber actual, T defaultValue, boolean failOnEmpty) { super(actual); this.defaultValue = defaultValue; + this.failOnEmpty = failOnEmpty; } @Override @@ -94,7 +102,11 @@ public void onComplete() { v = defaultValue; } if (v == null) { - actual.onComplete(); + if (failOnEmpty) { + actual.onError(new NoSuchElementException()); + } else { + actual.onComplete(); + } } else { complete(v); } diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java index 20608cd430..28ac5bcda1 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleMaybe.java @@ -36,7 +36,7 @@ protected void subscribeActual(MaybeObserver s) { @Override public Flowable fuseToFlowable() { - return RxJavaPlugins.onAssembly(new FlowableSingle(source, null)); + return RxJavaPlugins.onAssembly(new FlowableSingle(source, null, false)); } static final class SingleElementSubscriber diff --git a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java index 44df823067..cb33706362 100644 --- a/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java +++ b/src/main/java/io/reactivex/internal/operators/flowable/FlowableSingleSingle.java @@ -41,7 +41,7 @@ protected void subscribeActual(SingleObserver s) { @Override public Flowable fuseToFlowable() { - return RxJavaPlugins.onAssembly(new FlowableSingle(source, defaultValue)); + return RxJavaPlugins.onAssembly(new FlowableSingle(source, defaultValue, true)); } static final class SingleElementSubscriber diff --git a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java index 2aff36bc6a..8be75e36e1 100644 --- a/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/flowable/FlowableSingleTest.java @@ -796,4 +796,13 @@ public void cancelAsFlowable() { assertFalse(pp.hasSubscribers()); } + + @Test + public void singleOrError() { + Flowable.empty() + .singleOrError() + .toFlowable() + .test() + .assertFailure(NoSuchElementException.class); + } } diff --git a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java index 597ac731c2..9407cf4969 100644 --- a/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java +++ b/src/test/java/io/reactivex/internal/operators/observable/ObservableSingleTest.java @@ -556,4 +556,13 @@ public MaybeSource apply(Observable o) throws Exception { } }); } + + @Test + public void singleOrError() { + Observable.empty() + .singleOrError() + .toObservable() + .test() + .assertFailure(NoSuchElementException.class); + } }