Skip to content

Commit 1922d7a

Browse files
Merge pull request ReactiveX#203 from mairbek/all-operation
All Operation Implemented
2 parents 7aee2bd + 55d4a62 commit 1922d7a

File tree

3 files changed

+195
-0
lines changed

3 files changed

+195
-0
lines changed

language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -275,6 +275,12 @@ def class ObservableTests {
275275

276276
}
277277

278+
@Test
279+
public void testAll() {
280+
Observable.toObservable(1, 2, 3).all({ x -> x > 0 }).subscribe({ result -> a.received(result) });
281+
verify(a, times(1)).received(true);
282+
}
283+
278284
def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {
279285

280286
public Subscription call(final Observer<Integer> observer) {

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import org.mockito.MockitoAnnotations;
3737

3838
import rx.observables.GroupedObservable;
39+
import rx.operators.OperationAll;
3940
import rx.operators.OperationConcat;
4041
import rx.operators.OperationDefer;
4142
import rx.operators.OperationDematerialize;
@@ -1665,6 +1666,35 @@ public T call(T t1, T t2) {
16651666
});
16661667
}
16671668

1669+
/**
1670+
* Determines whether all elements of an observable sequence satisfies a condition.
1671+
* @param sequence an observable sequence whose elements to apply the predicate to.
1672+
* @param predicate a function to test each element for a condition.
1673+
* @param <T> the type of observable.
1674+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
1675+
*/
1676+
public static <T> Observable<Boolean> all(final Observable<T> sequence, final Func1<T, Boolean> predicate) {
1677+
return _create(OperationAll.all(sequence, predicate));
1678+
}
1679+
1680+
/**
1681+
* Determines whether all elements of an observable sequence satisfies a condition.
1682+
* @param sequence an observable sequence whose elements to apply the predicate to.
1683+
* @param predicate a function to test each element for a condition.
1684+
* @param <T> the type of observable.
1685+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
1686+
*/
1687+
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
1688+
final FuncN _f = Functions.from(predicate);
1689+
1690+
return all(sequence, new Func1<T, Boolean>() {
1691+
@Override
1692+
public Boolean call(T t) {
1693+
return (Boolean) _f.call(t);
1694+
}
1695+
});
1696+
}
1697+
16681698
/**
16691699
* Returns an Observable that skips the first <code>num</code> items emitted by the source
16701700
* Observable. You can ignore the first <code>num</code> items emitted by an Observable and attend
@@ -2973,6 +3003,24 @@ public Observable<T> scan(final T initialValue, final Object accumulator) {
29733003
return scan(this, initialValue, accumulator);
29743004
}
29753005

3006+
/**
3007+
* Determines whether all elements of an observable sequence satisfies a condition.
3008+
* @param predicate a function to test each element for a condition.
3009+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
3010+
*/
3011+
public Observable<Boolean> all(Func1<T, Boolean> predicate) {
3012+
return all(this, predicate);
3013+
}
3014+
3015+
/**
3016+
* Determines whether all elements of an observable sequence satisfies a condition.
3017+
* @param predicate a function to test each element for a condition.
3018+
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
3019+
*/
3020+
public Observable<Boolean> all(Object predicate) {
3021+
return all(this, predicate);
3022+
}
3023+
29763024
/**
29773025
* Returns an Observable that skips the first <code>num</code> items emitted by the source
29783026
* Observable.
Lines changed: 141 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,141 @@
1+
package rx.operators;
2+
3+
import org.junit.Test;
4+
import rx.Observable;
5+
import rx.Observer;
6+
import rx.Subscription;
7+
import rx.util.AtomicObservableSubscription;
8+
import rx.util.functions.Func1;
9+
10+
import java.util.concurrent.atomic.AtomicBoolean;
11+
12+
import static org.mockito.Mockito.mock;
13+
import static org.mockito.Mockito.verify;
14+
import static org.mockito.Mockito.verifyNoMoreInteractions;
15+
16+
public class OperationAll {
17+
18+
public static <T> Func1<Observer<Boolean>, Subscription> all(Observable<T> sequence, Func1<T, Boolean> predicate) {
19+
return new AllObservable<T>(sequence, predicate);
20+
}
21+
22+
private static class AllObservable<T> implements Func1<Observer<Boolean>, Subscription> {
23+
private final Observable<T> sequence;
24+
private final Func1<T, Boolean> predicate;
25+
26+
private final AtomicBoolean status = new AtomicBoolean(true);
27+
private final AtomicObservableSubscription subscription = new AtomicObservableSubscription();
28+
29+
30+
private AllObservable(Observable<T> sequence, Func1<T, Boolean> predicate) {
31+
this.sequence = sequence;
32+
this.predicate = predicate;
33+
}
34+
35+
36+
@Override
37+
public Subscription call(final Observer<Boolean> observer) {
38+
return subscription.wrap(sequence.subscribe(new Observer<T>() {
39+
@Override
40+
public void onCompleted() {
41+
if (status.get()) {
42+
observer.onNext(true);
43+
observer.onCompleted();
44+
}
45+
}
46+
47+
@Override
48+
public void onError(Exception e) {
49+
observer.onError(e);
50+
}
51+
52+
@Override
53+
public void onNext(T args) {
54+
boolean result = predicate.call(args);
55+
boolean changed = status.compareAndSet(true, result);
56+
57+
if (changed && !result) {
58+
observer.onNext(false);
59+
observer.onCompleted();
60+
subscription.unsubscribe();
61+
}
62+
}
63+
}));
64+
}
65+
}
66+
67+
public static class UnitTest {
68+
69+
@Test
70+
@SuppressWarnings("unchecked")
71+
public void testAll() {
72+
Observable<String> obs = Observable.from("one", "two", "six");
73+
74+
Observer<Boolean> observer = mock(Observer.class);
75+
Observable.create(all(obs, new Func1<String, Boolean>() {
76+
@Override
77+
public Boolean call(String s) {
78+
return s.length() == 3;
79+
}
80+
})).subscribe(observer);
81+
82+
verify(observer).onNext(true);
83+
verify(observer).onCompleted();
84+
verifyNoMoreInteractions(observer);
85+
}
86+
87+
@Test
88+
@SuppressWarnings("unchecked")
89+
public void testNotAll() {
90+
Observable<String> obs = Observable.from("one", "two", "three", "six");
91+
92+
Observer<Boolean> observer = mock(Observer.class);
93+
Observable.create(all(obs, new Func1<String, Boolean>() {
94+
@Override
95+
public Boolean call(String s) {
96+
return s.length() == 3;
97+
}
98+
})).subscribe(observer);
99+
100+
verify(observer).onNext(false);
101+
verify(observer).onCompleted();
102+
verifyNoMoreInteractions(observer);
103+
}
104+
105+
@Test
106+
@SuppressWarnings("unchecked")
107+
public void testEmpty() {
108+
Observable<String> obs = Observable.empty();
109+
110+
Observer<Boolean> observer = mock(Observer.class);
111+
Observable.create(all(obs, new Func1<String, Boolean>() {
112+
@Override
113+
public Boolean call(String s) {
114+
return s.length() == 3;
115+
}
116+
})).subscribe(observer);
117+
118+
verify(observer).onNext(true);
119+
verify(observer).onCompleted();
120+
verifyNoMoreInteractions(observer);
121+
}
122+
123+
@Test
124+
@SuppressWarnings("unchecked")
125+
public void testError() {
126+
Exception error = new Exception();
127+
Observable<String> obs = Observable.error(error);
128+
129+
Observer<Boolean> observer = mock(Observer.class);
130+
Observable.create(all(obs, new Func1<String, Boolean>() {
131+
@Override
132+
public Boolean call(String s) {
133+
return s.length() == 3;
134+
}
135+
})).subscribe(observer);
136+
137+
verify(observer).onError(error);
138+
verifyNoMoreInteractions(observer);
139+
}
140+
}
141+
}

0 commit comments

Comments
 (0)