Skip to content

Commit 7f8415c

Browse files
Merge pull request ReactiveX#355 from jmhofer/skip-while
implemented skipWhile and skipWhileWithIndex (ReactiveX#80)
2 parents 1903caa + a179865 commit 7f8415c

File tree

2 files changed

+222
-0
lines changed

2 files changed

+222
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
import rx.operators.OperationSample;
5151
import rx.operators.OperationScan;
5252
import rx.operators.OperationSkip;
53+
import rx.operators.OperationSkipWhile;
5354
import rx.operators.OperationSubscribeOn;
5455
import rx.operators.OperationSum;
5556
import rx.operators.OperationSwitch;
@@ -2431,6 +2432,34 @@ public <E> Observable<T> takeUntil(Observable<? extends E> other) {
24312432
return OperationTakeUntil.takeUntil(this, other);
24322433
}
24332434

2435+
/**
2436+
* Returns an Observable that bypasses all items from the source Observable as long as the specified
2437+
* condition holds true. Emits all further source items as soon as the condition becomes false.
2438+
* @param predicate
2439+
* A function to test each item emitted from the source Observable for a condition.
2440+
* It receives the emitted item as first parameter and the index of the emitted item as
2441+
* second parameter.
2442+
* @return an Observable that emits all items from the source Observable as soon as the condition
2443+
* becomes false.
2444+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211631%28v=vs.103%29.aspx">MSDN: Observable.SkipWhile</a>
2445+
*/
2446+
public Observable<T> skipWhileWithIndex(Func2<? super T, Integer, Boolean> predicate) {
2447+
return create(OperationSkipWhile.skipWhileWithIndex(this, predicate));
2448+
}
2449+
2450+
/**
2451+
* Returns an Observable that bypasses all items from the source Observable as long as the specified
2452+
* condition holds true. Emits all further source items as soon as the condition becomes false.
2453+
* @param predicate
2454+
* A function to test each item emitted from the source Observable for a condition.
2455+
* @return an Observable that emits all items from the source Observable as soon as the condition
2456+
* becomes false.
2457+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229685%28v=vs.103%29.aspx">MSDN: Observable.SkipWhile</a>
2458+
*/
2459+
public Observable<T> skipWhile(Func1<? super T, Boolean> predicate) {
2460+
return create(OperationSkipWhile.skipWhile(this, predicate));
2461+
}
2462+
24342463
/**
24352464
* Returns an Observable that emits a single item, a list composed of all the items emitted by
24362465
* the source Observable.
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static rx.Observable.create;
21+
22+
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.concurrent.atomic.AtomicInteger;
24+
25+
import org.junit.Test;
26+
import org.mockito.InOrder;
27+
28+
import rx.Observable;
29+
import rx.Observable.OnSubscribeFunc;
30+
import rx.Observer;
31+
import rx.Subscription;
32+
import rx.util.functions.Func1;
33+
import rx.util.functions.Func2;
34+
35+
/**
36+
* Skips any emitted source items as long as the specified condition holds true. Emits all further source items
37+
* as soon as the condition becomes false.
38+
*/
39+
public final class OperationSkipWhile {
40+
public static <T> OnSubscribeFunc<T> skipWhileWithIndex(Observable<? extends T> source, Func2<? super T, Integer, Boolean> predicate) {
41+
return new SkipWhile<T>(source, predicate);
42+
}
43+
44+
public static <T> OnSubscribeFunc<T> skipWhile(Observable<? extends T> source, final Func1<? super T, Boolean> predicate) {
45+
return new SkipWhile<T>(source, new Func2<T, Integer, Boolean>() {
46+
@Override
47+
public Boolean call(T value, Integer index) {
48+
return predicate.call(value);
49+
}
50+
});
51+
}
52+
53+
private static class SkipWhile<T> implements OnSubscribeFunc<T> {
54+
private final Observable<? extends T> source;
55+
private final Func2<? super T, Integer, Boolean> predicate;
56+
private final AtomicBoolean skipping = new AtomicBoolean(true);
57+
private final AtomicInteger index = new AtomicInteger(0);
58+
59+
SkipWhile(Observable<? extends T> source, Func2<? super T, Integer, Boolean> pred) {
60+
this.source = source;
61+
this.predicate = pred;
62+
}
63+
64+
public Subscription onSubscribe(Observer<? super T> observer) {
65+
return source.subscribe(new SkipWhileObserver(observer));
66+
}
67+
68+
private class SkipWhileObserver implements Observer<T> {
69+
private final Observer<? super T> observer;
70+
71+
public SkipWhileObserver(Observer<? super T> observer) {
72+
this.observer = observer;
73+
}
74+
75+
@Override
76+
public void onCompleted() {
77+
observer.onCompleted();
78+
}
79+
80+
@Override
81+
public void onError(Throwable e) {
82+
observer.onError(e);
83+
}
84+
85+
@Override
86+
public void onNext(T next) {
87+
if (!skipping.get()) {
88+
observer.onNext(next);
89+
} else {
90+
try {
91+
if (!predicate.call(next, index.getAndIncrement())) {
92+
skipping.set(false);
93+
observer.onNext(next);
94+
} else {
95+
}
96+
} catch(Throwable t) {
97+
observer.onError(t);
98+
}
99+
}
100+
}
101+
102+
}
103+
104+
}
105+
106+
public static class UnitTest {
107+
@SuppressWarnings("unchecked")
108+
Observer<Integer> w = mock(Observer.class);
109+
110+
private static final Func1<Integer, Boolean> LESS_THAN_FIVE = new Func1<Integer, Boolean>() {
111+
@Override
112+
public Boolean call(Integer v) {
113+
if (v == 42) throw new RuntimeException("that's not the answer to everything!");
114+
return v < 5;
115+
}
116+
};
117+
118+
private static final Func2<Integer, Integer, Boolean> INDEX_LESS_THAN_THREE = new Func2<Integer, Integer, Boolean>() {
119+
@Override
120+
public Boolean call(Integer value, Integer index) {
121+
return index < 3;
122+
}
123+
};
124+
125+
@Test
126+
public void testSkipWithIndex() {
127+
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5);
128+
create(skipWhileWithIndex(src, INDEX_LESS_THAN_THREE)).subscribe(w);
129+
130+
InOrder inOrder = inOrder(w);
131+
inOrder.verify(w, times(1)).onNext(4);
132+
inOrder.verify(w, times(1)).onNext(5);
133+
inOrder.verify(w, times(1)).onCompleted();
134+
inOrder.verify(w, never()).onError(any(Throwable.class));
135+
}
136+
137+
@Test
138+
public void testSkipEmpty() {
139+
Observable<Integer> src = Observable.empty();
140+
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
141+
verify(w, never()).onNext(anyInt());
142+
verify(w, never()).onError(any(Throwable.class));
143+
verify(w, times(1)).onCompleted();
144+
}
145+
146+
@Test
147+
public void testSkipEverything() {
148+
Observable<Integer> src = Observable.from(1, 2, 3, 4, 3, 2, 1);
149+
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
150+
verify(w, never()).onNext(anyInt());
151+
verify(w, never()).onError(any(Throwable.class));
152+
verify(w, times(1)).onCompleted();
153+
}
154+
155+
@Test
156+
public void testSkipNothing() {
157+
Observable<Integer> src = Observable.from(5, 3, 1);
158+
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
159+
160+
InOrder inOrder = inOrder(w);
161+
inOrder.verify(w, times(1)).onNext(5);
162+
inOrder.verify(w, times(1)).onNext(3);
163+
inOrder.verify(w, times(1)).onNext(1);
164+
inOrder.verify(w, times(1)).onCompleted();
165+
inOrder.verify(w, never()).onError(any(Throwable.class));
166+
}
167+
168+
@Test
169+
public void testSkipSome() {
170+
Observable<Integer> src = Observable.from(1, 2, 3, 4, 5, 3, 1, 5);
171+
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
172+
173+
InOrder inOrder = inOrder(w);
174+
inOrder.verify(w, times(1)).onNext(5);
175+
inOrder.verify(w, times(1)).onNext(3);
176+
inOrder.verify(w, times(1)).onNext(1);
177+
inOrder.verify(w, times(1)).onNext(5);
178+
inOrder.verify(w, times(1)).onCompleted();
179+
inOrder.verify(w, never()).onError(any(Throwable.class));
180+
}
181+
182+
@Test
183+
public void testSkipError() {
184+
Observable<Integer> src = Observable.from(1, 2, 42, 5, 3, 1);
185+
create(skipWhile(src, LESS_THAN_FIVE)).subscribe(w);
186+
187+
InOrder inOrder = inOrder(w);
188+
inOrder.verify(w, never()).onNext(anyInt());
189+
inOrder.verify(w, never()).onCompleted();
190+
inOrder.verify(w, times(1)).onError(any(RuntimeException.class));
191+
}
192+
}
193+
}

0 commit comments

Comments
 (0)