From dfc784126f259361f01f1927f44f5d1aa4e49a43 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Thu, 14 Mar 2013 14:53:33 +0200 Subject: [PATCH 01/25] Naive schedulers implementation --- rxjava-core/src/main/java/rx/Observable.java | 43 ++++------ rxjava-core/src/main/java/rx/Scheduler.java | 20 +++++ .../rx/concurrency/AbstractScheduler.java | 43 ++++++++++ .../concurrency/CurrentThreadScheduler.java | 83 +++++++++++++++++++ .../java/rx/concurrency/DelayedAction.java | 35 ++++++++ .../rx/concurrency/DiscardableAction.java | 35 ++++++++ .../rx/concurrency/ExecutorScheduler.java | 29 +++++++ .../rx/concurrency/ImmediateScheduler.java | 22 +++++ .../rx/concurrency/NewThreadScheduler.java | 30 +++++++ .../main/java/rx/concurrency/Schedulers.java | 27 ++++++ .../rx/observables/ScheduledObserver.java | 45 ++++++++++ .../java/rx/operators/OperationObserveOn.java | 71 ++++++++++++++++ .../rx/operators/OperationSubscribeOn.java | 56 +++++++++++++ 13 files changed, 513 insertions(+), 26 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/Scheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/DelayedAction.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java create mode 100644 rxjava-core/src/main/java/rx/concurrency/Schedulers.java create mode 100644 rxjava-core/src/main/java/rx/observables/ScheduledObserver.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperationObserveOn.java create mode 100644 rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e9015b0379..8a4a254366 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -36,32 +36,7 @@ import org.mockito.MockitoAnnotations; import rx.observables.GroupedObservable; -import rx.operators.OperationConcat; -import rx.operators.OperationDefer; -import rx.operators.OperationDematerialize; -import rx.operators.OperationFilter; -import rx.operators.OperationMap; -import rx.operators.OperationMaterialize; -import rx.operators.OperationMerge; -import rx.operators.OperationMergeDelayError; -import rx.operators.OperationMostRecent; -import rx.operators.OperationNext; -import rx.operators.OperationOnErrorResumeNextViaFunction; -import rx.operators.OperationOnErrorResumeNextViaObservable; -import rx.operators.OperationOnErrorReturn; -import rx.operators.OperationScan; -import rx.operators.OperationSkip; -import rx.operators.OperationSynchronize; -import rx.operators.OperationTake; -import rx.operators.OperationTakeLast; -import rx.operators.OperationToObservableFuture; -import rx.operators.OperationToObservableIterable; -import rx.operators.OperationToObservableList; -import rx.operators.OperationToObservableSortedList; -import rx.operators.OperationZip; -import rx.operators.OperatorGroupBy; -import rx.operators.OperatorTakeUntil; -import rx.operators.OperatorToIterator; +import rx.operators.*; import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaPlugins; import rx.subscriptions.Subscriptions; @@ -766,6 +741,14 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + public static Observable subscribeOn(Observable source, Scheduler scheduler) { + return _create(OperationSubscribeOn.subscribeOn(source, scheduler)); + } + + public static Observable observeOn(Observable source, Scheduler scheduler) { + return _create(OperationObserveOn.observeOn(source, scheduler)); + } + /** * Returns an observable sequence that invokes the observable factory whenever a new observer subscribes. * The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer @@ -2589,6 +2572,14 @@ public Observable> materialize() { return materialize(this); } + public Observable subscribeOn(Scheduler scheduler) { + return subscribeOn(this, scheduler); + } + + public Observable observeOn(Scheduler scheduler) { + return observeOn(this, scheduler); + } + /** * Dematerializes the explicit notification values of an observable sequence as implicit notifications. * diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java new file mode 100644 index 0000000000..ea6a234dca --- /dev/null +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -0,0 +1,20 @@ +package rx; + +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public interface Scheduler { + + Subscription schedule(Action0 action); + + Subscription schedule(Func0 action); + + Subscription schedule(Action0 action, long timespan, TimeUnit unit); + + Subscription schedule(Func0 action, long timespan, TimeUnit unit); + + long now(); + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java new file mode 100644 index 0000000000..d4620cb482 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -0,0 +1,43 @@ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public abstract class AbstractScheduler implements Scheduler { + + @Override + public Subscription schedule(Action0 action) { + return schedule(asFunc0(action)); + } + + @Override + public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { + return schedule(asFunc0(action), timespan, unit); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + return schedule(new DelayedAction(action, this, timespan, unit)); + } + + @Override + public long now() { + return System.nanoTime(); + } + + private static Func0 asFunc0(final Action0 action) { + return new Func0() { + @Override + public Subscription call() { + action.call(); + return Subscriptions.empty(); + } + }; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java new file mode 100644 index 0000000000..e15290e536 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -0,0 +1,83 @@ +package rx.concurrency; + +import org.junit.Test; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.LinkedList; +import java.util.Queue; +import java.util.concurrent.TimeUnit; + +public class CurrentThreadScheduler extends AbstractScheduler { + private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + public static CurrentThreadScheduler getInstance() { + return INSTANCE; + } + + private static final ThreadLocal> QUEUE = new ThreadLocal>(); + + private CurrentThreadScheduler() { + } + + @Override + public Subscription schedule(Func0 action) { + DiscardableAction discardableAction = new DiscardableAction(action); + enqueue(discardableAction); + return discardableAction; + } + + private void enqueue(DiscardableAction action) { + Queue queue = QUEUE.get(); + boolean exec = false; + + if (queue == null) { + queue = new LinkedList(); + QUEUE.set(queue); + exec = true; + } + + queue.add(action); + + while (exec && !queue.isEmpty()) { + queue.poll().call(); + } + } + + public static class UnitTest { + + @Test + public void testScheduler() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + System.out.println("First action start"); + System.out.println("First action end"); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + System.out.println("Second action start"); + scheduler.schedule(firstAction); + System.out.println("Second action end"); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + System.out.println("Third action start"); + scheduler.schedule(secondAction); + System.out.println("Third action end"); + } + }; + + scheduler.schedule(thirdAction); + } + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java b/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java new file mode 100644 index 0000000000..d83004ae34 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java @@ -0,0 +1,35 @@ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class DelayedAction implements Func0 { + private final Func0 underlying; + private final Scheduler scheduler; + private final long execTime; + + public DelayedAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + this.underlying = underlying; + this.scheduler = scheduler; + this.execTime = scheduler.now() + timeUnit.toMillis(timespan); + } + + @Override + public Subscription call() { + if (execTime < scheduler.now()) { + try { + Thread.sleep(scheduler.now() - execTime); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + throw new RuntimeException(e); + } + } + + return underlying.call(); + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java new file mode 100644 index 0000000000..e9d4ee8379 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -0,0 +1,35 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.AtomicObservableSubscription; +import rx.util.functions.Func0; + +import java.util.concurrent.atomic.AtomicBoolean; + +public class DiscardableAction implements Func0, Subscription { + private final Func0 underlying; + + private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); + private final AtomicBoolean ready = new AtomicBoolean(true); + + public DiscardableAction(Func0 underlying) { + this.underlying = underlying; + } + + @Override + public Subscription call() { + if (ready.compareAndSet(true, false)) { + Subscription subscription = underlying.call(); + wrapper.wrap(subscription); + return subscription; + } + return wrapper; + } + + @Override + public void unsubscribe() { + ready.set(false); + wrapper.unsubscribe(); + } +} + diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java new file mode 100644 index 0000000000..e27d6fea81 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -0,0 +1,29 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.Executor; + +public class ExecutorScheduler extends AbstractScheduler { + private final Executor executor; + + public ExecutorScheduler(Executor executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + executor.execute(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + return discardableAction; + + } +} diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java new file mode 100644 index 0000000000..209bf964fd --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -0,0 +1,22 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +public final class ImmediateScheduler extends AbstractScheduler { + private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); + + private ImmediateScheduler() { + + } + + public static ImmediateScheduler getInstance() { + return INSTANCE; + } + + @Override + public Subscription schedule(Func0 action) { + return new DiscardableAction(action); + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java new file mode 100644 index 0000000000..deba7dec1e --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -0,0 +1,30 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +public class NewThreadScheduler extends AbstractScheduler { + private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); + + public static NewThreadScheduler getInstance() { + return INSTANCE; + } + + + @Override + public Subscription schedule(Func0 action) { + final DiscardableAction discardableAction = new DiscardableAction(action); + + Thread t = new Thread(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }); + + t.start(); + + return discardableAction; + } + +} diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java new file mode 100644 index 0000000000..dee9327c04 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -0,0 +1,27 @@ +package rx.concurrency; + +import rx.Scheduler; + +import java.util.concurrent.Executor; + +public class Schedulers { + private Schedulers() { + + } + + public static Scheduler immediate() { + return ImmediateScheduler.getInstance(); + } + + public static Scheduler currentThread() { + return CurrentThreadScheduler.getInstance(); + } + + public static Scheduler newThread() { + return NewThreadScheduler.getInstance(); + } + + public static Scheduler executor(Executor executor) { + return new ExecutorScheduler(executor); + } +} diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java new file mode 100644 index 0000000000..90025d6db4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java @@ -0,0 +1,45 @@ +package rx.observables; + +import rx.Observer; +import rx.Scheduler; +import rx.util.functions.Action0; + +public class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onNext(T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java new file mode 100644 index 0000000000..7ec96bc3f1 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -0,0 +1,71 @@ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationObserveOn { + + public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { + return new ObserveOn(source, scheduler); + } + + private static class ObserveOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public ObserveOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return source.subscribe(new ScheduledObserver(observer, scheduler)); + } + } + + private static class ScheduledObserver implements Observer { + private final Observer underlying; + private final Scheduler scheduler; + + public ScheduledObserver(Observer underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onError(Exception e) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + + @Override + public void onNext(T args) { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.onCompleted(); + } + }); + } + } +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java new file mode 100644 index 0000000000..b16c0263d0 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -0,0 +1,56 @@ +package rx.operators; + +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; +import rx.util.functions.Func1; + +public class OperationSubscribeOn { + + public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { + return new SubscribeOn(source, scheduler); + } + + private static class SubscribeOn implements Func1, Subscription> { + private final Observable source; + private final Scheduler scheduler; + + public SubscribeOn(Observable source, Scheduler scheduler) { + this.source = source; + this.scheduler = scheduler; + } + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + @Override + public Subscription call() { + return new ScheduledSubscription(source.subscribe(observer), scheduler); + } + }); + } + } + + private static class ScheduledSubscription implements Subscription { + private final Subscription underlying; + private final Scheduler scheduler; + + private ScheduledSubscription(Subscription underlying, Scheduler scheduler) { + this.underlying = underlying; + this.scheduler = scheduler; + } + + @Override + public void unsubscribe() { + scheduler.schedule(new Action0() { + @Override + public void call() { + underlying.unsubscribe(); + } + }); + } + } +} From 0aa6ca2f4d21eefdcffa17a4da0449744bb14742 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:01:06 +0200 Subject: [PATCH 02/25] Added ScheduledExecutorServiceScheduler --- .../ScheduledExecutorServiceScheduler.java | 34 +++++++++++++++++++ 1 file changed, 34 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java new file mode 100644 index 0000000000..1f788e9c8d --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -0,0 +1,34 @@ +package rx.concurrency; + +import rx.Subscription; +import rx.util.functions.Func0; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + +// TODO [@mairbek] silly name +public class ScheduledExecutorServiceScheduler extends AbstractScheduler { + private final ScheduledExecutorService executorService; + + public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) { + this.executorService = executorService; + } + + @Override + public Subscription schedule(Func0 action) { + return schedule(action, 0, TimeUnit.MILLISECONDS); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(action); + executorService.schedule(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, timespan, unit); + return discardableAction; + } + +} From 1896da37a91e4d1c1451847decfa9f3b48cb12bc Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:02:23 +0200 Subject: [PATCH 03/25] Added to Schedulers --- rxjava-core/src/main/java/rx/concurrency/Schedulers.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index dee9327c04..d198d8cc98 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -3,6 +3,7 @@ import rx.Scheduler; import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; public class Schedulers { private Schedulers() { @@ -24,4 +25,8 @@ public static Scheduler newThread() { public static Scheduler executor(Executor executor) { return new ExecutorScheduler(executor); } + + public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { + return new ScheduledExecutorServiceScheduler(executor); + } } From 9eb111e1c7bc65c994768a6ed8afd7b9842662bd Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 17:57:49 +0200 Subject: [PATCH 04/25] More tests --- .../rx/concurrency/AbstractScheduler.java | 2 +- .../concurrency/CurrentThreadScheduler.java | 66 +++++++++++++---- .../rx/concurrency/ImmediateScheduler.java | 73 +++++++++++++++++-- ...DelayedAction.java => SleepingAction.java} | 4 +- 4 files changed, 123 insertions(+), 22 deletions(-) rename rxjava-core/src/main/java/rx/concurrency/{DelayedAction.java => SleepingAction.java} (82%) diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index d4620cb482..34ad11ab0e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -22,7 +22,7 @@ public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { @Override public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return schedule(new DelayedAction(action, this, timespan, unit)); + return schedule(new SleepingAction(action, this, timespan, unit)); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index e15290e536..be2659d060 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,17 +1,19 @@ package rx.concurrency; import org.junit.Test; -import rx.Scheduler; +import org.mockito.InOrder; import rx.Subscription; import rx.util.functions.Action0; import rx.util.functions.Func0; import java.util.LinkedList; import java.util.Queue; -import java.util.concurrent.TimeUnit; + +import static org.mockito.Mockito.*; public class CurrentThreadScheduler extends AbstractScheduler { private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); + public static CurrentThreadScheduler getInstance() { return INSTANCE; } @@ -30,53 +32,89 @@ public Subscription schedule(Func0 action) { private void enqueue(DiscardableAction action) { Queue queue = QUEUE.get(); - boolean exec = false; + boolean exec = queue == null; - if (queue == null) { + if (exec) { queue = new LinkedList(); QUEUE.set(queue); - exec = true; } queue.add(action); - while (exec && !queue.isEmpty()) { - queue.poll().call(); + if (exec) { + while (!queue.isEmpty()) { + queue.poll().call(); + } + + QUEUE.set(null); } } public static class UnitTest { @Test - public void testScheduler() { + public void testOrdering() { final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + final Action0 firstAction = new Action0() { @Override public void call() { - System.out.println("First action start"); - System.out.println("First action end"); + firstStepStart.call(); + firstStepEnd.call(); } }; final Action0 secondAction = new Action0() { @Override public void call() { - System.out.println("Second action start"); + secondStepStart.call(); scheduler.schedule(firstAction); - System.out.println("Second action end"); + secondStepEnd.call(); } }; final Action0 thirdAction = new Action0() { @Override public void call() { - System.out.println("Third action start"); + thirdStepStart.call(); scheduler.schedule(secondAction); - System.out.println("Third action end"); + thirdStepEnd.call(); } }; + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + } + + @Test + public void testSequenceOfActions() { + final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); + + final Action0 first = mock(Action0.class); + final Action0 second = mock(Action0.class); + + scheduler.schedule(first); + scheduler.schedule(second); + + verify(first, times(1)).call(); + verify(second, times(1)).call(); + } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 209bf964fd..b49f6b3352 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,22 +1,85 @@ package rx.concurrency; +import org.junit.Test; +import org.mockito.InOrder; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; + public final class ImmediateScheduler extends AbstractScheduler { private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); - private ImmediateScheduler() { - - } - public static ImmediateScheduler getInstance() { return INSTANCE; } + private ImmediateScheduler() { + } + @Override public Subscription schedule(Func0 action) { - return new DiscardableAction(action); + action.call(); + return Subscriptions.empty(); } + public static class UnitTest { + + @Test + public void testOrdering() { + final ImmediateScheduler scheduler = new ImmediateScheduler(); + + final Action0 firstStepStart = mock(Action0.class); + final Action0 firstStepEnd = mock(Action0.class); + + final Action0 secondStepStart = mock(Action0.class); + final Action0 secondStepEnd = mock(Action0.class); + + final Action0 thirdStepStart = mock(Action0.class); + final Action0 thirdStepEnd = mock(Action0.class); + + final Action0 firstAction = new Action0() { + @Override + public void call() { + firstStepStart.call(); + firstStepEnd.call(); + } + }; + final Action0 secondAction = new Action0() { + @Override + public void call() { + secondStepStart.call(); + scheduler.schedule(firstAction); + secondStepEnd.call(); + + } + }; + final Action0 thirdAction = new Action0() { + @Override + public void call() { + thirdStepStart.call(); + scheduler.schedule(secondAction); + thirdStepEnd.call(); + } + }; + + InOrder inOrder = inOrder(firstStepStart, firstStepEnd, secondStepStart, secondStepEnd, thirdStepStart, thirdStepEnd); + + scheduler.schedule(thirdAction); + + inOrder.verify(thirdStepStart, times(1)).call(); + inOrder.verify(secondStepStart, times(1)).call(); + inOrder.verify(firstStepStart, times(1)).call(); + inOrder.verify(firstStepEnd, times(1)).call(); + inOrder.verify(secondStepEnd, times(1)).call(); + inOrder.verify(thirdStepEnd, times(1)).call(); + } + + } + + } diff --git a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java similarity index 82% rename from rxjava-core/src/main/java/rx/concurrency/DelayedAction.java rename to rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index d83004ae34..ce10fb1903 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DelayedAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -7,12 +7,12 @@ import java.util.concurrent.TimeUnit; -public class DelayedAction implements Func0 { +public class SleepingAction implements Func0 { private final Func0 underlying; private final Scheduler scheduler; private final long execTime; - public DelayedAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { + public SleepingAction(Func0 underlying, Scheduler scheduler, long timespan, TimeUnit timeUnit) { this.underlying = underlying; this.scheduler = scheduler; this.execTime = scheduler.now() + timeUnit.toMillis(timespan); From 86a750c76d98eafb14d8f03734bf719bbb0d95b9 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 18:31:57 +0200 Subject: [PATCH 05/25] Headers --- rxjava-core/src/main/java/rx/Scheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/AbstractScheduler.java | 15 +++++++++++++++ .../rx/concurrency/CurrentThreadScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/DiscardableAction.java | 15 +++++++++++++++ .../java/rx/concurrency/ExecutorScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/ImmediateScheduler.java | 15 +++++++++++++++ .../java/rx/concurrency/NewThreadScheduler.java | 15 +++++++++++++++ .../ScheduledExecutorServiceScheduler.java | 15 +++++++++++++++ .../src/main/java/rx/concurrency/Schedulers.java | 15 +++++++++++++++ .../main/java/rx/concurrency/SleepingAction.java | 15 +++++++++++++++ .../java/rx/operators/OperationObserveOn.java | 15 +++++++++++++++ .../java/rx/operators/OperationSubscribeOn.java | 15 +++++++++++++++ 12 files changed, 180 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index ea6a234dca..06b69fe2ff 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 34ad11ab0e..63ed7a6a60 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index be2659d060..b91a22ab53 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import org.junit.Test; diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index e9d4ee8379..632ec69b1a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index e27d6fea81..88aabc46eb 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index b49f6b3352..59908e4e0c 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import org.junit.Test; diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index deba7dec1e..a78c9633f1 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index 1f788e9c8d..c8a61d7292 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Subscription; diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index d198d8cc98..61b51d070d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index ce10fb1903..02823436d4 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.concurrency; import rx.Scheduler; diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 7ec96bc3f1..85228f2cff 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import rx.Observable; diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index b16c0263d0..b59dd5f37d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -1,3 +1,18 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ package rx.operators; import rx.Observable; From df09fcb6a53b70a828698fbad71e681edbcdc7f4 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 18:52:25 +0200 Subject: [PATCH 06/25] ObserveOn/SubscribeOn unit tests --- .../rx/concurrency/ForwardingScheduler.java | 56 +++++++++++++++++++ .../rx/concurrency/ImmediateScheduler.java | 5 +- .../main/java/rx/concurrency/Schedulers.java | 4 ++ .../java/rx/operators/OperationObserveOn.java | 39 +++++++++++-- .../rx/operators/OperationSubscribeOn.java | 33 ++++++++++- 5 files changed, 129 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java new file mode 100644 index 0000000000..2714808766 --- /dev/null +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -0,0 +1,56 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +import java.util.concurrent.TimeUnit; + +public class ForwardingScheduler implements Scheduler { + private final Scheduler underlying; + + public ForwardingScheduler(Scheduler underlying) { + this.underlying = underlying; + } + + @Override + public Subscription schedule(Action0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Func0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { + return underlying.schedule(action, timespan, unit); + } + + @Override + public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + return underlying.schedule(action, timespan, unit); + } + + @Override + public long now() { + return underlying.now(); + } +} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 59908e4e0c..e54d178ae0 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -38,8 +38,9 @@ private ImmediateScheduler() { @Override public Subscription schedule(Func0 action) { - action.call(); - return Subscriptions.empty(); + DiscardableAction discardableAction = new DiscardableAction(action); + discardableAction.call(); + return discardableAction; } public static class UnitTest { diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 61b51d070d..9f5ff2065d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -44,4 +44,8 @@ public static Scheduler executor(Executor executor) { public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { return new ScheduledExecutorServiceScheduler(executor); } + + public static Scheduler forwardingScheduler(Scheduler underlying) { + return new ForwardingScheduler(underlying); + } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 85228f2cff..4e6e14bb85 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -15,14 +15,20 @@ */ package rx.operators; +import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; -import rx.util.functions.Func0; import rx.util.functions.Func1; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + public class OperationObserveOn { public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { @@ -64,23 +70,46 @@ public void call() { } @Override - public void onError(Exception e) { + public void onError(final Exception e) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onError(e); } }); } @Override - public void onNext(T args) { + public void onNext(final T args) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onNext(args); } }); } } + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testObserveOn() { + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); + + verify(scheduler, times(4)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + } diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index b59dd5f37d..104a134657 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -15,14 +15,19 @@ */ package rx.operators; +import org.junit.Test; import rx.Observable; import rx.Observer; import rx.Scheduler; import rx.Subscription; +import rx.concurrency.Schedulers; import rx.util.functions.Action0; import rx.util.functions.Func0; import rx.util.functions.Func1; +import static org.mockito.Matchers.any; +import static org.mockito.Mockito.*; + public class OperationSubscribeOn { public static Func1, Subscription> subscribeOn(Observable source, Scheduler scheduler) { @@ -68,4 +73,30 @@ public void call() { }); } } -} + + public static class UnitTest { + + @Test + @SuppressWarnings("unchecked") + public void testSubscribeOn() { + Observable w = Observable.toObservable(1, 2, 3); + + Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + + Observer observer = mock(Observer.class); + Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); + + verify(scheduler, times(1)).schedule(any(Func0.class)); + subscription.unsubscribe(); + verify(scheduler, times(1)).schedule(any(Action0.class)); + verifyNoMoreInteractions(scheduler); + + verify(observer, times(1)).onNext(1); + verify(observer, times(1)).onNext(2); + verify(observer, times(1)).onNext(3); + verify(observer, times(1)).onCompleted(); + } + + } + +} \ No newline at end of file From 2d1c45da58b09063ce19863bffcf7866e5fb4300 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Mon, 18 Mar 2013 19:41:25 +0200 Subject: [PATCH 07/25] Some documentation --- rxjava-core/src/main/java/rx/Observable.java | 28 +++++++++++++++++++ .../ScheduledExecutorServiceScheduler.java | 1 - 2 files changed, 28 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 8a4a254366..bbbbd10d02 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -741,10 +741,26 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @param the type of observable. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ public static Observable subscribeOn(Observable source, Scheduler scheduler) { return _create(OperationSubscribeOn.subscribeOn(source, scheduler)); } + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param source the source observable. + * @param scheduler the scheduler to notify observers on. + * @param the type of observable. + * @return the source sequence whose observations happen on the specified scheduler. + */ public static Observable observeOn(Observable source, Scheduler scheduler) { return _create(OperationObserveOn.observeOn(source, scheduler)); } @@ -2572,10 +2588,22 @@ public Observable> materialize() { return materialize(this); } + /** + * Asynchronously subscribes and unsubscribes observers on the specified scheduler. + * + * @param scheduler the scheduler to perform subscription and unsubscription actions on. + * @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler. + */ public Observable subscribeOn(Scheduler scheduler) { return subscribeOn(this, scheduler); } + /** + * Asynchronously notify observers on the specified scheduler. + * + * @param scheduler the scheduler to notify observers on. + * @return the source sequence whose observations happen on the specified scheduler. + */ public Observable observeOn(Scheduler scheduler) { return observeOn(this, scheduler); } diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index c8a61d7292..a37f51edac 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -21,7 +21,6 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; -// TODO [@mairbek] silly name public class ScheduledExecutorServiceScheduler extends AbstractScheduler { private final ScheduledExecutorService executorService; From db9f9a60afdfe3b3a4f8a9e2d0e085b5e9629847 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:09:36 +0200 Subject: [PATCH 08/25] Documenting code --- rxjava-core/src/main/java/rx/Scheduler.java | 38 +++++++++++++++++-- .../rx/concurrency/AbstractScheduler.java | 8 ++-- .../rx/concurrency/ForwardingScheduler.java | 8 ++-- .../ScheduledExecutorServiceScheduler.java | 4 +- 4 files changed, 44 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 06b69fe2ff..b4d2c5d471 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -20,16 +20,46 @@ import java.util.concurrent.TimeUnit; +/** + * Represents an object that schedules units of work. + */ public interface Scheduler { - Subscription schedule(Action0 action); - + /** + * Schedules a cancelable action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ Subscription schedule(Func0 action); - Subscription schedule(Action0 action, long timespan, TimeUnit unit); + /** + * Schedules an action to be executed. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action); + + /** + * Schedules an action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Action0 action, long dueTime, TimeUnit unit); - Subscription schedule(Func0 action, long timespan, TimeUnit unit); + /** + * Schedules a cancelable action to be executed in dueTime. + * + * @param action action + * @return a subscription to be able to unsubscribe from action. + */ + Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + /** + * Returns the scheduler's notion of current time. + */ long now(); } diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 63ed7a6a60..8ad8b436a0 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -31,13 +31,13 @@ public Subscription schedule(Action0 action) { } @Override - public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { - return schedule(asFunc0(action), timespan, unit); + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return schedule(asFunc0(action), dueTime, unit); } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return schedule(new SleepingAction(action, this, timespan, unit)); + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java index 2714808766..a32f72aa9f 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -40,13 +40,13 @@ public Subscription schedule(Func0 action) { } @Override - public Subscription schedule(Action0 action, long timespan, TimeUnit unit) { - return underlying.schedule(action, timespan, unit); + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { - return underlying.schedule(action, timespan, unit); + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index a37f51edac..0558d6fa26 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -34,14 +34,14 @@ public Subscription schedule(Func0 action) { } @Override - public Subscription schedule(Func0 action, long timespan, TimeUnit unit) { + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(action); executorService.schedule(new Runnable() { @Override public void run() { discardableAction.call(); } - }, timespan, unit); + }, dueTime, unit); return discardableAction; } From eaa0316231ac0b8233d49f77a7ba4f85ea5d9537 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:10:36 +0200 Subject: [PATCH 09/25] renamed tests --- .../src/main/java/rx/concurrency/CurrentThreadScheduler.java | 2 +- .../src/main/java/rx/concurrency/ImmediateScheduler.java | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index b91a22ab53..3b60f631ae 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -68,7 +68,7 @@ private void enqueue(DiscardableAction action) { public static class UnitTest { @Test - public void testOrdering() { + public void testNestedActions() { final CurrentThreadScheduler scheduler = new CurrentThreadScheduler(); final Action0 firstStepStart = mock(Action0.class); diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index e54d178ae0..59e52078f3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -46,7 +46,7 @@ public Subscription schedule(Func0 action) { public static class UnitTest { @Test - public void testOrdering() { + public void testNestedActions() { final ImmediateScheduler scheduler = new ImmediateScheduler(); final Action0 firstStepStart = mock(Action0.class); From 81ee35dcf7cbbd06c8b5c932543f3d23f407b857 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Mar 2013 20:21:24 +0200 Subject: [PATCH 10/25] Extracted ScheduledObserver as a separate class --- .../rx/observables/ScheduledObserver.java | 8 ++-- .../java/rx/operators/OperationObserveOn.java | 41 +------------------ 2 files changed, 5 insertions(+), 44 deletions(-) diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java index 90025d6db4..ec38b90d0e 100644 --- a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java @@ -24,21 +24,21 @@ public void call() { } @Override - public void onError(Exception e) { + public void onError(final Exception e) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onError(e); } }); } @Override - public void onNext(T args) { + public void onNext(final T args) { scheduler.schedule(new Action0() { @Override public void call() { - underlying.onCompleted(); + underlying.onNext(args); } }); } diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 4e6e14bb85..16eed89270 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -21,6 +21,7 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; +import rx.observables.ScheduledObserver; import rx.util.functions.Action0; import rx.util.functions.Func1; @@ -50,46 +51,6 @@ public Subscription call(final Observer observer) { } } - private static class ScheduledObserver implements Observer { - private final Observer underlying; - private final Scheduler scheduler; - - public ScheduledObserver(Observer underlying, Scheduler scheduler) { - this.underlying = underlying; - this.scheduler = scheduler; - } - - @Override - public void onCompleted() { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onCompleted(); - } - }); - } - - @Override - public void onError(final Exception e) { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onError(e); - } - }); - } - - @Override - public void onNext(final T args) { - scheduler.schedule(new Action0() { - @Override - public void call() { - underlying.onNext(args); - } - }); - } - } - public static class UnitTest { @Test From bd32659611c6546e45e96b2896c99400ffa6a1d3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 2 Apr 2013 21:06:08 +0300 Subject: [PATCH 11/25] Simplified ImmediateScheduler --- .../src/main/java/rx/concurrency/ImmediateScheduler.java | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 59e52078f3..27f9b88e48 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -38,9 +38,7 @@ private ImmediateScheduler() { @Override public Subscription schedule(Func0 action) { - DiscardableAction discardableAction = new DiscardableAction(action); - discardableAction.call(); - return discardableAction; + return action.call(); } public static class UnitTest { From b24b42f819b6c11b2d652a985ea8816bdc4a285c Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Thu, 4 Apr 2013 00:11:20 +0300 Subject: [PATCH 12/25] Removed SleepingAction from Abstract scheduler. ExecutorScheduler throws exception for the delayed action. --- .../src/main/java/rx/concurrency/AbstractScheduler.java | 5 ----- .../main/java/rx/concurrency/CurrentThreadScheduler.java | 6 ++++++ .../src/main/java/rx/concurrency/ExecutorScheduler.java | 6 ++++++ .../src/main/java/rx/concurrency/ImmediateScheduler.java | 8 +++++++- .../src/main/java/rx/concurrency/NewThreadScheduler.java | 7 +++++++ rxjava-core/src/main/java/rx/concurrency/Schedulers.java | 2 +- 6 files changed, 27 insertions(+), 7 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 8ad8b436a0..a056f72152 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -35,11 +35,6 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { return schedule(asFunc0(action), dueTime, unit); } - @Override - public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - return schedule(new SleepingAction(action, this, dueTime, unit)); - } - @Override public long now() { return System.nanoTime(); diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index 3b60f631ae..e1f253d181 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -23,6 +23,7 @@ import java.util.LinkedList; import java.util.Queue; +import java.util.concurrent.TimeUnit; import static org.mockito.Mockito.*; @@ -45,6 +46,11 @@ public Subscription schedule(Func0 action) { return discardableAction; } + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + private void enqueue(DiscardableAction action) { Queue queue = QUEUE.get(); boolean exec = queue == null; diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 88aabc46eb..e31535c91a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -19,6 +19,7 @@ import rx.util.functions.Func0; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; public class ExecutorScheduler extends AbstractScheduler { private final Executor executor; @@ -41,4 +42,9 @@ public void run() { return discardableAction; } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + throw new IllegalStateException("Delayed scheduling is not supported"); + } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 27f9b88e48..9dcd0d4f65 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -18,10 +18,11 @@ import org.junit.Test; import org.mockito.InOrder; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; +import java.util.concurrent.TimeUnit; + import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.times; @@ -41,6 +42,11 @@ public Subscription schedule(Func0 action) { return action.call(); } + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + public static class UnitTest { @Test diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index a78c9633f1..c2393fbe95 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -18,6 +18,8 @@ import rx.Subscription; import rx.util.functions.Func0; +import java.util.concurrent.TimeUnit; + public class NewThreadScheduler extends AbstractScheduler { private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); @@ -42,4 +44,9 @@ public void run() { return discardableAction; } + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return schedule(new SleepingAction(action, this, dueTime, unit)); + } + } diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 9f5ff2065d..bbcfb6e845 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -41,7 +41,7 @@ public static Scheduler executor(Executor executor) { return new ExecutorScheduler(executor); } - public static Scheduler scheduledExecutor(ScheduledExecutorService executor) { + public static Scheduler fromScheduledExecutorService(ScheduledExecutorService executor) { return new ScheduledExecutorServiceScheduler(executor); } From 9cfb294362b9f9258a75acef90ef8a1dd3c82ab9 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Thu, 4 Apr 2013 00:27:13 +0300 Subject: [PATCH 13/25] added threadPoolForComputation and threadPoolForIO schedulers --- .../main/java/rx/concurrency/Schedulers.java | 45 ++++++++++++++++++- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index bbcfb6e845..d5ac25c13a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -17,10 +17,15 @@ import rx.Scheduler; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.*; +import java.util.concurrent.atomic.AtomicInteger; public class Schedulers { + private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); + private static final ScheduledExecutorService IO_EXECUTOR = createIOExecutor(); + private static final int DEFAULT_MAX_IO_THREADS = 10; + private static final int DEFAULT_KEEP_ALIVE_TIME = 10 * 1000; // 10 seconds + private Schedulers() { } @@ -45,7 +50,43 @@ public static Scheduler fromScheduledExecutorService(ScheduledExecutorService ex return new ScheduledExecutorServiceScheduler(executor); } + public static Scheduler threadPoolForComputation() { + return fromScheduledExecutorService(COMPUTATION_EXECUTOR); + } + + public static Scheduler threadPoolForIO() { + return fromScheduledExecutorService(IO_EXECUTOR); + } + public static Scheduler forwardingScheduler(Scheduler underlying) { return new ForwardingScheduler(underlying); } + + private static ScheduledExecutorService createComputationExecutor() { + int cores = Runtime.getRuntime().availableProcessors(); + return Executors.newScheduledThreadPool(cores, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + } + }); + } + + private static ScheduledExecutorService createIOExecutor() { + ScheduledThreadPoolExecutor result = new ScheduledThreadPoolExecutor(DEFAULT_MAX_IO_THREADS, new ThreadFactory() { + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + } + }); + + result.setKeepAliveTime(DEFAULT_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS); + result.allowCoreThreadTimeOut(true); + + return result; + } } From 4510b6e4e1fd22b914eaf8599c5696bbf1fc364c Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 20:17:25 -0700 Subject: [PATCH 14/25] Keeping ScheduledObserver out of public API For now keeping ScheduledObserver an implementation detail until it's clear we want it part of the long-term public API. --- .../java/rx/operators/OperationObserveOn.java | 1 - .../ScheduledObserver.java | 19 +++++++++++++++++-- 2 files changed, 17 insertions(+), 3 deletions(-) rename rxjava-core/src/main/java/rx/{observables => operators}/ScheduledObserver.java (59%) diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 16eed89270..720ca86ed2 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -21,7 +21,6 @@ import rx.Scheduler; import rx.Subscription; import rx.concurrency.Schedulers; -import rx.observables.ScheduledObserver; import rx.util.functions.Action0; import rx.util.functions.Func1; diff --git a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java similarity index 59% rename from rxjava-core/src/main/java/rx/observables/ScheduledObserver.java rename to rxjava-core/src/main/java/rx/operators/ScheduledObserver.java index ec38b90d0e..ab2ba33dbe 100644 --- a/rxjava-core/src/main/java/rx/observables/ScheduledObserver.java +++ b/rxjava-core/src/main/java/rx/operators/ScheduledObserver.java @@ -1,10 +1,25 @@ -package rx.observables; +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; import rx.Observer; import rx.Scheduler; import rx.util.functions.Action0; -public class ScheduledObserver implements Observer { +/* package */class ScheduledObserver implements Observer { private final Observer underlying; private final Scheduler scheduler; From a78d7563ec7dcb328325caf1e06705d1cb3e21eb Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 20:20:03 -0700 Subject: [PATCH 15/25] Formatting to match codebase --- rxjava-core/src/main/java/rx/Scheduler.java | 30 +++++++++++-------- .../rx/concurrency/AbstractScheduler.java | 10 +++---- .../concurrency/CurrentThreadScheduler.java | 6 ++-- .../rx/concurrency/DiscardableAction.java | 7 ++--- .../rx/concurrency/ExecutorScheduler.java | 6 ++-- .../rx/concurrency/ForwardingScheduler.java | 6 ++-- .../rx/concurrency/ImmediateScheduler.java | 7 ++--- .../rx/concurrency/NewThreadScheduler.java | 7 ++--- .../ScheduledExecutorServiceScheduler.java | 6 ++-- .../main/java/rx/concurrency/Schedulers.java | 6 ++-- .../java/rx/concurrency/SleepingAction.java | 6 ++-- 11 files changed, 49 insertions(+), 48 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index b4d2c5d471..74fe274b3a 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,11 +15,11 @@ */ package rx; +import java.util.concurrent.TimeUnit; + import rx.util.functions.Action0; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - /** * Represents an object that schedules units of work. */ @@ -27,32 +27,36 @@ public interface Scheduler { /** * Schedules a cancelable action to be executed. - * - * @param action action + * + * @param action + * action * @return a subscription to be able to unsubscribe from action. */ Subscription schedule(Func0 action); /** * Schedules an action to be executed. - * - * @param action action + * + * @param action + * action * @return a subscription to be able to unsubscribe from action. */ Subscription schedule(Action0 action); /** * Schedules an action to be executed in dueTime. - * - * @param action action + * + * @param action + * action * @return a subscription to be able to unsubscribe from action. */ Subscription schedule(Action0 action, long dueTime, TimeUnit unit); /** * Schedules a cancelable action to be executed in dueTime. - * - * @param action action + * + * @param action + * action * @return a subscription to be able to unsubscribe from action. */ Subscription schedule(Func0 action, long dueTime, TimeUnit unit); diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index a056f72152..9dc8542df3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,14 +15,14 @@ */ package rx.concurrency; +import java.util.concurrent.TimeUnit; + import rx.Scheduler; import rx.Subscription; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - public abstract class AbstractScheduler implements Scheduler { @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index e1f253d181..ae56293ccd 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index 632ec69b1a..8fb6537ef3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -47,4 +47,3 @@ public void unsubscribe() { wrapper.unsubscribe(); } } - diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index e31535c91a..3ce79d9ce8 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java index a32f72aa9f..6b9db71831 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 9dcd0d4f65..11caaa15f3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -101,5 +101,4 @@ public void call() { } - } diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index c2393fbe95..86b8b49e23 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -27,7 +27,6 @@ public static NewThreadScheduler getInstance() { return INSTANCE; } - @Override public Subscription schedule(Func0 action) { final DiscardableAction discardableAction = new DiscardableAction(action); diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java index 0558d6fa26..a6e8710548 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index d5ac25c13a..145340f09b 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index 02823436d4..3f44306ed3 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. From a6ccf5af39d1b40504d3cde01b2419e03e67b581 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 20:32:48 -0700 Subject: [PATCH 16/25] Consolidating ExecutorScheduler and ScheduledExecutorScheduler ScheduledExecutorScheduler is just an extension of ExecutorScheduler so keeping them together for less surface area on the API. --- .../rx/concurrency/ExecutorScheduler.java | 37 +++++++++++--- .../ScheduledExecutorServiceScheduler.java | 48 ------------------- .../main/java/rx/concurrency/Schedulers.java | 13 +++-- 3 files changed, 39 insertions(+), 59 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 3ce79d9ce8..1fe81f97e8 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -15,12 +15,19 @@ */ package rx.concurrency; -import rx.Subscription; -import rx.util.functions.Func0; - import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import rx.Scheduler; +import rx.Subscription; +import rx.util.functions.Func0; + +/** + * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. + *

+ * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then scheduler events requiring delays will not work and an IllegalStateException be thrown. + */ public class ExecutorScheduler extends AbstractScheduler { private final Executor executor; @@ -28,6 +35,26 @@ public ExecutorScheduler(Executor executor) { this.executor = executor; } + public ExecutorScheduler(ScheduledExecutorService executor) { + this.executor = executor; + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + final DiscardableAction discardableAction = new DiscardableAction(action); + ((ScheduledExecutorService) executor).schedule(new Runnable() { + @Override + public void run() { + discardableAction.call(); + } + }, dueTime, unit); + return discardableAction; + } else { + throw new IllegalStateException("Delayed scheduling is not supported with 'Executor' please use 'ScheduledExecutorServiceScheduler'"); + } + } + @Override public Subscription schedule(Func0 action) { final DiscardableAction discardableAction = new DiscardableAction(action); @@ -43,8 +70,4 @@ public void run() { } - @Override - public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - throw new IllegalStateException("Delayed scheduling is not supported"); - } } diff --git a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java deleted file mode 100644 index a6e8710548..0000000000 --- a/rxjava-core/src/main/java/rx/concurrency/ScheduledExecutorServiceScheduler.java +++ /dev/null @@ -1,48 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.concurrency; - -import rx.Subscription; -import rx.util.functions.Func0; - -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; - -public class ScheduledExecutorServiceScheduler extends AbstractScheduler { - private final ScheduledExecutorService executorService; - - public ScheduledExecutorServiceScheduler(ScheduledExecutorService executorService) { - this.executorService = executorService; - } - - @Override - public Subscription schedule(Func0 action) { - return schedule(action, 0, TimeUnit.MILLISECONDS); - } - - @Override - public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - final DiscardableAction discardableAction = new DiscardableAction(action); - executorService.schedule(new Runnable() { - @Override - public void run() { - discardableAction.call(); - } - }, dueTime, unit); - return discardableAction; - } - -} diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 145340f09b..8b570e2972 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -15,11 +15,16 @@ */ package rx.concurrency; -import rx.Scheduler; - -import java.util.concurrent.*; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.ThreadFactory; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import rx.Scheduler; + public class Schedulers { private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); private static final ScheduledExecutorService IO_EXECUTOR = createIOExecutor(); @@ -47,7 +52,7 @@ public static Scheduler executor(Executor executor) { } public static Scheduler fromScheduledExecutorService(ScheduledExecutorService executor) { - return new ScheduledExecutorServiceScheduler(executor); + return new ExecutorScheduler(executor); } public static Scheduler threadPoolForComputation() { From a8292de5086cc915dc431036b32e0e98a72e8e91 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 20:38:56 -0700 Subject: [PATCH 17/25] Make ForwardingScheduler Internal Until there is a use case other than unit testing I'm moving this to a non-public role so it's not part of the public API. --- .../rx/concurrency/ForwardingScheduler.java | 56 ------------------- .../main/java/rx/concurrency/Schedulers.java | 4 -- .../java/rx/operators/OperationObserveOn.java | 17 +++--- .../rx/operators/OperationSubscribeOn.java | 2 +- .../src/main/java/rx/operators/Tester.java | 47 ++++++++++++++++ 5 files changed, 56 insertions(+), 70 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java diff --git a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java deleted file mode 100644 index 6b9db71831..0000000000 --- a/rxjava-core/src/main/java/rx/concurrency/ForwardingScheduler.java +++ /dev/null @@ -1,56 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.concurrency; - -import rx.Scheduler; -import rx.Subscription; -import rx.util.functions.Action0; -import rx.util.functions.Func0; - -import java.util.concurrent.TimeUnit; - -public class ForwardingScheduler implements Scheduler { - private final Scheduler underlying; - - public ForwardingScheduler(Scheduler underlying) { - this.underlying = underlying; - } - - @Override - public Subscription schedule(Action0 action) { - return underlying.schedule(action); - } - - @Override - public Subscription schedule(Func0 action) { - return underlying.schedule(action); - } - - @Override - public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { - return underlying.schedule(action, dueTime, unit); - } - - @Override - public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { - return underlying.schedule(action, dueTime, unit); - } - - @Override - public long now() { - return underlying.now(); - } -} \ No newline at end of file diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 8b570e2972..42ddb29186 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -63,10 +63,6 @@ public static Scheduler threadPoolForIO() { return fromScheduledExecutorService(IO_EXECUTOR); } - public static Scheduler forwardingScheduler(Scheduler underlying) { - return new ForwardingScheduler(underlying); - } - private static ScheduledExecutorService createComputationExecutor() { int cores = Runtime.getRuntime().availableProcessors(); return Executors.newScheduledThreadPool(cores, new ThreadFactory() { diff --git a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java index 720ca86ed2..7ff2e0b786 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationObserveOn.java @@ -1,12 +1,12 @@ /** * Copyright 2013 Netflix, Inc. - * + * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -15,7 +15,11 @@ */ package rx.operators; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + import org.junit.Test; + import rx.Observable; import rx.Observer; import rx.Scheduler; @@ -24,11 +28,6 @@ import rx.util.functions.Action0; import rx.util.functions.Func1; -import static org.mockito.Matchers.any; -import static org.mockito.Mockito.*; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; - public class OperationObserveOn { public static Func1, Subscription> observeOn(Observable source, Scheduler scheduler) { @@ -56,7 +55,7 @@ public static class UnitTest { @SuppressWarnings("unchecked") public void testObserveOn() { - Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); Observer observer = mock(Observer.class); Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java index 104a134657..5b6368cedc 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java +++ b/rxjava-core/src/main/java/rx/operators/OperationSubscribeOn.java @@ -81,7 +81,7 @@ public static class UnitTest { public void testSubscribeOn() { Observable w = Observable.toObservable(1, 2, 3); - Scheduler scheduler = spy(Schedulers.forwardingScheduler(Schedulers.immediate())); + Scheduler scheduler = spy(Tester.UnitTest.forwardingScheduler(Schedulers.immediate())); Observer observer = mock(Observer.class); Subscription subscription = Observable.create(subscribeOn(w, scheduler)).subscribe(observer); diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java index 9692323015..bc04242846 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -5,6 +5,7 @@ import java.lang.Thread.UncaughtExceptionHandler; import java.util.ArrayList; import java.util.List; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -12,8 +13,11 @@ import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.Subscription; import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func0; import rx.util.functions.Func1; /** @@ -43,6 +47,16 @@ public Subscription call(Observer observer) }; } + /** + * Used for mocking of Schedulers since many Scheduler implementations are static/final. + * + * @param underlying + * @return + */ + public static Scheduler forwardingScheduler(Scheduler underlying) { + return new ForwardingScheduler(underlying); + } + public static class TestingObserver implements Observer { private final Observer actual; @@ -257,5 +271,38 @@ public void onNext(String args) } } } + + public static class ForwardingScheduler implements Scheduler { + private final Scheduler underlying; + + public ForwardingScheduler(Scheduler underlying) { + this.underlying = underlying; + } + + @Override + public Subscription schedule(Action0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Func0 action) { + return underlying.schedule(action); + } + + @Override + public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + return underlying.schedule(action, dueTime, unit); + } + + @Override + public long now() { + return underlying.now(); + } + } } } \ No newline at end of file From 7c6a14d2ea06580921b877c531a68746c8df50ce Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 21:09:35 -0700 Subject: [PATCH 18/25] Tweaks to Executor/ExecutorScheduler/IOScheduler and Javadocs - added Javadocs - moved some classes to package-private until they are proven necessary for the public API - made ExecutorScheduler support Executor implementations and still work with time delays by using a system-wide scheduler/timer - made IO thread-pool unbounded with a cached thread pool --- .../rx/concurrency/AbstractScheduler.java | 2 +- .../concurrency/CurrentThreadScheduler.java | 16 +++-- .../rx/concurrency/DiscardableAction.java | 9 ++- .../rx/concurrency/ExecutorScheduler.java | 59 +++++++++++++-- .../rx/concurrency/ImmediateScheduler.java | 14 ++-- .../rx/concurrency/NewThreadScheduler.java | 7 +- .../main/java/rx/concurrency/Schedulers.java | 71 +++++++++++++++---- .../java/rx/concurrency/SleepingAction.java | 7 +- 8 files changed, 146 insertions(+), 39 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index 9dc8542df3..e6fc87ebdb 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -23,7 +23,7 @@ import rx.util.functions.Action0; import rx.util.functions.Func0; -public abstract class AbstractScheduler implements Scheduler { +/* package */abstract class AbstractScheduler implements Scheduler { @Override public Subscription schedule(Action0 action) { diff --git a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java index ae56293ccd..14a12f8831 100644 --- a/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java @@ -15,18 +15,22 @@ */ package rx.concurrency; -import org.junit.Test; -import org.mockito.InOrder; -import rx.Subscription; -import rx.util.functions.Action0; -import rx.util.functions.Func0; +import static org.mockito.Mockito.*; import java.util.LinkedList; import java.util.Queue; import java.util.concurrent.TimeUnit; -import static org.mockito.Mockito.*; +import org.junit.Test; +import org.mockito.InOrder; +import rx.Subscription; +import rx.util.functions.Action0; +import rx.util.functions.Func0; + +/** + * Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed. + */ public class CurrentThreadScheduler extends AbstractScheduler { private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler(); diff --git a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java index 8fb6537ef3..bf036befdc 100644 --- a/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java @@ -15,13 +15,16 @@ */ package rx.concurrency; +import java.util.concurrent.atomic.AtomicBoolean; + import rx.Subscription; import rx.util.AtomicObservableSubscription; import rx.util.functions.Func0; -import java.util.concurrent.atomic.AtomicBoolean; - -public class DiscardableAction implements Func0, Subscription { +/** + * Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked. + */ +/* package */class DiscardableAction implements Func0, Subscription { private final Func0 underlying; private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription(); diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 1fe81f97e8..bed7e5f6c8 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -16,8 +16,11 @@ package rx.concurrency; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; import rx.Subscription; @@ -26,11 +29,37 @@ /** * A {@link Scheduler} implementation that uses an {@link Executor} or {@link ScheduledExecutorService} implementation. *

- * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then scheduler events requiring delays will not work and an IllegalStateException be thrown. + * Note that if an {@link Executor} implementation is used instead of {@link ScheduledExecutorService} then a system-wide Timer will be used to handle delayed events. */ public class ExecutorScheduler extends AbstractScheduler { private final Executor executor; + /** + * Setup a ScheduledExecutorService that we can use if someone provides an Executor instead of ScheduledExecutorService. + */ + private final static ScheduledExecutorService SYSTEM_SCHEDULED_EXECUTOR; + static { + int count = Runtime.getRuntime().availableProcessors(); + if (count > 8) { + count = count / 2; + } + // we don't need more than 8 to handle just scheduling and doing no work + if (count > 8) { + count = 8; + } + SYSTEM_SCHEDULED_EXECUTOR = Executors.newScheduledThreadPool(count, new ThreadFactory() { + + final AtomicInteger counter = new AtomicInteger(); + + @Override + public Thread newThread(Runnable r) { + return new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet()); + } + + }); + + } + public ExecutorScheduler(Executor executor) { this.executor = executor; } @@ -41,18 +70,40 @@ public ExecutorScheduler(ScheduledExecutorService executor) { @Override public Subscription schedule(Func0 action, long dueTime, TimeUnit unit) { + final DiscardableAction discardableAction = new DiscardableAction(action); + if (executor instanceof ScheduledExecutorService) { - final DiscardableAction discardableAction = new DiscardableAction(action); ((ScheduledExecutorService) executor).schedule(new Runnable() { @Override public void run() { discardableAction.call(); } }, dueTime, unit); - return discardableAction; } else { - throw new IllegalStateException("Delayed scheduling is not supported with 'Executor' please use 'ScheduledExecutorServiceScheduler'"); + if (dueTime == 0) { + // no delay so put on the thread-pool right now + return (schedule(action)); + } else { + // there is a delay and this isn't a ScheduledExecutorService so we'll use a system-wide ScheduledExecutorService + // to handle the scheduling and once it's ready then execute on this Executor + SYSTEM_SCHEDULED_EXECUTOR.schedule(new Runnable() { + + @Override + public void run() { + // now execute on the real Executor + executor.execute(new Runnable() { + + @Override + public void run() { + discardableAction.call(); + } + + }); + } + }, dueTime, unit); + } } + return discardableAction; } @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java index 11caaa15f3..10a2e33b6f 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ImmediateScheduler.java @@ -15,18 +15,20 @@ */ package rx.concurrency; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + import org.junit.Test; import org.mockito.InOrder; + import rx.Subscription; import rx.util.functions.Action0; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; - +/** + * Executes work immediately on the current thread. + */ public final class ImmediateScheduler extends AbstractScheduler { private static final ImmediateScheduler INSTANCE = new ImmediateScheduler(); diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index 86b8b49e23..3c1844885d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -15,11 +15,14 @@ */ package rx.concurrency; +import java.util.concurrent.TimeUnit; + import rx.Subscription; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - +/** + * Schedules work on a new thread. + */ public class NewThreadScheduler extends AbstractScheduler { private static final NewThreadScheduler INSTANCE = new NewThreadScheduler(); diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index 42ddb29186..bd35ab58ff 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -18,49 +18,97 @@ import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.ThreadFactory; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; +/** + * Static factory methods for creating Schedulers. + */ public class Schedulers { private static final ScheduledExecutorService COMPUTATION_EXECUTOR = createComputationExecutor(); - private static final ScheduledExecutorService IO_EXECUTOR = createIOExecutor(); - private static final int DEFAULT_MAX_IO_THREADS = 10; - private static final int DEFAULT_KEEP_ALIVE_TIME = 10 * 1000; // 10 seconds + private static final Executor IO_EXECUTOR = createIOExecutor(); private Schedulers() { } + /** + * {@link Scheduler} that executes work immediately on the current thread. + * + * @return {@link ImmediateScheduler} instance + */ public static Scheduler immediate() { return ImmediateScheduler.getInstance(); } + /** + * {@link Scheduler} that queues work on the current thread to be executed after the current work completes. + * + * @return {@link CurrentThreadScheduler} instance + */ public static Scheduler currentThread() { return CurrentThreadScheduler.getInstance(); } + /** + * {@link Scheduler} that creates a new {@link Thread} for each unit of work. + * + * @return {@link NewThreadScheduler} instance + */ public static Scheduler newThread() { return NewThreadScheduler.getInstance(); } + /** + * {@link Scheduler} that queues work on an {@link Executor}. + *

+ * Note that this does not support scheduled actions with a delay. + * + * @return {@link ExecutorScheduler} instance + */ public static Scheduler executor(Executor executor) { return new ExecutorScheduler(executor); } - public static Scheduler fromScheduledExecutorService(ScheduledExecutorService executor) { + /** + * {@link Scheduler} that queues work on an {@link ScheduledExecutorService}. + * + * @return {@link ExecutorScheduler} instance + */ + public static Scheduler executor(ScheduledExecutorService executor) { return new ExecutorScheduler(executor); } + /** + * {@link Scheduler} intended for computational work. + *

+ * The implementation is backed by a {@link ScheduledExecutorService} thread-pool sized to the number of CPU cores. + *

+ * This can be used for event-loops, processing callbacks and other computational work. + *

+ * Do not perform IO-bound work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for computation-bound work. + */ public static Scheduler threadPoolForComputation() { - return fromScheduledExecutorService(COMPUTATION_EXECUTOR); + return executor(COMPUTATION_EXECUTOR); } + /** + * {@link Scheduler} intended for IO-bound work. + *

+ * The implementation is backed by an {@link Executor} thread-pool that will grow as needed. + *

+ * This can be used for asynchronously performing blocking IO. + *

+ * Do not perform computational work on this scheduler. Use {@link #threadPoolForComputation()} instead. + * + * @return {@link ExecutorScheduler} for IO-bound work. + */ public static Scheduler threadPoolForIO() { - return fromScheduledExecutorService(IO_EXECUTOR); + return executor(IO_EXECUTOR); } private static ScheduledExecutorService createComputationExecutor() { @@ -75,8 +123,8 @@ public Thread newThread(Runnable r) { }); } - private static ScheduledExecutorService createIOExecutor() { - ScheduledThreadPoolExecutor result = new ScheduledThreadPoolExecutor(DEFAULT_MAX_IO_THREADS, new ThreadFactory() { + private static Executor createIOExecutor() { + Executor result = Executors.newCachedThreadPool(new ThreadFactory() { final AtomicInteger counter = new AtomicInteger(); @Override @@ -85,9 +133,6 @@ public Thread newThread(Runnable r) { } }); - result.setKeepAliveTime(DEFAULT_KEEP_ALIVE_TIME, TimeUnit.MILLISECONDS); - result.allowCoreThreadTimeOut(true); - return result; } } diff --git a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java index 3f44306ed3..a57fd9046d 100644 --- a/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java +++ b/rxjava-core/src/main/java/rx/concurrency/SleepingAction.java @@ -15,14 +15,13 @@ */ package rx.concurrency; +import java.util.concurrent.TimeUnit; + import rx.Scheduler; import rx.Subscription; -import rx.util.functions.Action0; import rx.util.functions.Func0; -import java.util.concurrent.TimeUnit; - -public class SleepingAction implements Func0 { +/* package */class SleepingAction implements Func0 { private final Func0 underlying; private final Scheduler scheduler; private final long execTime; From 57875b052e7dec48f7702f395ba7f6fe5061834e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 21:21:30 -0700 Subject: [PATCH 19/25] Basic unit tests - I plan on using this to expand unit testing around various aspects of schedulers - this is not done as an inner-class as it does not correlate with just one class but is cross-functional over many classes thus it fits best here --- .../java/rx/concurrency/TestSchedulers.java | 77 +++++++++++++++++++ 1 file changed, 77 insertions(+) create mode 100644 rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java new file mode 100644 index 0000000000..12ff328e1e --- /dev/null +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -0,0 +1,77 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.concurrency; + +import static org.junit.Assert.*; + +import org.junit.Test; + +import rx.Observable; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +public class TestSchedulers { + + @Test + public void testComputationThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForComputation()).forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + + } + + @Test + public void testIOThreadPool1() { + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().startsWith("RxIOThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.subscribeOn(Schedulers.threadPoolForIO()).forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + + } +} From 54c1dfd9d9a228f7959ac2eeef91dbafce030805 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Thu, 4 Apr 2013 23:24:11 -0700 Subject: [PATCH 20/25] Scheduler overloads for Subscribe, ToObservable, From, Merge, Empty - the list of operators to add overloads to was derived from the Rx.Net docs at http://msdn.microsoft.com/en-us/library/hh212048(v=vs.103).aspx --- rxjava-core/src/main/java/rx/Observable.java | 258 +++++++++++++++++- .../java/rx/concurrency/TestSchedulers.java | 144 +++++++++- 2 files changed, 398 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 92d4b92852..db6a5cdf5b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -191,6 +191,37 @@ public Subscription subscribe(Observer observer) { } } + /** + * an {@link Observer} must call an Observable's subscribe method in order to register itself + * to receive push-based notifications from the Observable. A typical implementation of the + * subscribe method does the following: + *

+ * It stores a reference to the Observer in a collection object, such as a List + * object. + *

+ * It returns a reference to the {@link Subscription} interface. This enables + * Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has + * finished sending them and has called the Observer's {@link Observer#onCompleted()} method. + *

+ * At any given time, a particular instance of an Observable implementation is + * responsible for accepting all subscriptions and notifying all subscribers. Unless the + * documentation for a particular Observable implementation indicates otherwise, + * Observers should make no assumptions about the Observable implementation, such + * as the order of notifications that multiple Observers will receive. + *

+ * For more information see the RxJava Wiki + * + * + * @param observer + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return a {@link Subscription} reference that allows observers + * to stop receiving notifications before the provider has finished sending them + */ + public Subscription subscribe(Observer observer, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(observer); + } + /** * Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance. *

@@ -239,6 +270,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Map callbacks, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(callbacks); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object o) { if (o instanceof Observer) { @@ -275,6 +310,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object o, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(o); + } + public Subscription subscribe(final Action1 onNext) { /** @@ -303,6 +342,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError) { // lookup and memoize onNext @@ -336,6 +379,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError); + } + public Subscription subscribe(final Action1 onNext, final Action1 onError) { /** @@ -366,6 +413,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, final Action1 onError, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError); + } + @SuppressWarnings({ "rawtypes", "unchecked" }) public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) { // lookup and memoize onNext @@ -401,6 +452,10 @@ public void onNext(Object args) { }); } + public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); + } + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete) { /** @@ -431,6 +486,10 @@ public void onNext(T args) { }); } + public Subscription subscribe(final Action1 onNext, final Action1 onError, final Action0 onComplete, Scheduler scheduler) { + return subscribeOn(scheduler).subscribe(onNext, onError, onComplete); + } + /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

@@ -720,6 +779,21 @@ public static Observable empty() { return toObservable(new ArrayList()); } + /** + * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method on the given {@link Scheduler}. + *

+ * + * + * @param + * the type of item emitted by the Observable + * @param {@link Scheduler} The scheduler to send the termination ({@link Observer#onCompleted()} call. + * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method + */ + @SuppressWarnings("unchecked") + public static Observable empty(Scheduler scheduler) { + return (Observable) empty().subscribeOn(scheduler); + } + /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

@@ -803,6 +877,22 @@ public static Observable from(Iterable iterable) { return toObservable(iterable); } + /** + * Converts an {@link Iterable} sequence to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * @param iterable + * the source {@link Iterable} sequence + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param + * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable + * @return an Observable that emits each item in the source {@link Iterable} sequence + * @see {@link #toObservable(Iterable)} + */ + public static Observable from(Iterable iterable, Scheduler scheduler) { + return toObservable(iterable, scheduler); + } + /** * Converts an Array to an Observable sequence. * @@ -817,6 +907,22 @@ public static Observable from(T... items) { return toObservable(items); } + /** + * Converts an Array to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param items + * the source Array + * @param + * the type of items in the Array, and the type of items emitted by the resulting Observable + * @return an Observable that emits each item in the source Array + * @see {@link #toObservable(Object...)} + */ + public static Observable from(Scheduler scheduler, T... items) { + return toObservable(scheduler, items); + } + /** * Generates an observable sequence of integral numbers within a specified range. * @@ -1196,6 +1302,25 @@ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a list of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * a list of Observables that emit sequences of items + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source list of Observables + * @see MSDN: Observable.Merge + */ + public static Observable merge(List> source, Scheduler scheduler) { + return merge(source).subscribeOn(scheduler); + } + /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output @@ -1213,6 +1338,25 @@ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a + * Observable into one Observable sequence without any transformation. You can combine the output + * of multiple Observables so that they act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * an Observable that emits Observables + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the Observables emitted by the source Observable + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Observable> source, Scheduler scheduler) { + return merge(source).subscribeOn(scheduler); + } + /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they @@ -1230,6 +1374,25 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a series of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param source + * a series of Observables that emit sequences of items + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source Observables + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Scheduler scheduler, Observable... source) { + return merge(source).subscribeOn(scheduler); + } + /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * @@ -2153,6 +2316,27 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } + /** + * Converts an Iterable sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the Iterable interface can be converted into an Observable that emits + * each iterable item in the object, by passing the object into the toObservable method. + *

+ * + * + * @param iterable + * the source Iterable sequence + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param + * the type of items in the iterable sequence and the type emitted by the resulting + * Observable + * @return an Observable that emits each item in the source Iterable sequence + */ + public static Observable toObservable(Iterable iterable, Scheduler scheduler) { + return toObservable(iterable).subscribeOn(scheduler); + } + /** * Converts an Future to an Observable sequence. * @@ -2172,6 +2356,27 @@ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } + /** + * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param scheduler + * The {@link Scheduler} to wait for the future on. + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, Scheduler scheduler) { + return toObservable(future).subscribeOn(scheduler); + } + /** * Converts an Future to an Observable sequence. * @@ -2183,7 +2388,7 @@ public static Observable toObservable(Future future) { * * @param future * the source {@link Future} - * @param time + * @param timeout * the maximum time to wait * @param unit * the time unit of the time argument @@ -2192,8 +2397,34 @@ public static Observable toObservable(Future future) { * Observable * @return an Observable that emits the item from the source Future */ - public static Observable toObservable(Future future, long time, TimeUnit unit) { - return create(OperationToObservableFuture.toObservableFuture(future, time, unit)); + public static Observable toObservable(Future future, long timeout, TimeUnit unit) { + return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); + } + + /** + * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the time argument + * @param scheduler + * The {@link Scheduler} to wait for the future on. + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { + return toObservable(future, timeout, unit).subscribeOn(scheduler); } /** @@ -2215,6 +2446,27 @@ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } + /** + * Converts an Array sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. + * + * An Array can be converted into an Observable that emits each item in the Array, by passing the + * Array into the toObservable method. + *

+ * + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param items + * the source Array + * @param + * the type of items in the Array, and the type of items emitted by the resulting + * Observable + * @return an Observable that emits each item in the source Array + */ + public static Observable toObservable(Scheduler scheduler, T... items) { + return toObservable(items).subscribeOn(scheduler); + } + /** * Sort T objects by their natural order (object must implement Comparable). *

diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index 12ff328e1e..a92033dcd1 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -17,6 +17,8 @@ import static org.junit.Assert.*; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; import rx.Observable; @@ -47,7 +49,6 @@ public void call(String t) { System.out.println("t: " + t); } }); - } @Test @@ -72,6 +73,147 @@ public void call(String t) { System.out.println("t: " + t); } }); + } + + @Test + public void testMergeWithoutScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithImmediateScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(Schedulers.immediate(), o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithCurrentThreadScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(Schedulers.currentThread(), o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertTrue(Thread.currentThread().getName().equals(currentThreadName)); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testMergeWithScheduler1() { + + final String currentThreadName = Thread.currentThread().getName(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + Observable o2 = Observable. from(6, 7, 8, 9, 10); + @SuppressWarnings("unchecked") + Observable o = Observable. merge(Schedulers.threadPoolForComputation(), o1, o2).map(new Func1() { + + @Override + public String call(Integer t) { + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); + return "Value_" + t + "_Thread_" + Thread.currentThread().getName(); + } + }); + + o.forEach(new Action1() { + + @Override + public void call(String t) { + System.out.println("t: " + t); + } + }); + } + + @Test + public void testSubscribeWithScheduler1() { + + final AtomicInteger count = new AtomicInteger(); + + Observable o1 = Observable. from(1, 2, 3, 4, 5); + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + } + }); + + // the above should be blocking so we should see a count of 5 + assertEquals(5, count.get()); + + count.set(0); + + // now we'll subscribe with a scheduler and it should be async + + o1.subscribe(new Action1() { + + @Override + public void call(Integer t) { + System.out.println("Thread: " + Thread.currentThread().getName()); + System.out.println("t: " + t); + count.incrementAndGet(); + } + }, Schedulers.threadPoolForComputation()); + assertEquals(0, count.get()); } } From 52bf7e145152f3b9c5d1ffe07247bab5f95bdde9 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 09:28:01 -0700 Subject: [PATCH 21/25] Set threads to daemons so they don't prevent system from exiting - This applies to any pools RxJava itself creates. It will be up to users to do this for Executors they inject. --- .../src/main/java/rx/concurrency/ExecutorScheduler.java | 4 +++- rxjava-core/src/main/java/rx/concurrency/Schedulers.java | 8 ++++++-- 2 files changed, 9 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index bed7e5f6c8..133f772889 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -53,7 +53,9 @@ public class ExecutorScheduler extends AbstractScheduler { @Override public Thread newThread(Runnable r) { - return new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet()); + Thread t = new Thread(r, "RxScheduledExecutorPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; } }); diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index bd35ab58ff..f805917b83 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -118,7 +118,9 @@ private static ScheduledExecutorService createComputationExecutor() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + Thread t = new Thread(r, "RxComputationThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; } }); } @@ -129,7 +131,9 @@ private static Executor createIOExecutor() { @Override public Thread newThread(Runnable r) { - return new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + Thread t = new Thread(r, "RxIOThreadPool-" + counter.incrementAndGet()); + t.setDaemon(true); + return t; } }); From 56bd8db2f46237c648ea0f7e8004883847cfb763 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 10:02:27 -0700 Subject: [PATCH 22/25] Name the NewThreadScheduler threads --- .../src/main/java/rx/concurrency/NewThreadScheduler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java index 3c1844885d..6dfedeb08e 100644 --- a/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/NewThreadScheduler.java @@ -39,7 +39,7 @@ public Subscription schedule(Func0 action) { public void run() { discardableAction.call(); } - }); + }, "RxNewThreadScheduler"); t.start(); From cfca6fded7a1cdf1eb5f72c59ca4b3e4d8a47c40 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 10:03:03 -0700 Subject: [PATCH 23/25] Use long instead of int MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit … considering very long running app with lots of IO events. --- rxjava-core/src/main/java/rx/concurrency/Schedulers.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java index f805917b83..1b27b9bf0a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/Schedulers.java +++ b/rxjava-core/src/main/java/rx/concurrency/Schedulers.java @@ -20,6 +20,7 @@ import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import rx.Scheduler; @@ -127,7 +128,7 @@ public Thread newThread(Runnable r) { private static Executor createIOExecutor() { Executor result = Executors.newCachedThreadPool(new ThreadFactory() { - final AtomicInteger counter = new AtomicInteger(); + final AtomicLong counter = new AtomicLong(); @Override public Thread newThread(Runnable r) { From d35b3e7d8b5fd7b509ed5cb78937b29dbe240210 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 10:04:08 -0700 Subject: [PATCH 24/25] Fix non-deterministic unit test --- .../java/rx/concurrency/TestSchedulers.java | 31 ++++++++++++++++++- 1 file changed, 30 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index a92033dcd1..bec93cfdcd 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -17,11 +17,17 @@ import static org.junit.Assert.*; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; import org.junit.Test; import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subscriptions.Subscriptions; import rx.util.functions.Action1; import rx.util.functions.Func1; @@ -181,7 +187,7 @@ public void call(String t) { } @Test - public void testSubscribeWithScheduler1() { + public void testSubscribeWithScheduler1() throws InterruptedException { final AtomicInteger count = new AtomicInteger(); @@ -204,16 +210,39 @@ public void call(Integer t) { // now we'll subscribe with a scheduler and it should be async + final String currentThreadName = Thread.currentThread().getName(); + + // latches for deterministically controlling the test below across threads + final CountDownLatch latch = new CountDownLatch(5); + final CountDownLatch first = new CountDownLatch(1); + o1.subscribe(new Action1() { @Override public void call(Integer t) { + try { + // we block the first one so we can assert this executes asynchronously with a count + first.await(1000, TimeUnit.SECONDS); + } catch (InterruptedException e) { + throw new RuntimeException("The latch should have released if we are async.", e); + } + assertFalse(Thread.currentThread().getName().equals(currentThreadName)); + assertTrue(Thread.currentThread().getName().startsWith("RxComputationThreadPool")); System.out.println("Thread: " + Thread.currentThread().getName()); System.out.println("t: " + t); count.incrementAndGet(); + latch.countDown(); } }, Schedulers.threadPoolForComputation()); + // assert we are async assertEquals(0, count.get()); + // release the latch so it can go forward + first.countDown(); + + // wait for all 5 responses + latch.await(); + assertEquals(5, count.get()); } + } From 97fbcc7e31f74d5479acaad858684b68c09b38cd Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 13:21:13 -0700 Subject: [PATCH 25/25] Removing Scheduler overloads on operators (for now) I have some outstanding questions on how these should be implemented (or even why we need them when the 'subscribeOn' operator is far cleaner) so want to remove them for now so they don't make it into the public incorrectly. --- rxjava-core/src/main/java/rx/Observable.java | 193 ------------------ .../java/rx/concurrency/TestSchedulers.java | 6 +- 2 files changed, 3 insertions(+), 196 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index db6a5cdf5b..6d7f503084 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -779,21 +779,6 @@ public static Observable empty() { return toObservable(new ArrayList()); } - /** - * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method on the given {@link Scheduler}. - *

- * - * - * @param - * the type of item emitted by the Observable - * @param {@link Scheduler} The scheduler to send the termination ({@link Observer#onCompleted()} call. - * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method - */ - @SuppressWarnings("unchecked") - public static Observable empty(Scheduler scheduler) { - return (Observable) empty().subscribeOn(scheduler); - } - /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

@@ -877,22 +862,6 @@ public static Observable from(Iterable iterable) { return toObservable(iterable); } - /** - * Converts an {@link Iterable} sequence to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * @param iterable - * the source {@link Iterable} sequence - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param - * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable - * @return an Observable that emits each item in the source {@link Iterable} sequence - * @see {@link #toObservable(Iterable)} - */ - public static Observable from(Iterable iterable, Scheduler scheduler) { - return toObservable(iterable, scheduler); - } - /** * Converts an Array to an Observable sequence. * @@ -907,22 +876,6 @@ public static Observable from(T... items) { return toObservable(items); } - /** - * Converts an Array to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param items - * the source Array - * @param - * the type of items in the Array, and the type of items emitted by the resulting Observable - * @return an Observable that emits each item in the source Array - * @see {@link #toObservable(Object...)} - */ - public static Observable from(Scheduler scheduler, T... items) { - return toObservable(scheduler, items); - } - /** * Generates an observable sequence of integral numbers within a specified range. * @@ -1302,25 +1255,6 @@ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences from a list of Observables into one Observable sequence - * without any transformation. You can combine the output of multiple Observables so that they - * act like a single Observable, by using the merge method. - *

- * - * - * @param source - * a list of Observables that emit sequences of items - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the source list of Observables - * @see MSDN: Observable.Merge - */ - public static Observable merge(List> source, Scheduler scheduler) { - return merge(source).subscribeOn(scheduler); - } - /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output @@ -1338,25 +1272,6 @@ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a - * Observable into one Observable sequence without any transformation. You can combine the output - * of multiple Observables so that they act like a single Observable, by using the merge method. - *

- * - * - * @param source - * an Observable that emits Observables - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the Observables emitted by the source Observable - * @see MSDN: Observable.Merge Method - */ - public static Observable merge(Observable> source, Scheduler scheduler) { - return merge(source).subscribeOn(scheduler); - } - /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they @@ -1374,25 +1289,6 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } - /** - * Flattens the Observable sequences from a series of Observables into one Observable sequence - * without any transformation. You can combine the output of multiple Observables so that they - * act like a single Observable, by using the merge method. - *

- * - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param source - * a series of Observables that emit sequences of items - * @return an Observable that emits a sequence of elements that are the result of flattening the - * output from the source Observables - * @see MSDN: Observable.Merge Method - */ - public static Observable merge(Scheduler scheduler, Observable... source) { - return merge(source).subscribeOn(scheduler); - } - /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * @@ -2316,27 +2212,6 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } - /** - * Converts an Iterable sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the Iterable interface can be converted into an Observable that emits - * each iterable item in the object, by passing the object into the toObservable method. - *

- * - * - * @param iterable - * the source Iterable sequence - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param - * the type of items in the iterable sequence and the type emitted by the resulting - * Observable - * @return an Observable that emits each item in the source Iterable sequence - */ - public static Observable toObservable(Iterable iterable, Scheduler scheduler) { - return toObservable(iterable).subscribeOn(scheduler); - } - /** * Converts an Future to an Observable sequence. * @@ -2356,27 +2231,6 @@ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } - /** - * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the {@link Future} interface can be converted into an Observable that emits - * the return value of the get() method in the object, by passing the object into the toObservable method. - *

- * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. - * - * @param future - * the source {@link Future} - * @param scheduler - * The {@link Scheduler} to wait for the future on. - * @param - * the type of of object that the future's returns and the type emitted by the resulting - * Observable - * @return an Observable that emits the item from the source Future - */ - public static Observable toObservable(Future future, Scheduler scheduler) { - return toObservable(future).subscribeOn(scheduler); - } - /** * Converts an Future to an Observable sequence. * @@ -2401,32 +2255,6 @@ public static Observable toObservable(Future future, long timeout, Tim return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } - /** - * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. - * - * Any object that supports the {@link Future} interface can be converted into an Observable that emits - * the return value of the get() method in the object, by passing the object into the toObservable method. - * The subscribe method on this synchronously so the Subscription returned doesn't nothing. - *

- * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. - * - * @param future - * the source {@link Future} - * @param timeout - * the maximum time to wait - * @param unit - * the time unit of the time argument - * @param scheduler - * The {@link Scheduler} to wait for the future on. - * @param - * the type of of object that the future's returns and the type emitted by the resulting - * Observable - * @return an Observable that emits the item from the source Future - */ - public static Observable toObservable(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { - return toObservable(future, timeout, unit).subscribeOn(scheduler); - } - /** * Converts an Array sequence to an Observable sequence. * @@ -2446,27 +2274,6 @@ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } - /** - * Converts an Array sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. - * - * An Array can be converted into an Observable that emits each item in the Array, by passing the - * Array into the toObservable method. - *

- * - * - * @param scheduler - * The {@link Scheduler} that the sequence is subscribed to on. - * @param items - * the source Array - * @param - * the type of items in the Array, and the type of items emitted by the resulting - * Observable - * @return an Observable that emits each item in the source Array - */ - public static Observable toObservable(Scheduler scheduler, T... items) { - return toObservable(items).subscribeOn(scheduler); - } - /** * Sort T objects by their natural order (object must implement Comparable). *

diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index bec93cfdcd..ec247d0b95 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -115,7 +115,7 @@ public void testMergeWithImmediateScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.immediate(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { @Override public String call(Integer t) { @@ -141,7 +141,7 @@ public void testMergeWithCurrentThreadScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.currentThread(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { @Override public String call(Integer t) { @@ -167,7 +167,7 @@ public void testMergeWithScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(Schedulers.threadPoolForComputation(), o1, o2).map(new Func1() { + Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { @Override public String call(Integer t) {