Skip to content

Commit 820f914

Browse files
Merge pull request ReactiveX#312 from petermd/fix-errorresumenext
Fix for OperatorOnErrorResumeNextViaObservable and async Resume
2 parents a0aed06 + 8302664 commit 820f914

File tree

1 file changed

+54
-8
lines changed

1 file changed

+54
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperationOnErrorResumeNextViaObservable.java

Lines changed: 54 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,9 @@ public Subscription call(final Observer<T> observer) {
7373
// subscribe to the original Observable and remember the subscription
7474
subscription.wrap(originalSequence.subscribe(new Observer<T>() {
7575
public void onNext(T value) {
76-
// forward the successful calls
77-
observer.onNext(value);
76+
// forward the successful calls unless resumed
77+
if (subscriptionRef.get()==subscription)
78+
observer.onNext(value);
7879
}
7980

8081
/**
@@ -83,8 +84,8 @@ public void onNext(T value) {
8384
public void onError(Exception ex) {
8485
/* remember what the current subscription is so we can determine if someone unsubscribes concurrently */
8586
AtomicObservableSubscription currentSubscription = subscriptionRef.get();
86-
// check that we have not been unsubscribed before we can process the error
87-
if (currentSubscription != null) {
87+
// check that we have not been unsubscribed and not already resumed before we can process the error
88+
if (currentSubscription == subscription) {
8889
/* error occurred, so switch subscription to the 'resumeSequence' */
8990
AtomicObservableSubscription innerSubscription = new AtomicObservableSubscription(resumeSequence.subscribe(observer));
9091
/* we changed the sequence, so also change the subscription to the one of the 'resumeSequence' instead */
@@ -97,8 +98,9 @@ public void onError(Exception ex) {
9798
}
9899

99100
public void onCompleted() {
100-
// forward the successful calls
101-
observer.onCompleted();
101+
// forward the successful calls unless resumed
102+
if (subscriptionRef.get()==subscription)
103+
observer.onCompleted();
102104
}
103105
}));
104106

@@ -119,7 +121,8 @@ public static class UnitTest {
119121
@Test
120122
public void testResumeNext() {
121123
Subscription s = mock(Subscription.class);
122-
TestObservable w = new TestObservable(s, "one");
124+
// Trigger failure on second element
125+
TestObservable w = new TestObservable(s, "one", "fail", "two", "three");
123126
Observable<String> resume = Observable.from("twoResume", "threeResume");
124127
Observable<String> observable = Observable.create(onErrorResumeNextViaObservable(w, resume));
125128

@@ -140,7 +143,46 @@ public void testResumeNext() {
140143
verify(aObserver, Mockito.never()).onNext("three");
141144
verify(aObserver, times(1)).onNext("twoResume");
142145
verify(aObserver, times(1)).onNext("threeResume");
146+
}
147+
148+
@Test
149+
public void testMapResumeAsyncNext() {
150+
Subscription sr = mock(Subscription.class);
151+
// Trigger multiple failures
152+
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
153+
// Resume Observable is async
154+
TestObservable resume = new TestObservable(sr, "twoResume", "threeResume");
155+
156+
// Introduce map function that fails intermittently (Map does not prevent this when the observer is a
157+
// rx.operator incl onErrorResumeNextViaObservable)
158+
w = w.map(new Func1<String, String>() {
159+
public String call(String s) {
160+
if ("fail".equals(s))
161+
throw new RuntimeException("Forced Failure");
162+
System.out.println("BadMapper:" + s);
163+
return s;
164+
}
165+
});
166+
167+
Observable<String> observable = Observable.create(onErrorResumeNextViaObservable(w, resume));
168+
169+
@SuppressWarnings("unchecked")
170+
Observer<String> aObserver = mock(Observer.class);
171+
observable.subscribe(aObserver);
172+
173+
try {
174+
resume.t.join();
175+
} catch (InterruptedException e) {
176+
fail(e.getMessage());
177+
}
143178

179+
verify(aObserver, Mockito.never()).onError(any(Exception.class));
180+
verify(aObserver, times(1)).onCompleted();
181+
verify(aObserver, times(1)).onNext("one");
182+
verify(aObserver, Mockito.never()).onNext("two");
183+
verify(aObserver, Mockito.never()).onNext("three");
184+
verify(aObserver, times(1)).onNext("twoResume");
185+
verify(aObserver, times(1)).onNext("threeResume");
144186
}
145187

146188
private static class TestObservable extends Observable<String> {
@@ -164,11 +206,15 @@ public void run() {
164206
try {
165207
System.out.println("running TestObservable thread");
166208
for (String s : values) {
209+
if ("fail".equals(s))
210+
throw new RuntimeException("Forced Failure");
167211
System.out.println("TestObservable onNext: " + s);
168212
observer.onNext(s);
169213
}
170-
throw new RuntimeException("Forced Failure");
214+
System.out.println("TestObservable onCompleted");
215+
observer.onCompleted();
171216
} catch (Exception e) {
217+
System.out.println("TestObservable onError: " + e);
172218
observer.onError(e);
173219
}
174220
}

0 commit comments

Comments
 (0)