Skip to content

Commit 24619f0

Browse files
committed
Implemented the 'Amb' operator
1 parent bcf6eb2 commit 24619f0

File tree

2 files changed

+274
-0
lines changed

2 files changed

+274
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import rx.observables.ConnectableObservable;
3131
import rx.observables.GroupedObservable;
3232
import rx.operators.OperationAll;
33+
import rx.operators.OperationAmb;
3334
import rx.operators.OperationAny;
3435
import rx.operators.OperationAverage;
3536
import rx.operators.OperationBuffer;
@@ -4576,6 +4577,34 @@ public Observable<TimeInterval<T>> timeInterval(Scheduler scheduler) {
45764577
return create(OperationTimeInterval.timeInterval(this, scheduler));
45774578
}
45784579

4580+
/**
4581+
* Propagates the observable sequence that reacts first.
4582+
*
4583+
* @param sources
4584+
* observable sources competing to react first.
4585+
*
4586+
* @return
4587+
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
4588+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229733(v=vs.103).aspx">MSDN: Observable.Amb</a>
4589+
*/
4590+
public static <T> Observable<T> amb(Observable<? extends T>... sources) {
4591+
return create(OperationAmb.amb(sources));
4592+
}
4593+
4594+
/**
4595+
* Propagates the observable sequence that reacts first.
4596+
*
4597+
* @param sources
4598+
* observable sources competing to react first.
4599+
*
4600+
* @return
4601+
* an observable sequence that surfaces any of the given sequences, whichever reacted first.
4602+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229115(v=vs.103).aspx">MSDN: Observable.Amb</a>
4603+
*/
4604+
public static <T> Observable<T> amb(Iterable<? extends Observable<? extends T>> sources) {
4605+
return create(OperationAmb.amb(sources));
4606+
}
4607+
45794608
/**
45804609
* Whether a given {@link Function} is an internal implementation inside rx.* packages or not.
45814610
* <p>
Lines changed: 245 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,245 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.inOrder;
19+
import static org.mockito.Mockito.mock;
20+
import static org.mockito.Mockito.times;
21+
22+
import java.io.IOException;
23+
import java.util.Arrays;
24+
import java.util.concurrent.TimeUnit;
25+
import java.util.concurrent.atomic.AtomicInteger;
26+
27+
import org.junit.Before;
28+
import org.junit.Test;
29+
import org.mockito.InOrder;
30+
31+
import rx.Observable;
32+
import rx.Observable.OnSubscribeFunc;
33+
import rx.Observer;
34+
import rx.Subscription;
35+
import rx.concurrency.TestScheduler;
36+
import rx.subscriptions.CompositeSubscription;
37+
import rx.util.functions.Action0;
38+
39+
/**
40+
* Propagates the observable sequence that reacts first.
41+
*/
42+
public class OperationAmb {
43+
44+
public static <T> OnSubscribeFunc<T> amb(Observable<? extends T>... sources) {
45+
return amb(Arrays.asList(sources));
46+
}
47+
48+
public static <T> OnSubscribeFunc<T> amb(
49+
final Iterable<? extends Observable<? extends T>> sources) {
50+
return new OnSubscribeFunc<T>() {
51+
52+
@Override
53+
public Subscription onSubscribe(final Observer<? super T> observer) {
54+
AtomicInteger choice = new AtomicInteger(AmbObserver.NONE);
55+
int index = 0;
56+
CompositeSubscription parentSubscription = new CompositeSubscription();
57+
for (Observable<? extends T> source : sources) {
58+
SafeObservableSubscription subscription = new SafeObservableSubscription();
59+
AmbObserver<T> ambObserver = new AmbObserver<T>(
60+
subscription, observer, index, choice);
61+
parentSubscription.add(subscription.wrap(source
62+
.subscribe(ambObserver)));
63+
index++;
64+
}
65+
return parentSubscription;
66+
}
67+
};
68+
}
69+
70+
private static class AmbObserver<T> implements Observer<T> {
71+
72+
private static final int NONE = -1;
73+
74+
private Subscription subscription;
75+
private Observer<? super T> observer;
76+
private int index;
77+
private AtomicInteger choice;
78+
79+
private AmbObserver(Subscription subscription,
80+
Observer<? super T> observer, int index, AtomicInteger choice) {
81+
this.subscription = subscription;
82+
this.observer = observer;
83+
this.choice = choice;
84+
this.index = index;
85+
}
86+
87+
@Override
88+
public void onNext(T args) {
89+
if (!isSelected()) {
90+
subscription.unsubscribe();
91+
return;
92+
}
93+
observer.onNext(args);
94+
}
95+
96+
@Override
97+
public void onCompleted() {
98+
if (!isSelected()) {
99+
subscription.unsubscribe();
100+
return;
101+
}
102+
observer.onCompleted();
103+
}
104+
105+
@Override
106+
public void onError(Throwable e) {
107+
if (!isSelected()) {
108+
subscription.unsubscribe();
109+
return;
110+
}
111+
observer.onError(e);
112+
}
113+
114+
private boolean isSelected() {
115+
if (choice.get() == NONE) {
116+
return choice.compareAndSet(NONE, index);
117+
}
118+
return choice.get() == index;
119+
}
120+
}
121+
122+
public static class UnitTest {
123+
private TestScheduler scheduler;
124+
125+
@Before
126+
public void setUp() {
127+
scheduler = new TestScheduler();
128+
}
129+
130+
private Observable<String> createObservable(final String[] values,
131+
final long interval, final Throwable e) {
132+
return Observable.create(new OnSubscribeFunc<String>() {
133+
134+
@Override
135+
public Subscription onSubscribe(
136+
final Observer<? super String> observer) {
137+
CompositeSubscription parentSubscription = new CompositeSubscription();
138+
long delay = interval;
139+
for (final String value : values) {
140+
parentSubscription.add(scheduler.schedule(
141+
new Action0() {
142+
@Override
143+
public void call() {
144+
observer.onNext(value);
145+
}
146+
}, delay, TimeUnit.MILLISECONDS));
147+
delay += interval;
148+
}
149+
parentSubscription.add(scheduler.schedule(new Action0() {
150+
@Override
151+
public void call() {
152+
if (e == null) {
153+
observer.onCompleted();
154+
} else {
155+
observer.onError(e);
156+
}
157+
}
158+
}, delay, TimeUnit.MILLISECONDS));
159+
return parentSubscription;
160+
}
161+
});
162+
}
163+
164+
@Test
165+
public void testAmb() {
166+
Observable<String> observable1 = createObservable(new String[] {
167+
"1", "11", "111", "1111" }, 2000, null);
168+
Observable<String> observable2 = createObservable(new String[] {
169+
"2", "22", "222", "2222" }, 1000, null);
170+
Observable<String> observable3 = createObservable(new String[] {
171+
"3", "33", "333", "3333" }, 3000, null);
172+
173+
@SuppressWarnings("unchecked")
174+
Observable<String> o = Observable.create(amb(observable1,
175+
observable2, observable3));
176+
177+
@SuppressWarnings("unchecked")
178+
Observer<String> observer = (Observer<String>) mock(Observer.class);
179+
o.subscribe(observer);
180+
181+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
182+
183+
InOrder inOrder = inOrder(observer);
184+
inOrder.verify(observer, times(1)).onNext("2");
185+
inOrder.verify(observer, times(1)).onNext("22");
186+
inOrder.verify(observer, times(1)).onNext("222");
187+
inOrder.verify(observer, times(1)).onNext("2222");
188+
inOrder.verify(observer, times(1)).onCompleted();
189+
inOrder.verifyNoMoreInteractions();
190+
}
191+
192+
@Test
193+
public void testAmb2() {
194+
IOException needHappenedException = new IOException(
195+
"fake exception");
196+
Observable<String> observable1 = createObservable(new String[] {},
197+
2000, new IOException("fake exception"));
198+
Observable<String> observable2 = createObservable(new String[] {
199+
"2", "22", "222", "2222" }, 1000, needHappenedException);
200+
Observable<String> observable3 = createObservable(new String[] {},
201+
3000, new IOException("fake exception"));
202+
203+
@SuppressWarnings("unchecked")
204+
Observable<String> o = Observable.create(amb(observable1,
205+
observable2, observable3));
206+
207+
@SuppressWarnings("unchecked")
208+
Observer<String> observer = (Observer<String>) mock(Observer.class);
209+
o.subscribe(observer);
210+
211+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
212+
213+
InOrder inOrder = inOrder(observer);
214+
inOrder.verify(observer, times(1)).onNext("2");
215+
inOrder.verify(observer, times(1)).onNext("22");
216+
inOrder.verify(observer, times(1)).onNext("222");
217+
inOrder.verify(observer, times(1)).onNext("2222");
218+
inOrder.verify(observer, times(1)).onError(needHappenedException);
219+
inOrder.verifyNoMoreInteractions();
220+
}
221+
222+
@Test
223+
public void testAmb3() {
224+
Observable<String> observable1 = createObservable(new String[] {
225+
"1" }, 2000, null);
226+
Observable<String> observable2 = createObservable(new String[] {},
227+
1000, null);
228+
Observable<String> observable3 = createObservable(new String[] {
229+
"3" }, 3000, null);
230+
231+
@SuppressWarnings("unchecked")
232+
Observable<String> o = Observable.create(amb(observable1,
233+
observable2, observable3));
234+
235+
@SuppressWarnings("unchecked")
236+
Observer<String> observer = (Observer<String>) mock(Observer.class);
237+
o.subscribe(observer);
238+
239+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
240+
InOrder inOrder = inOrder(observer);
241+
inOrder.verify(observer, times(1)).onCompleted();
242+
inOrder.verifyNoMoreInteractions();
243+
}
244+
}
245+
}

0 commit comments

Comments
 (0)