Skip to content

Commit 78f7dd8

Browse files
author
jmhofer
committed
initial attempt at implementing periodic schedulers as discussed in ReactiveX#228, needs some testing next...
1 parent 7834e8a commit 78f7dd8

File tree

4 files changed

+172
-22
lines changed

4 files changed

+172
-22
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 45 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,51 @@ public interface Scheduler {
101101
*/
102102
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
103103

104+
/**
105+
* Schedules an action to be executed periodically.
106+
*
107+
* @param action The action to execute periodically.
108+
* @param initialDelay Time to wait before executing the action for the first time.
109+
* @param period The time interval to wait each time in between executing the action.
110+
* @param unit The time unit the interval above is given in.
111+
* @return A subscription to be able to unsubscribe from action.
112+
*/
113+
Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit);
114+
115+
/**
116+
* Schedules a cancelable action to be executed periodically.
117+
*
118+
* @param action The action to execute periodically.
119+
* @param initialDelay Time to wait before executing the action for the first time.
120+
* @param period The time interval to wait each time in between executing the action.
121+
* @param unit The time unit the interval above is given in.
122+
* @return A subscription to be able to unsubscribe from action.
123+
*/
124+
Subscription schedulePeriodically(Func0<Subscription> action, long initialDelay, long period, TimeUnit unit);
125+
126+
/**
127+
* Schedules a cancelable action to be executed periodically.
128+
*
129+
* @param action The action to execute periodically.
130+
* @param initialDelay Time to wait before executing the action for the first time.
131+
* @param period The time interval to wait each time in between executing the action.
132+
* @param unit The time unit the interval above is given in.
133+
* @return A subscription to be able to unsubscribe from action.
134+
*/
135+
Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit);
136+
137+
/**
138+
* Schedules a cancelable action to be executed periodically.
139+
*
140+
* @param state State to pass into the action.
141+
* @param action The action to execute periodically.
142+
* @param initialDelay Time to wait before executing the action for the first time.
143+
* @param period The time interval to wait each time in between executing the action.
144+
* @param unit The time unit the interval above is given in.
145+
* @return A subscription to be able to unsubscribe from action.
146+
*/
147+
<T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit);
148+
104149
/**
105150
* Returns the scheduler's notion of current time.
106151
*/

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

Lines changed: 76 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package rx.concurrency;
1717

1818
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1920

2021
import rx.Scheduler;
2122
import rx.Subscription;
@@ -34,22 +35,12 @@ public Subscription schedule(Action0 action) {
3435

3536
@Override
3637
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
37-
return schedule(new Func0<Subscription>() {
38-
@Override
39-
public Subscription call() {
40-
return action.call(AbstractScheduler.this);
41-
}
42-
});
38+
return schedule(func0ForwardingToFunc1(action));
4339
}
4440

4541
@Override
4642
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
47-
return schedule(new Func0<Subscription>() {
48-
@Override
49-
public Subscription call() {
50-
return action.call(AbstractScheduler.this, state);
51-
}
52-
});
43+
return schedule(func0ForwardingToFunc2(action, state));
5344
}
5445

5546
@Override
@@ -59,29 +50,92 @@ public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
5950

6051
@Override
6152
public Subscription schedule(final Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
62-
return schedule(new Func0<Subscription>() {
63-
@Override
64-
public Subscription call() {
65-
return action.call(AbstractScheduler.this);
66-
}
67-
}, dueTime, unit);
53+
return schedule(func0ForwardingToFunc1(action), dueTime, unit);
6854
}
6955

7056
@Override
7157
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
72-
return schedule(new Func0<Subscription>() {
58+
return schedule(func0ForwardingToFunc2(action, state), dueTime, unit);
59+
}
60+
61+
@Override
62+
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
63+
return schedulePeriodically(asFunc0(action), initialDelay, period, unit);
64+
}
65+
66+
/**
67+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
68+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
69+
*/
70+
@Override
71+
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, final long period, final TimeUnit unit) {
72+
final long periodInNanos = unit.toNanos(period);
73+
final AtomicBoolean complete = new AtomicBoolean();
74+
75+
final Func0<Subscription> recursiveAction = new Func0<Subscription>() {
7376
@Override
7477
public Subscription call() {
75-
return action.call(AbstractScheduler.this, state);
78+
if (! complete.get()) {
79+
long startedAt = System.nanoTime();
80+
final Subscription sub1 = action.call();
81+
long timeTakenByActionInNanos = System.nanoTime() - startedAt;
82+
final Subscription sub2 = schedule(this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
83+
return Subscriptions.create(new Action0() {
84+
@Override
85+
public void call() {
86+
sub1.unsubscribe();
87+
sub2.unsubscribe();
88+
}
89+
});
90+
}
91+
return Subscriptions.empty();
92+
}
93+
};
94+
final Subscription sub = schedule(recursiveAction, initialDelay, unit);
95+
return Subscriptions.create(new Action0() {
96+
@Override
97+
public void call() {
98+
complete.set(true);
99+
sub.unsubscribe();
76100
}
77-
}, dueTime, unit);
101+
});
78102
}
79-
103+
104+
@Override
105+
public Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
106+
return schedulePeriodically(func0ForwardingToFunc1(action), initialDelay, period, unit);
107+
}
108+
109+
@Override
110+
public <T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
111+
return schedulePeriodically(func0ForwardingToFunc2(action, state), initialDelay, period, unit);
112+
}
113+
80114
@Override
81115
public long now() {
82116
return System.nanoTime();
83117
}
84118

119+
@SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
120+
private Func0<Subscription> func0ForwardingToFunc1(final Func1<Scheduler, Subscription> func1) {
121+
return new Func0<Subscription>() {
122+
@Override
123+
public Subscription call() {
124+
return func1.call(AbstractScheduler.this);
125+
}
126+
};
127+
}
128+
129+
@SuppressWarnings("static-method") // can't be done, of course, but Eclipse fails at detecting AbstractScheduler.this
130+
private <T> Func0<Subscription> func0ForwardingToFunc2(final Func2<Scheduler, T, Subscription> func2, final T state) {
131+
return new Func0<Subscription>() {
132+
@Override
133+
public Subscription call() {
134+
return func2.call(AbstractScheduler.this, state);
135+
}
136+
};
137+
}
138+
85139
private static Func0<Subscription> asFunc0(final Action0 action) {
86140
return new Func0<Subscription>() {
87141
@Override

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,20 @@
1515
*/
1616
package rx.concurrency;
1717

18+
import java.util.Queue;
19+
import java.util.concurrent.ConcurrentLinkedQueue;
1820
import java.util.concurrent.Executor;
1921
import java.util.concurrent.Executors;
2022
import java.util.concurrent.ScheduledExecutorService;
23+
import java.util.concurrent.ScheduledFuture;
2124
import java.util.concurrent.ThreadFactory;
2225
import java.util.concurrent.TimeUnit;
2326
import java.util.concurrent.atomic.AtomicInteger;
2427

2528
import rx.Scheduler;
2629
import rx.Subscription;
30+
import rx.subscriptions.Subscriptions;
31+
import rx.util.functions.Action0;
2732
import rx.util.functions.Func0;
2833

2934
/**
@@ -123,4 +128,30 @@ public void run() {
123128

124129
}
125130

131+
@Override
132+
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
133+
final Queue<Subscription> subscriptions = new ConcurrentLinkedQueue<Subscription>();
134+
if (executor instanceof ScheduledExecutorService) {
135+
final ScheduledFuture<?> future = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
136+
@Override
137+
public void run() {
138+
subscriptions.add(action.call());
139+
}
140+
}, initialDelay, period, unit);
141+
142+
return Subscriptions.create(new Action0() {
143+
@Override
144+
public void call() {
145+
future.cancel(false);
146+
Subscription next = subscriptions.poll();
147+
while (next != null) {
148+
next.unsubscribe();
149+
next = subscriptions.poll();
150+
}
151+
}
152+
});
153+
}
154+
// not a scheduled executor service, so we fall back to the recursive implementation
155+
return super.schedulePeriodically(action, initialDelay, period, unit);
156+
}
126157
}

rxjava-core/src/main/java/rx/operators/Tester.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,26 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
320320
return underlying.schedule(state, action, dueTime, unit);
321321
}
322322

323+
@Override
324+
public Subscription schedulePeriodically(Action0 action, long initialDelay, long period, TimeUnit unit) {
325+
return underlying.schedulePeriodically(action, initialDelay, period, unit);
326+
}
327+
328+
@Override
329+
public Subscription schedulePeriodically(Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
330+
return underlying.schedulePeriodically(action, initialDelay, period, unit);
331+
}
332+
333+
@Override
334+
public Subscription schedulePeriodically(Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
335+
return underlying.schedulePeriodically(action, initialDelay, period, unit);
336+
}
337+
338+
@Override
339+
public <T> Subscription schedulePeriodically(T state, Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
340+
return underlying.schedulePeriodically(state, action, initialDelay, period, unit);
341+
}
342+
323343
@Override
324344
public long now() {
325345
return underlying.now();

0 commit comments

Comments
 (0)