Skip to content

Commit 63cef64

Browse files
Small reorganization of code for OperationTake and TrustedObservableTester
- removed rx.testing package (if that's going to exist that means it's bleeding into something that should live in /src/test and beyond what works well for inner class testing) - made TrustedObservableTester part of rx.operation package and an inner UnitTest class so it doesn't become part of the public API
1 parent a5c28cf commit 63cef64

File tree

2 files changed

+65
-65
lines changed

2 files changed

+65
-65
lines changed

rxjava-core/src/main/java/rx/testing/TrustedObservableTester.java renamed to rxjava-core/src/main/java/rx/operators/AbstractOperation.java

Lines changed: 56 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -1,75 +1,79 @@
1-
package rx.testing;
1+
package rx.operators;
22

3-
import org.junit.Test;
4-
import rx.Observable;
5-
import rx.Observer;
6-
import rx.Subscription;
7-
import rx.subscriptions.Subscriptions;
8-
import rx.util.functions.Func1;
3+
import static org.junit.Assert.*;
94

105
import java.lang.Thread.UncaughtExceptionHandler;
116
import java.util.ArrayList;
127
import java.util.List;
138
import java.util.concurrent.atomic.AtomicBoolean;
149
import java.util.concurrent.atomic.AtomicReference;
1510

16-
import static org.junit.Assert.assertEquals;
17-
import static org.junit.Assert.assertFalse;
18-
import static org.junit.Assert.assertNotNull;
11+
import org.junit.Test;
1912

20-
public class TrustedObservableTester
13+
import rx.Observable;
14+
import rx.Observer;
15+
import rx.Subscription;
16+
import rx.subscriptions.Subscriptions;
17+
import rx.util.functions.Func1;
18+
19+
/**
20+
* Common utility functions for operator implementations and tests.
21+
*/
22+
/* package */class AbstractOperation
2123
{
22-
private TrustedObservableTester() {}
24+
private AbstractOperation() {
25+
}
2326

24-
public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
25-
{
26-
return new Func1<Observer<T>, Subscription>()
27+
public static class UnitTest {
28+
29+
public static <T> Func1<Observer<T>, Subscription> assertTrustedObservable(final Func1<Observer<T>, Subscription> source)
2730
{
28-
@Override
29-
public Subscription call(Observer<T> observer)
31+
return new Func1<Observer<T>, Subscription>()
3032
{
31-
return source.call(new TestingObserver<T>(observer));
32-
}
33-
};
34-
}
33+
@Override
34+
public Subscription call(Observer<T> observer)
35+
{
36+
return source.call(new TestingObserver<T>(observer));
37+
}
38+
};
39+
}
3540

36-
public static class TestingObserver<T> implements Observer<T> {
41+
public static class TestingObserver<T> implements Observer<T> {
3742

38-
private final Observer<T> actual;
39-
private final AtomicBoolean isFinished = new AtomicBoolean(false);
40-
private final AtomicBoolean isInCallback = new AtomicBoolean(false);
43+
private final Observer<T> actual;
44+
private final AtomicBoolean isFinished = new AtomicBoolean(false);
45+
private final AtomicBoolean isInCallback = new AtomicBoolean(false);
4146

42-
public TestingObserver(Observer<T> actual) {
43-
this.actual = actual;
44-
}
47+
public TestingObserver(Observer<T> actual) {
48+
this.actual = actual;
49+
}
4550

46-
@Override
47-
public void onCompleted() {
48-
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
49-
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
50-
actual.onCompleted();
51-
isInCallback.set(false);
52-
}
51+
@Override
52+
public void onCompleted() {
53+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
54+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
55+
actual.onCompleted();
56+
isInCallback.set(false);
57+
}
5358

54-
@Override
55-
public void onError(Exception e) {
56-
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
57-
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
58-
actual.onError(e);
59-
isInCallback.set(false);
60-
}
59+
@Override
60+
public void onError(Exception e) {
61+
assertFalse("previous call to onCompleted() or onError()", !isFinished.compareAndSet(false, true));
62+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
63+
actual.onError(e);
64+
isInCallback.set(false);
65+
}
6166

62-
@Override
63-
public void onNext(T args) {
64-
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
65-
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
66-
actual.onNext(args);
67-
isInCallback.set(false);
68-
}
67+
@Override
68+
public void onNext(T args) {
69+
assertFalse("previous call to onCompleted() or onError()", isFinished.get());
70+
assertFalse("concurrent callback pending", !isInCallback.compareAndSet(false, true));
71+
actual.onNext(args);
72+
isInCallback.set(false);
73+
}
6974

70-
}
75+
}
7176

72-
public static class UnitTest {
7377
@Test(expected = AssertionError.class)
7478
public void testDoubleCompleted() {
7579
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
@@ -141,7 +145,6 @@ public Subscription call(Observer<String> observer)
141145
})).lastOrDefault("end");
142146
}
143147

144-
145148
@Test(expected = AssertionError.class)
146149
public void testErrorNext() {
147150
Observable.create(assertTrustedObservable(new Func1<Observer<String>, Subscription>()
@@ -250,4 +253,4 @@ public void onNext(String args)
250253
}
251254
}
252255
}
253-
}
256+
}

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,23 @@
1515
*/
1616
package rx.operators;
1717

18+
import static org.junit.Assert.*;
19+
import static org.mockito.Matchers.*;
20+
import static org.mockito.Mockito.*;
21+
import static rx.operators.AbstractOperation.UnitTest.*;
22+
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
1826
import org.junit.Test;
27+
1928
import rx.Observable;
2029
import rx.Observer;
2130
import rx.Subscription;
2231
import rx.subscriptions.Subscriptions;
2332
import rx.util.AtomicObservableSubscription;
2433
import rx.util.functions.Func1;
2534

26-
import java.util.concurrent.atomic.AtomicBoolean;
27-
import java.util.concurrent.atomic.AtomicInteger;
28-
29-
import static org.junit.Assert.assertTrue;
30-
import static org.junit.Assert.fail;
31-
import static org.mockito.Matchers.any;
32-
import static org.mockito.Mockito.mock;
33-
import static org.mockito.Mockito.never;
34-
import static org.mockito.Mockito.times;
35-
import static org.mockito.Mockito.verify;
36-
import static rx.testing.TrustedObservableTester.assertTrustedObservable;
37-
3835
/**
3936
* Returns a specified number of contiguous values from the start of an observable sequence.
4037
*/

0 commit comments

Comments
 (0)