Skip to content

Commit 352683c

Browse files
Merge pull request #688 from benjchristensen/SafeObserver-errorHandling
Fix SafeObserver handling of onComplete errors
2 parents f1b4de7 + f667642 commit 352683c

File tree

2 files changed

+247
-32
lines changed

2 files changed

+247
-32
lines changed

rxjava-core/src/main/java/rx/operators/SafeObserver.java

Lines changed: 50 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,9 @@
1919
import java.util.concurrent.atomic.AtomicBoolean;
2020

2121
import rx.Observer;
22+
import rx.Subscription;
2223
import rx.plugins.RxJavaPlugins;
24+
import rx.subscriptions.Subscriptions;
2325
import rx.util.CompositeException;
2426
import rx.util.OnErrorNotImplementedException;
2527

@@ -59,7 +61,12 @@ public class SafeObserver<T> implements Observer<T> {
5961

6062
private final Observer<? super T> actual;
6163
private final AtomicBoolean isFinished = new AtomicBoolean(false);
62-
private final SafeObservableSubscription subscription;
64+
private final Subscription subscription;
65+
66+
public SafeObserver(Observer<? super T> actual) {
67+
this.subscription = Subscriptions.empty();
68+
this.actual = actual;
69+
}
6370

6471
public SafeObserver(SafeObservableSubscription subscription, Observer<? super T> actual) {
6572
this.subscription = subscription;
@@ -73,44 +80,18 @@ public void onCompleted() {
7380
actual.onCompleted();
7481
} catch (Throwable e) {
7582
// handle errors if the onCompleted implementation fails, not just if the Observable fails
76-
onError(e);
83+
_onError(e);
84+
} finally {
85+
// auto-unsubscribe
86+
subscription.unsubscribe();
7787
}
78-
// auto-unsubscribe
79-
subscription.unsubscribe();
8088
}
8189
}
8290

8391
@Override
8492
public void onError(Throwable e) {
8593
if (isFinished.compareAndSet(false, true)) {
86-
try {
87-
actual.onError(e);
88-
} catch (Throwable e2) {
89-
if (e2 instanceof OnErrorNotImplementedException) {
90-
/**
91-
* onError isn't implemented so throw
92-
*
93-
* https://github.com/Netflix/RxJava/issues/198
94-
*
95-
* Rx Design Guidelines 5.2
96-
*
97-
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
98-
* to rethrow the exception on the thread that the message comes out from the observable sequence.
99-
* The OnCompleted behavior in this case is to do nothing."
100-
*/
101-
throw (OnErrorNotImplementedException) e2;
102-
} else {
103-
// if the onError itself fails then pass to the plugin
104-
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
105-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
106-
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
107-
// and throw exception despite that not being proper for Rx
108-
// https://github.com/Netflix/RxJava/issues/198
109-
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
110-
}
111-
}
112-
// auto-unsubscribe
113-
subscription.unsubscribe();
94+
_onError(e);
11495
}
11596
}
11697

@@ -126,4 +107,41 @@ public void onNext(T args) {
126107
}
127108
}
128109

110+
/*
111+
* The logic for `onError` without the `isFinished` check so it can be called from within `onCompleted`.
112+
*
113+
* See https://github.com/Netflix/RxJava/issues/630 for the report of this bug.
114+
*/
115+
protected void _onError(Throwable e) {
116+
try {
117+
actual.onError(e);
118+
} catch (Throwable e2) {
119+
if (e2 instanceof OnErrorNotImplementedException) {
120+
/**
121+
* onError isn't implemented so throw
122+
*
123+
* https://github.com/Netflix/RxJava/issues/198
124+
*
125+
* Rx Design Guidelines 5.2
126+
*
127+
* "when calling the Subscribe method that only has an onNext argument, the OnError behavior will be
128+
* to rethrow the exception on the thread that the message comes out from the observable sequence.
129+
* The OnCompleted behavior in this case is to do nothing."
130+
*/
131+
throw (OnErrorNotImplementedException) e2;
132+
} else {
133+
// if the onError itself fails then pass to the plugin
134+
// see https://github.com/Netflix/RxJava/issues/216 for further discussion
135+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e);
136+
RxJavaPlugins.getInstance().getErrorHandler().handleError(e2);
137+
// and throw exception despite that not being proper for Rx
138+
// https://github.com/Netflix/RxJava/issues/198
139+
throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2)));
140+
}
141+
} finally {
142+
// auto-unsubscribe
143+
subscription.unsubscribe();
144+
}
145+
}
146+
129147
}
Lines changed: 197 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import org.junit.Test;
23+
24+
import rx.Observer;
25+
26+
public class SafeObserverTest {
27+
28+
@Test
29+
public void onNextFailure() {
30+
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
31+
try {
32+
OBSERVER_ONNEXT_FAIL(onError).onNext("one");
33+
fail("expects exception to be thrown");
34+
} catch (Exception e) {
35+
// expected
36+
assertNull(onError.get());
37+
}
38+
}
39+
40+
@Test
41+
public void onNextFailureSafe() {
42+
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
43+
try {
44+
new SafeObserver<String>(OBSERVER_ONNEXT_FAIL(onError)).onNext("one");
45+
assertNotNull(onError.get());
46+
} catch (Exception e) {
47+
fail("expects exception to be passed to onError");
48+
}
49+
}
50+
51+
@Test
52+
public void onCompletedFailure() {
53+
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
54+
try {
55+
OBSERVER_ONCOMPLETED_FAIL(onError).onCompleted();
56+
fail("expects exception to be thrown");
57+
} catch (Exception e) {
58+
// expected
59+
assertNull(onError.get());
60+
}
61+
}
62+
63+
@Test
64+
public void onCompletedFailureSafe() {
65+
AtomicReference<Throwable> onError = new AtomicReference<Throwable>();
66+
try {
67+
new SafeObserver<String>(OBSERVER_ONCOMPLETED_FAIL(onError)).onCompleted();
68+
assertNotNull(onError.get());
69+
} catch (Exception e) {
70+
fail("expects exception to be passed to onError");
71+
}
72+
}
73+
74+
@Test
75+
public void onErrorFailure() {
76+
try {
77+
OBSERVER_ONERROR_FAIL().onError(new RuntimeException("error!"));
78+
fail("expects exception to be thrown");
79+
} catch (Exception e) {
80+
// expected
81+
}
82+
}
83+
84+
@Test
85+
public void onErrorFailureSafe() {
86+
try {
87+
new SafeObserver<String>(OBSERVER_ONERROR_FAIL()).onError(new RuntimeException("error!"));
88+
fail("expects exception to be thrown");
89+
} catch (Exception e) {
90+
// expected since onError fails so SafeObserver can't help
91+
}
92+
}
93+
94+
@Test
95+
public void onNextOnErrorFailure() {
96+
try {
97+
OBSERVER_ONNEXT_ONERROR_FAIL().onError(new RuntimeException("error!"));
98+
fail("expects exception to be thrown");
99+
} catch (Exception e) {
100+
// expected
101+
}
102+
}
103+
104+
@Test
105+
public void onNextOnErrorFailureSafe() {
106+
try {
107+
new SafeObserver<String>(OBSERVER_ONNEXT_ONERROR_FAIL()).onError(new RuntimeException("error!"));
108+
fail("expects exception to be thrown");
109+
} catch (Exception e) {
110+
// expected since onError fails so SafeObserver can't help
111+
}
112+
}
113+
114+
private static Observer<String> OBSERVER_ONNEXT_FAIL(final AtomicReference<Throwable> onError) {
115+
return new Observer<String>() {
116+
117+
@Override
118+
public void onCompleted() {
119+
120+
}
121+
122+
@Override
123+
public void onError(Throwable e) {
124+
onError.set(e);
125+
}
126+
127+
@Override
128+
public void onNext(String args) {
129+
throw new RuntimeException("onNextFail");
130+
}
131+
};
132+
133+
}
134+
135+
private static Observer<String> OBSERVER_ONNEXT_ONERROR_FAIL() {
136+
return new Observer<String>() {
137+
138+
@Override
139+
public void onCompleted() {
140+
141+
}
142+
143+
@Override
144+
public void onError(Throwable e) {
145+
throw new RuntimeException("onErrortFail");
146+
}
147+
148+
@Override
149+
public void onNext(String args) {
150+
throw new RuntimeException("onNextFail");
151+
}
152+
153+
};
154+
}
155+
156+
private static Observer<String> OBSERVER_ONERROR_FAIL() {
157+
return new Observer<String>() {
158+
159+
@Override
160+
public void onCompleted() {
161+
162+
}
163+
164+
@Override
165+
public void onError(Throwable e) {
166+
throw new RuntimeException("onErrorFail");
167+
}
168+
169+
@Override
170+
public void onNext(String args) {
171+
172+
}
173+
174+
};
175+
}
176+
177+
private static Observer<String> OBSERVER_ONCOMPLETED_FAIL(final AtomicReference<Throwable> onError) {
178+
return new Observer<String>() {
179+
180+
@Override
181+
public void onCompleted() {
182+
throw new RuntimeException("onCompletedFail");
183+
}
184+
185+
@Override
186+
public void onError(Throwable e) {
187+
onError.set(e);
188+
}
189+
190+
@Override
191+
public void onNext(String args) {
192+
193+
}
194+
195+
};
196+
}
197+
}

0 commit comments

Comments
 (0)