From 4d5962a85ece462f2eefe94e921d654fba8bcfd7 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 00:44:08 +0200 Subject: [PATCH 1/5] Added support for the switch operation. --- rxjava-core/src/main/java/rx/Observable.java | 27 +- .../java/rx/operators/OperationSwitch.java | 396 ++++++++++++++++++ 2 files changed, 420 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSwitch.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a7d8ffe55f..7791601579 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,9 +15,16 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Arrays; @@ -60,6 +67,7 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSubscribeOn; +import rx.operators.OperationSwitch; import rx.operators.OperationSynchronize; import rx.operators.OperationTake; import rx.operators.OperationTakeLast; @@ -1981,6 +1989,19 @@ public static Observable skip(final Observable items, int num) { return create(OperationSkip.skip(items, num)); } + /** + * Accepts an {@link Observable} sequence of {@link Observable} sequences, and transforms it into a single + * {@link Observable} sequence, which publishes the values of the most recently published {@link Observable} sequence. + * + * @param sequenceOfSequences + * the {@link Observable} sequence of {@link Observable} sequences. + * @return an {@link Observable} which publishes only the values of the most recently published + * {@link Observable} sequence. + */ + public static Observable switchDo(Observable> sequenceOfSequences) { + return create(OperationSwitch.switchDo(sequenceOfSequences)); + } + /** * Accepts an Observable and wraps it in another Observable that ensures that the resulting * Observable is chronologically well-behaved. diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java new file mode 100644 index 0000000000..7138fd00d7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -0,0 +1,396 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + + +/** + * This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single + * {@link Observable} sequence which only produces values from the most recently published {@link Observable} + * sequence in the sequence. + */ +public final class OperationSwitch { + + /** + * This function transforms an {@link Observable} sequence of {@link Observable} sequences into a single + * {@link Observable} sequence which produces values from the most recently published {@link Observable}. + * + * @param sequences The {@link Observable} sequence consisting of {@link Observable} sequences. + * @return A {@link Func1} which does this transformation. + */ + public static Func1, Subscription> switchDo(final Observable> sequences) { + return new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + return new Switch(sequences).call(observer); + } + }; + } + + private static class Switch implements Func1, Subscription> { + + private final Observable> sequences; + + public Switch(Observable> sequences) { + this.sequences = sequences; + } + + @Override + public Subscription call(Observer observer) { + return sequences.subscribe(new SwitchObserver(observer)); + } + } + + private static class SwitchObserver implements Observer> { + + private final AtomicReference subscription = new AtomicReference(); + + private final Observer observer; + + public SwitchObserver(Observer observer) { + this.observer = observer; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Exception e) { + observer.onError(e); + } + + @Override + public void onNext(Observable args) { + synchronized (subscription) { + Subscription previousSubscription = subscription.get(); + if (previousSubscription != null) { + previousSubscription.unsubscribe(); + } + + subscription.set(args.subscribe(new Observer() { + @Override + public void onCompleted() { + // Do nothing. + } + + @Override + public void onError(Exception e) { + // Do nothing. + } + + @Override + public void onNext(T args) { + observer.onNext(args); + } + })); + } + } + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testSwitchWithComplete() { + Observable> source = Observable.create(new Func1>, Subscription>() { + @Override + public Subscription call(Observer> observer) { + publishNext(observer, 50, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "one"); + publishNext(observer, 100, "two"); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 200, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 0, "three"); + publishNext(observer, 100, "four"); + return Subscriptions.empty(); + } + })); + + publishCompleted(observer, 250); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(225, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("three"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + + @Test + public void testSwitchWithError() { + Observable> source = Observable.create(new Func1>, Subscription>() { + @Override + public Subscription call(Observer> observer) { + publishNext(observer, 50, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "one"); + publishNext(observer, 100, "two"); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 200, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 0, "three"); + publishNext(observer, 100, "four"); + return Subscriptions.empty(); + } + })); + + publishError(observer, 250, new TestException()); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(175, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(225, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("three"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(350, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + @Test + public void testSwitchWithSubsequenceComplete() { + Observable> source = Observable.create(new Func1>, Subscription>() { + @Override + public Subscription call(Observer> observer) { + publishNext(observer, 50, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "one"); + publishNext(observer, 100, "two"); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 130, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishCompleted(observer, 0); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 150, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "three"); + return Subscriptions.empty(); + } + })); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + + @Test + public void testSwitchWithSubsequenceError() { + Observable> source = Observable.create(new Func1>, Subscription>() { + @Override + public Subscription call(Observer> observer) { + publishNext(observer, 50, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "one"); + publishNext(observer, 100, "two"); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 130, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishError(observer, 0, new TestException()); + return Subscriptions.empty(); + } + })); + + publishNext(observer, 150, Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + publishNext(observer, 50, "three"); + return Subscriptions.empty(); + } + })); + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationSwitch.switchDo(source)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(90, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(125, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, long delay, final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { } + } +} \ No newline at end of file From 7190d234ba7694a4e92cd11c8d7e8853447bb0f1 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 00:55:55 +0200 Subject: [PATCH 2/5] Reverted changes to junit and mockito imports in the Observable class. --- rxjava-core/src/main/java/rx/Observable.java | 13 +++---------- 1 file changed, 3 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7791601579..f81ad6fe37 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,16 +15,9 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; From 9869e7875cf052f6c54e4ffedf9fa70f3803e3da Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Tue, 7 May 2013 00:01:59 +0200 Subject: [PATCH 3/5] Removed synchronized block as per RxJava guidelines (6.7). --- .../java/rx/operators/OperationSwitch.java | 42 +++++++++---------- 1 file changed, 20 insertions(+), 22 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index 7138fd00d7..44c88a0dcb 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -93,29 +93,27 @@ public void onError(Exception e) { @Override public void onNext(Observable args) { - synchronized (subscription) { - Subscription previousSubscription = subscription.get(); - if (previousSubscription != null) { - previousSubscription.unsubscribe(); - } - - subscription.set(args.subscribe(new Observer() { - @Override - public void onCompleted() { - // Do nothing. - } - - @Override - public void onError(Exception e) { - // Do nothing. - } - - @Override - public void onNext(T args) { - observer.onNext(args); - } - })); + Subscription previousSubscription = subscription.get(); + if (previousSubscription != null) { + previousSubscription.unsubscribe(); } + + subscription.set(args.subscribe(new Observer() { + @Override + public void onCompleted() { + // Do nothing. + } + + @Override + public void onError(Exception e) { + // Do nothing. + } + + @Override + public void onNext(T args) { + observer.onNext(args); + } + })); } } From 58cc48d0c621c1b66d490c2135c53778ce32f0fb Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Wed, 15 May 2013 19:09:46 +0200 Subject: [PATCH 4/5] Switch operator should now propagate onError in subsequences. --- .../java/rx/operators/OperationSwitch.java | 53 ++++++++++++------- 1 file changed, 34 insertions(+), 19 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index 44c88a0dcb..ec4336866b 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -15,9 +15,6 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; @@ -30,9 +27,19 @@ import rx.Subscription; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; +import rx.util.AtomicObservableSubscription; import rx.util.functions.Action0; import rx.util.functions.Func1; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; + /** * This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single @@ -67,54 +74,64 @@ public Switch(Observable> sequences) { @Override public Subscription call(Observer observer) { - return sequences.subscribe(new SwitchObserver(observer)); + AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + subscription.wrap(sequences.subscribe(new SwitchObserver(observer, subscription))); + return subscription; } } private static class SwitchObserver implements Observer> { - private final AtomicReference subscription = new AtomicReference(); - private final Observer observer; + private final AtomicObservableSubscription parent; + private final AtomicReference subsequence = new AtomicReference(); - public SwitchObserver(Observer observer) { + public SwitchObserver(Observer observer, AtomicObservableSubscription parent) { this.observer = observer; + this.parent = parent; } @Override public void onCompleted() { + unsubscribeFromSubSequence(); observer.onCompleted(); } @Override public void onError(Exception e) { + unsubscribeFromSubSequence(); observer.onError(e); } @Override public void onNext(Observable args) { - Subscription previousSubscription = subscription.get(); - if (previousSubscription != null) { - previousSubscription.unsubscribe(); - } + unsubscribeFromSubSequence(); - subscription.set(args.subscribe(new Observer() { + subsequence.set(args.subscribe(new Observer() { @Override public void onCompleted() { - // Do nothing. + // Do nothing. } @Override public void onError(Exception e) { - // Do nothing. + parent.unsubscribe(); + observer.onError(e); } - @Override + @Override public void onNext(T args) { observer.onNext(args); } })); } + + private void unsubscribeFromSubSequence() { + Subscription previousSubscription = subsequence.get(); + if (previousSubscription != null) { + previousSubscription.unsubscribe(); + } + } } public static class UnitTest { @@ -299,7 +316,6 @@ public Subscription call(Observer observer) { verify(observer, never()).onError(any(Exception.class)); scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); - inOrder.verify(observer, times(1)).onNext("two"); inOrder.verify(observer, times(1)).onNext("three"); verify(observer, never()).onCompleted(); verify(observer, never()).onError(any(Exception.class)); @@ -355,10 +371,9 @@ public Subscription call(Observer observer) { verify(observer, never()).onError(any(Exception.class)); scheduler.advanceTimeTo(250, TimeUnit.MILLISECONDS); - inOrder.verify(observer, times(1)).onNext("two"); - inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, never()).onNext("three"); verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); + verify(observer, times(1)).onError(any(TestException.class)); } private void publishCompleted(final Observer observer, long delay) { From 587f5e61ac811a7e5d75a3113be7cb04079bbcd5 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Wed, 15 May 2013 19:24:53 +0200 Subject: [PATCH 5/5] Replaced tabs with spaces. --- .../java/rx/operators/OperationSwitch.java | 31 +++++++++---------- 1 file changed, 15 insertions(+), 16 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java index ec4336866b..6995e934d2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSwitch.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSwitch.java @@ -40,7 +40,6 @@ import static org.mockito.Matchers.any; import static org.mockito.Matchers.anyString; - /** * This operation transforms an {@link Observable} sequence of {@link Observable} sequences into a single * {@link Observable} sequence which only produces values from the most recently published {@link Observable} @@ -74,32 +73,32 @@ public Switch(Observable> sequences) { @Override public Subscription call(Observer observer) { - AtomicObservableSubscription subscription = new AtomicObservableSubscription(); - subscription.wrap(sequences.subscribe(new SwitchObserver(observer, subscription))); - return subscription; + AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + subscription.wrap(sequences.subscribe(new SwitchObserver(observer, subscription))); + return subscription; } } private static class SwitchObserver implements Observer> { private final Observer observer; - private final AtomicObservableSubscription parent; - private final AtomicReference subsequence = new AtomicReference(); + private final AtomicObservableSubscription parent; + private final AtomicReference subsequence = new AtomicReference(); public SwitchObserver(Observer observer, AtomicObservableSubscription parent) { this.observer = observer; - this.parent = parent; + this.parent = parent; } @Override public void onCompleted() { - unsubscribeFromSubSequence(); + unsubscribeFromSubSequence(); observer.onCompleted(); } @Override public void onError(Exception e) { - unsubscribeFromSubSequence(); + unsubscribeFromSubSequence(); observer.onError(e); } @@ -110,28 +109,28 @@ public void onNext(Observable args) { subsequence.set(args.subscribe(new Observer() { @Override public void onCompleted() { - // Do nothing. + // Do nothing. } @Override public void onError(Exception e) { - parent.unsubscribe(); - observer.onError(e); + parent.unsubscribe(); + observer.onError(e); } - @Override + @Override public void onNext(T args) { observer.onNext(args); } })); } - private void unsubscribeFromSubSequence() { - Subscription previousSubscription = subsequence.get(); + private void unsubscribeFromSubSequence() { + Subscription previousSubscription = subsequence.get(); if (previousSubscription != null) { previousSubscription.unsubscribe(); } - } + } } public static class UnitTest {