Skip to content

Commit e077ed8

Browse files
authored
3.x: Add Completable.onErrorReturn[Item] (#6886)
1 parent f86753a commit e077ed8

File tree

5 files changed

+224
-10
lines changed

5 files changed

+224
-10
lines changed

src/main/java/io/reactivex/rxjava3/core/Completable.java

Lines changed: 58 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2323,6 +2323,64 @@ public final Completable onErrorResumeWith(@NonNull CompletableSource fallback)
23232323
return onErrorResumeNext(Functions.justFunction(fallback));
23242324
}
23252325

2326+
/**
2327+
* Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
2328+
* {@code Completable} instead of signaling the error via {@code onError}.
2329+
* <p>
2330+
* <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturn.png" alt="">
2331+
* <p>
2332+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
2333+
* encountered.
2334+
* <dl>
2335+
* <dt><b>Scheduler:</b></dt>
2336+
* <dd>{@code onErrorReturn} does not operate by default on a particular {@link Scheduler}.</dd>
2337+
* </dl>
2338+
*
2339+
* @param <T> the item type to return on error
2340+
* @param itemSupplier
2341+
* a function that returns a single value that will be emitted as success value
2342+
* the current {@code Completable} signals an {@code onError} event
2343+
* @return the new {@link Maybe} instance
2344+
* @throws NullPointerException if {@code itemSupplier} is {@code null}
2345+
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
2346+
* @since 3.0.0
2347+
*/
2348+
@CheckReturnValue
2349+
@NonNull
2350+
@SchedulerSupport(SchedulerSupport.NONE)
2351+
public final <T> Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? extends T> itemSupplier) {
2352+
Objects.requireNonNull(itemSupplier, "itemSupplier is null");
2353+
return RxJavaPlugins.onAssembly(new CompletableOnErrorReturn<>(this, itemSupplier));
2354+
}
2355+
2356+
/**
2357+
* Ends the flow with the given success item when the current {@code Completable}
2358+
* fails instead of signaling the error via {@code onError}.
2359+
* <p>
2360+
* <img width="640" height="567" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.onErrorReturnItem.png" alt="">
2361+
* <p>
2362+
* You can use this to prevent errors from propagating or to supply fallback data should errors be
2363+
* encountered.
2364+
* <dl>
2365+
* <dt><b>Scheduler:</b></dt>
2366+
* <dd>{@code onErrorReturnItem} does not operate by default on a particular {@link Scheduler}.</dd>
2367+
* </dl>
2368+
*
2369+
* @param <T> the item type to return on error
2370+
* @param item
2371+
* the value that is emitted as {@code onSuccess} in case the current {@code Completable} signals an {@code onError}
2372+
* @return the new {@link Maybe} instance
2373+
* @throws NullPointerException if {@code item} is {@code null}
2374+
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
2375+
*/
2376+
@CheckReturnValue
2377+
@NonNull
2378+
@SchedulerSupport(SchedulerSupport.NONE)
2379+
public final <T> Maybe<T> onErrorReturnItem(@NonNull T item) {
2380+
Objects.requireNonNull(item, "item is null");
2381+
return onErrorReturn(Functions.justFunction(item));
2382+
}
2383+
23262384
/**
23272385
* Nulls out references to the upstream producer and downstream {@link CompletableObserver} if
23282386
* the sequence is terminated or downstream calls {@code dispose()}.

src/main/java/io/reactivex/rxjava3/core/Maybe.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4467,7 +4467,7 @@ public final Maybe<T> onErrorResumeNext(@NonNull Function<? super Throwable, ? e
44674467
* Ends the flow with a success item returned by a function for the {@link Throwable} error signaled by the current
44684468
* {@code Maybe} instead of signaling the error via {@code onError}.
44694469
* <p>
4470-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
4470+
* <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturn.png" alt="">
44714471
* <p>
44724472
* You can use this to prevent errors from propagating or to supply fallback data should errors be
44734473
* encountered.
@@ -4494,7 +4494,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
44944494
/**
44954495
* Ends the flow with the given success item when the current {@code Maybe} fails instead of signaling the error via {@code onError}.
44964496
* <p>
4497-
* <img width="640" height="310" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/onErrorReturn.png" alt="">
4497+
* <img width="640" height="377" src="https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Maybe.onErrorReturnItem.png" alt="">
44984498
* <p>
44994499
* You can use this to prevent errors from propagating or to supply fallback data should errors be
45004500
* encountered.
@@ -4504,7 +4504,7 @@ public final Maybe<T> onErrorReturn(@NonNull Function<? super Throwable, ? exten
45044504
* </dl>
45054505
*
45064506
* @param item
4507-
* the value that is emitted as {@code onSuccess} in case this {@code Maybe} signals an {@code onError}
4507+
* the value that is emitted as {@code onSuccess} in case the current {@code Maybe} signals an {@code onError}
45084508
* @return the new {@code Maybe} instance
45094509
* @throws NullPointerException if {@code item} is {@code null}
45104510
* @see <a href="http://reactivex.io/documentation/operators/catch.html">ReactiveX operators documentation: Catch</a>
Lines changed: 99 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
/**
2+
* Copyright (c) 2016-present, RxJava Contributors.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in
5+
* compliance with the License. You may obtain a copy of the License at
6+
*
7+
* http://www.apache.org/licenses/LICENSE-2.0
8+
*
9+
* Unless required by applicable law or agreed to in writing, software distributed under the License is
10+
* distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See
11+
* the License for the specific language governing permissions and limitations under the License.
12+
*/
13+
14+
package io.reactivex.rxjava3.internal.operators.completable;
15+
16+
import io.reactivex.rxjava3.core.*;
17+
import io.reactivex.rxjava3.disposables.Disposable;
18+
import io.reactivex.rxjava3.exceptions.*;
19+
import io.reactivex.rxjava3.functions.Function;
20+
import io.reactivex.rxjava3.internal.disposables.DisposableHelper;
21+
22+
import java.util.Objects;
23+
24+
/**
25+
* Returns a value generated via a function if the main source signals an onError.
26+
* @param <T> the value type
27+
* @since 3.0.0
28+
*/
29+
public final class CompletableOnErrorReturn<T> extends Maybe<T> {
30+
31+
final CompletableSource source;
32+
33+
final Function<? super Throwable, ? extends T> valueSupplier;
34+
35+
public CompletableOnErrorReturn(CompletableSource source,
36+
Function<? super Throwable, ? extends T> valueSupplier) {
37+
this.source = source;
38+
this.valueSupplier = valueSupplier;
39+
}
40+
41+
@Override
42+
protected void subscribeActual(MaybeObserver<? super T> observer) {
43+
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
44+
}
45+
46+
static final class OnErrorReturnMaybeObserver<T> implements CompletableObserver, Disposable {
47+
48+
final MaybeObserver<? super T> downstream;
49+
50+
final Function<? super Throwable, ? extends T> itemSupplier;
51+
52+
Disposable upstream;
53+
54+
OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
55+
Function<? super Throwable, ? extends T> itemSupplier) {
56+
this.downstream = actual;
57+
this.itemSupplier = itemSupplier;
58+
}
59+
60+
@Override
61+
public void dispose() {
62+
upstream.dispose();
63+
}
64+
65+
@Override
66+
public boolean isDisposed() {
67+
return upstream.isDisposed();
68+
}
69+
70+
@Override
71+
public void onSubscribe(Disposable d) {
72+
if (DisposableHelper.validate(this.upstream, d)) {
73+
this.upstream = d;
74+
75+
downstream.onSubscribe(this);
76+
}
77+
}
78+
79+
@Override
80+
public void onError(Throwable e) {
81+
T v;
82+
83+
try {
84+
v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
85+
} catch (Throwable ex) {
86+
Exceptions.throwIfFatal(ex);
87+
downstream.onError(new CompositeException(e, ex));
88+
return;
89+
}
90+
91+
downstream.onSuccess(v);
92+
}
93+
94+
@Override
95+
public void onComplete() {
96+
downstream.onComplete();
97+
}
98+
}
99+
}

src/main/java/io/reactivex/rxjava3/internal/operators/maybe/MaybeOnErrorReturn.java

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,31 +27,31 @@
2727
*/
2828
public final class MaybeOnErrorReturn<T> extends AbstractMaybeWithUpstream<T, T> {
2929

30-
final Function<? super Throwable, ? extends T> valueSupplier;
30+
final Function<? super Throwable, ? extends T> itemSupplier;
3131

3232
public MaybeOnErrorReturn(MaybeSource<T> source,
33-
Function<? super Throwable, ? extends T> valueSupplier) {
33+
Function<? super Throwable, ? extends T> itemSupplier) {
3434
super(source);
35-
this.valueSupplier = valueSupplier;
35+
this.itemSupplier = itemSupplier;
3636
}
3737

3838
@Override
3939
protected void subscribeActual(MaybeObserver<? super T> observer) {
40-
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, valueSupplier));
40+
source.subscribe(new OnErrorReturnMaybeObserver<>(observer, itemSupplier));
4141
}
4242

4343
static final class OnErrorReturnMaybeObserver<T> implements MaybeObserver<T>, Disposable {
4444

4545
final MaybeObserver<? super T> downstream;
4646

47-
final Function<? super Throwable, ? extends T> valueSupplier;
47+
final Function<? super Throwable, ? extends T> itemSupplier;
4848

4949
Disposable upstream;
5050

5151
OnErrorReturnMaybeObserver(MaybeObserver<? super T> actual,
5252
Function<? super Throwable, ? extends T> valueSupplier) {
5353
this.downstream = actual;
54-
this.valueSupplier = valueSupplier;
54+
this.itemSupplier = valueSupplier;
5555
}
5656

5757
@Override
@@ -83,7 +83,7 @@ public void onError(Throwable e) {
8383
T v;
8484

8585
try {
86-
v = Objects.requireNonNull(valueSupplier.apply(e), "The valueSupplier returned a null value");
86+
v = Objects.requireNonNull(itemSupplier.apply(e), "The itemSupplier returned a null value");
8787
} catch (Throwable ex) {
8888
Exceptions.throwIfFatal(ex);
8989
downstream.onError(new CompositeException(e, ex));

src/test/java/io/reactivex/rxjava3/internal/operators/completable/CompletableOnErrorXTest.java

Lines changed: 57 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,16 @@
1515

1616
import static org.junit.Assert.assertEquals;
1717

18+
import java.io.IOException;
19+
1820
import org.junit.Test;
1921

2022
import io.reactivex.rxjava3.core.*;
23+
import io.reactivex.rxjava3.exceptions.TestException;
2124
import io.reactivex.rxjava3.functions.Function;
25+
import io.reactivex.rxjava3.internal.functions.Functions;
26+
import io.reactivex.rxjava3.subjects.CompletableSubject;
27+
import io.reactivex.rxjava3.testsupport.TestHelper;
2228

2329
public class CompletableOnErrorXTest extends RxJavaTest {
2430

@@ -46,4 +52,55 @@ public CompletableSource apply(Throwable e) throws Exception {
4652

4753
assertEquals(0, call[0]);
4854
}
55+
56+
@Test
57+
public void onErrorReturnConst() {
58+
Completable.error(new TestException())
59+
.onErrorReturnItem(1)
60+
.test()
61+
.assertResult(1);
62+
}
63+
64+
@Test
65+
public void onErrorReturn() {
66+
Completable.error(new TestException())
67+
.onErrorReturn(Functions.justFunction(1))
68+
.test()
69+
.assertResult(1);
70+
}
71+
72+
@Test
73+
public void onErrorReturnFunctionThrows() {
74+
TestHelper.assertCompositeExceptions(Completable.error(new TestException())
75+
.onErrorReturn(new Function<Throwable, Object>() {
76+
@Override
77+
public Object apply(Throwable v) throws Exception {
78+
throw new IOException();
79+
}
80+
})
81+
.to(TestHelper.testConsumer()), TestException.class, IOException.class);
82+
}
83+
84+
@Test
85+
public void onErrorReturnEmpty() {
86+
Completable.complete()
87+
.onErrorReturnItem(2)
88+
.test()
89+
.assertResult();
90+
}
91+
92+
@Test
93+
public void onErrorReturnDispose() {
94+
TestHelper.checkDisposed(CompletableSubject.create().onErrorReturnItem(1));
95+
}
96+
97+
@Test
98+
public void onErrorReturnDoubleOnSubscribe() {
99+
TestHelper.checkDoubleOnSubscribeCompletableToMaybe(new Function<Completable, MaybeSource<Object>>() {
100+
@Override
101+
public MaybeSource<Object> apply(Completable v) throws Exception {
102+
return v.onErrorReturnItem(1);
103+
}
104+
});
105+
}
49106
}

0 commit comments

Comments
 (0)