@@ -70,12 +70,12 @@ private NextIterator(NextObserver<T> observer) {
70
70
71
71
@ Override
72
72
public boolean hasNext () {
73
- return !observer .isCompleted ();
73
+ return !observer .isCompleted (false );
74
74
}
75
75
76
76
@ Override
77
77
public T next () {
78
- if (observer .isCompleted ()) {
78
+ if (observer .isCompleted (true )) {
79
79
throw new IllegalStateException ("Observable is completed" );
80
80
}
81
81
@@ -131,14 +131,18 @@ public void await() {
131
131
waiting .set (true );
132
132
}
133
133
134
- public boolean isCompleted () {
134
+ public boolean isCompleted (boolean rethrowExceptionIfExists ) {
135
135
Notification <T > lastItem = buf .peek ();
136
136
if (lastItem == null ) {
137
137
return false ;
138
138
}
139
139
140
140
if (lastItem .isOnError ()) {
141
- throw Exceptions .propagate (lastItem .getException ());
141
+ if (rethrowExceptionIfExists ) {
142
+ throw Exceptions .propagate (lastItem .getException ());
143
+ } else {
144
+ return true ;
145
+ }
142
146
}
143
147
144
148
return lastItem .isOnCompleted ();
@@ -219,6 +223,35 @@ public void testOnError() throws Throwable {
219
223
}
220
224
}
221
225
226
+ @ Test
227
+ public void testOnErrorViaHasNext () throws Throwable {
228
+ Subscription s = mock (Subscription .class );
229
+ final TestObservable obs = new TestObservable (s );
230
+
231
+ Iterator <String > it = next (obs ).iterator ();
232
+
233
+ assertTrue (it .hasNext ());
234
+
235
+ Future <String > next = nextAsync (it );
236
+ Thread .sleep (100 );
237
+ obs .sendOnNext ("one" );
238
+ assertEquals ("one" , next .get ());
239
+
240
+ assertTrue (it .hasNext ());
241
+
242
+ next = nextAsync (it );
243
+ Thread .sleep (100 );
244
+ obs .sendOnError (new TestException ());
245
+
246
+ // this should not throw an exception but instead just return false
247
+ try {
248
+ assertFalse (it .hasNext ());
249
+ } catch (Exception e ) {
250
+ fail ("should not have received exception" );
251
+ e .printStackTrace ();
252
+ }
253
+ }
254
+
222
255
private Future <String > nextAsync (final Iterator <String > it ) throws Exception {
223
256
224
257
return executor .submit (new Callable <String >() {
@@ -250,7 +283,6 @@ public void sendOnNext(String value) {
250
283
}
251
284
252
285
/* used to simulate subscription */
253
- @ SuppressWarnings ("unused" )
254
286
public void sendOnError (Exception e ) {
255
287
observer .onError (e );
256
288
}
0 commit comments