Skip to content

Commit a7254ce

Browse files
Merge pull request #215 from benjchristensen/pull-212-manual-merge
Manual Merge of Pull Request #212
2 parents f1c54b5 + 63cef64 commit a7254ce

File tree

4 files changed

+664
-134
lines changed

4 files changed

+664
-134
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 8 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,8 @@
4141
import rx.operators.OperationDefer;
4242
import rx.operators.OperationDematerialize;
4343
import rx.operators.OperationFilter;
44+
import rx.operators.OperationTake;
45+
import rx.operators.OperationTakeWhile;
4446
import rx.operators.OperationWhere;
4547
import rx.operators.OperationMap;
4648
import rx.operators.OperationMaterialize;
@@ -54,7 +56,6 @@
5456
import rx.operators.OperationScan;
5557
import rx.operators.OperationSkip;
5658
import rx.operators.OperationSynchronize;
57-
import rx.operators.OperationTake;
5859
import rx.operators.OperationTakeLast;
5960
import rx.operators.OperationToObservableFuture;
6061
import rx.operators.OperationToObservableIterable;
@@ -1779,7 +1780,7 @@ public static <T> Observable<T> takeLast(final Observable<T> items, final int co
17791780
* @return
17801781
*/
17811782
public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Boolean> predicate) {
1782-
return create(OperationTake.takeWhile(items, predicate));
1783+
return create(OperationTakeWhile.takeWhile(items, predicate));
17831784
}
17841785

17851786
/**
@@ -1811,16 +1812,18 @@ public Boolean call(T t) {
18111812
* @return
18121813
*/
18131814
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Func2<T, Integer, Boolean> predicate) {
1814-
return create(OperationTake.takeWhileWithIndex(items, predicate));
1815+
return create(OperationTakeWhile.takeWhileWithIndex(items, predicate));
18151816
}
18161817

18171818
public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
18181819
@SuppressWarnings("rawtypes")
18191820
final FuncN _f = Functions.from(predicate);
18201821

1821-
return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
1822+
return create(OperationTakeWhile.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>()
1823+
{
18221824
@Override
1823-
public Boolean call(T t, Integer integer) {
1825+
public Boolean call(T t, Integer integer)
1826+
{
18241827
return (Boolean) _f.call(t, integer);
18251828
}
18261829
}));
Lines changed: 256 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,256 @@
1+
package rx.operators;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.lang.Thread.UncaughtExceptionHandler;
6+
import java.util.ArrayList;
7+
import java.util.List;
8+
import java.util.concurrent.atomic.AtomicBoolean;
9+
import java.util.concurrent.atomic.AtomicReference;
10+
11+
import org.junit.Test;
12+
13+
import rx.Observable;
14+
import rx.Observer;
15+
import rx.Subscription;
16+
import rx.subscriptions.Subscriptions;
17+
import rx.util.functions.Func1;
18+
19+
/**
20+
* Common utility functions for operator implementations and tests.
21+
*/
22+
/* package */class AbstractOperation
23+
{
24+
private AbstractOperation() {
25+
}
26+
27+
public static class UnitTest {
28+
29+
public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
30+
{
31+
return new Func1<Observer<T>, Subscription>()
32+
{
33+
@Override
34+
public Subscription call(Observer<T> observer)
35+
{
36+
return source.call(new TestingObserver<T>(observer));
37+
}
38+
};
39+
}
40+
41+
public static class TestingObserver<T> implements Observer<T> {
42+
43+
private final Observer<T> actual;
44+
private final AtomicBoolean isFinished = new AtomicBoolean(false);
45+
private final AtomicBoolean isInCallback = new AtomicBoolean(false);
46+
47+
public TestingObserver(Observer<T> actual) {
48+
this.actual = actual;
49+
}
50+
51+
@Override
52+
public void onCompleted() {
53+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
54+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
55+
actual.onCompleted();
56+
isInCallback.set(false);
57+
}
58+
59+
@Override
60+
public void onError(Exception e) {
61+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
62+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
63+
actual.onError(e);
64+
isInCallback.set(false);
65+
}
66+
67+
@Override
68+
public void onNext(T args) {
69+
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
70+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
71+
actual.onNext(args);
72+
isInCallback.set(false);
73+
}
74+
75+
}
76+
77+
@Test(expected = AssertionError.class)
78+
public void testDoubleCompleted() {
79+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
80+
{
81+
@Override
82+
public Subscription call(Observer<String> observer)
83+
{
84+
observer.onCompleted();
85+
observer.onCompleted();
86+
return Subscriptions.empty();
87+
}
88+
})).lastOrDefault("end");
89+
90+
}
91+
92+
@Test(expected = AssertionError.class)
93+
public void testCompletedError() {
94+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
95+
{
96+
@Override
97+
public Subscription call(Observer<String> observer)
98+
{
99+
observer.onCompleted();
100+
observer.onError(new Exception());
101+
return Subscriptions.empty();
102+
}
103+
})).lastOrDefault("end");
104+
}
105+
106+
@Test(expected = AssertionError.class)
107+
public void testCompletedNext() {
108+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
109+
{
110+
@Override
111+
public Subscription call(Observer<String> observer)
112+
{
113+
observer.onCompleted();
114+
observer.onNext("one");
115+
return Subscriptions.empty();
116+
}
117+
})).lastOrDefault("end");
118+
}
119+
120+
@Test(expected = AssertionError.class)
121+
public void testErrorCompleted() {
122+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
123+
{
124+
@Override
125+
public Subscription call(Observer<String> observer)
126+
{
127+
observer.onError(new Exception());
128+
observer.onCompleted();
129+
return Subscriptions.empty();
130+
}
131+
})).lastOrDefault("end");
132+
}
133+
134+
@Test(expected = AssertionError.class)
135+
public void testDoubleError() {
136+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
137+
{
138+
@Override
139+
public Subscription call(Observer<String> observer)
140+
{
141+
observer.onError(new Exception());
142+
observer.onError(new Exception());
143+
return Subscriptions.empty();
144+
}
145+
})).lastOrDefault("end");
146+
}
147+
148+
@Test(expected = AssertionError.class)
149+
public void testErrorNext() {
150+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
151+
{
152+
@Override
153+
public Subscription call(Observer<String> observer)
154+
{
155+
observer.onError(new Exception());
156+
observer.onNext("one");
157+
return Subscriptions.empty();
158+
}
159+
})).lastOrDefault("end");
160+
}
161+
162+
@Test
163+
public void testNextCompleted() {
164+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
165+
{
166+
@Override
167+
public Subscription call(Observer<String> observer)
168+
{
169+
observer.onNext("one");
170+
observer.onCompleted();
171+
return Subscriptions.empty();
172+
}
173+
})).lastOrDefault("end");
174+
}
175+
176+
@Test
177+
public void testConcurrentNextNext() {
178+
final List<Thread> threads = new ArrayList<Thread>();
179+
final AtomicReference<Throwable> threadFailure = new AtomicReference<Throwable>();
180+
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
181+
{
182+
@Override
183+
public Subscription call(final Observer<String> observer)
184+
{
185+
threads.add(new Thread(new Runnable()
186+
{
187+
@Override
188+
public void run()
189+
{
190+
observer.onNext("one");
191+
}
192+
}));
193+
threads.add(new Thread(new Runnable()
194+
{
195+
@Override
196+
public void run()
197+
{
198+
observer.onNext("two");
199+
}
200+
}));
201+
return Subscriptions.empty();
202+
}
203+
})).subscribe(new SlowObserver());
204+
for (Thread thread : threads) {
205+
thread.setUncaughtExceptionHandler(new UncaughtExceptionHandler()
206+
{
207+
@Override
208+
public void uncaughtException(Thread thread, Throwable throwable)
209+
{
210+
threadFailure.set(throwable);
211+
}
212+
});
213+
thread.start();
214+
}
215+
for (Thread thread : threads) {
216+
try {
217+
thread.join();
218+
} catch (InterruptedException ignored) {
219+
}
220+
}
221+
// Junit seems pretty bad about exposing test failures inside of created threads.
222+
assertNotNull("exception thrown by thread", threadFailure.get());
223+
assertEquals("class of exception thrown by thread", AssertionError.class, threadFailure.get().getClass());
224+
}
225+
226+
private static class SlowObserver implements Observer<String>
227+
{
228+
@Override
229+
public void onCompleted()
230+
{
231+
try {
232+
Thread.sleep(10);
233+
} catch (InterruptedException ignored) {
234+
}
235+
}
236+
237+
@Override
238+
public void onError(Exception e)
239+
{
240+
try {
241+
Thread.sleep(10);
242+
} catch (InterruptedException ignored) {
243+
}
244+
}
245+
246+
@Override
247+
public void onNext(String args)
248+
{
249+
try {
250+
Thread.sleep(10);
251+
} catch (InterruptedException ignored) {
252+
}
253+
}
254+
}
255+
}
256+
}

0 commit comments

Comments
 (0)