diff --git a/rxjava-core/src/main/java/rx/operators/OperationAll.java b/rxjava-core/src/main/java/rx/operators/OperationAll.java index 4ce29069a2..db603dc853 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationAll.java +++ b/rxjava-core/src/main/java/rx/operators/OperationAll.java @@ -23,7 +23,6 @@ private static class AllObservable implements Func1, Subscr private final Observable sequence; private final Func1 predicate; - private final AtomicBoolean status = new AtomicBoolean(true); private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); @@ -35,33 +34,45 @@ private AllObservable(Observable sequence, Func1 predicate) { @Override public Subscription call(final Observer observer) { - return subscription.wrap(sequence.subscribe(new Observer() { - @Override - public void onCompleted() { - if (status.get()) { - observer.onNext(true); - observer.onCompleted(); - } - } + return subscription.wrap(sequence.subscribe(new AllObserver(observer))); - @Override - public void onError(Exception e) { - observer.onError(e); - } + } - @Override - public void onNext(T args) { - boolean result = predicate.call(args); - boolean changed = status.compareAndSet(true, result); - - if (changed && !result) { - observer.onNext(false); - observer.onCompleted(); - subscription.unsubscribe(); - } + private class AllObserver implements Observer { + private final Observer underlying; + + private final AtomicBoolean status = new AtomicBoolean(true); + + public AllObserver(Observer underlying) { + this.underlying = underlying; + } + + @Override + public void onCompleted() { + if (status.get()) { + underlying.onNext(true); + underlying.onCompleted(); } - })); + } + + @Override + public void onError(Exception e) { + underlying.onError(e); + } + + @Override + public void onNext(T args) { + boolean result = predicate.call(args); + boolean changed = status.compareAndSet(true, result); + + if (changed && !result) { + underlying.onNext(false); + underlying.onCompleted(); + subscription.unsubscribe(); + } + } } + } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java index f3a0b079c0..35123d704e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java +++ b/rxjava-core/src/main/java/rx/operators/OperationGroupBy.java @@ -56,7 +56,6 @@ public static Func1>, Subscription> grou private static class GroupBy implements Func1>, Subscription> { private final Observable> source; - private final ConcurrentHashMap keys = new ConcurrentHashMap(); private GroupBy(Observable> source) { this.source = source; @@ -64,29 +63,36 @@ private GroupBy(Observable> source) { @Override public Subscription call(final Observer> observer) { + return source.subscribe(new GroupByObserver(observer)); + } - return source.subscribe(new Observer>() { + private class GroupByObserver implements Observer> { + private final Observer> underlying; - @Override - public void onCompleted() { - observer.onCompleted(); - } + private final ConcurrentHashMap keys = new ConcurrentHashMap(); - @Override - public void onError(Exception e) { - observer.onError(e); - } + private GroupByObserver(Observer> underlying) { + this.underlying = underlying; + } - @Override - public void onNext(final KeyValue args) { - K key = args.key; - boolean newGroup = keys.putIfAbsent(key, true) == null; - if (newGroup) { - observer.onNext(buildObservableFor(source, key)); - } - } + @Override + public void onCompleted() { + underlying.onCompleted(); + } + + @Override + public void onError(Exception e) { + underlying.onError(e); + } - }); + @Override + public void onNext(final KeyValue args) { + K key = args.key; + boolean newGroup = keys.putIfAbsent(key, true) == null; + if (newGroup) { + underlying.onNext(buildObservableFor(source, key)); + } + } } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index cf99ba6408..2eebf4c44e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -68,7 +68,6 @@ public Subscription call(Observer observer) { * @param */ private static class Take implements Func1, Subscription> { - private final AtomicInteger counter = new AtomicInteger(); private final Observable items; private final int num; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); @@ -108,6 +107,8 @@ public void onNext(T args) private class ItemObserver implements Observer { private final Observer observer; + private final AtomicInteger counter = new AtomicInteger(); + public ItemObserver(Observer observer) { this.observer = observer; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index a6b1cdede2..618e507a44 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -91,7 +91,6 @@ public Boolean call(T input, Integer index) { * @param */ private static class TakeWhile implements Func1, Subscription> { - private final AtomicInteger counter = new AtomicInteger(); private final Observable items; private final Func2 predicate; private final AtomicObservableSubscription subscription = new AtomicObservableSubscription(); @@ -109,6 +108,8 @@ public Subscription call(Observer observer) { private class ItemObserver implements Observer { private final Observer observer; + private final AtomicInteger counter = new AtomicInteger(); + public ItemObserver(Observer observer) { // Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior // needs "isFinished" logic to not send duplicated events