Skip to content

Commit 317c4b9

Browse files
Merge pull request ReactiveX#246 from jmhofer/schedulePeriodically
Scheduling actions periodically
2 parents 1b8ea5c + 355e04a commit 317c4b9

File tree

6 files changed

+231
-36
lines changed

6 files changed

+231
-36
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 171 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,9 +15,18 @@
1515
*/
1616
package rx;
1717

18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
1821
import java.util.Date;
1922
import java.util.concurrent.TimeUnit;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
25+
import org.junit.Test;
26+
import org.mockito.InOrder;
27+
import org.mockito.Mockito;
2028

29+
import rx.concurrency.TestScheduler;
2130
import rx.subscriptions.Subscriptions;
2231
import rx.util.functions.Action0;
2332
import rx.util.functions.Func0;
@@ -71,6 +80,56 @@ public abstract class Scheduler {
7180
*/
7281
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
7382

83+
/**
84+
* Schedules a cancelable action to be executed periodically.
85+
* This default implementation schedules recursively and waits for actions to complete (instead of potentially executing
86+
* long-running actions concurrently). Each scheduler that can do periodic scheduling in a better way should override this.
87+
*
88+
* @param state
89+
* State to pass into the action.
90+
* @param action
91+
* The action to execute periodically.
92+
* @param initialDelay
93+
* Time to wait before executing the action for the first time.
94+
* @param period
95+
* The time interval to wait each time in between executing the action.
96+
* @param unit
97+
* The time unit the interval above is given in.
98+
* @return A subscription to be able to unsubscribe from action.
99+
*/
100+
public <T> Subscription schedulePeriodically(T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
101+
final long periodInNanos = unit.toNanos(period);
102+
final AtomicBoolean complete = new AtomicBoolean();
103+
104+
final Func2<Scheduler, T, Subscription> recursiveAction = new Func2<Scheduler, T, Subscription>() {
105+
@Override
106+
public Subscription call(Scheduler scheduler, T state0) {
107+
if (!complete.get()) {
108+
long startedAt = now();
109+
final Subscription sub1 = action.call(scheduler, state0);
110+
long timeTakenByActionInNanos = TimeUnit.MILLISECONDS.toNanos(now() - startedAt);
111+
final Subscription sub2 = schedule(state0, this, periodInNanos - timeTakenByActionInNanos, TimeUnit.NANOSECONDS);
112+
return Subscriptions.create(new Action0() {
113+
@Override
114+
public void call() {
115+
sub1.unsubscribe();
116+
sub2.unsubscribe();
117+
}
118+
});
119+
}
120+
return Subscriptions.empty();
121+
}
122+
};
123+
final Subscription sub = schedule(state, recursiveAction, initialDelay, unit);
124+
return Subscriptions.create(new Action0() {
125+
@Override
126+
public void call() {
127+
complete.set(true);
128+
sub.unsubscribe();
129+
}
130+
});
131+
}
132+
74133
/**
75134
* Schedules a cancelable action to be executed at dueTime.
76135
*
@@ -103,7 +162,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action) {
103162
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
104163

105164
@Override
106-
public Subscription call(Scheduler scheduler, Void t2) {
165+
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
107166
return action.call(scheduler);
108167
}
109168
});
@@ -120,7 +179,7 @@ public Subscription schedule(final Func0<Subscription> action) {
120179
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
121180

122181
@Override
123-
public Subscription call(Scheduler scheduler, Void t2) {
182+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
124183
return action.call();
125184
}
126185
});
@@ -137,7 +196,7 @@ public Subscription schedule(final Action0 action) {
137196
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
138197

139198
@Override
140-
public Subscription call(Scheduler scheduler, Void t2) {
199+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
141200
action.call();
142201
return Subscriptions.empty();
143202
}
@@ -159,7 +218,7 @@ public Subscription schedule(final Func1<Scheduler, Subscription> action, long d
159218
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
160219

161220
@Override
162-
public Subscription call(Scheduler scheduler, Void t2) {
221+
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
163222
return action.call(scheduler);
164223
}
165224
}, delayTime, unit);
@@ -176,7 +235,7 @@ public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit
176235
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
177236

178237
@Override
179-
public Subscription call(Scheduler scheduler, Void t2) {
238+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
180239
action.call();
181240
return Subscriptions.empty();
182241
}
@@ -194,17 +253,123 @@ public Subscription schedule(final Func0<Subscription> action, long delayTime, T
194253
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
195254

196255
@Override
197-
public Subscription call(Scheduler scheduler, Void t2) {
256+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
198257
return action.call();
199258
}
200259
}, delayTime, unit);
201260
}
202261

262+
/**
263+
* Schedules a cancelable action to be executed periodically.
264+
*
265+
* @param action
266+
* The action to execute periodically.
267+
* @param initialDelay
268+
* Time to wait before executing the action for the first time.
269+
* @param period
270+
* The time interval to wait each time in between executing the action.
271+
* @param unit
272+
* The time unit the interval above is given in.
273+
* @return A subscription to be able to unsubscribe from action.
274+
*/
275+
public Subscription schedulePeriodically(final Func1<Scheduler, Subscription> action, long initialDelay, long period, TimeUnit unit) {
276+
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
277+
@Override
278+
public Subscription call(Scheduler scheduler, @SuppressWarnings("unused") Void state) {
279+
return action.call(scheduler);
280+
}
281+
}, initialDelay, period, unit);
282+
}
283+
284+
/**
285+
* Schedules a cancelable action to be executed periodically.
286+
*
287+
* @param action
288+
* The action to execute periodically.
289+
* @param initialDelay
290+
* Time to wait before executing the action for the first time.
291+
* @param period
292+
* The time interval to wait each time in between executing the action.
293+
* @param unit
294+
* The time unit the interval above is given in.
295+
* @return A subscription to be able to unsubscribe from action.
296+
*/
297+
public Subscription schedulePeriodically(final Func0<Subscription> action, long initialDelay, long period, TimeUnit unit) {
298+
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
299+
@Override
300+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
301+
return action.call();
302+
}
303+
}, initialDelay, period, unit);
304+
}
305+
306+
/**
307+
* Schedules an action to be executed periodically.
308+
*
309+
* @param action
310+
* The action to execute periodically.
311+
* @param initialDelay
312+
* Time to wait before executing the action for the first time.
313+
* @param period
314+
* The time interval to wait each time in between executing the action.
315+
* @param unit
316+
* The time unit the interval above is given in.
317+
* @return A subscription to be able to unsubscribe from action.
318+
*/
319+
public Subscription schedulePeriodically(final Action0 action, long initialDelay, long period, TimeUnit unit) {
320+
return schedulePeriodically(null, new Func2<Scheduler, Void, Subscription>() {
321+
@Override
322+
public Subscription call(@SuppressWarnings("unused") Scheduler scheduler, @SuppressWarnings("unused") Void state) {
323+
action.call();
324+
return Subscriptions.empty();
325+
}
326+
}, initialDelay, period, unit);
327+
}
328+
203329
/**
204330
* Returns the scheduler's notion of current absolute time in milliseconds.
205331
*/
206332
public long now() {
207333
return System.currentTimeMillis();
208334
}
209335

336+
public static class UnitTest {
337+
@SuppressWarnings("unchecked") // mocking is unchecked, unfortunately
338+
@Test
339+
public void testPeriodicScheduling() {
340+
final Func1<Long, Void> calledOp = mock(Func1.class);
341+
342+
final TestScheduler scheduler = new TestScheduler();
343+
Subscription subscription = scheduler.schedulePeriodically(new Action0() {
344+
@Override public void call() {
345+
System.out.println(scheduler.now());
346+
calledOp.call(scheduler.now());
347+
}
348+
}, 1, 2, TimeUnit.SECONDS);
349+
350+
verify(calledOp, never()).call(anyLong());
351+
352+
InOrder inOrder = Mockito.inOrder(calledOp);
353+
354+
scheduler.advanceTimeBy(999L, TimeUnit.MILLISECONDS);
355+
inOrder.verify(calledOp, never()).call(anyLong());
356+
357+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
358+
inOrder.verify(calledOp, times(1)).call(1000L);
359+
360+
scheduler.advanceTimeBy(1999L, TimeUnit.MILLISECONDS);
361+
inOrder.verify(calledOp, never()).call(3000L);
362+
363+
scheduler.advanceTimeBy(1L, TimeUnit.MILLISECONDS);
364+
inOrder.verify(calledOp, times(1)).call(3000L);
365+
366+
scheduler.advanceTimeBy(5L, TimeUnit.SECONDS);
367+
inOrder.verify(calledOp, times(1)).call(5000L);
368+
inOrder.verify(calledOp, times(1)).call(7000L);
369+
370+
subscription.unsubscribe();
371+
scheduler.advanceTimeBy(11L, TimeUnit.SECONDS);
372+
inOrder.verify(calledOp, never()).call(anyLong());
373+
}
374+
}
210375
}

rxjava-core/src/main/java/rx/concurrency/ExecutorScheduler.java

Lines changed: 21 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,27 @@ public ExecutorScheduler(ScheduledExecutorService executor) {
4444
this.executor = executor;
4545
}
4646

47+
@Override
48+
public <T> Subscription schedulePeriodically(final T state, final Func2<Scheduler, T, Subscription> action, long initialDelay, long period, TimeUnit unit) {
49+
if (executor instanceof ScheduledExecutorService) {
50+
final CompositeSubscription subscriptions = new CompositeSubscription();
51+
52+
ScheduledFuture<?> f = ((ScheduledExecutorService) executor).scheduleAtFixedRate(new Runnable() {
53+
@Override
54+
public void run() {
55+
Subscription s = action.call(ExecutorScheduler.this, state);
56+
subscriptions.add(s);
57+
}
58+
}, initialDelay, period, unit);
59+
60+
subscriptions.add(Subscriptions.create(f));
61+
return subscriptions;
62+
63+
} else {
64+
return super.schedulePeriodically(state, action, initialDelay, period, unit);
65+
}
66+
}
67+
4768
@Override
4869
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
4970
final DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,11 +54,12 @@ public int compare(TimedAction<?> action1, TimedAction<?> action2) {
5454
}
5555
}
5656

57+
// Storing time in nanoseconds internally.
5758
private long time;
5859

5960
@Override
6061
public long now() {
61-
return time;
62+
return TimeUnit.NANOSECONDS.toMillis(time);
6263
}
6364

6465
public void advanceTimeBy(long delayTime, TimeUnit unit) {
@@ -79,6 +80,7 @@ private void triggerActions(long targetTimeInNanos) {
7980
while (!queue.isEmpty()) {
8081
TimedAction<?> current = queue.peek();
8182
if (current.time > targetTimeInNanos) {
83+
time = targetTimeInNanos;
8284
break;
8385
}
8486
time = current.time;
@@ -95,7 +97,7 @@ public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> acti
9597

9698
@Override
9799
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit) {
98-
queue.add(new TimedAction<T>(this, now() + unit.toNanos(delayTime), action, state));
100+
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
99101
return Subscriptions.empty();
100102
}
101103
}

rxjava-core/src/main/java/rx/operators/OperationInterval.java

Lines changed: 12 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import java.util.concurrent.Executors;
2222
import java.util.concurrent.TimeUnit;
23-
import java.util.concurrent.atomic.AtomicBoolean;
2423

2524
import org.junit.Before;
2625
import org.junit.Test;
@@ -57,47 +56,35 @@ public static Func1<Observer<Long>, Subscription> interval(long interval, TimeUn
5756
}
5857

5958
private static class Interval implements Func1<Observer<Long>, Subscription> {
60-
private final long interval;
59+
private final long period;
6160
private final TimeUnit unit;
6261
private final Scheduler scheduler;
6362

6463
private long currentValue;
65-
private final AtomicBoolean complete = new AtomicBoolean();
6664

67-
private Interval(long interval, TimeUnit unit, Scheduler scheduler) {
68-
this.interval = interval;
65+
private Interval(long period, TimeUnit unit, Scheduler scheduler) {
66+
this.period = period;
6967
this.unit = unit;
7068
this.scheduler = scheduler;
7169
}
7270

7371
@Override
7472
public Subscription call(final Observer<Long> observer) {
75-
scheduler.schedule(new IntervalAction(observer), interval, unit);
76-
return Subscriptions.create(new Action0() {
73+
final Subscription wrapped = scheduler.schedulePeriodically(new Action0() {
7774
@Override
7875
public void call() {
79-
complete.set(true);
76+
observer.onNext(currentValue);
77+
currentValue++;
8078
}
81-
});
82-
}
83-
84-
private class IntervalAction implements Action0 {
85-
private final Observer<Long> observer;
86-
87-
private IntervalAction(Observer<Long> observer) {
88-
this.observer = observer;
89-
}
79+
}, period, period, unit);
9080

91-
@Override
92-
public void call() {
93-
if (complete.get()) {
81+
return Subscriptions.create(new Action0() {
82+
@Override
83+
public void call() {
84+
wrapped.unsubscribe();
9485
observer.onCompleted();
95-
} else {
96-
observer.onNext(currentValue);
97-
currentValue++;
98-
scheduler.schedule(this, interval, unit);
9986
}
100-
}
87+
});
10188
}
10289
}
10390

rxjava-core/src/main/java/rx/operators/OperationTakeWhile.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
import rx.Observer;
2828
import rx.Subscription;
2929
import rx.subjects.PublishSubject;
30+
import rx.subjects.Subject;
3031
import rx.subscriptions.Subscriptions;
3132
import rx.util.AtomicObservableSubscription;
3233
import rx.util.AtomicObserver;
@@ -175,9 +176,8 @@ public Boolean call(Integer input)
175176

176177
@Test
177178
public void testTakeWhileOnSubject1() {
178-
PublishSubject<Integer> s = PublishSubject.create();
179-
Observable<Integer> w = (Observable<Integer>) s;
180-
Observable<Integer> take = Observable.create(takeWhile(w, new Func1<Integer, Boolean>()
179+
Subject<Integer, Integer> s = PublishSubject.create();
180+
Observable<Integer> take = Observable.create(takeWhile(s, new Func1<Integer, Boolean>()
181181
{
182182
@Override
183183
public Boolean call(Integer input)

0 commit comments

Comments
 (0)