Skip to content

Moved state from Observable to Observer #245

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 3 commits into from
May 1, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 35 additions & 24 deletions rxjava-core/src/main/java/rx/operators/OperationAll.java
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscr
private final Observable<T> sequence;
private final Func1<T, Boolean> predicate;

private final AtomicBoolean status = new AtomicBoolean(true);
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();


Expand All @@ -35,33 +34,45 @@ private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {

@Override
public Subscription call(final Observer<Boolean> observer) {
return subscription.wrap(sequence.subscribe(new Observer<T>() {
@Override
public void onCompleted() {
if (status.get()) {
observer.onNext(true);
observer.onCompleted();
}
}
return subscription.wrap(sequence.subscribe(new AllObserver(observer)));

@Override
public void onError(Exception e) {
observer.onError(e);
}
}

@Override
public void onNext(T args) {
boolean result = predicate.call(args);
boolean changed = status.compareAndSet(true, result);

if (changed && !result) {
observer.onNext(false);
observer.onCompleted();
subscription.unsubscribe();
}
private class AllObserver implements Observer<T> {
private final Observer<Boolean> underlying;

private final AtomicBoolean status = new AtomicBoolean(true);

public AllObserver(Observer<Boolean> underlying) {
this.underlying = underlying;
}

@Override
public void onCompleted() {
if (status.get()) {
underlying.onNext(true);
underlying.onCompleted();
}
}));
}

@Override
public void onError(Exception e) {
underlying.onError(e);
}

@Override
public void onNext(T args) {
boolean result = predicate.call(args);
boolean changed = status.compareAndSet(true, result);

if (changed && !result) {
underlying.onNext(false);
underlying.onCompleted();
subscription.unsubscribe();
}
}
}

}

public static class UnitTest {
Expand Down
44 changes: 25 additions & 19 deletions rxjava-core/src/main/java/rx/operators/OperationGroupBy.java
Original file line number Diff line number Diff line change
Expand Up @@ -56,37 +56,43 @@ public static <K, T> Func1<Observer<GroupedObservable<K, T>>, Subscription> grou

private static class GroupBy<K, V> implements Func1<Observer<GroupedObservable<K, V>>, Subscription> {
private final Observable<KeyValue<K, V>> source;
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();

private GroupBy(Observable<KeyValue<K, V>> source) {
this.source = source;
}

@Override
public Subscription call(final Observer<GroupedObservable<K, V>> observer) {
return source.subscribe(new GroupByObserver(observer));
}

return source.subscribe(new Observer<KeyValue<K, V>>() {
private class GroupByObserver implements Observer<KeyValue<K, V>> {
private final Observer<GroupedObservable<K, V>> underlying;

@Override
public void onCompleted() {
observer.onCompleted();
}
private final ConcurrentHashMap<K, Boolean> keys = new ConcurrentHashMap<K, Boolean>();

@Override
public void onError(Exception e) {
observer.onError(e);
}
private GroupByObserver(Observer<GroupedObservable<K, V>> underlying) {
this.underlying = underlying;
}

@Override
public void onNext(final KeyValue<K, V> args) {
K key = args.key;
boolean newGroup = keys.putIfAbsent(key, true) == null;
if (newGroup) {
observer.onNext(buildObservableFor(source, key));
}
}
@Override
public void onCompleted() {
underlying.onCompleted();
}

@Override
public void onError(Exception e) {
underlying.onError(e);
}

});
@Override
public void onNext(final KeyValue<K, V> args) {
K key = args.key;
boolean newGroup = keys.putIfAbsent(key, true) == null;
if (newGroup) {
underlying.onNext(buildObservableFor(source, key));
}
}
}
}

Expand Down
3 changes: 2 additions & 1 deletion rxjava-core/src/main/java/rx/operators/OperationTake.java
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,6 @@ public Subscription call(Observer<T> observer) {
* @param <T>
*/
private static class Take<T> implements Func1<Observer<T>, Subscription> {
private final AtomicInteger counter = new AtomicInteger();
private final Observable<T> items;
private final int num;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
Expand Down Expand Up @@ -108,6 +107,8 @@ public void onNext(T args)
private class ItemObserver implements Observer<T> {
private final Observer<T> observer;

private final AtomicInteger counter = new AtomicInteger();

public ItemObserver(Observer<T> observer) {
this.observer = observer;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ public Boolean call(T input, Integer index) {
* @param <T>
*/
private static class TakeWhile<T> implements Func1<Observer<T>, Subscription> {
private final AtomicInteger counter = new AtomicInteger();
private final Observable<T> items;
private final Func2<T, Integer, Boolean> predicate;
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
Expand All @@ -109,6 +108,8 @@ public Subscription call(Observer<T> observer) {
private class ItemObserver implements Observer<T> {
private final Observer<T> observer;

private final AtomicInteger counter = new AtomicInteger();

public ItemObserver(Observer<T> observer) {
// Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior
// needs "isFinished" logic to not send duplicated events
Expand Down