Skip to content

Commit 0ed6a79

Browse files
Merge pull request ReactiveX#220 from johngmyers/takewhile-predicate
TakeWhile protect calls to predicate
2 parents ce51735 + da27629 commit 0ed6a79

File tree

1 file changed

+38
-1
lines changed

1 file changed

+38
-1
lines changed

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 38 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,15 @@ public void onError(Exception e) {
123123

124124
@Override
125125
public void onNext(T args) {
126-
if (predicate.call(args, counter.getAndIncrement())) {
126+
Boolean isSelected;
127+
try {
128+
isSelected = predicate.call(args, counter.getAndIncrement());
129+
}
130+
catch (Exception e) {
131+
observer.onError(e);
132+
return;
133+
}
134+
if (isSelected) {
127135
observer.onNext(args);
128136
} else {
129137
observer.onCompleted();
@@ -238,6 +246,35 @@ public Boolean call(String s)
238246
})).last();
239247
}
240248

249+
@Test
250+
public void testTakeWhileProtectsPredicateCall() {
251+
TestObservable source = new TestObservable(mock(Subscription.class), "one");
252+
final RuntimeException testException = new RuntimeException("test exception");
253+
254+
@SuppressWarnings("unchecked")
255+
Observer<String> aObserver = mock(Observer.class);
256+
Observable<String> take = Observable.create(takeWhile(source, new Func1<String, Boolean>()
257+
{
258+
@Override
259+
public Boolean call(String s)
260+
{
261+
throw testException;
262+
}
263+
}));
264+
take.subscribe(aObserver);
265+
266+
// wait for the Observable to complete
267+
try {
268+
source.t.join();
269+
} catch (Exception e) {
270+
e.printStackTrace();
271+
fail(e.getMessage());
272+
}
273+
274+
verify(aObserver, never()).onNext(any(String.class));
275+
verify(aObserver, times(1)).onError(testException);
276+
}
277+
241278
@Test
242279
public void testUnsubscribeAfterTake() {
243280
Subscription s = mock(Subscription.class);

0 commit comments

Comments
 (0)