Skip to content

Commit 23a8865

Browse files
Separating unit tests out due to ReactiveX#466
1 parent ff21cf1 commit 23a8865

File tree

2 files changed

+157
-132
lines changed

2 files changed

+157
-132
lines changed

rxjava-core/src/main/java/rx/operators/OperationAmb.java

Lines changed: 0 additions & 132 deletions
Original file line numberDiff line numberDiff line change
@@ -15,27 +15,15 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Mockito.inOrder;
19-
import static org.mockito.Mockito.mock;
20-
import static org.mockito.Mockito.times;
21-
22-
import java.io.IOException;
2318
import java.util.ArrayList;
2419
import java.util.List;
25-
import java.util.concurrent.TimeUnit;
2620
import java.util.concurrent.atomic.AtomicInteger;
2721

28-
import org.junit.Before;
29-
import org.junit.Test;
30-
import org.mockito.InOrder;
31-
3222
import rx.Observable;
3323
import rx.Observable.OnSubscribeFunc;
3424
import rx.Observer;
3525
import rx.Subscription;
36-
import rx.concurrency.TestScheduler;
3726
import rx.subscriptions.CompositeSubscription;
38-
import rx.util.functions.Action0;
3927

4028
/**
4129
* Propagates the observable sequence that reacts first.
@@ -200,124 +188,4 @@ private boolean isSelected() {
200188
}
201189
}
202190

203-
public static class UnitTest {
204-
private TestScheduler scheduler;
205-
206-
@Before
207-
public void setUp() {
208-
scheduler = new TestScheduler();
209-
}
210-
211-
private Observable<String> createObservable(final String[] values,
212-
final long interval, final Throwable e) {
213-
return Observable.create(new OnSubscribeFunc<String>() {
214-
215-
@Override
216-
public Subscription onSubscribe(
217-
final Observer<? super String> observer) {
218-
CompositeSubscription parentSubscription = new CompositeSubscription();
219-
long delay = interval;
220-
for (final String value : values) {
221-
parentSubscription.add(scheduler.schedule(
222-
new Action0() {
223-
@Override
224-
public void call() {
225-
observer.onNext(value);
226-
}
227-
}, delay, TimeUnit.MILLISECONDS));
228-
delay += interval;
229-
}
230-
parentSubscription.add(scheduler.schedule(new Action0() {
231-
@Override
232-
public void call() {
233-
if (e == null) {
234-
observer.onCompleted();
235-
} else {
236-
observer.onError(e);
237-
}
238-
}
239-
}, delay, TimeUnit.MILLISECONDS));
240-
return parentSubscription;
241-
}
242-
});
243-
}
244-
245-
@Test
246-
public void testAmb() {
247-
Observable<String> observable1 = createObservable(new String[] {
248-
"1", "11", "111", "1111" }, 2000, null);
249-
Observable<String> observable2 = createObservable(new String[] {
250-
"2", "22", "222", "2222" }, 1000, null);
251-
Observable<String> observable3 = createObservable(new String[] {
252-
"3", "33", "333", "3333" }, 3000, null);
253-
254-
Observable<String> o = Observable.create(amb(observable1,
255-
observable2, observable3));
256-
257-
@SuppressWarnings("unchecked")
258-
Observer<String> observer = (Observer<String>) mock(Observer.class);
259-
o.subscribe(observer);
260-
261-
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
262-
263-
InOrder inOrder = inOrder(observer);
264-
inOrder.verify(observer, times(1)).onNext("2");
265-
inOrder.verify(observer, times(1)).onNext("22");
266-
inOrder.verify(observer, times(1)).onNext("222");
267-
inOrder.verify(observer, times(1)).onNext("2222");
268-
inOrder.verify(observer, times(1)).onCompleted();
269-
inOrder.verifyNoMoreInteractions();
270-
}
271-
272-
@Test
273-
public void testAmb2() {
274-
IOException needHappenedException = new IOException(
275-
"fake exception");
276-
Observable<String> observable1 = createObservable(new String[] {},
277-
2000, new IOException("fake exception"));
278-
Observable<String> observable2 = createObservable(new String[] {
279-
"2", "22", "222", "2222" }, 1000, needHappenedException);
280-
Observable<String> observable3 = createObservable(new String[] {},
281-
3000, new IOException("fake exception"));
282-
283-
Observable<String> o = Observable.create(amb(observable1,
284-
observable2, observable3));
285-
286-
@SuppressWarnings("unchecked")
287-
Observer<String> observer = (Observer<String>) mock(Observer.class);
288-
o.subscribe(observer);
289-
290-
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
291-
292-
InOrder inOrder = inOrder(observer);
293-
inOrder.verify(observer, times(1)).onNext("2");
294-
inOrder.verify(observer, times(1)).onNext("22");
295-
inOrder.verify(observer, times(1)).onNext("222");
296-
inOrder.verify(observer, times(1)).onNext("2222");
297-
inOrder.verify(observer, times(1)).onError(needHappenedException);
298-
inOrder.verifyNoMoreInteractions();
299-
}
300-
301-
@Test
302-
public void testAmb3() {
303-
Observable<String> observable1 = createObservable(new String[] {
304-
"1" }, 2000, null);
305-
Observable<String> observable2 = createObservable(new String[] {},
306-
1000, null);
307-
Observable<String> observable3 = createObservable(new String[] {
308-
"3" }, 3000, null);
309-
310-
Observable<String> o = Observable.create(amb(observable1,
311-
observable2, observable3));
312-
313-
@SuppressWarnings("unchecked")
314-
Observer<String> observer = (Observer<String>) mock(Observer.class);
315-
o.subscribe(observer);
316-
317-
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
318-
InOrder inOrder = inOrder(observer);
319-
inOrder.verify(observer, times(1)).onCompleted();
320-
inOrder.verifyNoMoreInteractions();
321-
}
322-
}
323191
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Mockito.*;
19+
import static rx.operators.OperationAmb.*;
20+
21+
import java.io.IOException;
22+
import java.util.concurrent.TimeUnit;
23+
24+
import org.junit.Before;
25+
import org.junit.Test;
26+
import org.mockito.InOrder;
27+
28+
import rx.Observable;
29+
import rx.Observable.OnSubscribeFunc;
30+
import rx.Observer;
31+
import rx.Subscription;
32+
import rx.concurrency.TestScheduler;
33+
import rx.subscriptions.CompositeSubscription;
34+
import rx.util.functions.Action0;
35+
36+
public class OperationAmbTest {
37+
38+
private TestScheduler scheduler;
39+
40+
@Before
41+
public void setUp() {
42+
scheduler = new TestScheduler();
43+
}
44+
45+
private Observable<String> createObservable(final String[] values,
46+
final long interval, final Throwable e) {
47+
return Observable.create(new OnSubscribeFunc<String>() {
48+
49+
@Override
50+
public Subscription onSubscribe(
51+
final Observer<? super String> observer) {
52+
CompositeSubscription parentSubscription = new CompositeSubscription();
53+
long delay = interval;
54+
for (final String value : values) {
55+
parentSubscription.add(scheduler.schedule(
56+
new Action0() {
57+
@Override
58+
public void call() {
59+
observer.onNext(value);
60+
}
61+
}, delay, TimeUnit.MILLISECONDS));
62+
delay += interval;
63+
}
64+
parentSubscription.add(scheduler.schedule(new Action0() {
65+
@Override
66+
public void call() {
67+
if (e == null) {
68+
observer.onCompleted();
69+
} else {
70+
observer.onError(e);
71+
}
72+
}
73+
}, delay, TimeUnit.MILLISECONDS));
74+
return parentSubscription;
75+
}
76+
});
77+
}
78+
79+
@Test
80+
public void testAmb() {
81+
Observable<String> observable1 = createObservable(new String[] {
82+
"1", "11", "111", "1111" }, 2000, null);
83+
Observable<String> observable2 = createObservable(new String[] {
84+
"2", "22", "222", "2222" }, 1000, null);
85+
Observable<String> observable3 = createObservable(new String[] {
86+
"3", "33", "333", "3333" }, 3000, null);
87+
88+
Observable<String> o = Observable.create(amb(observable1,
89+
observable2, observable3));
90+
91+
@SuppressWarnings("unchecked")
92+
Observer<String> observer = (Observer<String>) mock(Observer.class);
93+
o.subscribe(observer);
94+
95+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
96+
97+
InOrder inOrder = inOrder(observer);
98+
inOrder.verify(observer, times(1)).onNext("2");
99+
inOrder.verify(observer, times(1)).onNext("22");
100+
inOrder.verify(observer, times(1)).onNext("222");
101+
inOrder.verify(observer, times(1)).onNext("2222");
102+
inOrder.verify(observer, times(1)).onCompleted();
103+
inOrder.verifyNoMoreInteractions();
104+
}
105+
106+
@Test
107+
public void testAmb2() {
108+
IOException needHappenedException = new IOException(
109+
"fake exception");
110+
Observable<String> observable1 = createObservable(new String[] {},
111+
2000, new IOException("fake exception"));
112+
Observable<String> observable2 = createObservable(new String[] {
113+
"2", "22", "222", "2222" }, 1000, needHappenedException);
114+
Observable<String> observable3 = createObservable(new String[] {},
115+
3000, new IOException("fake exception"));
116+
117+
Observable<String> o = Observable.create(amb(observable1,
118+
observable2, observable3));
119+
120+
@SuppressWarnings("unchecked")
121+
Observer<String> observer = (Observer<String>) mock(Observer.class);
122+
o.subscribe(observer);
123+
124+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
125+
126+
InOrder inOrder = inOrder(observer);
127+
inOrder.verify(observer, times(1)).onNext("2");
128+
inOrder.verify(observer, times(1)).onNext("22");
129+
inOrder.verify(observer, times(1)).onNext("222");
130+
inOrder.verify(observer, times(1)).onNext("2222");
131+
inOrder.verify(observer, times(1)).onError(needHappenedException);
132+
inOrder.verifyNoMoreInteractions();
133+
}
134+
135+
@Test
136+
public void testAmb3() {
137+
Observable<String> observable1 = createObservable(new String[] {
138+
"1" }, 2000, null);
139+
Observable<String> observable2 = createObservable(new String[] {},
140+
1000, null);
141+
Observable<String> observable3 = createObservable(new String[] {
142+
"3" }, 3000, null);
143+
144+
Observable<String> o = Observable.create(amb(observable1,
145+
observable2, observable3));
146+
147+
@SuppressWarnings("unchecked")
148+
Observer<String> observer = (Observer<String>) mock(Observer.class);
149+
o.subscribe(observer);
150+
151+
scheduler.advanceTimeBy(100000, TimeUnit.MILLISECONDS);
152+
InOrder inOrder = inOrder(observer);
153+
inOrder.verify(observer, times(1)).onCompleted();
154+
inOrder.verifyNoMoreInteractions();
155+
}
156+
157+
}

0 commit comments

Comments
 (0)