Skip to content

Commit 7834e8a

Browse files
author
jmhofer
committed
Trying to extend the Scheduler interface according to the comments at
ReactiveX#19.
1 parent e119a88 commit 7834e8a

File tree

3 files changed

+103
-0
lines changed

3 files changed

+103
-0
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

+40
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,31 @@
1919

2020
import rx.util.functions.Action0;
2121
import rx.util.functions.Func0;
22+
import rx.util.functions.Func1;
23+
import rx.util.functions.Func2;
2224

2325
/**
2426
* Represents an object that schedules units of work.
2527
*/
2628
public interface Scheduler {
2729

30+
/**
31+
* Schedules a cancelable action to be executed.
32+
*
33+
* @param state State to pass into the action.
34+
* @param action Action to schedule.
35+
* @return a subscription to be able to unsubscribe from action.
36+
*/
37+
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
38+
39+
/**
40+
* Schedules a cancelable action to be executed.
41+
*
42+
* @param action Action to schedule.
43+
* @return a subscription to be able to unsubscribe from action.
44+
*/
45+
Subscription schedule(Func1<Scheduler, Subscription> action);
46+
2847
/**
2948
* Schedules a cancelable action to be executed.
3049
*
@@ -43,6 +62,27 @@ public interface Scheduler {
4362
*/
4463
Subscription schedule(Action0 action);
4564

65+
/**
66+
* Schedules a cancelable action to be executed in dueTime.
67+
*
68+
* @param state State to pass into the action.
69+
* @param action Action to schedule.
70+
* @param dueTime Time the action is due for executing.
71+
* @param unit Time unit of the due time.
72+
* @return a subscription to be able to unsubscribe from action.
73+
*/
74+
<T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit);
75+
76+
/**
77+
* Schedules a cancelable action to be executed in dueTime.
78+
*
79+
* @param action Action to schedule.
80+
* @param dueTime Time the action is due for executing.
81+
* @param unit Time unit of the due time.
82+
* @return a subscription to be able to unsubscribe from action.
83+
*/
84+
Subscription schedule(Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit);
85+
4686
/**
4787
* Schedules an action to be executed in dueTime.
4888
*

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

+42
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
import rx.subscriptions.Subscriptions;
2323
import rx.util.functions.Action0;
2424
import rx.util.functions.Func0;
25+
import rx.util.functions.Func1;
26+
import rx.util.functions.Func2;
2527

2628
/* package */abstract class AbstractScheduler implements Scheduler {
2729

@@ -30,11 +32,51 @@ public Subscription schedule(Action0 action) {
3032
return schedule(asFunc0(action));
3133
}
3234

35+
@Override
36+
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
37+
return schedule(new Func0<Subscription>() {
38+
@Override
39+
public Subscription call() {
40+
return action.call(AbstractScheduler.this);
41+
}
42+
});
43+
}
44+
45+
@Override
46+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action) {
47+
return schedule(new Func0<Subscription>() {
48+
@Override
49+
public Subscription call() {
50+
return action.call(AbstractScheduler.this, state);
51+
}
52+
});
53+
}
54+
3355
@Override
3456
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
3557
return schedule(asFunc0(action), dueTime, unit);
3658
}
3759

60+
@Override
61+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
62+
return schedule(new Func0<Subscription>() {
63+
@Override
64+
public Subscription call() {
65+
return action.call(AbstractScheduler.this);
66+
}
67+
}, dueTime, unit);
68+
}
69+
70+
@Override
71+
public <T> Subscription schedule(final T state, final Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
72+
return schedule(new Func0<Subscription>() {
73+
@Override
74+
public Subscription call() {
75+
return action.call(AbstractScheduler.this, state);
76+
}
77+
}, dueTime, unit);
78+
}
79+
3880
@Override
3981
public long now() {
4082
return System.nanoTime();

rxjava-core/src/main/java/rx/operators/Tester.java

+21
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import rx.util.functions.Action0;
2020
import rx.util.functions.Func0;
2121
import rx.util.functions.Func1;
22+
import rx.util.functions.Func2;
2223

2324
/**
2425
* Common utility functions for testing operator implementations.
@@ -289,6 +290,16 @@ public Subscription schedule(Func0<Subscription> action) {
289290
return underlying.schedule(action);
290291
}
291292

293+
@Override
294+
public Subscription schedule(Func1<Scheduler, Subscription> action) {
295+
return underlying.schedule(action);
296+
}
297+
298+
@Override
299+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
300+
return underlying.schedule(state, action);
301+
}
302+
292303
@Override
293304
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
294305
return underlying.schedule(action, dueTime, unit);
@@ -299,6 +310,16 @@ public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit
299310
return underlying.schedule(action, dueTime, unit);
300311
}
301312

313+
@Override
314+
public Subscription schedule(Func1<Scheduler, Subscription> action, long dueTime, TimeUnit unit) {
315+
return underlying.schedule(action, dueTime, unit);
316+
}
317+
318+
@Override
319+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
320+
return underlying.schedule(state, action, dueTime, unit);
321+
}
322+
302323
@Override
303324
public long now() {
304325
return underlying.now();

0 commit comments

Comments
 (0)