From f49c462e71090bf97a294d7074d6d0234a5abd60 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 12 Apr 2013 13:48:05 +0200 Subject: [PATCH 01/10] initial attempt at implementing periodic schedulers as discussed in #228, needs some testing next... --- rxjava-core/src/main/java/rx/Scheduler.java | 45 +++++++++ .../rx/concurrency/AbstractScheduler.java | 98 ++++++++++++++----- .../rx/concurrency/ExecutorScheduler.java | 31 ++++++ .../src/main/java/rx/operators/Tester.java | 20 ++++ 4 files changed, 172 insertions(+), 22 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index e1e7c1e806..707efd09df 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -101,6 +101,51 @@ public interface Scheduler { */ Subscription schedule(Func0 action, long dueTime, TimeUnit unit); + /** + * Schedules an action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit); + + /** + * Schedules a cancelable action to be executed periodically. + * + * @param state State to pass into the action. + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit); + /** * Returns the scheduler's notion of current time. */ diff --git a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java index d15e8e184a..fc46e41132 100644 --- a/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java @@ -16,6 +16,7 @@ package rx.concurrency; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; @@ -34,22 +35,12 @@ public Subscription schedule(Action0 action) { @Override public Subscription schedule(final Func1 action) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this); - } - }); + return schedule(func0ForwardingToFunc1(action)); } @Override public Subscription schedule(final T state, final Func2 action) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this, state); - } - }); + return schedule(func0ForwardingToFunc2(action, state)); } @Override @@ -59,29 +50,92 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) { @Override public Subscription schedule(final Func1 action, long dueTime, TimeUnit unit) { - return schedule(new Func0() { - @Override - public Subscription call() { - return action.call(AbstractScheduler.this); - } - }, dueTime, unit); + return schedule(func0ForwardingToFunc1(action), dueTime, unit); } @Override public Subscription schedule(final T state, final Func2 action, long dueTime, TimeUnit unit) { - return schedule(new Func0() { + return schedule(func0ForwardingToFunc2(action, state), dueTime, unit); + } + + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(asFunc0(action), initialDelay, period, unit); + } + + /** + * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing + * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. + */ + @Override + public Subscription schedulePeriodically(final Func0 action, long initialDelay, final long period, final TimeUnit unit) { + final long periodInNanos = unit.toNanos(period); + final AtomicBoolean complete = new AtomicBoolean(); + + final Func0 recursiveAction = new Func0() { @Override public Subscription call() { - return action.call(AbstractScheduler.this, state); + if (! complete.get()) { + long startedAt = System.nanoTime(); + final Subscription sub1 = action.call(); + long timeTakenByActionInNanos = System.nanoTime() - startedAt; + final Subscription sub2 = schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); + return Subscriptions.create(new Action0() { + @Override + public void call() { + sub1.unsubscribe(); + sub2.unsubscribe(); + } + }); + } + return Subscriptions.empty(); + } + }; + final Subscription sub = schedule(recursiveAction, initialDelay, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + sub.unsubscribe(); } - }, dueTime, unit); + }); } - + + @Override + public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(func0ForwardingToFunc1(action), initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(func0ForwardingToFunc2(action, state), initialDelay, period, unit); + } + @Override public long now() { return System.nanoTime(); } + @SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this + private Func0 func0ForwardingToFunc1(final Func1 func1) { + return new Func0() { + @Override + public Subscription call() { + return func1.call(AbstractScheduler.this); + } + }; + } + + @SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this + private Func0 func0ForwardingToFunc2(final Func2 func2, final T state) { + return new Func0() { + @Override + public Subscription call() { + return func2.call(AbstractScheduler.this, state); + } + }; + } + private static Func0 asFunc0(final Action0 action) { return new Func0() { @Override diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index 133f772889..2e0f3ae78a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -15,15 +15,20 @@ */ package rx.concurrency; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.Executor; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import rx.Scheduler; import rx.Subscription; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; import rx.util.functions.Func0; /** @@ -123,4 +128,30 @@ public void run() { } + @Override + public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { + final Queue subscriptions = new ConcurrentLinkedQueue(); + if (executor instanceof ScheduledExecutorService) { + final ScheduledFuture future = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + subscriptions.add(action.call()); + } + }, initialDelay, period, unit); + + return Subscriptions.create(new Action0() { + @Override + public void call() { + future.cancel(false); + Subscription next = subscriptions.poll(); + while (next != null) { + next.unsubscribe(); + next = subscriptions.poll(); + } + } + }); + } + // not a scheduled executor service, so we fall back to the recursive implementation + return super.schedulePeriodically(action, initialDelay, period, unit); + } } diff --git a/rxjava-core/src/main/java/rx/operators/Tester.java b/rxjava-core/src/main/java/rx/operators/Tester.java index 11b2c8a798..e209dfc43b 100644 --- a/rxjava-core/src/main/java/rx/operators/Tester.java +++ b/rxjava-core/src/main/java/rx/operators/Tester.java @@ -320,6 +320,26 @@ public Subscription schedule(T state, Func2 acti return underlying.schedule(state, action, dueTime, unit); } + @Override + public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(action, initialDelay, period, unit); + } + + @Override + public Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit) { + return underlying.schedulePeriodically(state, action, initialDelay, period, unit); + } + @Override public long now() { return underlying.now(); From 504bd056350db7c523954cca2d6f5e57069124ed Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 22 Apr 2013 10:21:50 +0200 Subject: [PATCH 02/10] fixing the scheduler methods to fit the new pattern --- rxjava-core/src/main/java/rx/Scheduler.java | 68 ++++++++++++++------- 1 file changed, 45 insertions(+), 23 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index e606d18860..f61ae73a7b 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -71,6 +71,18 @@ public abstract class Scheduler { */ public abstract Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit); + /** + * Schedules a cancelable action to be executed periodically. + * + * @param state State to pass into the action. + * @param action The action to execute periodically. + * @param initialDelay Time to wait before executing the action for the first time. + * @param period The time interval to wait each time in between executing the action. + * @param unit The time unit the interval above is given in. + * @return A subscription to be able to unsubscribe from action. + */ + public abstract Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit); + /** * Schedules a cancelable action to be executed at dueTime. * @@ -103,7 +115,7 @@ public Subscription schedule(final Func1 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(scheduler); } }); @@ -120,7 +132,7 @@ public Subscription schedule(final Func0 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(); } }); @@ -137,7 +149,7 @@ public Subscription schedule(final Action0 action) { return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { action.call(); return Subscriptions.empty(); } @@ -159,7 +171,7 @@ public Subscription schedule(final Func1 action, long d return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(scheduler); } }, delayTime, unit); @@ -176,7 +188,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { action.call(); return Subscriptions.empty(); } @@ -194,23 +206,12 @@ public Subscription schedule(final Func0 action, long delayTime, T return schedule(null, new Func2() { @Override - public Subscription call(Scheduler scheduler, Void t2) { + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { return action.call(); } }, delayTime, unit); } - /** - * Schedules an action to be executed periodically. - * - * @param action The action to execute periodically. - * @param initialDelay Time to wait before executing the action for the first time. - * @param period The time interval to wait each time in between executing the action. - * @param unit The time unit the interval above is given in. - * @return A subscription to be able to unsubscribe from action. - */ - Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit); - /** * Schedules a cancelable action to be executed periodically. * @@ -220,7 +221,14 @@ public Subscription call(Scheduler scheduler, Void t2) { * @param unit The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ - Subscription schedulePeriodically(Func0 action, long initialDelay, long period, TimeUnit unit); + public Subscription schedulePeriodically(final Func1 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) { + return action.call(scheduler); + } + }, initialDelay, period, unit); + } /** * Schedules a cancelable action to be executed periodically. @@ -231,19 +239,33 @@ public Subscription call(Scheduler scheduler, Void t2) { * @param unit The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ - Subscription schedulePeriodically(Func1 action, long initialDelay, long period, TimeUnit unit); + public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { + return action.call(); + } + }, initialDelay, period, unit); + } /** - * Schedules a cancelable action to be executed periodically. - * - * @param state State to pass into the action. + * Schedules an action to be executed periodically. + * * @param action The action to execute periodically. * @param initialDelay Time to wait before executing the action for the first time. * @param period The time interval to wait each time in between executing the action. * @param unit The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ - Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit); + public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { + return schedulePeriodically(null, new Func2() { + @Override + public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) { + action.call(); + return Subscriptions.empty(); + } + }, initialDelay, period, unit); + } /** * Returns the scheduler's notion of current absolute time in milliseconds. From b7cb10b8006d28063f268574cb19b44a1a50331a Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 22 Apr 2013 10:39:35 +0200 Subject: [PATCH 03/10] taken over default recursive implementation --- rxjava-core/src/main/java/rx/Scheduler.java | 36 ++++++++++++++++++++- 1 file changed, 35 insertions(+), 1 deletion(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index f61ae73a7b..e061d8206a 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -17,6 +17,7 @@ import java.util.Date; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; @@ -73,6 +74,8 @@ public abstract class Scheduler { /** * Schedules a cancelable action to be executed periodically. + * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing + * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. * * @param state State to pass into the action. * @param action The action to execute periodically. @@ -81,7 +84,38 @@ public abstract class Scheduler { * @param unit The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ - public abstract Subscription schedulePeriodically(T state, Func2 action, long initialDelay, long period, TimeUnit unit); + public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + final long periodInNanos = unit.toNanos(period); + final AtomicBoolean complete = new AtomicBoolean(); + + final Func2 recursiveAction = new Func2() { + @Override + public Subscription call(Scheduler scheduler, T state0) { + if (! complete.get()) { + long startedAt = System.nanoTime(); + final Subscription sub1 = action.call(scheduler, state0); + long timeTakenByActionInNanos = System.nanoTime() - startedAt; + final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS); + return Subscriptions.create(new Action0() { + @Override + public void call() { + sub1.unsubscribe(); + sub2.unsubscribe(); + } + }); + } + return Subscriptions.empty(); + } + }; + final Subscription sub = schedule(state, recursiveAction, initialDelay, unit); + return Subscriptions.create(new Action0() { + @Override + public void call() { + complete.set(true); + sub.unsubscribe(); + } + }); + } /** * Schedules a cancelable action to be executed at dueTime. From 9563a1b6025a81d27b30c18d529d6309a46f9a77 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Mon, 22 Apr 2013 15:54:55 +0200 Subject: [PATCH 04/10] Fixed scheduler javadoc, and one single unnecessary cast in takeWhile --- rxjava-core/src/main/java/rx/Scheduler.java | 51 ++++++++++++------- .../java/rx/operators/OperationTakeWhile.java | 6 +-- 2 files changed, 37 insertions(+), 20 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index e061d8206a..e3ab4419f2 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -77,11 +77,16 @@ public abstract class Scheduler { * This default implementation schedules recursively and waits for actions to complete (instead of potentially executing * long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this. * - * @param state State to pass into the action. - * @param action The action to execute periodically. - * @param initialDelay Time to wait before executing the action for the first time. - * @param period The time interval to wait each time in between executing the action. - * @param unit The time unit the interval above is given in. + * @param state + * State to pass into the action. + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ public Subscription schedulePeriodically(T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { @@ -249,10 +254,14 @@ public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @Suppr /** * Schedules a cancelable action to be executed periodically. * - * @param action The action to execute periodically. - * @param initialDelay Time to wait before executing the action for the first time. - * @param period The time interval to wait each time in between executing the action. - * @param unit The time unit the interval above is given in. + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ public Subscription schedulePeriodically(final Func1 action, long initialDelay, long period, TimeUnit unit) { @@ -267,10 +276,14 @@ public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void s /** * Schedules a cancelable action to be executed periodically. * - * @param action The action to execute periodically. - * @param initialDelay Time to wait before executing the action for the first time. - * @param period The time interval to wait each time in between executing the action. - * @param unit The time unit the interval above is given in. + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ public Subscription schedulePeriodically(final Func0 action, long initialDelay, long period, TimeUnit unit) { @@ -285,10 +298,14 @@ public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @Suppr /** * Schedules an action to be executed periodically. * - * @param action The action to execute periodically. - * @param initialDelay Time to wait before executing the action for the first time. - * @param period The time interval to wait each time in between executing the action. - * @param unit The time unit the interval above is given in. + * @param action + * The action to execute periodically. + * @param initialDelay + * Time to wait before executing the action for the first time. + * @param period + * The time interval to wait each time in between executing the action. + * @param unit + * The time unit the interval above is given in. * @return A subscription to be able to unsubscribe from action. */ public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) { diff --git a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java index a6b1cdede2..5445082809 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java +++ b/rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java @@ -27,6 +27,7 @@ import rx.Observer; import rx.Subscription; import rx.subjects.PublishSubject; +import rx.subjects.Subject; import rx.subscriptions.Subscriptions; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; @@ -174,9 +175,8 @@ public Boolean call(Integer input) @Test public void testTakeWhileOnSubject1() { - PublishSubject s = PublishSubject.create(); - Observable w = (Observable) s; - Observable take = Observable.create(takeWhile(w, new Func1() + Subject s = PublishSubject.create(); + Observable take = Observable.create(takeWhile(s, new Func1() { @Override public Boolean call(Integer input) From eb9af4d3708a91c6442966dc38ee1d6139d361c7 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 14:17:33 +0200 Subject: [PATCH 05/10] Fixed a bug in the test scheduler that happened when advancing time by a too little amount --- rxjava-core/src/main/java/rx/concurrency/TestScheduler.java | 1 + 1 file changed, 1 insertion(+) diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index c785031df2..75ba1e665a 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -79,6 +79,7 @@ private void triggerActions(long targetTimeInNanos) { while (!queue.isEmpty()) { TimedAction current = queue.peek(); if (current.time > targetTimeInNanos) { + time = targetTimeInNanos; break; } time = current.time; From a372ee899aa342ddfd7f50d82d8d56fcd8ce6e57 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 14:50:04 +0200 Subject: [PATCH 06/10] Fixed millisecond/nanosecond confusion in test scheduler, fixed schedulePeriodically to use now() instead of System.nanoTime() and added a test against scheduling periodically --- rxjava-core/src/main/java/rx/Scheduler.java | 50 ++++++++++++++++++- .../java/rx/concurrency/TestScheduler.java | 5 +- 2 files changed, 51 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index e3ab4419f2..9bd3cd3033 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -15,10 +15,21 @@ */ package rx; +import static org.mockito.Matchers.anyLong; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + import java.util.Date; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.junit.Test; +import org.mockito.InOrder; +import org.mockito.Mockito; + +import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; import rx.util.functions.Func0; @@ -97,9 +108,9 @@ public Subscription schedulePeriodically(T state, final Func2 calledOp = mock(Func1.class); + + final TestScheduler scheduler = new TestScheduler(); + scheduler.schedulePeriodically(new Action0() { + @Override public void call() { + System.out.println(scheduler.now()); + calledOp.call(scheduler.now()); + } + }, 1, 2, TimeUnit.SECONDS); + + verify(calledOp, never()).call(anyLong()); + + InOrder inOrder = Mockito.inOrder(calledOp); + + scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(1000L); + + scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, never()).call(3000L); + + scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS); + inOrder.verify(calledOp, times(1)).call(3000L); + + scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); + inOrder.verify(calledOp, times(1)).call(5000L); + inOrder.verify(calledOp, times(1)).call(7000L); + } + } } diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index 75ba1e665a..a10ab90e21 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -54,11 +54,12 @@ public int compare(TimedAction action1, TimedAction action2) { } } + // Storing time in nanoseconds internally. private long time; @Override public long now() { - return time; + return TimeUnit.NANOSECONDS.toMillis(time); } public void advanceTimeBy(long delayTime, TimeUnit unit) { @@ -96,7 +97,7 @@ public Subscription schedule(T state, Func2 acti @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, now() + unit.toNanos(delayTime), action, state)); + queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); return Subscriptions.empty(); } } From 5f34896edacff5834f34c06cd6c81e35ddc96264 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 15:03:44 +0200 Subject: [PATCH 07/10] Tested unsubscription, too --- rxjava-core/src/main/java/rx/Scheduler.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 9bd3cd3033..3898dd1c3f 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -107,7 +107,7 @@ public Subscription schedulePeriodically(T state, final Func2 recursiveAction = new Func2() { @Override public Subscription call(Scheduler scheduler, T state0) { - if (! complete.get()) { + if (!complete.get()) { long startedAt = now(); final Subscription sub1 = action.call(scheduler, state0); long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt); @@ -343,7 +343,7 @@ public void testPeriodicScheduling() { final Func1 calledOp = mock(Func1.class); final TestScheduler scheduler = new TestScheduler(); - scheduler.schedulePeriodically(new Action0() { + Subscription subscription = scheduler.schedulePeriodically(new Action0() { @Override public void call() { System.out.println(scheduler.now()); calledOp.call(scheduler.now()); @@ -369,6 +369,10 @@ public void testPeriodicScheduling() { scheduler.advanceTimeBy(5L, TimeUnit.SECONDS); inOrder.verify(calledOp, times(1)).call(5000L); inOrder.verify(calledOp, times(1)).call(7000L); + + subscription.unsubscribe(); + scheduler.advanceTimeBy(11L, TimeUnit.SECONDS); + inOrder.verify(calledOp, never()).call(anyLong()); } } } From f2ef8f041b44c6982f2856c683921a25ba8166b0 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 15:13:48 +0200 Subject: [PATCH 08/10] switched OperationInterval implementation over to use periodic scheduling --- rxjava-core/src/main/java/rx/Scheduler.java | 7 +--- .../java/rx/operators/OperationInterval.java | 37 ++++++------------- 2 files changed, 14 insertions(+), 30 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 3898dd1c3f..22735209cd 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -15,11 +15,8 @@ */ package rx; -import static org.mockito.Matchers.anyLong; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.Date; import java.util.concurrent.TimeUnit; diff --git a/rxjava-core/src/main/java/rx/operators/OperationInterval.java b/rxjava-core/src/main/java/rx/operators/OperationInterval.java index 19a3736826..a9a46d2bbe 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationInterval.java +++ b/rxjava-core/src/main/java/rx/operators/OperationInterval.java @@ -20,7 +20,6 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; import org.junit.Before; import org.junit.Test; @@ -57,47 +56,35 @@ public static Func1, Subscription> interval(long interval, TimeUn } private static class Interval implements Func1, Subscription> { - private final long interval; + private final long period; private final TimeUnit unit; private final Scheduler scheduler; private long currentValue; - private final AtomicBoolean complete = new AtomicBoolean(); - private Interval(long interval, TimeUnit unit, Scheduler scheduler) { - this.interval = interval; + private Interval(long period, TimeUnit unit, Scheduler scheduler) { + this.period = period; this.unit = unit; this.scheduler = scheduler; } @Override public Subscription call(final Observer observer) { - scheduler.schedule(new IntervalAction(observer), interval, unit); - return Subscriptions.create(new Action0() { + final Subscription wrapped = scheduler.schedulePeriodically(new Action0() { @Override public void call() { - complete.set(true); + observer.onNext(currentValue); + currentValue++; } - }); - } - - private class IntervalAction implements Action0 { - private final Observer observer; - - private IntervalAction(Observer observer) { - this.observer = observer; - } + }, period, period, unit); - @Override - public void call() { - if (complete.get()) { + return Subscriptions.create(new Action0() { + @Override + public void call() { + wrapped.unsubscribe(); observer.onCompleted(); - } else { - observer.onNext(currentValue); - currentValue++; - scheduler.schedule(this, interval, unit); } - } + }); } } From 3fb72d6ab6ea5d03f68bc8b2ed51d835f1a4aaf1 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 15:43:38 +0200 Subject: [PATCH 09/10] ExecutorScheduler now uses scheduleAtFixedRate instead of recursion when possible. --- rxjava-core/src/main/java/rx/Scheduler.java | 2 ++ .../rx/concurrency/ExecutorScheduler.java | 21 +++++++++++++++++++ 2 files changed, 23 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index 22735209cd..dffe97fcda 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.*; import java.util.Date; +import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -26,6 +27,7 @@ import org.mockito.InOrder; import org.mockito.Mockito; +import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0; diff --git a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java index ed40bec350..a2522a2900 100644 --- a/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java @@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) { this.executor = executor; } + @Override + public Subscription schedulePeriodically(final T state, final Func2 action, long initialDelay, long period, TimeUnit unit) { + if (executor instanceof ScheduledExecutorService) { + final CompositeSubscription subscriptions = new CompositeSubscription(); + + ScheduledFuture f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + Subscription s = action.call(ExecutorScheduler.this, state); + subscriptions.add(s); + } + }, initialDelay, period, unit); + + subscriptions.add(Subscriptions.create(f)); + return subscriptions; + + } else { + return super.schedulePeriodically(state, action, initialDelay, period, unit); + } + } + @Override public Subscription schedule(final T state, final Func2 action, long delayTime, TimeUnit unit) { final DiscardableAction discardableAction = new DiscardableAction(state, action); From 58456dabeabcb40c579b6481438499d53fc1a7a9 Mon Sep 17 00:00:00 2001 From: jmhofer Date: Fri, 26 Apr 2013 15:45:03 +0200 Subject: [PATCH 10/10] Removed unused imports. --- rxjava-core/src/main/java/rx/Scheduler.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Scheduler.java b/rxjava-core/src/main/java/rx/Scheduler.java index dffe97fcda..22735209cd 100644 --- a/rxjava-core/src/main/java/rx/Scheduler.java +++ b/rxjava-core/src/main/java/rx/Scheduler.java @@ -19,7 +19,6 @@ import static org.mockito.Mockito.*; import java.util.Date; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -27,7 +26,6 @@ import org.mockito.InOrder; import org.mockito.Mockito; -import rx.concurrency.Schedulers; import rx.concurrency.TestScheduler; import rx.subscriptions.Subscriptions; import rx.util.functions.Action0;