diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e332098ba2..95e3830979 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -27,6 +27,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; @@ -41,9 +42,6 @@ import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; import rx.operators.OperationFilter; -import rx.operators.OperationTake; -import rx.operators.OperationTakeWhile; -import rx.operators.OperationWhere; import rx.operators.OperationMap; import rx.operators.OperationMaterialize; import rx.operators.OperationMerge; @@ -56,17 +54,21 @@ import rx.operators.OperationScan; import rx.operators.OperationSkip; import rx.operators.OperationSynchronize; +import rx.operators.OperationTake; import rx.operators.OperationTakeLast; +import rx.operators.OperationTakeWhile; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; import rx.operators.OperationToObservableList; import rx.operators.OperationToObservableSortedList; +import rx.operators.OperationWhere; import rx.operators.OperationZip; import rx.operators.OperatorGroupBy; import rx.operators.OperatorTakeUntil; import rx.operators.OperatorToIterator; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; +import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; @@ -79,6 +81,7 @@ import rx.util.functions.Func3; import rx.util.functions.Func4; import rx.util.functions.FuncN; +import rx.util.functions.Function; import rx.util.functions.FunctionLanguageAdaptor; import rx.util.functions.Functions; @@ -99,10 +102,9 @@ public class Observable { private final Func1, Subscription> onSubscribe; - private final boolean isTrusted; protected Observable() { - this(null, false); + this(null); } /** @@ -114,18 +116,7 @@ protected Observable() { * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. */ protected Observable(Func1, Subscription> onSubscribe) { - this(onSubscribe, false); - } - - /** - * @param onSubscribe - * {@link Func1} to be executed when {@link #subscribe(Observer)} is called. - * @param isTrusted - * boolean true if the onSubscribe function is guaranteed to conform to the correct contract and thus shortcuts can be taken. - */ - private Observable(Func1, Subscription> onSubscribe, boolean isTrusted) { this.onSubscribe = onSubscribe; - this.isTrusted = isTrusted; } /** @@ -159,7 +150,10 @@ public Subscription subscribe(Observer observer) { // the subscribe function can also be overridden but generally that's not the appropriate approach so I won't mention that in the exception } try { - if (isTrusted) { + /** + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + if (isInternalImplementation(observer)) { Subscription s = onSubscribe.call(observer); if (s == null) { // this generally shouldn't be the case on a 'trusted' onSubscribe but in case it happens @@ -185,6 +179,16 @@ public Subscription subscribe(Observer observer) { } } + /** + * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. + *

+ * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + private Subscription protectivelyWrapAndSubscribe(Observer o) { + AtomicObservableSubscription subscription = new AtomicObservableSubscription(); + return subscription.wrap(subscribe(new AtomicObserver(subscription, o))); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Map callbacks) { // lookup and memoize onNext @@ -194,7 +198,12 @@ public Subscription subscribe(final Map callbacks) { } final FuncN onNext = Functions.from(_onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { Object onComplete = callbacks.get("onCompleted"); @@ -231,7 +240,12 @@ public Subscription subscribe(final Object o) { } final FuncN onNext = Functions.from(o); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -251,7 +265,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -280,7 +299,12 @@ public Subscription subscribe(final Object onNext, final Object onError) { } final FuncN onNextFunction = Functions.from(onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -302,7 +326,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext, final Action1 onError) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { // do nothing @@ -333,7 +362,12 @@ public Subscription subscribe(final Object onNext, final Object onError, final O } final FuncN onNextFunction = Functions.from(onNext); - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { if (onComplete != null) { @@ -357,7 +391,12 @@ public void onNext(Object args) { public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { - return subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + return protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { onComplete.call(); @@ -396,7 +435,12 @@ public void forEach(final Action1 onNext) { final CountDownLatch latch = new CountDownLatch(1); final AtomicReference exceptionFromOnError = new AtomicReference(); - subscribe(new Observer() { + /** + * Wrapping since raw functions provided by the user are being invoked. + * + * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + */ + protectivelyWrapAndSubscribe(new Observer() { public void onCompleted() { latch.countDown(); } @@ -565,7 +609,7 @@ public Subscription call(Observer t1) { return Subscriptions.empty(); } - }, true); + }); } } @@ -593,7 +637,7 @@ public Subscription call(Observer observer) { return Subscriptions.empty(); } - }, true); + }); } } @@ -619,13 +663,6 @@ public static Observable create(Func1, Subscription> func) { return new Observable(func); } - /* - * Private version that creates a 'trusted' Observable to allow performance optimizations. - */ - private static Observable _create(Func1, Subscription> func) { - return new Observable(func, true); - } - /** * Creates an Observable that will execute the given function when a {@link Observer} subscribes to it. *

@@ -697,7 +734,7 @@ public static Observable error(Exception exception) { * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable filter(Observable that, Func1 predicate) { - return _create(OperationFilter.filter(that, predicate)); + return create(OperationFilter.filter(that, predicate)); } /** @@ -729,7 +766,7 @@ public Boolean call(T t1) { * Filters an Observable by discarding any of its emissions that do not meet some test. *

* - * + * * @param that * the Observable to filter * @param predicate @@ -737,7 +774,7 @@ public Boolean call(T t1) { * @return an Observable that emits only those items in the original Observable that the filter evaluates as true */ public static Observable where(Observable that, Func1 predicate) { - return _create(OperationWhere.where(that, predicate)); + return create(OperationWhere.where(that, predicate)); } /** @@ -797,7 +834,7 @@ public static Observable range(int start, int count) { * @return the observable sequence whose observers trigger an invocation of the given observable factory function. */ public static Observable defer(Func0> observableFactory) { - return _create(OperationDefer.defer(observableFactory)); + return create(OperationDefer.defer(observableFactory)); } /** @@ -816,7 +853,7 @@ public static Observable defer(Object observableFactory) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(observableFactory); - return _create(OperationDefer.defer(new Func0>() { + return create(OperationDefer.defer(new Func0>() { @Override @SuppressWarnings("unchecked") @@ -980,7 +1017,7 @@ public Boolean call(T args) { * in the sequence emitted by the source Observable */ public static Observable map(Observable sequence, Func1 func) { - return _create(OperationMap.map(sequence, func)); + return create(OperationMap.map(sequence, func)); } /** @@ -1036,7 +1073,7 @@ public R call(T t1) { * the Observables obtained from this transformation */ public static Observable mapMany(Observable sequence, Func1> func) { - return _create(OperationMap.mapMany(sequence, func)); + return create(OperationMap.mapMany(sequence, func)); } /** @@ -1085,7 +1122,7 @@ public R call(T t1) { * @see MSDN: Observable.Materialize */ public static Observable> materialize(final Observable sequence) { - return _create(OperationMaterialize.materialize(sequence)); + return create(OperationMaterialize.materialize(sequence)); } /** @@ -1097,7 +1134,7 @@ public static Observable> materialize(final Observable se * @see MSDN: Observable.Dematerialize */ public static Observable dematerialize(final Observable> sequence) { - return _create(OperationDematerialize.dematerialize(sequence)); + return create(OperationDematerialize.dematerialize(sequence)); } /** @@ -1114,7 +1151,7 @@ public static Observable dematerialize(final Observable> * @see MSDN: Observable.Merge */ public static Observable merge(List> source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1131,7 +1168,7 @@ public static Observable merge(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable> source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1148,7 +1185,7 @@ public static Observable merge(Observable> source) { * @see MSDN: Observable.Merge Method */ public static Observable merge(Observable... source) { - return _create(OperationMerge.merge(source)); + return create(OperationMerge.merge(source)); } /** @@ -1181,7 +1218,7 @@ public static Observable takeUntil(final Observable source, final O * @see MSDN: Observable.Concat Method */ public static Observable concat(Observable... source) { - return _create(OperationConcat.concat(source)); + return create(OperationConcat.concat(source)); } /** @@ -1202,7 +1239,7 @@ public static Observable concat(Observable... source) { * @return an observable of observable groups, each of which corresponds to a unique key value, containing all elements that share that same key value. */ public static Observable> groupBy(Observable source, final Func1 keySelector, final Func1 elementSelector) { - return _create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); + return create(OperatorGroupBy.groupBy(source, keySelector, elementSelector)); } /** @@ -1219,7 +1256,7 @@ public static Observable> groupBy(Observable Observable> groupBy(Observable source, final Func1 keySelector) { - return _create(OperatorGroupBy.groupBy(source, keySelector)); + return create(OperatorGroupBy.groupBy(source, keySelector)); } /** @@ -1238,7 +1275,7 @@ public static Observable> groupBy(Observable s * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(List> source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1257,7 +1294,7 @@ public static Observable mergeDelayError(List> source) { * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable> source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1276,7 +1313,7 @@ public static Observable mergeDelayError(Observable> source * @see MSDN: Observable.Merge Method */ public static Observable mergeDelayError(Observable... source) { - return _create(OperationMergeDelayError.mergeDelayError(source)); + return create(OperationMergeDelayError.mergeDelayError(source)); } /** @@ -1317,7 +1354,7 @@ public static Observable never() { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Func1> resumeFunction) { - return _create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); + return create(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction)); } /** @@ -1381,7 +1418,7 @@ public Observable call(Exception e) { * @return the source Observable, with its behavior modified as described */ public static Observable onErrorResumeNext(final Observable that, final Observable resumeSequence) { - return _create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); + return create(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence)); } /** @@ -1405,7 +1442,7 @@ public static Observable onErrorResumeNext(final Observable that, fina * @return the source Observable, with its behavior modified as described */ public static Observable onErrorReturn(final Observable that, Func1 resumeFunction) { - return _create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); + return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction)); } /** @@ -1435,7 +1472,7 @@ public static Observable onErrorReturn(final Observable that, Func1Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, accumulator)), 1); + return takeLast(create(OperationScan.scan(sequence, accumulator)), 1); } /** @@ -1507,7 +1544,7 @@ public T call(T t1, T t2) { * @see Wikipedia: Fold (higher-order function) */ public static Observable reduce(Observable sequence, T initialValue, Func2 accumulator) { - return takeLast(_create(OperationScan.scan(sequence, initialValue, accumulator)), 1); + return takeLast(create(OperationScan.scan(sequence, initialValue, accumulator)), 1); } /** @@ -1571,7 +1608,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, Func2 accumulator) { - return _create(OperationScan.scan(sequence, accumulator)); + return create(OperationScan.scan(sequence, accumulator)); } /** @@ -1629,7 +1666,7 @@ public T call(T t1, T t2) { * @see MSDN: Observable.Scan */ public static Observable scan(Observable sequence, T initialValue, Func2 accumulator) { - return _create(OperationScan.scan(sequence, initialValue, accumulator)); + return create(OperationScan.scan(sequence, initialValue, accumulator)); } /** @@ -1669,20 +1706,28 @@ public T call(T t1, T t2) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param sequence an observable sequence whose elements to apply the predicate to. - * @param predicate a function to test each element for a condition. - * @param the type of observable. + * + * @param sequence + * an observable sequence whose elements to apply the predicate to. + * @param predicate + * a function to test each element for a condition. + * @param + * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, final Func1 predicate) { - return _create(OperationAll.all(sequence, predicate)); + return create(OperationAll.all(sequence, predicate)); } /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param sequence an observable sequence whose elements to apply the predicate to. - * @param predicate a function to test each element for a condition. - * @param the type of observable. + * + * @param sequence + * an observable sequence whose elements to apply the predicate to. + * @param predicate + * a function to test each element for a condition. + * @param + * the type of observable. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public static Observable all(final Observable sequence, Object predicate) { @@ -1712,7 +1757,7 @@ public Boolean call(T t) { * @see MSDN: Observable.Skip Method */ public static Observable skip(final Observable items, int num) { - return _create(OperationSkip.skip(items, num)); + return create(OperationSkip.skip(items, num)); } /** @@ -1730,7 +1775,7 @@ public static Observable skip(final Observable items, int num) { * @return an Observable that is a chronologically well-behaved version of the source Observable */ public static Observable synchronize(Observable observable) { - return _create(OperationSynchronize.synchronize(observable)); + return create(OperationSynchronize.synchronize(observable)); } /** @@ -1752,7 +1797,7 @@ public static Observable synchronize(Observable observable) { * Observable */ public static Observable take(final Observable items, final int num) { - return _create(OperationTake.take(items, num)); + return create(OperationTake.take(items, num)); } /** @@ -1768,7 +1813,7 @@ public static Observable take(final Observable items, final int num) { * Observable */ public static Observable takeLast(final Observable items, final int count) { - return _create(OperationTakeLast.takeLast(items, count)); + return create(OperationTakeLast.takeLast(items, count)); } /** @@ -1849,7 +1894,7 @@ public Boolean call(T t, Integer integer) * items emitted by the source Observable */ public static Observable> toList(final Observable that) { - return _create(OperationToObservableList.toObservableList(that)); + return create(OperationToObservableList.toObservableList(that)); } /** @@ -2047,7 +2092,7 @@ private static T singleOrDefault(Observable that, boolean hasDefault, T d * @return an Observable that emits each item in the source Iterable sequence */ public static Observable toObservable(Iterable iterable) { - return _create(OperationToObservableIterable.toObservableIterable(iterable)); + return create(OperationToObservableIterable.toObservableIterable(iterable)); } /** @@ -2066,7 +2111,7 @@ public static Observable toObservable(Iterable iterable) { * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future) { - return _create(OperationToObservableFuture.toObservableFuture(future)); + return create(OperationToObservableFuture.toObservableFuture(future)); } /** @@ -2090,7 +2135,7 @@ public static Observable toObservable(Future future) { * @return an Observable that emits the item from the source Future */ public static Observable toObservable(Future future, long time, TimeUnit unit) { - return _create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); } /** @@ -2123,7 +2168,7 @@ public static Observable toObservable(T... items) { * @return */ public static Observable> toSortedList(Observable sequence) { - return _create(OperationToObservableSortedList.toSortedList(sequence)); + return create(OperationToObservableSortedList.toSortedList(sequence)); } /** @@ -2136,7 +2181,7 @@ public static Observable> toSortedList(Observable sequence) { * @return */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { - return _create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); + return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); } /** @@ -2151,7 +2196,7 @@ public static Observable> toSortedList(Observable sequence, Func2 public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(sortFunction); - return _create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { + return create(OperationToObservableSortedList.toSortedList(sequence, new Func2() { @Override public Integer call(T t1, T t2) { @@ -2186,7 +2231,7 @@ public Integer call(T t1, T t2) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Func2 reduceFunction) { - return _create(OperationZip.zip(w0, w1, reduceFunction)); + return create(OperationZip.zip(w0, w1, reduceFunction)); } /** @@ -2310,7 +2355,7 @@ public R call(T0 t0, T1 t1) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Func3 function) { - return _create(OperationZip.zip(w0, w1, w2, function)); + return create(OperationZip.zip(w0, w1, w2, function)); } /** @@ -2385,7 +2430,7 @@ public R call(T0 t0, T1 t1, T2 t2) { * @return an Observable that emits the zipped results */ public static Observable zip(Observable w0, Observable w1, Observable w2, Observable w3, Func4 reduceFunction) { - return _create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); + return create(OperationZip.zip(w0, w1, w2, w3, reduceFunction)); } /** @@ -2472,7 +2517,7 @@ public Boolean call(T t1) { * Filters an Observable by discarding any of its emissions that do not meet some test. *

* - * + * * @param predicate * a function that evaluates the items emitted by the source Observable, returning * true if they pass the filter @@ -2663,7 +2708,7 @@ public Observable> materialize() { */ @SuppressWarnings("unchecked") public Observable dematerialize() { - return dematerialize((Observable>)this); + return dematerialize((Observable>) this); } /** @@ -3008,7 +3053,9 @@ public Observable scan(final T initialValue, final Object accumulator) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param predicate a function to test each element for a condition. + * + * @param predicate + * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Func1 predicate) { @@ -3017,7 +3064,9 @@ public Observable all(Func1 predicate) { /** * Determines whether all elements of an observable sequence satisfies a condition. - * @param predicate a function to test each element for a condition. + * + * @param predicate + * a function to test each element for a condition. * @return true if all elements of an observable sequence satisfies a condition; otherwise, false. */ public Observable all(Object predicate) { @@ -3262,6 +3311,27 @@ public Iterable mostRecent(T initialValue) { return mostRecent(this, initialValue); } + /** + * Whether a given {@link Function} is an internal implementation inside rx.* packages or not. + *

+ * For why this is being used see https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator" + * + * NOTE: If strong reasons for not depending on package names comes up then the implementation of this method can change to looking for a marker interface. + * + * @param f + * @return + */ + private boolean isInternalImplementation(Object o) { + if (o == null) { + return true; + } + // prevent double-wrapping (yeah it happens) + if (o instanceof AtomicObserver) + return true; + // we treat the following package as "internal" and don't wrap it + return o.getClass().getPackage().getName().startsWith("rx.operators"); + } + public static class UnitTest { @Mock @@ -3556,6 +3626,204 @@ public void testMaterializeDematerializeChaining() { verify(observer, times(0)).onError(any(Exception.class)); } + /** + * The error from the user provided Observer is not handled by the subscribe method try/catch. + * + * It is handled by the AtomicObserver that wraps the provided Observer. + * + * Result: Passes (if AtomicObserver functionality exists) + */ + @Test + public void testCustomObservableWithErrorInObserverAsynchronous() throws InterruptedException { + final CountDownLatch latch = new CountDownLatch(1); + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription s = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + try { + if (!s.isUnsubscribed()) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + } + } finally { + latch.countDown(); + } + } + }).start(); + return s; + } + }).subscribe(new Observer() { + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + }); + + // wait for async sequence to complete + latch.await(); + + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observer is handled by the subscribe try/catch because this is synchronous + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObserverSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + observer.onNext("three"); + observer.onNext("4"); + observer.onCompleted(); + return Subscriptions.empty(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + int num = Integer.parseInt(v); + System.out.println(num); + // doSomething(num); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + /** + * The error from the user provided Observable is handled by the subscribe try/catch because this is synchronous + * + * + * Result: Passes + */ + @Test + public void testCustomObservableWithErrorInObservableSynchronous() { + final AtomicInteger count = new AtomicInteger(); + final AtomicReference error = new AtomicReference(); + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("1"); + observer.onNext("2"); + throw new NumberFormatException(); + } + }).subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("completed"); + } + + @Override + public void onError(Exception e) { + error.set(e); + System.out.println("error"); + e.printStackTrace(); + } + + @Override + public void onNext(String v) { + System.out.println(v); + count.incrementAndGet(); + } + + }); + assertEquals(2, count.get()); + assertNotNull(error.get()); + if (!(error.get() instanceof NumberFormatException)) { + fail("It should be a NumberFormatException"); + } + } + + @Test + public void testForEachWithError() { + try { + Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onCompleted(); + } + }).start(); + return subscription; + } + }).forEach(new Action1() { + + @Override + public void call(String t1) { + throw new RuntimeException("fail"); + } + }); + fail("we expect an exception to be thrown"); + } catch (Exception e) { + // do nothing as we expect this + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTake.java b/rxjava-core/src/main/java/rx/operators/OperationTake.java index 5ea6b627e4..a4b1922e22 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTake.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTake.java @@ -18,7 +18,7 @@ import static org.junit.Assert.*; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import static rx.operators.AbstractOperation.UnitTest.*; +import static rx.operators.Tester.UnitTest.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -281,6 +281,7 @@ public void run() { } } + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index 1bad2d36e5..d00d66c3aa 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -26,11 +26,13 @@ import rx.Observable; import rx.Observer; import rx.Subscription; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; +import rx.util.AtomicObserver; import rx.util.functions.Func1; import rx.util.functions.Func2; -import rx.subjects.Subject; + /** * Returns values from an observable sequence as long as a specified condition is true, and then skips the remaining values. */ @@ -45,7 +47,7 @@ public final class OperationTakeWhile { * @return */ public static Func1, Subscription> takeWhile(final Observable items, final Func1 predicate) { - return takeWhileWithIndex(items, OperationTakeWhile.skipIndex(predicate)); + return takeWhileWithIndex(items, OperationTakeWhile. skipIndex(predicate)); } /** @@ -108,7 +110,10 @@ private class ItemObserver implements Observer { private final Observer observer; public ItemObserver(Observer observer) { - this.observer = observer; + // Using AtomicObserver because the unsubscribe, onCompleted, onError and error handling behavior + // needs "isFinished" logic to not send duplicated events + // The 'testTakeWhile1' and 'testTakeWhile2' tests fail without this. + this.observer = new AtomicObserver(subscription, observer); } @Override @@ -126,8 +131,7 @@ public void onNext(T args) { Boolean isSelected; try { isSelected = predicate.call(args, counter.getAndIncrement()); - } - catch (Exception e) { + } catch (Exception e) { observer.onError(e); return; } @@ -171,7 +175,7 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { Subject s = Subject.create(); - Observable w = (Observable)s; + Observable w = (Observable) s; Observable take = Observable.create(takeWhile(w, new Func1() { @Override diff --git a/rxjava-core/src/main/java/rx/operators/AbstractOperation.java b/rxjava-core/src/main/java/rx/operators/Tester.java similarity index 95% rename from rxjava-core/src/main/java/rx/operators/AbstractOperation.java rename to rxjava-core/src/main/java/rx/operators/Tester.java index dd71bd837a..9692323015 100644 --- a/rxjava-core/src/main/java/rx/operators/AbstractOperation.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -17,11 +17,16 @@ import rx.util.functions.Func1; /** - * Common utility functions for operator implementations and tests. + * Common utility functions for testing operator implementations. */ -/* package */class AbstractOperation -{ - private AbstractOperation() { +/* package */class Tester { + /* + * This is purposefully package-only so it does not leak into the public API outside of this package. + * + * This package is implementation details and not part of the Javadocs and thus can change without breaking backwards compatibility. + */ + + private Tester() { } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/util/AtomicObserver.java b/rxjava-core/src/main/java/rx/util/AtomicObserver.java index b3e18f5b3e..24519dc276 100644 --- a/rxjava-core/src/main/java/rx/util/AtomicObserver.java +++ b/rxjava-core/src/main/java/rx/util/AtomicObserver.java @@ -1,8 +1,10 @@ package rx.util; +import java.util.Arrays; import java.util.concurrent.atomic.AtomicBoolean; import rx.Observer; +import rx.plugins.RxJavaPlugins; /** * Wrapper around Observer to ensure compliance with Rx contract. @@ -32,7 +34,7 @@ *

  • When onError or onComplete occur it will unsubscribe from the Observable (if executing asynchronously).
  • * *

    - * It will not synchronized onNext execution. Use the {@link SynchronizedObserver} to do that. + * It will not synchronize onNext execution. Use the {@link SynchronizedObserver} to do that. * * @param */ @@ -50,7 +52,12 @@ public AtomicObserver(AtomicObservableSubscription subscription, Observer act @Override public void onCompleted() { if (isFinished.compareAndSet(false, true)) { - actual.onCompleted(); + try { + actual.onCompleted(); + } catch (Exception e) { + // handle errors if the onCompleted implementation fails, not just if the Observable fails + onError(e); + } // auto-unsubscribe subscription.unsubscribe(); } @@ -59,7 +66,17 @@ public void onCompleted() { @Override public void onError(Exception e) { if (isFinished.compareAndSet(false, true)) { - actual.onError(e); + try { + actual.onError(e); + } catch (Exception e2) { + // if the onError itself fails then pass to the plugin + // see https://github.com/Netflix/RxJava/issues/216 for further discussion + RxJavaPlugins.getInstance().getErrorHandler().handleError(e); + RxJavaPlugins.getInstance().getErrorHandler().handleError(e2); + // and throw exception despite that not being proper for Rx + // https://github.com/Netflix/RxJava/issues/198 + throw new RuntimeException("Error occurred when trying to propagate error to Observer.onError", new CompositeException(Arrays.asList(e, e2))); + } // auto-unsubscribe subscription.unsubscribe(); }