Skip to content

Commit 9d48f99

Browse files
Refactoring for consistent implementation approach.
Combined ObservableExtensions into Observable to match how Rx works (Observable.* methods)
1 parent 2a4122c commit 9d48f99

31 files changed

+2384
-2336
lines changed

rxjava-core/src/main/java/org/rx/operations/AtomicWatchableSubscription.java renamed to rxjava-core/src/main/java/org/rx/operations/AtomicObservableSubscription.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@
1111
* Thread-safe wrapper around ObservableSubscription that ensures unsubscribe can be called only once.
1212
*/
1313
@ThreadSafe
14-
/* package */class AtomicObservableSubscription implements Subscription {
14+
/* package */final class AtomicObservableSubscription implements Subscription {
1515

1616
private AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
1717
private AtomicBoolean unsubscribed = new AtomicBoolean(false);

rxjava-core/src/main/java/org/rx/operations/AtomicWatcher.java renamed to rxjava-core/src/main/java/org/rx/operations/AtomicObserver.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@
2929
* @param <T>
3030
*/
3131
@ThreadSafe
32-
/* package */class AtomicObserver<T> implements Observer<T> {
32+
/* package */final class AtomicObserver<T> implements Observer<T> {
3333

3434
/** Allow changing between forcing single or allowing multi-threaded execution of onNext */
3535
private static boolean allowMultiThreaded = true;

rxjava-core/src/main/java/org/rx/operations/AtomicWatcherMultiThreaded.java renamed to rxjava-core/src/main/java/org/rx/operations/AtomicObserverMultiThreaded.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@
3737
* @param <T>
3838
*/
3939
@ThreadSafe
40-
/* package */class AtomicObserverMultiThreaded<T> implements Observer<T> {
40+
/* package */final class AtomicObserverMultiThreaded<T> implements Observer<T> {
4141

4242
private final Observer<T> Observer;
4343
private final AtomicObservableSubscription subscription;

rxjava-core/src/main/java/org/rx/operations/AtomicWatcherSingleThreaded.java renamed to rxjava-core/src/main/java/org/rx/operations/AtomicObserverSingleThreaded.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@
3434
* @param <T>
3535
*/
3636
@ThreadSafe
37-
/* package */class AtomicObserverSingleThreaded<T> implements Observer<T> {
37+
/* package */final class AtomicObserverSingleThreaded<T> implements Observer<T> {
3838

3939
/**
4040
* Intrinsic synchronized locking with double-check short-circuiting was chosen after testing several other implementations.

rxjava-core/src/main/java/org/rx/operations/ObservableExtensions.java

Lines changed: 0 additions & 1459 deletions
This file was deleted.

rxjava-core/src/main/java/org/rx/operations/OperationCombineLatest.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import org.rx.reactive.Observer;
2323
import org.rx.reactive.Subscription;
2424

25-
class OperationCombineLatest {
25+
public class OperationCombineLatest {
2626

2727
public static <R, T0, T1> Observable<R> combineLatest(Observable<T0> w0, Observable<T1> w1, Func2<R, T0, T1> combineLatestFunction) {
2828
Aggregator<R> a = new Aggregator<R>(Functions.fromFunc(combineLatestFunction));
@@ -652,7 +652,7 @@ public void testCombineLatest2Types() {
652652
/* define a Observer to receive aggregated events */
653653
Observer<String> aObserver = mock(Observer.class);
654654

655-
Observable<String> w = combineLatest(ObservableExtensions.toObservable("one", "two"), ObservableExtensions.toObservable(2, 3, 4), combineLatestFunction);
655+
Observable<String> w = combineLatest(Observable.toObservable("one", "two"), Observable.toObservable(2, 3, 4), combineLatestFunction);
656656
w.subscribe(aObserver);
657657

658658
verify(aObserver, never()).onError(any(Exception.class));
@@ -671,7 +671,7 @@ public void testCombineLatest3TypesA() {
671671
/* define a Observer to receive aggregated events */
672672
Observer<String> aObserver = mock(Observer.class);
673673

674-
Observable<String> w = combineLatest(ObservableExtensions.toObservable("one", "two"), ObservableExtensions.toObservable(2), ObservableExtensions.toObservable(new int[] { 4, 5, 6 }), combineLatestFunction);
674+
Observable<String> w = combineLatest(Observable.toObservable("one", "two"), Observable.toObservable(2), Observable.toObservable(new int[] { 4, 5, 6 }), combineLatestFunction);
675675
w.subscribe(aObserver);
676676

677677
verify(aObserver, never()).onError(any(Exception.class));
@@ -688,7 +688,7 @@ public void testCombineLatest3TypesB() {
688688
/* define a Observer to receive aggregated events */
689689
Observer<String> aObserver = mock(Observer.class);
690690

691-
Observable<String> w = combineLatest(ObservableExtensions.toObservable("one"), ObservableExtensions.toObservable(2), ObservableExtensions.toObservable(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
691+
Observable<String> w = combineLatest(Observable.toObservable("one"), Observable.toObservable(2), Observable.toObservable(new int[] { 4, 5, 6 }, new int[] { 7, 8 }), combineLatestFunction);
692692
w.subscribe(aObserver);
693693

694694
verify(aObserver, never()).onError(any(Exception.class));
@@ -781,7 +781,7 @@ private static class TestObservable extends Observable<String> {
781781
public Subscription subscribe(Observer<String> Observer) {
782782
// just store the variable where it can be accessed so we can manually trigger it
783783
this.Observer = Observer;
784-
return ObservableExtensions.noOpSubscription();
784+
return Observable.noOpSubscription();
785785
}
786786

787787
}

rxjava-core/src/main/java/org/rx/operations/OperationFilter.java

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,63 @@
44
import static org.mockito.Mockito.*;
55

66
import org.junit.Test;
7+
import org.mockito.Mockito;
78
import org.rx.functions.Func1;
89
import org.rx.reactive.Observable;
910
import org.rx.reactive.Observer;
1011
import org.rx.reactive.Subscription;
1112

12-
/* package */final class OperationFilter<T> extends Observable<T> {
13-
private final Observable<T> that;
14-
private final Func1<Boolean, T> predicate;
13+
public final class OperationFilter<T> {
1514

16-
OperationFilter(Observable<T> that, Func1<Boolean, T> predicate) {
17-
this.that = that;
18-
this.predicate = predicate;
15+
public static <T> Observable<T> filter(Observable<T> that, Func1<Boolean, T> predicate) {
16+
return new Filter<T>(that, predicate);
1917
}
2018

21-
public Subscription subscribe(Observer<T> Observer) {
22-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
23-
final Observer<T> observer = new AtomicObserver<T>(Observer, subscription);
19+
private static class Filter<T> extends Observable<T> {
2420

25-
subscription.setActual(that.subscribe(new Observer<T>() {
26-
public void onNext(T value) {
27-
try {
28-
if ((boolean) predicate.call(value)) {
29-
observer.onNext(value);
21+
private final Observable<T> that;
22+
private final Func1<Boolean, T> predicate;
23+
24+
public Filter(Observable<T> that, Func1<Boolean, T> predicate) {
25+
this.that = that;
26+
this.predicate = predicate;
27+
}
28+
29+
public Subscription subscribe(Observer<T> Observer) {
30+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
31+
final Observer<T> observer = new AtomicObserver<T>(Observer, subscription);
32+
33+
subscription.setActual(that.subscribe(new Observer<T>() {
34+
public void onNext(T value) {
35+
try {
36+
if ((boolean) predicate.call(value)) {
37+
observer.onNext(value);
38+
}
39+
} catch (Exception ex) {
40+
observer.onError(ex);
41+
subscription.unsubscribe();
3042
}
31-
} catch (Exception ex) {
32-
observer.onError(ex);
33-
subscription.unsubscribe();
3443
}
35-
}
3644

37-
public void onError(Exception ex) {
38-
observer.onError(ex);
39-
}
45+
public void onError(Exception ex) {
46+
observer.onError(ex);
47+
}
4048

41-
public void onCompleted() {
42-
observer.onCompleted();
43-
}
44-
}));
49+
public void onCompleted() {
50+
observer.onCompleted();
51+
}
52+
}));
4553

46-
return subscription;
54+
return subscription;
55+
}
4756
}
4857

4958
public static class UnitTest {
5059

5160
@Test
5261
public void testFilter() {
53-
Observable<String> w = ObservableExtensions.toObservable("one", "two", "three");
54-
Observable<String> Observable = new OperationFilter<String>(w, new Func1<Boolean, String>() {
62+
Observable<String> w = Observable.toObservable("one", "two", "three");
63+
Observable<String> Observable = filter(w, new Func1<Boolean, String>() {
5564

5665
@Override
5766
public Boolean call(String t1) {
@@ -65,10 +74,10 @@ public Boolean call(String t1) {
6574
@SuppressWarnings("unchecked")
6675
Observer<String> aObserver = mock(Observer.class);
6776
Observable.subscribe(aObserver);
68-
verify(aObserver, never()).onNext("one");
77+
verify(aObserver, Mockito.never()).onNext("one");
6978
verify(aObserver, times(1)).onNext("two");
70-
verify(aObserver, never()).onNext("three");
71-
verify(aObserver, never()).onError(any(Exception.class));
79+
verify(aObserver, Mockito.never()).onNext("three");
80+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
7281
verify(aObserver, times(1)).onCompleted();
7382
}
7483
}

rxjava-core/src/main/java/org/rx/operations/OperationLast.java

Lines changed: 38 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
import java.util.concurrent.atomic.AtomicReference;
88

99
import org.junit.Test;
10+
import org.mockito.Mockito;
1011
import org.rx.reactive.Observable;
1112
import org.rx.reactive.Observer;
1213
import org.rx.reactive.Subscription;
@@ -16,54 +17,62 @@
1617
*
1718
* @param <T>
1819
*/
19-
/* package */final class OperationLast<T> extends Observable<T> {
20-
private final AtomicReference<T> lastValue = new AtomicReference<T>();
21-
private final Observable<T> that;
22-
private final AtomicBoolean onNextCalled = new AtomicBoolean(false);
20+
public final class OperationLast<T> {
2321

24-
OperationLast(Observable<T> that) {
25-
this.that = that;
22+
public static <T> Observable<T> last(Observable<T> observable) {
23+
return new Last<T>(observable);
2624
}
2725

28-
public Subscription subscribe(final Observer<T> Observer) {
29-
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
30-
final Observer<T> observer = new AtomicObserver<T>(Observer, subscription);
26+
private static class Last<T> extends Observable<T> {
3127

32-
subscription.setActual(that.subscribe(new Observer<T>() {
33-
public void onNext(T value) {
34-
onNextCalled.set(true);
35-
lastValue.set(value);
36-
}
28+
private final AtomicReference<T> lastValue = new AtomicReference<T>();
29+
private final Observable<T> that;
30+
private final AtomicBoolean onNextCalled = new AtomicBoolean(false);
3731

38-
public void onError(Exception ex) {
39-
observer.onError(ex);
40-
}
32+
public Last(Observable<T> that) {
33+
this.that = that;
34+
}
35+
36+
public Subscription subscribe(final Observer<T> Observer) {
37+
final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
38+
final Observer<T> observer = new AtomicObserver<T>(Observer, subscription);
39+
40+
subscription.setActual(that.subscribe(new Observer<T>() {
41+
public void onNext(T value) {
42+
onNextCalled.set(true);
43+
lastValue.set(value);
44+
}
4145

42-
public void onCompleted() {
43-
if (onNextCalled.get()) {
44-
observer.onNext(lastValue.get());
46+
public void onError(Exception ex) {
47+
observer.onError(ex);
4548
}
46-
observer.onCompleted();
47-
}
48-
}));
4949

50-
return subscription;
50+
public void onCompleted() {
51+
if (onNextCalled.get()) {
52+
observer.onNext(lastValue.get());
53+
}
54+
observer.onCompleted();
55+
}
56+
}));
57+
58+
return subscription;
59+
}
5160
}
5261

5362
public static class UnitTest {
5463

5564
@Test
5665
public void testLast() {
57-
Observable<String> w = ObservableExtensions.toObservable("one", "two", "three");
58-
Observable<String> Observable = new OperationLast<String>(w);
66+
Observable<String> w = Observable.toObservable("one", "two", "three");
67+
Observable<String> Observable = last(w);
5968

6069
@SuppressWarnings("unchecked")
6170
Observer<String> aObserver = mock(Observer.class);
6271
Observable.subscribe(aObserver);
63-
verify(aObserver, never()).onNext("one");
64-
verify(aObserver, never()).onNext("two");
72+
verify(aObserver, Mockito.never()).onNext("one");
73+
verify(aObserver, Mockito.never()).onNext("two");
6574
verify(aObserver, times(1)).onNext("three");
66-
verify(aObserver, never()).onError(any(Exception.class));
75+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
6776
verify(aObserver, times(1)).onCompleted();
6877
}
6978
}

rxjava-core/src/main/java/org/rx/operations/OperationMap.java

Lines changed: 8 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
import org.rx.reactive.Observer;
1616
import org.rx.reactive.Subscription;
1717

18-
/* package */class OperationMap {
18+
public final class OperationMap {
1919

2020
/**
2121
* Accepts a sequence and a transformation function. Returns a sequence that is the result of
@@ -130,7 +130,7 @@ public void testMap() {
130130
Map<String, String> m1 = getMap("One");
131131
Map<String, String> m2 = getMap("Two");
132132
@SuppressWarnings("unchecked")
133-
Observable<Map<String, String>> observable = ObservableExtensions.toObservable(m1, m2);
133+
Observable<Map<String, String>> observable = Observable.toObservable(m1, m2);
134134

135135
Observable<String> m = map(observable, new Func1<String, Map<String, String>>() {
136136

@@ -152,7 +152,7 @@ public String call(Map<String, String> map) {
152152
@Test
153153
public void testMapMany() {
154154
/* simulate a top-level async call which returns IDs */
155-
Observable<Integer> ids = ObservableExtensions.toObservable(1, 2);
155+
Observable<Integer> ids = Observable.toObservable(1, 2);
156156

157157
/* now simulate the behavior to take those IDs and perform nested async calls based on them */
158158
Observable<String> m = mapMany(ids, new Func1<Observable<String>, Integer>() {
@@ -165,11 +165,11 @@ public Observable<String> call(Integer id) {
165165
if (id == 1) {
166166
Map<String, String> m1 = getMap("One");
167167
Map<String, String> m2 = getMap("Two");
168-
subObservable = ObservableExtensions.toObservable(m1, m2);
168+
subObservable = Observable.toObservable(m1, m2);
169169
} else {
170170
Map<String, String> m3 = getMap("Three");
171171
Map<String, String> m4 = getMap("Four");
172-
subObservable = ObservableExtensions.toObservable(m3, m4);
172+
subObservable = Observable.toObservable(m3, m4);
173173
}
174174

175175
/* simulate kicking off the async call and performing a select on it to transform the data */
@@ -197,15 +197,15 @@ public void testMapMany2() {
197197
Map<String, String> m1 = getMap("One");
198198
Map<String, String> m2 = getMap("Two");
199199
@SuppressWarnings("unchecked")
200-
Observable<Map<String, String>> observable1 = ObservableExtensions.toObservable(m1, m2);
200+
Observable<Map<String, String>> observable1 = Observable.toObservable(m1, m2);
201201

202202
Map<String, String> m3 = getMap("Three");
203203
Map<String, String> m4 = getMap("Four");
204204
@SuppressWarnings("unchecked")
205-
Observable<Map<String, String>> observable2 = ObservableExtensions.toObservable(m3, m4);
205+
Observable<Map<String, String>> observable2 = Observable.toObservable(m3, m4);
206206

207207
@SuppressWarnings("unchecked")
208-
Observable<Observable<Map<String, String>>> observable = ObservableExtensions.toObservable(observable1, observable2);
208+
Observable<Observable<Map<String, String>>> observable = Observable.toObservable(observable1, observable2);
209209

210210
Observable<String> m = mapMany(observable, new Func1<Observable<String>, Observable<Map<String, String>>>() {
211211

rxjava-core/src/main/java/org/rx/operations/OperationMaterialize.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
* <p>
1919
* See http://msdn.microsoft.com/en-us/library/hh229453(v=VS.103).aspx for the Microsoft Rx equivalent.
2020
*/
21-
public class OperationMaterialize {
21+
public final class OperationMaterialize {
2222

2323
/**
2424
* Materializes the implicit notifications of an observable sequence as explicit notification values.

rxjava-core/src/main/java/org/rx/operations/OperationMerge.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
import org.rx.reactive.Observer;
1818
import org.rx.reactive.Subscription;
1919

20-
/* package */class OperationMerge {
20+
public final class OperationMerge {
2121

2222
/**
2323
* Flattens the observable sequences from the list of Observables into one observable sequence without any transformation.

rxjava-core/src/main/java/org/rx/operations/OperationMergeDelayError.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@
2727
* <p>
2828
* NOTE: If this is used on an infinite stream it will never call onError and effectively will swallow errors.
2929
*/
30-
/* package */class OperationMergeDelayError {
30+
public final class OperationMergeDelayError {
3131

3232
/**
3333
* Flattens the observable sequences from the list of Observables into one observable sequence without any transformation and delays any onError calls until after all sequences have called

0 commit comments

Comments
 (0)