Skip to content

Commit f2c53cc

Browse files
Merge pull request #245 from mairbek/multisubscribe
Moved state from Observable to Observer
2 parents fdfc935 + 0c4f59a commit f2c53cc

File tree

4 files changed

+64
-45
lines changed

4 files changed

+64
-45
lines changed

rxjava-core/src/main/java/rx/operators/OperationAll.java

Lines changed: 35 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,6 @@ private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscr
2323
private final Observable<T> sequence;
2424
private final Func1<T, Boolean> predicate;
2525

26-
private final AtomicBoolean status = new AtomicBoolean(true);
2726
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
2827

2928

@@ -35,33 +34,45 @@ private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
3534

3635
@Override
3736
public Subscription call(final Observer<Boolean> observer) {
38-
return subscription.wrap(sequence.subscribe(new Observer<T>() {
39-
@Override
40-
public void onCompleted() {
41-
if (status.get()) {
42-
observer.onNext(true);
43-
observer.onCompleted();
44-
}
45-
}
37+
return subscription.wrap(sequence.subscribe(new AllObserver(observer)));
4638

47-
@Override
48-
public void onError(Exception e) {
49-
observer.onError(e);
50-
}
39+
}
5140

52-
@Override
53-
public void onNext(T args) {
54-
boolean result = predicate.call(args);
55-
boolean changed = status.compareAndSet(true, result);
56-
57-
if (changed && !result) {
58-
observer.onNext(false);
59-
observer.onCompleted();
60-
subscription.unsubscribe();
61-
}
41+
private class AllObserver implements Observer<T> {
42+
private final Observer<Boolean> underlying;
43+
44+
private final AtomicBoolean status = new AtomicBoolean(true);
45+
46+
public AllObserver(Observer<Boolean> underlying) {
47+
this.underlying = underlying;
48+
}
49+
50+
@Override
51+
public void onCompleted() {
52+
if (status.get()) {
53+
underlying.onNext(true);
54+
underlying.onCompleted();
6255
}
63-
}));
56+
}
57+
58+
@Override
59+
public void onError(Exception e) {
60+
underlying.onError(e);
61+
}
62+
63+
@Override
64+
public void onNext(T args) {
65+
boolean result = predicate.call(args);
66+
boolean changed = status.compareAndSet(true, result);
67+
68+
if (changed && !result) {
69+
underlying.onNext(false);
70+
underlying.onCompleted();
71+
subscription.unsubscribe();
72+
}
73+
}
6474
}
75+
6576
}
6677

6778
public static class UnitTest {

rxjava-core/src/main/java/rx/operators/OperationGroupBy.java

Lines changed: 25 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -56,37 +56,43 @@ public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> grou
5656

5757
private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K, V>>, Subscription> {
5858
private final Observable<KeyValue<K, V>> source;
59-
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();
6059

6160
private GroupBy(Observable<KeyValue<K, V>> source) {
6261
this.source = source;
6362
}
6463

6564
@Override
6665
public Subscription call(final Observer<GroupedObservable<K, V>> observer) {
66+
return source.subscribe(new GroupByObserver(observer));
67+
}
6768

68-
return source.subscribe(new Observer<KeyValue<K, V>>() {
69+
private class GroupByObserver implements Observer<KeyValue<K, V>> {
70+
private final Observer<GroupedObservable<K, V>> underlying;
6971

70-
@Override
71-
public void onCompleted() {
72-
observer.onCompleted();
73-
}
72+
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();
7473

75-
@Override
76-
public void onError(Exception e) {
77-
observer.onError(e);
78-
}
74+
private GroupByObserver(Observer<GroupedObservable<K, V>> underlying) {
75+
this.underlying = underlying;
76+
}
7977

80-
@Override
81-
public void onNext(final KeyValue<K, V> args) {
82-
K key = args.key;
83-
boolean newGroup = keys.putIfAbsent(key, true) == null;
84-
if (newGroup) {
85-
observer.onNext(buildObservableFor(source, key));
86-
}
87-
}
78+
@Override
79+
public void onCompleted() {
80+
underlying.onCompleted();
81+
}
82+
83+
@Override
84+
public void onError(Exception e) {
85+
underlying.onError(e);
86+
}
8887

89-
});
88+
@Override
89+
public void onNext(final KeyValue<K, V> args) {
90+
K key = args.key;
91+
boolean newGroup = keys.putIfAbsent(key, true) == null;
92+
if (newGroup) {
93+
underlying.onNext(buildObservableFor(source, key));
94+
}
95+
}
9096
}
9197
}
9298

rxjava-core/src/main/java/rx/operators/OperationTake.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public Subscription call(Observer<T> observer) {
6868
* @param <T>
6969
*/
7070
private static class Take<T> implements Func1<Observer<T>, Subscription> {
71-
private final AtomicInteger counter = new AtomicInteger();
7271
private final Observable<T> items;
7372
private final int num;
7473
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
@@ -108,6 +107,8 @@ public void onNext(T args)
108107
private class ItemObserver implements Observer<T> {
109108
private final Observer<T> observer;
110109

110+
private final AtomicInteger counter = new AtomicInteger();
111+
111112
public ItemObserver(Observer<T> observer) {
112113
this.observer = observer;
113114
}

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,6 @@ public Boolean call(T input, Integer index) {
9191
* @param <T>
9292
*/
9393
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
94-
private final AtomicInteger counter = new AtomicInteger();
9594
private final Observable<T> items;
9695
private final Func2<T, Integer, Boolean> predicate;
9796
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
@@ -109,6 +108,8 @@ public Subscription call(Observer<T> observer) {
109108
private class ItemObserver implements Observer<T> {
110109
private final Observer<T> observer;
111110

111+
private final AtomicInteger counter = new AtomicInteger();
112+
112113
public ItemObserver(Observer<T> observer) {
113114
// Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior
114115
// needs "isFinished" logic to not send duplicated events

0 commit comments

Comments
 (0)