From 24619f055076a5b5d0ee8e582fd6325b146f352a Mon Sep 17 00:00:00 2001 From: zsxwing Date: Fri, 1 Nov 2013 20:33:43 +0800 Subject: [PATCH 1/2] Implemented the 'Amb' operator --- rxjava-core/src/main/java/rx/Observable.java | 29 +++ .../main/java/rx/operators/OperationAmb.java | 245 ++++++++++++++++++ 2 files changed, 274 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationAmb.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 8cf4c951ed..76171ac761 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -30,6 +30,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; +import rx.operators.OperationAmb; import rx.operators.OperationAny; import rx.operators.OperationAverage; import rx.operators.OperationBuffer; @@ -4576,6 +4577,34 @@ public Observable> timeInterval(Scheduler scheduler) { return create(OperationTimeInterval.timeInterval(this, scheduler)); } + /** + * Propagates the observable sequence that reacts first. + * + * @param sources + * observable sources competing to react first. + * + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable... sources) { + return create(OperationAmb.amb(sources)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param sources + * observable sources competing to react first. + * + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Iterable> sources) { + return create(OperationAmb.amb(sources)); + } + /** * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationAmb.java b/rxjava-core/src/main/java/rx/operators/OperationAmb.java new file mode 100644 index 0000000000..505814acf3 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationAmb.java @@ -0,0 +1,245 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + +import java.io.IOException; +import java.util.Arrays; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.concurrency.TestScheduler; +import rx.subscriptions.CompositeSubscription; +import rx.util.functions.Action0; + +/** + * Propagates the observable sequence that reacts first. + */ +public class OperationAmb { + + public static OnSubscribeFunc amb(Observable... sources) { + return amb(Arrays.asList(sources)); + } + + public static OnSubscribeFunc amb( + final Iterable> sources) { + return new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe(final Observer observer) { + AtomicInteger choice = new AtomicInteger(AmbObserver.NONE); + int index = 0; + CompositeSubscription parentSubscription = new CompositeSubscription(); + for (Observable source : sources) { + SafeObservableSubscription subscription = new SafeObservableSubscription(); + AmbObserver ambObserver = new AmbObserver( + subscription, observer, index, choice); + parentSubscription.add(subscription.wrap(source + .subscribe(ambObserver))); + index++; + } + return parentSubscription; + } + }; + } + + private static class AmbObserver implements Observer { + + private static final int NONE = -1; + + private Subscription subscription; + private Observer observer; + private int index; + private AtomicInteger choice; + + private AmbObserver(Subscription subscription, + Observer observer, int index, AtomicInteger choice) { + this.subscription = subscription; + this.observer = observer; + this.choice = choice; + this.index = index; + } + + @Override + public void onNext(T args) { + if (!isSelected()) { + subscription.unsubscribe(); + return; + } + observer.onNext(args); + } + + @Override + public void onCompleted() { + if (!isSelected()) { + subscription.unsubscribe(); + return; + } + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if (!isSelected()) { + subscription.unsubscribe(); + return; + } + observer.onError(e); + } + + private boolean isSelected() { + if (choice.get() == NONE) { + return choice.compareAndSet(NONE, index); + } + return choice.get() == index; + } + } + + public static class UnitTest { + private TestScheduler scheduler; + + @Before + public void setUp() { + scheduler = new TestScheduler(); + } + + private Observable createObservable(final String[] values, + final long interval, final Throwable e) { + return Observable.create(new OnSubscribeFunc() { + + @Override + public Subscription onSubscribe( + final Observer observer) { + CompositeSubscription parentSubscription = new CompositeSubscription(); + long delay = interval; + for (final String value : values) { + parentSubscription.add(scheduler.schedule( + new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS)); + delay += interval; + } + parentSubscription.add(scheduler.schedule(new Action0() { + @Override + public void call() { + if (e == null) { + observer.onCompleted(); + } else { + observer.onError(e); + } + } + }, delay, TimeUnit.MILLISECONDS)); + return parentSubscription; + } + }); + } + + @Test + public void testAmb() { + Observable observable1 = createObservable(new String[] { + "1", "11", "111", "1111" }, 2000, null); + Observable observable2 = createObservable(new String[] { + "2", "22", "222", "2222" }, 1000, null); + Observable observable3 = createObservable(new String[] { + "3", "33", "333", "3333" }, 3000, null); + + @SuppressWarnings("unchecked") + Observable o = Observable.create(amb(observable1, + observable2, observable3)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + o.subscribe(observer); + + scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("2"); + inOrder.verify(observer, times(1)).onNext("22"); + inOrder.verify(observer, times(1)).onNext("222"); + inOrder.verify(observer, times(1)).onNext("2222"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testAmb2() { + IOException needHappenedException = new IOException( + "fake exception"); + Observable observable1 = createObservable(new String[] {}, + 2000, new IOException("fake exception")); + Observable observable2 = createObservable(new String[] { + "2", "22", "222", "2222" }, 1000, needHappenedException); + Observable observable3 = createObservable(new String[] {}, + 3000, new IOException("fake exception")); + + @SuppressWarnings("unchecked") + Observable o = Observable.create(amb(observable1, + observable2, observable3)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + o.subscribe(observer); + + scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("2"); + inOrder.verify(observer, times(1)).onNext("22"); + inOrder.verify(observer, times(1)).onNext("222"); + inOrder.verify(observer, times(1)).onNext("2222"); + inOrder.verify(observer, times(1)).onError(needHappenedException); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testAmb3() { + Observable observable1 = createObservable(new String[] { + "1" }, 2000, null); + Observable observable2 = createObservable(new String[] {}, + 1000, null); + Observable observable3 = createObservable(new String[] { + "3" }, 3000, null); + + @SuppressWarnings("unchecked") + Observable o = Observable.create(amb(observable1, + observable2, observable3)); + + @SuppressWarnings("unchecked") + Observer observer = (Observer) mock(Observer.class); + o.subscribe(observer); + + scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + } +} From 50b04ec8301b11f321e7d9547ba076f6b365bcf4 Mon Sep 17 00:00:00 2001 From: zsxwing Date: Sun, 3 Nov 2013 16:39:08 +0800 Subject: [PATCH 2/2] Removed the 'vararg' overload and added 2-9 args overloads --- rxjava-core/src/main/java/rx/Observable.java | 170 +++++++++++++++++- .../main/java/rx/operators/OperationAmb.java | 90 +++++++++- 2 files changed, 250 insertions(+), 10 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 76171ac761..a6f5f0f7ce 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4580,15 +4580,177 @@ public Observable> timeInterval(Scheduler scheduler) { /** * Propagates the observable sequence that reacts first. * - * @param sources - * observable sources competing to react first. + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2) { + return create(OperationAmb.amb(o1, o2)); + } + + /** + * Propagates the observable sequence that reacts first. * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. * @return * an observable sequence that surfaces any of the given sequences, whichever reacted first. * @see MSDN: Observable.Amb */ - public static Observable amb(Observable... sources) { - return create(OperationAmb.amb(sources)); + public static Observable amb(Observable o1, Observable o2, Observable o3) { + return create(OperationAmb.amb(o1, o2, o3)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4) { + return create(OperationAmb.amb(o1, o2, o3, o4)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @param o5 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { + return create(OperationAmb.amb(o1, o2, o3, o4, o5)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @param o5 + * an observable competing to react first. + * @param o6 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { + return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @param o5 + * an observable competing to react first. + * @param o6 + * an observable competing to react first. + * @param o7 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { + return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @param o5 + * an observable competing to react first. + * @param o6 + * an observable competing to react first. + * @param o7 + * an observable competing to react first. + * @param o8 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { + return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8)); + } + + /** + * Propagates the observable sequence that reacts first. + * + * @param o1 + * an observable competing to react first. + * @param o2 + * an observable competing to react first. + * @param o3 + * an observable competing to react first. + * @param o4 + * an observable competing to react first. + * @param o5 + * an observable competing to react first. + * @param o6 + * an observable competing to react first. + * @param o7 + * an observable competing to react first. + * @param o8 + * an observable competing to react first. + * @param o9 + * an observable competing to react first. + * @return + * an observable sequence that surfaces any of the given sequences, whichever reacted first. + * @see MSDN: Observable.Amb + */ + public static Observable amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { + return create(OperationAmb.amb(o1, o2, o3, o4, o5, o6, o7, o8, o9)); } /** diff --git a/rxjava-core/src/main/java/rx/operators/OperationAmb.java b/rxjava-core/src/main/java/rx/operators/OperationAmb.java index 505814acf3..3074b4d3e9 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAmb.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAmb.java @@ -20,7 +20,8 @@ import static org.mockito.Mockito.times; import java.io.IOException; -import java.util.Arrays; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; @@ -41,8 +42,88 @@ */ public class OperationAmb { - public static OnSubscribeFunc amb(Observable... sources) { - return amb(Arrays.asList(sources)); + public static OnSubscribeFunc amb(Observable o1, Observable o2) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + sources.add(o5); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + sources.add(o5); + sources.add(o6); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + sources.add(o5); + sources.add(o6); + sources.add(o7); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + sources.add(o5); + sources.add(o6); + sources.add(o7); + sources.add(o8); + return amb(sources); + } + + public static OnSubscribeFunc amb(Observable o1, Observable o2, Observable o3, Observable o4, Observable o5, Observable o6, Observable o7, Observable o8, Observable o9) { + List> sources = new ArrayList>(); + sources.add(o1); + sources.add(o2); + sources.add(o3); + sources.add(o4); + sources.add(o5); + sources.add(o6); + sources.add(o7); + sources.add(o8); + sources.add(o9); + return amb(sources); } public static OnSubscribeFunc amb( @@ -170,7 +251,6 @@ public void testAmb() { Observable observable3 = createObservable(new String[] { "3", "33", "333", "3333" }, 3000, null); - @SuppressWarnings("unchecked") Observable o = Observable.create(amb(observable1, observable2, observable3)); @@ -200,7 +280,6 @@ public void testAmb2() { Observable observable3 = createObservable(new String[] {}, 3000, new IOException("fake exception")); - @SuppressWarnings("unchecked") Observable o = Observable.create(amb(observable1, observable2, observable3)); @@ -228,7 +307,6 @@ public void testAmb3() { Observable observable3 = createObservable(new String[] { "3" }, 3000, null); - @SuppressWarnings("unchecked") Observable o = Observable.create(amb(observable1, observable2, observable3));