From d7b5fc2d5efbb527be90c9afc17498ad5cd18244 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:16:29 -0800 Subject: [PATCH 01/16] Implement Observable.as() --- src/main/java/io/reactivex/Observable.java | 25 ++++++++++++++ .../io/reactivex/ObservableConverter.java | 34 +++++++++++++++++++ .../observable/ObservableNullTests.java | 5 +++ .../reactivex/observable/ObservableTest.java | 28 +++++++++++++++ 4 files changed, 92 insertions(+) create mode 100644 src/main/java/io/reactivex/ObservableConverter.java diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index a124159e07..cd507b7361 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4800,6 +4800,31 @@ public final Single any(Predicate predicate) { return RxJavaPlugins.onAssembly(new ObservableAnySingle(this, predicate)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Observable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(ObservableConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Returns the first item emitted by this {@code Observable}, or throws * {@code NoSuchElementException} if it emits no items. diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java new file mode 100644 index 0000000000..bdee0a9373 --- /dev/null +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.NonNull; + +/** + * Convenience interface and callback used by the {@link Observable#as} operator to turn an Observable into another + * value fluently. + * + * @param the upstream type + * @param the output type + */ +public interface ObservableConverter { + /** + * Applies a function to the upstream Observable and returns a converted value of type . + * + * @param upstream the upstream Observable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Observable upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/observable/ObservableNullTests.java b/src/test/java/io/reactivex/observable/ObservableNullTests.java index 7f58c9a057..8816bcfe10 100644 --- a/src/test/java/io/reactivex/observable/ObservableNullTests.java +++ b/src/test/java/io/reactivex/observable/ObservableNullTests.java @@ -2408,6 +2408,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void toListNull() { just1.toList(null); diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 24653d1ff3..30dfd0c398 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1165,6 +1165,34 @@ public Object apply(Observable onSubscribe) { }); } + @Test + public void testAsExtend() { + final TestObserver subscriber = new TestObserver(); + final Object value = new Object(); + Observable.just(value).as(new ObservableConverter() { + @Override + public Object apply(Observable onSubscribe) { + onSubscribe.subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertComplete(); + subscriber.assertValue(value); + return subscriber.values().get(0); + } + }); + } + + @Test + public void as() { + Observable.just(1).as(new ObservableConverter>() { + @Override + public Flowable apply(Observable v) throws Exception { + return v.toFlowable(BackpressureStrategy.MISSING); + } + }) + .test() + .assertResult(1); + } + @Test public void testFlatMap() { List list = Observable.range(1, 5).flatMap(new Function>() { From eb276a61e75645973e18af12f6e52c822a3050ff Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:19:47 -0800 Subject: [PATCH 02/16] Implement Single.as() --- src/main/java/io/reactivex/Single.java | 25 ++++++++++++++ .../java/io/reactivex/SingleConverter.java | 34 +++++++++++++++++++ .../io/reactivex/single/SingleNullTests.java | 5 +++ .../java/io/reactivex/single/SingleTest.java | 12 +++++++ 4 files changed, 76 insertions(+) create mode 100644 src/main/java/io/reactivex/SingleConverter.java diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 03b659a71a..c17b5773e2 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1522,6 +1522,31 @@ public final Single ambWith(SingleSource other) { return ambArray(this, other); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Single instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(SingleConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Hides the identity of the current Single, including the Disposable that is sent * to the downstream via {@code onSubscribe()}. diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java new file mode 100644 index 0000000000..383b501dfe --- /dev/null +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.NonNull; + +/** + * Convenience interface and callback used by the {@link Single#as} operator to turn an Single into another + * value fluently. + * + * @param the upstream type + * @param the output type + */ +public interface SingleConverter { + /** + * Applies a function to the upstream Single and returns a converted value of type . + * + * @param upstream the upstream Single instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Single upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/single/SingleNullTests.java b/src/test/java/io/reactivex/single/SingleNullTests.java index 405054243c..374d7ee313 100644 --- a/src/test/java/io/reactivex/single/SingleNullTests.java +++ b/src/test/java/io/reactivex/single/SingleNullTests.java @@ -844,6 +844,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void zipWithNull() { just1.zipWith(null, new BiFunction() { diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index 3fc650d276..d3f5334750 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -543,6 +543,18 @@ public Integer apply(Single v) throws Exception { }).intValue()); } + @Test + public void as() { + Single.just(1).as(new SingleConverter>() { + @Override + public Flowable apply(Single v) throws Exception { + return v.toFlowable(); + } + }) + .test() + .assertResult(1); + } + @Test(expected = NullPointerException.class) public void fromObservableNull() { Single.fromObservable(null); From a8470330fe2a09544d3667708ebe6f4921bd741b Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:23:40 -0800 Subject: [PATCH 03/16] Implement Maybe.as() --- src/main/java/io/reactivex/Maybe.java | 25 ++++++++++++++ .../java/io/reactivex/MaybeConverter.java | 34 +++++++++++++++++++ .../java/io/reactivex/maybe/MaybeTest.java | 17 ++++++++++ 3 files changed, 76 insertions(+) create mode 100644 src/main/java/io/reactivex/MaybeConverter.java diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 13922ed8d3..d2e6e0dbb2 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1989,6 +1989,31 @@ public final Maybe ambWith(MaybeSource other) { return ambArray(this, other); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Maybe instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(MaybeConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Waits in a blocking fashion until the current Maybe signals a success value (which is returned), * null if completed or an exception (which is propagated). diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java new file mode 100644 index 0000000000..b18ae39703 --- /dev/null +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.NonNull; + +/** + * Convenience interface and callback used by the {@link Maybe#as} operator to turn a Maybe into another + * value fluently. + * + * @param the upstream type + * @param the output type + */ +public interface MaybeConverter { + /** + * Applies a function to the upstream Maybe and returns a converted value of type . + * + * @param upstream the upstream Maybe instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Maybe upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index e715b37c1e..945648a010 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -381,11 +381,28 @@ public Flowable apply(Maybe v) throws Exception { .assertResult(1); } + @Test + public void as() { + Maybe.just(1).as(new MaybeConverter>() { + @Override + public Flowable apply(Maybe v) throws Exception { + return v.toFlowable(); + } + }) + .test() + .assertResult(1); + } + @Test(expected = NullPointerException.class) public void toNull() { Maybe.just(1).to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + Maybe.just(1).as(null); + } + @Test public void compose() { Maybe.just(1).compose(new MaybeTransformer() { From d82c4ee0ba88c6684eda29653346c73ce2810326 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:28:49 -0800 Subject: [PATCH 04/16] Implement Flowable.as() --- src/main/java/io/reactivex/Flowable.java | 28 +++++++++++++++ .../java/io/reactivex/FlowableConverter.java | 34 +++++++++++++++++++ .../reactivex/flowable/FlowableNullTests.java | 5 +++ .../io/reactivex/flowable/FlowableTests.java | 29 ++++++++++++++++ 4 files changed, 96 insertions(+) create mode 100644 src/main/java/io/reactivex/FlowableConverter.java diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 7bc45def95..943f13dc20 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5237,6 +5237,34 @@ public final Single any(Predicate predicate) { return RxJavaPlugins.onAssembly(new FlowableAnySingle(this, predicate)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Backpressure:
+ *
The backpressure behavior depends on what happens in the {@code converter} function.
+ *
Scheduler:
+ *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Flowable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + */ + @CheckReturnValue + @BackpressureSupport(BackpressureKind.SPECIAL) + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(FlowableConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Returns the first item emitted by this {@code Flowable}, or throws * {@code NoSuchElementException} if it emits no items. diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java new file mode 100644 index 0000000000..7c42a39c67 --- /dev/null +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -0,0 +1,34 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.NonNull; + +/** + * Convenience interface and callback used by the {@link Flowable#as} operator to turn a Flowable into another + * value fluently. + * + * @param the upstream type + * @param the output type + */ +public interface FlowableConverter { + /** + * Applies a function to the upstream Flowable and returns a converted value of type . + * + * @param upstream the upstream Flowable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Flowable upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/flowable/FlowableNullTests.java b/src/test/java/io/reactivex/flowable/FlowableNullTests.java index 72fc7289d6..fdba369d99 100644 --- a/src/test/java/io/reactivex/flowable/FlowableNullTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableNullTests.java @@ -2351,6 +2351,11 @@ public void toNull() { just1.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + just1.as(null); + } + @Test(expected = NullPointerException.class) public void toListNull() { just1.toList(null); diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 29e0d6974a..3042e22d9d 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -20,6 +20,7 @@ import java.util.concurrent.*; import java.util.concurrent.atomic.*; +import io.reactivex.Observable; import org.junit.*; import org.mockito.InOrder; import org.reactivestreams.*; @@ -1125,6 +1126,34 @@ public Object apply(Flowable onSubscribe) { }); } + @Test + public void testAsExtend() { + final TestSubscriber subscriber = new TestSubscriber(); + final Object value = new Object(); + Flowable.just(value).as(new FlowableConverter() { + @Override + public Object apply(Flowable onSubscribe) { + onSubscribe.subscribe(subscriber); + subscriber.assertNoErrors(); + subscriber.assertComplete(); + subscriber.assertValue(value); + return subscriber.values().get(0); + } + }); + } + + @Test + public void as() { + Flowable.just(1).as(new FlowableConverter>() { + @Override + public Observable apply(Flowable v) throws Exception { + return v.toObservable(); + } + }) + .test() + .assertResult(1); + } + @Test public void toObservableEmpty() { Flowable.empty().toObservable().test().assertResult(); From 9bebf29aea26dfcda1b26a5f8827e2614369bf2d Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:32:59 -0800 Subject: [PATCH 05/16] Implement Completable.as() --- src/main/java/io/reactivex/Completable.java | 25 ++++++++++++++ .../io/reactivex/CompletableConverter.java | 33 +++++++++++++++++++ .../completable/CompletableTest.java | 32 ++++++++++++++++++ 3 files changed, 90 insertions(+) create mode 100644 src/main/java/io/reactivex/CompletableConverter.java diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 3c3c4040ca..1937c93d2b 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -908,6 +908,31 @@ public final Completable andThen(CompletableSource next) { return concatWith(next); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + *

+ *
Scheduler:
+ *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
+ * + * @param the resulting object type + * @param converter the function that receives the current Completable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + */ + @CheckReturnValue + @SchedulerSupport(SchedulerSupport.NONE) + public final R as(CompletableConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Subscribes to and awaits the termination of this Completable instance in a blocking manner and * rethrows any exception emitted. diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java new file mode 100644 index 0000000000..b4a4ba0d47 --- /dev/null +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -0,0 +1,33 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex; + +import io.reactivex.annotations.NonNull; + +/** + * Convenience interface and callback used by the {@link Completable#as} operator to turn a Completable into another + * value fluently. + * + * @param the output type + */ +public interface CompletableConverter { + /** + * Applies a function to the upstream Completable and returns a converted value of type . + * + * @param upstream the upstream Completable instance + * @return the converted value + */ + @NonNull + R apply(@NonNull Completable upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index a764d682f1..8db90b885b 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2796,11 +2796,43 @@ public void accept(Object e) { } }); } + @Test(timeout = 5000) + public void asNormal() { + Flowable flow = normal.completable.as(new CompletableConverter>() { + @Override + public Flowable apply(Completable c) { + return c.toFlowable(); + } + }); + + flow.blockingForEach(new Consumer() { + @Override + public void accept(Object e) { } + }); + } + + @Test + public void as() { + Completable.complete().as(new CompletableConverter>() { + @Override + public Flowable apply(Completable v) throws Exception { + return v.toFlowable(); + } + }) + .test() + .assertComplete(); + } + @Test(expected = NullPointerException.class) public void toNull() { normal.completable.to(null); } + @Test(expected = NullPointerException.class) + public void asNull() { + normal.completable.as(null); + } + @Test(timeout = 5000) public void toFlowableNormal() { normal.completable.toFlowable().blockingForEach(Functions.emptyConsumer()); From 1aa5208ffe22e6859074352b4e87dffc6e01ef40 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:44:11 -0800 Subject: [PATCH 06/16] Add Experimental annotations --- src/main/java/io/reactivex/Completable.java | 1 + src/main/java/io/reactivex/CompletableConverter.java | 3 ++- src/main/java/io/reactivex/Flowable.java | 1 + src/main/java/io/reactivex/FlowableConverter.java | 3 ++- src/main/java/io/reactivex/Maybe.java | 1 + src/main/java/io/reactivex/MaybeConverter.java | 3 ++- src/main/java/io/reactivex/Observable.java | 1 + src/main/java/io/reactivex/ObservableConverter.java | 3 ++- src/main/java/io/reactivex/Single.java | 1 + src/main/java/io/reactivex/SingleConverter.java | 3 ++- 10 files changed, 15 insertions(+), 5 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 1937c93d2b..93bc7d2cd0 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -922,6 +922,7 @@ public final Completable andThen(CompletableSource next) { * @return the converted value * @throws NullPointerException if converter is null */ + @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(CompletableConverter converter) { diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java index b4a4ba0d47..e283723410 100644 --- a/src/main/java/io/reactivex/CompletableConverter.java +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; /** * Convenience interface and callback used by the {@link Completable#as} operator to turn a Completable into another @@ -21,6 +21,7 @@ * * @param the output type */ +@Experimental public interface CompletableConverter { /** * Applies a function to the upstream Completable and returns a converted value of type . diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 943f13dc20..36f7ce6f5a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5253,6 +5253,7 @@ public final Single any(Predicate predicate) { * @return the converted value * @throws NullPointerException if converter is null */ + @Experimental @CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java index 7c42a39c67..3d273e6145 100644 --- a/src/main/java/io/reactivex/FlowableConverter.java +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; /** * Convenience interface and callback used by the {@link Flowable#as} operator to turn a Flowable into another @@ -22,6 +22,7 @@ * @param the upstream type * @param the output type */ +@Experimental public interface FlowableConverter { /** * Applies a function to the upstream Flowable and returns a converted value of type . diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index d2e6e0dbb2..80a103caf9 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2003,6 +2003,7 @@ public final Maybe ambWith(MaybeSource other) { * @return the converted value * @throws NullPointerException if converter is null */ + @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(MaybeConverter converter) { diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java index b18ae39703..b202b87353 100644 --- a/src/main/java/io/reactivex/MaybeConverter.java +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; /** * Convenience interface and callback used by the {@link Maybe#as} operator to turn a Maybe into another @@ -22,6 +22,7 @@ * @param the upstream type * @param the output type */ +@Experimental public interface MaybeConverter { /** * Applies a function to the upstream Maybe and returns a converted value of type . diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index cd507b7361..1e8d18dda2 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4814,6 +4814,7 @@ public final Single any(Predicate predicate) { * @return the converted value * @throws NullPointerException if converter is null */ + @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(ObservableConverter converter) { diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java index bdee0a9373..84e00e71b1 100644 --- a/src/main/java/io/reactivex/ObservableConverter.java +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; /** * Convenience interface and callback used by the {@link Observable#as} operator to turn an Observable into another @@ -22,6 +22,7 @@ * @param the upstream type * @param the output type */ +@Experimental public interface ObservableConverter { /** * Applies a function to the upstream Observable and returns a converted value of type . diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index c17b5773e2..09e5a8b302 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1536,6 +1536,7 @@ public final Single ambWith(SingleSource other) { * @return the converted value * @throws NullPointerException if converter is null */ + @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(SingleConverter converter) { diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java index 383b501dfe..fce5c3830e 100644 --- a/src/main/java/io/reactivex/SingleConverter.java +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -13,7 +13,7 @@ package io.reactivex; -import io.reactivex.annotations.NonNull; +import io.reactivex.annotations.*; /** * Convenience interface and callback used by the {@link Single#as} operator to turn an Single into another @@ -22,6 +22,7 @@ * @param the upstream type * @param the output type */ +@Experimental public interface SingleConverter { /** * Applies a function to the upstream Single and returns a converted value of type . From 68e2109fd8f7e0d628579d59fd4b302bdca9ab1a Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 02:46:33 -0800 Subject: [PATCH 07/16] Add throws doc --- src/main/java/io/reactivex/CompletableConverter.java | 1 + src/main/java/io/reactivex/FlowableConverter.java | 1 + src/main/java/io/reactivex/MaybeConverter.java | 1 + src/main/java/io/reactivex/ObservableConverter.java | 1 + src/main/java/io/reactivex/SingleConverter.java | 1 + 5 files changed, 5 insertions(+) diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java index e283723410..e72584a3e4 100644 --- a/src/main/java/io/reactivex/CompletableConverter.java +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -28,6 +28,7 @@ public interface CompletableConverter { * * @param upstream the upstream Completable instance * @return the converted value + * @throws Exception on error */ @NonNull R apply(@NonNull Completable upstream) throws Exception; diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java index 3d273e6145..8b8051ae4d 100644 --- a/src/main/java/io/reactivex/FlowableConverter.java +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -29,6 +29,7 @@ public interface FlowableConverter { * * @param upstream the upstream Flowable instance * @return the converted value + * @throws Exception on error */ @NonNull R apply(@NonNull Flowable upstream) throws Exception; diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java index b202b87353..9026025fde 100644 --- a/src/main/java/io/reactivex/MaybeConverter.java +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -29,6 +29,7 @@ public interface MaybeConverter { * * @param upstream the upstream Maybe instance * @return the converted value + * @throws Exception on error */ @NonNull R apply(@NonNull Maybe upstream) throws Exception; diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java index 84e00e71b1..ff10564ec5 100644 --- a/src/main/java/io/reactivex/ObservableConverter.java +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -29,6 +29,7 @@ public interface ObservableConverter { * * @param upstream the upstream Observable instance * @return the converted value + * @throws Exception on error */ @NonNull R apply(@NonNull Observable upstream) throws Exception; diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java index fce5c3830e..42a02e8c69 100644 --- a/src/main/java/io/reactivex/SingleConverter.java +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -29,6 +29,7 @@ public interface SingleConverter { * * @param upstream the upstream Single instance * @return the converted value + * @throws Exception on error */ @NonNull R apply(@NonNull Single upstream) throws Exception; From 8b67bbf0b9ab2d1d7bbb805f3e9c198113b6a394 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 03:02:27 -0800 Subject: [PATCH 08/16] Fix docs and validation errors --- src/main/java/io/reactivex/Completable.java | 4 ++-- src/main/java/io/reactivex/CompletableConverter.java | 2 +- src/main/java/io/reactivex/Flowable.java | 4 ++-- src/main/java/io/reactivex/FlowableConverter.java | 2 +- src/main/java/io/reactivex/Maybe.java | 4 ++-- src/main/java/io/reactivex/MaybeConverter.java | 2 +- src/main/java/io/reactivex/Observable.java | 4 ++-- src/main/java/io/reactivex/ObservableConverter.java | 2 +- src/main/java/io/reactivex/Single.java | 4 ++-- src/main/java/io/reactivex/SingleConverter.java | 4 ++-- 10 files changed, 16 insertions(+), 16 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 93bc7d2cd0..c2c1f625a2 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -914,7 +914,7 @@ public final Completable andThen(CompletableSource next) { * This allows fluent conversion to any other type. *
*
Scheduler:
- *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
*
* * @param the resulting object type @@ -925,7 +925,7 @@ public final Completable andThen(CompletableSource next) { @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - public final R as(CompletableConverter converter) { + public final R as(@NonNull CompletableConverter converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java index e72584a3e4..af03b4e9b9 100644 --- a/src/main/java/io/reactivex/CompletableConverter.java +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -24,7 +24,7 @@ @Experimental public interface CompletableConverter { /** - * Applies a function to the upstream Completable and returns a converted value of type . + * Applies a function to the upstream Completable and returns a converted value of type {@code R}. * * @param upstream the upstream Completable instance * @return the converted value diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 36f7ce6f5a..0f3ed1921a 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5245,7 +5245,7 @@ public final Single any(Predicate predicate) { *
Backpressure:
*
The backpressure behavior depends on what happens in the {@code converter} function.
*
Scheduler:
- *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
* * * @param the resulting object type @@ -5257,7 +5257,7 @@ public final Single any(Predicate predicate) { @CheckReturnValue @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) - public final R as(FlowableConverter converter) { + public final R as(@NonNull FlowableConverter converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java index 8b8051ae4d..7a601cc8d7 100644 --- a/src/main/java/io/reactivex/FlowableConverter.java +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -25,7 +25,7 @@ @Experimental public interface FlowableConverter { /** - * Applies a function to the upstream Flowable and returns a converted value of type . + * Applies a function to the upstream Flowable and returns a converted value of type {@code R}. * * @param upstream the upstream Flowable instance * @return the converted value diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index 80a103caf9..e3d2c8d420 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -1995,7 +1995,7 @@ public final Maybe ambWith(MaybeSource other) { * This allows fluent conversion to any other type. *
*
Scheduler:
- *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
*
* * @param the resulting object type @@ -2006,7 +2006,7 @@ public final Maybe ambWith(MaybeSource other) { @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - public final R as(MaybeConverter converter) { + public final R as(@NonNull MaybeConverter converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java index 9026025fde..0b6f33c385 100644 --- a/src/main/java/io/reactivex/MaybeConverter.java +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -25,7 +25,7 @@ @Experimental public interface MaybeConverter { /** - * Applies a function to the upstream Maybe and returns a converted value of type . + * Applies a function to the upstream Maybe and returns a converted value of type {@code R}. * * @param upstream the upstream Maybe instance * @return the converted value diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 1e8d18dda2..31dd33d27b 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4806,7 +4806,7 @@ public final Single any(Predicate predicate) { * This allows fluent conversion to any other type. *
*
Scheduler:
- *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
*
* * @param the resulting object type @@ -4817,7 +4817,7 @@ public final Single any(Predicate predicate) { @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - public final R as(ObservableConverter converter) { + public final R as(@NonNull ObservableConverter converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java index ff10564ec5..fbbe74308b 100644 --- a/src/main/java/io/reactivex/ObservableConverter.java +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -25,7 +25,7 @@ @Experimental public interface ObservableConverter { /** - * Applies a function to the upstream Observable and returns a converted value of type . + * Applies a function to the upstream Observable and returns a converted value of type {@code R}. * * @param upstream the upstream Observable instance * @return the converted value diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 09e5a8b302..f181705246 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1528,7 +1528,7 @@ public final Single ambWith(SingleSource other) { * This allows fluent conversion to any other type. *
*
Scheduler:
- *
{@code to} does not operate by default on a particular {@link Scheduler}.
+ *
{@code as} does not operate by default on a particular {@link Scheduler}.
*
* * @param the resulting object type @@ -1539,7 +1539,7 @@ public final Single ambWith(SingleSource other) { @Experimental @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) - public final R as(SingleConverter converter) { + public final R as(@NonNull SingleConverter converter) { try { return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } catch (Throwable ex) { diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java index 42a02e8c69..bce9d7e6fe 100644 --- a/src/main/java/io/reactivex/SingleConverter.java +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -16,7 +16,7 @@ import io.reactivex.annotations.*; /** - * Convenience interface and callback used by the {@link Single#as} operator to turn an Single into another + * Convenience interface and callback used by the {@link Single#as} operator to turn a Single into another * value fluently. * * @param the upstream type @@ -25,7 +25,7 @@ @Experimental public interface SingleConverter { /** - * Applies a function to the upstream Single and returns a converted value of type . + * Applies a function to the upstream Single and returns a converted value of type {@code R}. * * @param upstream the upstream Single instance * @return the converted value From 1914776f2e832c24d5c7c611bec76ca34af73334 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 04:08:05 -0800 Subject: [PATCH 09/16] Add @since 2.1.7 - experimental --- src/main/java/io/reactivex/Completable.java | 1 + src/main/java/io/reactivex/CompletableConverter.java | 1 + src/main/java/io/reactivex/Flowable.java | 1 + src/main/java/io/reactivex/FlowableConverter.java | 1 + src/main/java/io/reactivex/Maybe.java | 1 + src/main/java/io/reactivex/MaybeConverter.java | 1 + src/main/java/io/reactivex/Observable.java | 1 + src/main/java/io/reactivex/ObservableConverter.java | 1 + src/main/java/io/reactivex/Single.java | 1 + src/main/java/io/reactivex/SingleConverter.java | 1 + 10 files changed, 10 insertions(+) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index c2c1f625a2..181062736d 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -921,6 +921,7 @@ public final Completable andThen(CompletableSource next) { * @param converter the function that receives the current Completable instance and returns a value * @return the converted value * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java index af03b4e9b9..73a99bdc7b 100644 --- a/src/main/java/io/reactivex/CompletableConverter.java +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -20,6 +20,7 @@ * value fluently. * * @param the output type + * @since 2.1.7 - experimental */ @Experimental public interface CompletableConverter { diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 0f3ed1921a..9345ce680b 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5252,6 +5252,7 @@ public final Single any(Predicate predicate) { * @param converter the function that receives the current Flowable instance and returns a value * @return the converted value * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java index 7a601cc8d7..cc2691bae2 100644 --- a/src/main/java/io/reactivex/FlowableConverter.java +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -21,6 +21,7 @@ * * @param the upstream type * @param the output type + * @since 2.1.7 - experimental */ @Experimental public interface FlowableConverter { diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index e3d2c8d420..e94651e4da 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2002,6 +2002,7 @@ public final Maybe ambWith(MaybeSource other) { * @param converter the function that receives the current Maybe instance and returns a value * @return the converted value * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java index 0b6f33c385..4cc6419451 100644 --- a/src/main/java/io/reactivex/MaybeConverter.java +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -21,6 +21,7 @@ * * @param the upstream type * @param the output type + * @since 2.1.7 - experimental */ @Experimental public interface MaybeConverter { diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index 31dd33d27b..bfa2b5a882 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4813,6 +4813,7 @@ public final Single any(Predicate predicate) { * @param converter the function that receives the current Observable instance and returns a value * @return the converted value * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java index fbbe74308b..2b5f917a5f 100644 --- a/src/main/java/io/reactivex/ObservableConverter.java +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -21,6 +21,7 @@ * * @param the upstream type * @param the output type + * @since 2.1.7 - experimental */ @Experimental public interface ObservableConverter { diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index f181705246..5245dacf54 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1535,6 +1535,7 @@ public final Single ambWith(SingleSource other) { * @param converter the function that receives the current Single instance and returns a value * @return the converted value * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental */ @Experimental @CheckReturnValue diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java index bce9d7e6fe..68c1207a0b 100644 --- a/src/main/java/io/reactivex/SingleConverter.java +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -21,6 +21,7 @@ * * @param the upstream type * @param the output type + * @since 2.1.7 - experimental */ @Experimental public interface SingleConverter { From 8adb583c2d945eee7ed1156fe3211a287d01e103 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 04:14:41 -0800 Subject: [PATCH 10/16] ParallelFlowable.as() --- .../reactivex/parallel/ParallelFlowable.java | 23 ++++++++++++ .../parallel/ParallelFlowableConverter.java | 37 +++++++++++++++++++ .../parallel/ParallelFlowableTest.java | 26 +++++++++++++ 3 files changed, 86 insertions(+) create mode 100644 src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index b1f6d60322..9c1ec8d28b 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -122,6 +122,29 @@ public static ParallelFlowable from(@NonNull Publisher sourc return RxJavaPlugins.onAssembly(new ParallelFromPublisher(source, parallelism, prefetch)); } + /** + * Calls the specified converter function during assembly time and returns its resulting value. + *

+ * This allows fluent conversion to any other type. + * + * @param the resulting object type + * @param converter the function that receives the current ParallelFlowable instance and returns a value + * @return the converted value + * @throws NullPointerException if converter is null + * @since 2.1.7 - experimental + */ + @Experimental + @CheckReturnValue + @NonNull + public final R as(@NonNull ParallelFlowableConverter converter) { + try { + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); + } catch (Throwable ex) { + Exceptions.throwIfFatal(ex); + throw ExceptionHelper.wrapOrThrow(ex); + } + } + /** * Maps the source values on each 'rail' to another value. *

diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java new file mode 100644 index 0000000000..86f1eaea23 --- /dev/null +++ b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java @@ -0,0 +1,37 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + +package io.reactivex.parallel; + +import io.reactivex.annotations.*; + +/** + * Convenience interface and callback used by the {@link ParallelFlowable#as} operator to turn a ParallelFlowable into + * another value fluently. + * + * @param the upstream type + * @param the output type + * @since 2.1.7 - experimental + */ +@Experimental +public interface ParallelFlowableConverter { + /** + * Applies a function to the upstream ParallelFlowable and returns a converted value of type {@code R}. + * + * @param upstream the upstream ParallelFlowable instance + * @return the converted value + * @throws Exception on error + */ + @NonNull + R apply(@NonNull ParallelFlowable upstream) throws Exception; +} diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index 4cda73adbc..f0b903af2c 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -1100,6 +1100,20 @@ public Flowable apply(ParallelFlowable pf) throws Exception { .assertResult(1, 2, 3, 4, 5); } + @Test + public void as() { + Flowable.range(1, 5) + .parallel() + .as(new ParallelFlowableConverter>() { + @Override + public Flowable apply(ParallelFlowable pf) throws Exception { + return pf.sequential(); + } + }) + .test() + .assertResult(1, 2, 3, 4, 5); + } + @Test(expected = TestException.class) public void toThrows() { Flowable.range(1, 5) @@ -1112,6 +1126,18 @@ public Flowable apply(ParallelFlowable pf) throws Exception { }); } + @Test(expected = TestException.class) + public void asThrows() { + Flowable.range(1, 5) + .parallel() + .as(new ParallelFlowableConverter>() { + @Override + public Flowable apply(ParallelFlowable pf) throws Exception { + throw new TestException(); + } + }); + } + @Test public void compose() { Flowable.range(1, 5) From c1f26ee03c826ff8d6996669f0e2045d73dd4bfd Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 04:30:12 -0800 Subject: [PATCH 11/16] Start ConverterTest --- src/test/java/io/reactivex/ConverterTest.java | 218 ++++++++++++++++++ 1 file changed, 218 insertions(+) create mode 100644 src/test/java/io/reactivex/ConverterTest.java diff --git a/src/test/java/io/reactivex/ConverterTest.java b/src/test/java/io/reactivex/ConverterTest.java new file mode 100644 index 0000000000..ce6f0de34f --- /dev/null +++ b/src/test/java/io/reactivex/ConverterTest.java @@ -0,0 +1,218 @@ +package io.reactivex; + +import io.reactivex.exceptions.TestException; +import org.junit.Test; + +import static org.junit.Assert.*; + +public final class ConverterTest { + + @Test + public void flowableConverterThrows() { + try { + Flowable.just(1).as(new FlowableConverter() { + @Override + public Integer apply(Flowable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void observableConverterThrows() { + try { + Observable.just(1).as(new ObservableConverter() { + @Override + public Integer apply(Observable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void singleConverterThrows() { + try { + Single.just(1).as(new SingleConverter() { + @Override + public Integer apply(Single v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void maybeConverterThrows() { + try { + Maybe.just(1).as(new MaybeConverter() { + @Override + public Integer apply(Maybe v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + @Test + public void completableConverterThrows() { + try { + Completable.complete().as(new CompletableConverter() { + @Override + public Completable apply(Completable v) { + throw new TestException("Forced failure"); + } + }); + fail("Should have thrown!"); + } catch (TestException ex) { + assertEquals("Forced failure", ex.getMessage()); + } + } + + // Test demos for signature generics in compose() methods. Just needs to compile. + + @Test + public void observableGenericsSignatureTest() { + A a = new A() { }; + + Observable.just(a).as(ConverterTest.testObservableConverterCreator()); + } + + @Test + public void singleGenericsSignatureTest() { + A a = new A() { }; + + Single.just(a).as(ConverterTest.testSingleConverterCreator()); + } + + @Test + public void maybeGenericsSignatureTest() { + A a = new A() { }; + + Maybe.just(a).as(ConverterTest.testMaybeConverterCreator()); + } + + @Test + public void flowableGenericsSignatureTest() { + A a = new A() { }; + + Flowable.just(a).as(ConverterTest.testFlowableConverterCreator()); + } + + @Test + public void compositeTest() { + CompositeConverter converter = new CompositeConverter(); + + Flowable.just(1) + .as(converter) + .test() + .assertValue(1); + + Observable.just(1) + .as(converter) + .test() + .assertValue(1); + + Maybe.just(1) + .as(converter) + .test() + .assertValue(1); + + Single.just(1) + .as(converter) + .test() + .assertValue(1); + + Completable.complete() + .as(converter) + .test() + .assertComplete(); + } + + interface A { } + interface B { } + + private static ObservableConverter, B> testObservableConverterCreator() { + return new ObservableConverter, B>() { + @Override + public B apply(Observable> a) { + return new B() { + }; + } + }; + } + + private static SingleConverter, B> testSingleConverterCreator() { + return new SingleConverter, B>() { + @Override + public B apply(Single> a) { + return new B() { + }; + } + }; + } + + private static MaybeConverter, B> testMaybeConverterCreator() { + return new MaybeConverter, B>() { + @Override + public B apply(Maybe> a) { + return new B() { + }; + } + }; + } + + private static FlowableConverter, B> testFlowableConverterCreator() { + return new FlowableConverter, B>() { + @Override + public B apply(Flowable> a) { + return new B() { + }; + } + }; + } + + private static class CompositeConverter implements ObservableConverter>, + FlowableConverter>, + MaybeConverter>, + SingleConverter>, + CompletableConverter> { + @Override + public Flowable apply(Completable upstream) throws Exception { + return upstream.toFlowable(); + } + + @Override + public Observable apply(Flowable upstream) throws Exception { + return upstream.toObservable(); + } + + @Override + public Flowable apply(Maybe upstream) throws Exception { + return upstream.toFlowable(); + } + + @Override + public Flowable apply(Observable upstream) throws Exception { + return upstream.toFlowable(BackpressureStrategy.MISSING); + } + + @Override + public Flowable apply(Single upstream) throws Exception { + return upstream.toFlowable(); + } + } +} From e13a9785e89495f5731a75e4db63c45784da40ef Mon Sep 17 00:00:00 2001 From: akarnokd Date: Thu, 16 Nov 2017 15:21:21 +0100 Subject: [PATCH 12/16] Fix tests and update validator --- src/test/java/io/reactivex/ConverterTest.java | 66 ++++++++++++++++--- .../reactivex/ParamValidationCheckerTest.java | 40 +++++++++++ 2 files changed, 98 insertions(+), 8 deletions(-) diff --git a/src/test/java/io/reactivex/ConverterTest.java b/src/test/java/io/reactivex/ConverterTest.java index ce6f0de34f..e038bc0218 100644 --- a/src/test/java/io/reactivex/ConverterTest.java +++ b/src/test/java/io/reactivex/ConverterTest.java @@ -1,9 +1,24 @@ +/** + * Copyright (c) 2016-present, RxJava Contributors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in + * compliance with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is + * distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See + * the License for the specific language governing permissions and limitations under the License. + */ + package io.reactivex; -import io.reactivex.exceptions.TestException; +import static org.junit.Assert.*; + import org.junit.Test; -import static org.junit.Assert.*; +import io.reactivex.exceptions.TestException; +import io.reactivex.parallel.*; public final class ConverterTest { @@ -70,7 +85,7 @@ public Integer apply(Maybe v) { @Test public void completableConverterThrows() { try { - Completable.complete().as(new CompletableConverter() { + Completable.complete().as(new CompletableConverter() { @Override public Completable apply(Completable v) { throw new TestException("Forced failure"); @@ -84,32 +99,44 @@ public Completable apply(Completable v) { // Test demos for signature generics in compose() methods. Just needs to compile. + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void observableGenericsSignatureTest() { A a = new A() { }; - Observable.just(a).as(ConverterTest.testObservableConverterCreator()); + Observable.just(a).as((ObservableConverter)ConverterTest.testObservableConverterCreator()); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void singleGenericsSignatureTest() { A a = new A() { }; - Single.just(a).as(ConverterTest.testSingleConverterCreator()); + Single.just(a).as((SingleConverter)ConverterTest.testSingleConverterCreator()); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void maybeGenericsSignatureTest() { A a = new A() { }; - Maybe.just(a).as(ConverterTest.testMaybeConverterCreator()); + Maybe.just(a).as((MaybeConverter)ConverterTest.testMaybeConverterCreator()); } + @SuppressWarnings({ "rawtypes", "unchecked" }) @Test public void flowableGenericsSignatureTest() { A a = new A() { }; - Flowable.just(a).as(ConverterTest.testFlowableConverterCreator()); + Flowable.just(a).as((FlowableConverter)ConverterTest.testFlowableConverterCreator()); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void parallelFlowableGenericsSignatureTest() { + A a = new A() { }; + + Flowable.just(a).parallel().as((ParallelFlowableConverter)ConverterTest.testParallelFlowableConverterCreator()); } @Test @@ -140,6 +167,12 @@ public void compositeTest() { .as(converter) .test() .assertComplete(); + + Flowable.just(1) + .parallel() + .as(converter) + .test() + .assertValue(1); } interface A { } @@ -185,11 +218,28 @@ public B apply(Flowable> a) { }; } - private static class CompositeConverter implements ObservableConverter>, + private static ParallelFlowableConverter, B> testParallelFlowableConverterCreator() { + return new ParallelFlowableConverter, B>() { + @Override + public B apply(ParallelFlowable> a) { + return new B() { + }; + } + }; + } + + static class CompositeConverter + implements ObservableConverter>, + ParallelFlowableConverter>, FlowableConverter>, MaybeConverter>, SingleConverter>, CompletableConverter> { + @Override + public Flowable apply(ParallelFlowable upstream) throws Exception { + return upstream.sequential(); + } + @Override public Flowable apply(Completable upstream) throws Exception { return upstream.toFlowable(); diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index 6fef3b0273..b52d4b7eb7 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -560,6 +560,46 @@ public void checkParallelFlowable() { defaultValues.put(ParallelFailureHandling.class, ParallelFailureHandling.ERROR); + @SuppressWarnings("rawtypes") + class MixedConverters implements FlowableConverter, ObservableConverter, SingleConverter, + MaybeConverter, CompletableConverter, ParallelFlowableConverter { + + @Override + public Object apply(ParallelFlowable upstream) throws Exception { + return upstream; + } + + @Override + public Object apply(Completable upstream) throws Exception { + return upstream; + } + + @Override + public Object apply(Maybe upstream) throws Exception { + return upstream; + } + + @Override + public Object apply(Single upstream) throws Exception { + return upstream; + } + + @Override + public Object apply(Observable upstream) throws Exception { + return upstream; + } + + @Override + public Object apply(Flowable upstream) throws Exception { + return upstream; + } + } + + MixedConverters mc = new MixedConverters(); + for (Class c : MixedConverters.class.getInterfaces()) { + defaultValues.put(c, mc); + } + // ----------------------------------------------------------------------------------- defaultInstances = new HashMap, List>(); From 5c0dec58bd0c45333b99dfa2dd7e3c1f4a560f83 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 17:10:38 -0800 Subject: [PATCH 13/16] Remove exceptions from signatures --- src/main/java/io/reactivex/Completable.java | 7 +------ src/main/java/io/reactivex/CompletableConverter.java | 3 +-- src/main/java/io/reactivex/Flowable.java | 7 +------ src/main/java/io/reactivex/FlowableConverter.java | 3 +-- src/main/java/io/reactivex/Maybe.java | 7 +------ src/main/java/io/reactivex/MaybeConverter.java | 3 +-- src/main/java/io/reactivex/Observable.java | 7 +------ src/main/java/io/reactivex/ObservableConverter.java | 3 +-- src/main/java/io/reactivex/Single.java | 7 +------ src/main/java/io/reactivex/SingleConverter.java | 3 +-- src/main/java/io/reactivex/parallel/ParallelFlowable.java | 7 +------ .../io/reactivex/parallel/ParallelFlowableConverter.java | 3 +-- 12 files changed, 12 insertions(+), 48 deletions(-) diff --git a/src/main/java/io/reactivex/Completable.java b/src/main/java/io/reactivex/Completable.java index 181062736d..34008f1a0c 100644 --- a/src/main/java/io/reactivex/Completable.java +++ b/src/main/java/io/reactivex/Completable.java @@ -927,12 +927,7 @@ public final Completable andThen(CompletableSource next) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(@NonNull CompletableConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/CompletableConverter.java b/src/main/java/io/reactivex/CompletableConverter.java index 73a99bdc7b..39ec9b452b 100644 --- a/src/main/java/io/reactivex/CompletableConverter.java +++ b/src/main/java/io/reactivex/CompletableConverter.java @@ -29,8 +29,7 @@ public interface CompletableConverter { * * @param upstream the upstream Completable instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull Completable upstream) throws Exception; + R apply(@NonNull Completable upstream); } diff --git a/src/main/java/io/reactivex/Flowable.java b/src/main/java/io/reactivex/Flowable.java index 9345ce680b..b2da55c6fa 100644 --- a/src/main/java/io/reactivex/Flowable.java +++ b/src/main/java/io/reactivex/Flowable.java @@ -5259,12 +5259,7 @@ public final Single any(Predicate predicate) { @BackpressureSupport(BackpressureKind.SPECIAL) @SchedulerSupport(SchedulerSupport.NONE) public final R as(@NonNull FlowableConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/FlowableConverter.java b/src/main/java/io/reactivex/FlowableConverter.java index cc2691bae2..541e335bcd 100644 --- a/src/main/java/io/reactivex/FlowableConverter.java +++ b/src/main/java/io/reactivex/FlowableConverter.java @@ -30,8 +30,7 @@ public interface FlowableConverter { * * @param upstream the upstream Flowable instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull Flowable upstream) throws Exception; + R apply(@NonNull Flowable upstream); } diff --git a/src/main/java/io/reactivex/Maybe.java b/src/main/java/io/reactivex/Maybe.java index e94651e4da..c2a88aacb3 100644 --- a/src/main/java/io/reactivex/Maybe.java +++ b/src/main/java/io/reactivex/Maybe.java @@ -2008,12 +2008,7 @@ public final Maybe ambWith(MaybeSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(@NonNull MaybeConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/MaybeConverter.java b/src/main/java/io/reactivex/MaybeConverter.java index 4cc6419451..e156ed5944 100644 --- a/src/main/java/io/reactivex/MaybeConverter.java +++ b/src/main/java/io/reactivex/MaybeConverter.java @@ -30,8 +30,7 @@ public interface MaybeConverter { * * @param upstream the upstream Maybe instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull Maybe upstream) throws Exception; + R apply(@NonNull Maybe upstream); } diff --git a/src/main/java/io/reactivex/Observable.java b/src/main/java/io/reactivex/Observable.java index bfa2b5a882..bfa3a19e4e 100644 --- a/src/main/java/io/reactivex/Observable.java +++ b/src/main/java/io/reactivex/Observable.java @@ -4819,12 +4819,7 @@ public final Single any(Predicate predicate) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(@NonNull ObservableConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/ObservableConverter.java b/src/main/java/io/reactivex/ObservableConverter.java index 2b5f917a5f..b413de69de 100644 --- a/src/main/java/io/reactivex/ObservableConverter.java +++ b/src/main/java/io/reactivex/ObservableConverter.java @@ -30,8 +30,7 @@ public interface ObservableConverter { * * @param upstream the upstream Observable instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull Observable upstream) throws Exception; + R apply(@NonNull Observable upstream); } diff --git a/src/main/java/io/reactivex/Single.java b/src/main/java/io/reactivex/Single.java index 5245dacf54..72ee2872c6 100644 --- a/src/main/java/io/reactivex/Single.java +++ b/src/main/java/io/reactivex/Single.java @@ -1541,12 +1541,7 @@ public final Single ambWith(SingleSource other) { @CheckReturnValue @SchedulerSupport(SchedulerSupport.NONE) public final R as(@NonNull SingleConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/SingleConverter.java b/src/main/java/io/reactivex/SingleConverter.java index 68c1207a0b..9938b22cc7 100644 --- a/src/main/java/io/reactivex/SingleConverter.java +++ b/src/main/java/io/reactivex/SingleConverter.java @@ -30,8 +30,7 @@ public interface SingleConverter { * * @param upstream the upstream Single instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull Single upstream) throws Exception; + R apply(@NonNull Single upstream); } diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowable.java b/src/main/java/io/reactivex/parallel/ParallelFlowable.java index 9c1ec8d28b..4dd6dfd93d 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowable.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowable.java @@ -137,12 +137,7 @@ public static ParallelFlowable from(@NonNull Publisher sourc @CheckReturnValue @NonNull public final R as(@NonNull ParallelFlowableConverter converter) { - try { - return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); - } catch (Throwable ex) { - Exceptions.throwIfFatal(ex); - throw ExceptionHelper.wrapOrThrow(ex); - } + return ObjectHelper.requireNonNull(converter, "converter is null").apply(this); } /** diff --git a/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java index 86f1eaea23..f782eb7bb0 100644 --- a/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java +++ b/src/main/java/io/reactivex/parallel/ParallelFlowableConverter.java @@ -30,8 +30,7 @@ public interface ParallelFlowableConverter { * * @param upstream the upstream ParallelFlowable instance * @return the converted value - * @throws Exception on error */ @NonNull - R apply(@NonNull ParallelFlowable upstream) throws Exception; + R apply(@NonNull ParallelFlowable upstream); } From c4b0c260336f35c3bf5d1792bed442ecc3e70d42 Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Thu, 16 Nov 2017 17:19:18 -0800 Subject: [PATCH 14/16] Remove exception signature from implementations --- src/test/java/io/reactivex/ConverterTest.java | 12 ++++++------ .../io/reactivex/ParamValidationCheckerTest.java | 12 ++++++------ .../io/reactivex/completable/CompletableTest.java | 2 +- .../java/io/reactivex/flowable/FlowableTests.java | 2 +- src/test/java/io/reactivex/maybe/MaybeTest.java | 2 +- .../java/io/reactivex/observable/ObservableTest.java | 2 +- .../io/reactivex/parallel/ParallelFlowableTest.java | 4 ++-- src/test/java/io/reactivex/single/SingleTest.java | 2 +- 8 files changed, 19 insertions(+), 19 deletions(-) diff --git a/src/test/java/io/reactivex/ConverterTest.java b/src/test/java/io/reactivex/ConverterTest.java index e038bc0218..d2f174cd2f 100644 --- a/src/test/java/io/reactivex/ConverterTest.java +++ b/src/test/java/io/reactivex/ConverterTest.java @@ -236,32 +236,32 @@ static class CompositeConverter SingleConverter>, CompletableConverter> { @Override - public Flowable apply(ParallelFlowable upstream) throws Exception { + public Flowable apply(ParallelFlowable upstream) { return upstream.sequential(); } @Override - public Flowable apply(Completable upstream) throws Exception { + public Flowable apply(Completable upstream) { return upstream.toFlowable(); } @Override - public Observable apply(Flowable upstream) throws Exception { + public Observable apply(Flowable upstream) { return upstream.toObservable(); } @Override - public Flowable apply(Maybe upstream) throws Exception { + public Flowable apply(Maybe upstream) { return upstream.toFlowable(); } @Override - public Flowable apply(Observable upstream) throws Exception { + public Flowable apply(Observable upstream) { return upstream.toFlowable(BackpressureStrategy.MISSING); } @Override - public Flowable apply(Single upstream) throws Exception { + public Flowable apply(Single upstream) { return upstream.toFlowable(); } } diff --git a/src/test/java/io/reactivex/ParamValidationCheckerTest.java b/src/test/java/io/reactivex/ParamValidationCheckerTest.java index b52d4b7eb7..5ad5afbd96 100644 --- a/src/test/java/io/reactivex/ParamValidationCheckerTest.java +++ b/src/test/java/io/reactivex/ParamValidationCheckerTest.java @@ -565,32 +565,32 @@ class MixedConverters implements FlowableConverter, ObservableConverter, SingleC MaybeConverter, CompletableConverter, ParallelFlowableConverter { @Override - public Object apply(ParallelFlowable upstream) throws Exception { + public Object apply(ParallelFlowable upstream) { return upstream; } @Override - public Object apply(Completable upstream) throws Exception { + public Object apply(Completable upstream) { return upstream; } @Override - public Object apply(Maybe upstream) throws Exception { + public Object apply(Maybe upstream) { return upstream; } @Override - public Object apply(Single upstream) throws Exception { + public Object apply(Single upstream) { return upstream; } @Override - public Object apply(Observable upstream) throws Exception { + public Object apply(Observable upstream) { return upstream; } @Override - public Object apply(Flowable upstream) throws Exception { + public Object apply(Flowable upstream) { return upstream; } } diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index 8db90b885b..dd2ef4f658 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2815,7 +2815,7 @@ public void accept(Object e) { } public void as() { Completable.complete().as(new CompletableConverter>() { @Override - public Flowable apply(Completable v) throws Exception { + public Flowable apply(Completable v) { return v.toFlowable(); } }) diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index 3042e22d9d..d0ac63f762 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -1146,7 +1146,7 @@ public Object apply(Flowable onSubscribe) { public void as() { Flowable.just(1).as(new FlowableConverter>() { @Override - public Observable apply(Flowable v) throws Exception { + public Observable apply(Flowable v) { return v.toObservable(); } }) diff --git a/src/test/java/io/reactivex/maybe/MaybeTest.java b/src/test/java/io/reactivex/maybe/MaybeTest.java index 945648a010..c493f956b1 100644 --- a/src/test/java/io/reactivex/maybe/MaybeTest.java +++ b/src/test/java/io/reactivex/maybe/MaybeTest.java @@ -385,7 +385,7 @@ public Flowable apply(Maybe v) throws Exception { public void as() { Maybe.just(1).as(new MaybeConverter>() { @Override - public Flowable apply(Maybe v) throws Exception { + public Flowable apply(Maybe v) { return v.toFlowable(); } }) diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 30dfd0c398..69c7147bd2 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1185,7 +1185,7 @@ public Object apply(Observable onSubscribe) { public void as() { Observable.just(1).as(new ObservableConverter>() { @Override - public Flowable apply(Observable v) throws Exception { + public Flowable apply(Observable v) { return v.toFlowable(BackpressureStrategy.MISSING); } }) diff --git a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java index f0b903af2c..577d4be1cd 100644 --- a/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java +++ b/src/test/java/io/reactivex/parallel/ParallelFlowableTest.java @@ -1106,7 +1106,7 @@ public void as() { .parallel() .as(new ParallelFlowableConverter>() { @Override - public Flowable apply(ParallelFlowable pf) throws Exception { + public Flowable apply(ParallelFlowable pf) { return pf.sequential(); } }) @@ -1132,7 +1132,7 @@ public void asThrows() { .parallel() .as(new ParallelFlowableConverter>() { @Override - public Flowable apply(ParallelFlowable pf) throws Exception { + public Flowable apply(ParallelFlowable pf) { throw new TestException(); } }); diff --git a/src/test/java/io/reactivex/single/SingleTest.java b/src/test/java/io/reactivex/single/SingleTest.java index d3f5334750..ac9df2fafb 100644 --- a/src/test/java/io/reactivex/single/SingleTest.java +++ b/src/test/java/io/reactivex/single/SingleTest.java @@ -547,7 +547,7 @@ public Integer apply(Single v) throws Exception { public void as() { Single.just(1).as(new SingleConverter>() { @Override - public Flowable apply(Single v) throws Exception { + public Flowable apply(Single v) { return v.toFlowable(); } }) From 5c3bdf37cd587df0ed4be769ede29e81f7b17d2f Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Fri, 17 Nov 2017 17:30:40 -0800 Subject: [PATCH 15/16] Assert the full execution of extend() tests --- src/test/java/io/reactivex/flowable/FlowableTests.java | 6 ++++-- src/test/java/io/reactivex/observable/ObservableTest.java | 6 ++++-- 2 files changed, 8 insertions(+), 4 deletions(-) diff --git a/src/test/java/io/reactivex/flowable/FlowableTests.java b/src/test/java/io/reactivex/flowable/FlowableTests.java index d0ac63f762..84c86a8504 100644 --- a/src/test/java/io/reactivex/flowable/FlowableTests.java +++ b/src/test/java/io/reactivex/flowable/FlowableTests.java @@ -1114,7 +1114,7 @@ public void testForEachWithNull() { public void testExtend() { final TestSubscriber subscriber = new TestSubscriber(); final Object value = new Object(); - Flowable.just(value).to(new Function, Object>() { + Object returned = Flowable.just(value).to(new Function, Object>() { @Override public Object apply(Flowable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1124,13 +1124,14 @@ public Object apply(Flowable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); } @Test public void testAsExtend() { final TestSubscriber subscriber = new TestSubscriber(); final Object value = new Object(); - Flowable.just(value).as(new FlowableConverter() { + Object returned = Flowable.just(value).as(new FlowableConverter() { @Override public Object apply(Flowable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1140,6 +1141,7 @@ public Object apply(Flowable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); } @Test diff --git a/src/test/java/io/reactivex/observable/ObservableTest.java b/src/test/java/io/reactivex/observable/ObservableTest.java index 69c7147bd2..fc79edaa9c 100644 --- a/src/test/java/io/reactivex/observable/ObservableTest.java +++ b/src/test/java/io/reactivex/observable/ObservableTest.java @@ -1153,7 +1153,7 @@ public void testForEachWithNull() { public void testExtend() { final TestObserver subscriber = new TestObserver(); final Object value = new Object(); - Observable.just(value).to(new Function, Object>() { + Object returned = Observable.just(value).to(new Function, Object>() { @Override public Object apply(Observable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1163,13 +1163,14 @@ public Object apply(Observable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); } @Test public void testAsExtend() { final TestObserver subscriber = new TestObserver(); final Object value = new Object(); - Observable.just(value).as(new ObservableConverter() { + Object returned = Observable.just(value).as(new ObservableConverter() { @Override public Object apply(Observable onSubscribe) { onSubscribe.subscribe(subscriber); @@ -1179,6 +1180,7 @@ public Object apply(Observable onSubscribe) { return subscriber.values().get(0); } }); + assertSame(returned, value); } @Test From e0d793faa79ed43e7409bb9c81712474f3daf85e Mon Sep 17 00:00:00 2001 From: Zac Sweers Date: Sun, 19 Nov 2017 02:22:13 -0800 Subject: [PATCH 16/16] Use test() helpers --- .../completable/CompletableTest.java | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/src/test/java/io/reactivex/completable/CompletableTest.java b/src/test/java/io/reactivex/completable/CompletableTest.java index dd2ef4f658..9b0b914a66 100644 --- a/src/test/java/io/reactivex/completable/CompletableTest.java +++ b/src/test/java/io/reactivex/completable/CompletableTest.java @@ -2783,32 +2783,30 @@ public void timeoutOtherNull() { @Test(timeout = 5000) public void toNormal() { - Flowable flow = normal.completable.to(new Function>() { - @Override - public Flowable apply(Completable c) { - return c.toFlowable(); - } - }); - - flow.blockingForEach(new Consumer() { - @Override - public void accept(Object e) { } - }); + normal.completable + .to(new Function>() { + @Override + public Flowable apply(Completable c) { + return c.toFlowable(); + } + }) + .test() + .assertComplete() + .assertNoValues(); } @Test(timeout = 5000) public void asNormal() { - Flowable flow = normal.completable.as(new CompletableConverter>() { - @Override - public Flowable apply(Completable c) { - return c.toFlowable(); - } - }); - - flow.blockingForEach(new Consumer() { - @Override - public void accept(Object e) { } - }); + normal.completable + .as(new CompletableConverter>() { + @Override + public Flowable apply(Completable c) { + return c.toFlowable(); + } + }) + .test() + .assertComplete() + .assertNoValues(); } @Test