diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index de0d0fe01e..6449d77ff2 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -52,6 +52,7 @@ import rx.operators.OperationOnErrorResumeNextViaObservable; import rx.operators.OperationOnErrorReturn; import rx.operators.OperationOnExceptionResumeNextViaObservable; +import rx.operators.OperationRetry; import rx.operators.OperationSample; import rx.operators.OperationScan; import rx.operators.OperationSkip; @@ -3128,6 +3129,42 @@ public static Observable averageDoubles(Observable source) { public ConnectableObservable replay() { return OperationMulticast.multicast(this, ReplaySubject. create()); } + + /** + * Retry subscription to origin Observable upto given retry count. + *

+ * If {@link Observer#onError} is invoked the source Observable will be re-subscribed to as many times as defined by retryCount. + *

+ * Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together. + *

+ * For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and + * emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted]. + * + * @param retryCount + * Number of retry attempts before failing. + * @return Observable with retry logic. + */ + public Observable retry(int retryCount) { + return create(OperationRetry.retry(this, retryCount)); + } + + /** + * Retry subscription to origin Observable whenever onError is called (infinite retry count). + *

+ * If {@link Observer#onError} is invoked the source Observable will be re-subscribed to. + *

+ * Any {@link Observer#onNext} calls received on each attempt will be emitted and concatenated together. + *

+ * For example, if an Observable fails on first time but emits [1, 2] then succeeds the second time and + * emits [1, 2, 3, 4, 5] then the complete output would be [1, 2, 1, 2, 3, 4, 5, onCompleted]. + * + * @param retryCount + * Number of retry attempts before failing. + * @return Observable with retry logic. + */ + public Observable retry() { + return create(OperationRetry.retry(this)); + } /** * This method has similar behavior to {@link #replay} except that this auto-subscribes to diff --git a/rxjava-core/src/main/java/rx/operators/OperationRetry.java b/rxjava-core/src/main/java/rx/operators/OperationRetry.java new file mode 100644 index 0000000000..c7007a00a7 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationRetry.java @@ -0,0 +1,203 @@ +package rx.operators; + +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.subscriptions.CompositeSubscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; + +public class OperationRetry { + + private static final int INFINITE_RETRY = -1; + + public static OnSubscribeFunc retry(final Observable observable, final int retryCount) { + return new Retry(observable, retryCount); + } + + public static OnSubscribeFunc retry(final Observable observable) { + return new Retry(observable, INFINITE_RETRY); + } + + private static class Retry implements OnSubscribeFunc { + + private final Observable source; + private final int retryCount; + private final AtomicInteger attempts = new AtomicInteger(0); + private final CompositeSubscription subscription = new CompositeSubscription(); + + public Retry(Observable source, int retryCount) { + this.source = source; + this.retryCount = retryCount; + } + + @Override + public Subscription onSubscribe(Observer observer) { + subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + return subscription; + } + + private Action0 attemptSubscription(final Observer observer) { + return new Action0() { + + @Override + public void call() { + attempts.incrementAndGet(); + source.subscribe(new Observer() { + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + if ((retryCount == INFINITE_RETRY || attempts.get() <= retryCount) && !subscription.isUnsubscribed()) { + // retry again + // remove the last subscription since we have completed (so as we retry we don't build up a huge list) + subscription.removeLast(); + // add the new subscription and schedule a retry + subscription.add(Schedulers.currentThread().schedule(attemptSubscription(observer))); + } else { + // give up and pass the failure + observer.onError(e); + } + } + + @Override + public void onNext(T v) { + observer.onNext(v); + } + }); + + } + }; + } + + } + + public static class UnitTest { + + @Test + public void testOriginFails() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Observable origin = Observable.create(new FuncWithErrors(2)); + origin.subscribe(observer); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(1)).onNext("beginningEveryTime"); + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + inOrder.verify(observer, never()).onNext("onSuccessOnly"); + inOrder.verify(observer, never()).onCompleted(); + } + + @Test + public void testRetryFail() { + int NUM_RETRIES = 1; + int NUM_FAILURES = 2; + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Observable origin = Observable.create(new FuncWithErrors(NUM_FAILURES)); + Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer); + + InOrder inOrder = inOrder(observer); + // should show 2 attempts (first time fail, second time (1st retry) fail) + inOrder.verify(observer, times(1 + NUM_RETRIES)).onNext("beginningEveryTime"); + // should only retry once, fail again and emit onError + inOrder.verify(observer, times(1)).onError(any(RuntimeException.class)); + // no success + inOrder.verify(observer, never()).onNext("onSuccessOnly"); + inOrder.verify(observer, never()).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testRetrySuccess() { + int NUM_RETRIES = 3; + int NUM_FAILURES = 2; + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Observable origin = Observable.create(new FuncWithErrors(NUM_FAILURES)); + Observable.create(retry(origin, NUM_RETRIES)).subscribe(observer); + + InOrder inOrder = inOrder(observer); + // should show 3 attempts + inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime"); + // should have no errors + inOrder.verify(observer, never()).onError(any(Throwable.class)); + // should have a single success + inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); + // should have a single successful onCompleted + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testInfiniteRetry() { + int NUM_FAILURES = 20; + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + Observable origin = Observable.create(new FuncWithErrors(NUM_FAILURES)); + Observable.create(retry(origin)).subscribe(observer); + + InOrder inOrder = inOrder(observer); + // should show 3 attempts + inOrder.verify(observer, times(1 + NUM_FAILURES)).onNext("beginningEveryTime"); + // should have no errors + inOrder.verify(observer, never()).onError(any(Throwable.class)); + // should have a single success + inOrder.verify(observer, times(1)).onNext("onSuccessOnly"); + // should have a single successful onCompleted + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + public static class FuncWithErrors implements OnSubscribeFunc { + + private final int numFailures; + private final AtomicInteger count = new AtomicInteger(0); + + FuncWithErrors(int count) { + this.numFailures = count; + } + + @Override + public Subscription onSubscribe(Observer o) { + o.onNext("beginningEveryTime"); + if (count.incrementAndGet() <= numFailures) { + o.onError(new RuntimeException("forced failure: " + count.get())); + } else { + o.onNext("onSuccessOnly"); + o.onCompleted(); + } + return Subscriptions.empty(); + } + }; + } +} diff --git a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java index 873176c8d4..330657cd5d 100644 --- a/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java +++ b/rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java @@ -20,7 +20,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; -import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.LinkedBlockingDeque; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; @@ -42,7 +42,7 @@ public class CompositeSubscription implements Subscription { * TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach */ private AtomicBoolean unsubscribed = new AtomicBoolean(false); - private final ConcurrentLinkedQueue subscriptions = new ConcurrentLinkedQueue(); + private final LinkedBlockingDeque subscriptions = new LinkedBlockingDeque(); public CompositeSubscription(List subscriptions) { this.subscriptions.addAll(subscriptions); @@ -66,6 +66,15 @@ public synchronized void add(Subscription s) { } } + /** + * Remove the last Subscription that was added. + * + * @return Subscription or null if none exists + */ + public synchronized Subscription removeLast() { + return subscriptions.pollLast(); + } + @Override public synchronized void unsubscribe() { if (unsubscribed.compareAndSet(false, true)) {