Skip to content

Commit b66557c

Browse files
Merge pull request #235 from benjchristensen/schedulers-pull-229-merge
Schedulers Interface (Merging and Adding to Pull Request 229)
2 parents d0118b2 + d2a3f29 commit b66557c

14 files changed

+672
-162
lines changed

rxjava-core/src/main/java/rx/Scheduler.java

Lines changed: 150 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,15 +15,99 @@
1515
*/
1616
package rx;
1717

18+
import java.util.Date;
1819
import java.util.concurrent.TimeUnit;
1920

21+
import rx.subscriptions.Subscriptions;
2022
import rx.util.functions.Action0;
2123
import rx.util.functions.Func0;
24+
import rx.util.functions.Func1;
25+
import rx.util.functions.Func2;
2226

2327
/**
2428
* Represents an object that schedules units of work.
29+
* <p>
30+
* The methods left to implement are:
31+
* <ul>
32+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
33+
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
34+
* </ul>
35+
* <p>
36+
* Why is this an abstract class instead of an interface?
37+
* <p>
38+
* <ol>
39+
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
40+
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
41+
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
42+
* functionality.</li>
43+
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
44+
* </ol>
2545
*/
26-
public interface Scheduler {
46+
public abstract class Scheduler {
47+
48+
/**
49+
* Schedules a cancelable action to be executed.
50+
*
51+
* @param state
52+
* State to pass into the action.
53+
* @param action
54+
* Action to schedule.
55+
* @return a subscription to be able to unsubscribe from action.
56+
*/
57+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);
58+
59+
/**
60+
* Schedules a cancelable action to be executed in delayTime.
61+
*
62+
* @param state
63+
* State to pass into the action.
64+
* @param action
65+
* Action to schedule.
66+
* @param delayTime
67+
* Time the action is to be delayed before executing.
68+
* @param unit
69+
* Time unit of the delay time.
70+
* @return a subscription to be able to unsubscribe from action.
71+
*/
72+
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);
73+
74+
/**
75+
* Schedules a cancelable action to be executed at dueTime.
76+
*
77+
* @param state
78+
* State to pass into the action.
79+
* @param action
80+
* Action to schedule.
81+
* @param dueTime
82+
* Time the action is to be executed. If in the past it will be executed immediately.
83+
* @return a subscription to be able to unsubscribe from action.
84+
*/
85+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
86+
long scheduledTime = dueTime.getTime();
87+
long timeInFuture = scheduledTime - now();
88+
if (timeInFuture <= 0) {
89+
return schedule(state, action);
90+
} else {
91+
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
92+
}
93+
}
94+
95+
/**
96+
* Schedules a cancelable action to be executed.
97+
*
98+
* @param action
99+
* Action to schedule.
100+
* @return a subscription to be able to unsubscribe from action.
101+
*/
102+
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
103+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
104+
105+
@Override
106+
public Subscription call(Scheduler scheduler, Void t2) {
107+
return action.call(scheduler);
108+
}
109+
});
110+
}
27111

28112
/**
29113
* Schedules a cancelable action to be executed.
@@ -32,7 +116,15 @@ public interface Scheduler {
32116
* action
33117
* @return a subscription to be able to unsubscribe from action.
34118
*/
35-
Subscription schedule(Func0<Subscription> action);
119+
public Subscription schedule(final Func0<Subscription> action) {
120+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
121+
122+
@Override
123+
public Subscription call(Scheduler scheduler, Void t2) {
124+
return action.call();
125+
}
126+
});
127+
}
36128

37129
/**
38130
* Schedules an action to be executed.
@@ -41,29 +133,78 @@ public interface Scheduler {
41133
* action
42134
* @return a subscription to be able to unsubscribe from action.
43135
*/
44-
Subscription schedule(Action0 action);
136+
public Subscription schedule(final Action0 action) {
137+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
138+
139+
@Override
140+
public Subscription call(Scheduler scheduler, Void t2) {
141+
action.call();
142+
return Subscriptions.empty();
143+
}
144+
});
145+
}
45146

46147
/**
47-
* Schedules an action to be executed in dueTime.
148+
* Schedules a cancelable action to be executed in delayTime.
149+
*
150+
* @param action
151+
* Action to schedule.
152+
* @param delayTime
153+
* Time the action is to be delayed before executing.
154+
* @param unit
155+
* Time unit of the delay time.
156+
* @return a subscription to be able to unsubscribe from action.
157+
*/
158+
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
159+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
160+
161+
@Override
162+
public Subscription call(Scheduler scheduler, Void t2) {
163+
return action.call(scheduler);
164+
}
165+
}, delayTime, unit);
166+
}
167+
168+
/**
169+
* Schedules an action to be executed in delayTime.
48170
*
49171
* @param action
50172
* action
51173
* @return a subscription to be able to unsubscribe from action.
52174
*/
53-
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
175+
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
176+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
177+
178+
@Override
179+
public Subscription call(Scheduler scheduler, Void t2) {
180+
action.call();
181+
return Subscriptions.empty();
182+
}
183+
}, delayTime, unit);
184+
}
54185

55186
/**
56-
* Schedules a cancelable action to be executed in dueTime.
187+
* Schedules a cancelable action to be executed in delayTime.
57188
*
58189
* @param action
59190
* action
60191
* @return a subscription to be able to unsubscribe from action.
61192
*/
62-
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
193+
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
194+
return schedule(null, new Func2<Scheduler, Void, Subscription>() {
195+
196+
@Override
197+
public Subscription call(Scheduler scheduler, Void t2) {
198+
return action.call();
199+
}
200+
}, delayTime, unit);
201+
}
63202

64203
/**
65-
* Returns the scheduler's notion of current time.
204+
* Returns the scheduler's notion of current absolute time in milliseconds.
66205
*/
67-
long now();
206+
public long now() {
207+
return System.currentTimeMillis();
208+
}
68209

69210
}

rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

Lines changed: 0 additions & 53 deletions
This file was deleted.

rxjava-core/src/main/java/rx/concurrency/CurrentThreadScheduler.java

Lines changed: 16 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -24,51 +24,55 @@
2424
import org.junit.Test;
2525
import org.mockito.InOrder;
2626

27+
import rx.Scheduler;
2728
import rx.Subscription;
2829
import rx.util.functions.Action0;
29-
import rx.util.functions.Func0;
30+
import rx.util.functions.Func2;
3031

3132
/**
3233
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
3334
*/
34-
public class CurrentThreadScheduler extends AbstractScheduler {
35+
public class CurrentThreadScheduler extends Scheduler {
3536
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();
3637

3738
public static CurrentThreadScheduler getInstance() {
3839
return INSTANCE;
3940
}
4041

41-
private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();
42+
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();
4243

4344
private CurrentThreadScheduler() {
4445
}
4546

4647
@Override
47-
public Subscription schedule(Func0<Subscription> action) {
48-
DiscardableAction discardableAction = new DiscardableAction(action);
48+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
49+
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
4950
enqueue(discardableAction);
5051
return discardableAction;
5152
}
5253

5354
@Override
54-
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
55-
return schedule(new SleepingAction(action, this, dueTime, unit));
55+
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
56+
// since we are executing immediately on this thread we must cause this thread to sleep
57+
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
58+
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
59+
return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
5660
}
5761

58-
private void enqueue(DiscardableAction action) {
59-
Queue<DiscardableAction> queue = QUEUE.get();
62+
private void enqueue(DiscardableAction<?> action) {
63+
Queue<DiscardableAction<?>> queue = QUEUE.get();
6064
boolean exec = queue == null;
6165

6266
if (exec) {
63-
queue = new LinkedList<DiscardableAction>();
67+
queue = new LinkedList<DiscardableAction<?>>();
6468
QUEUE.set(queue);
6569
}
6670

6771
queue.add(action);
6872

6973
if (exec) {
7074
while (!queue.isEmpty()) {
71-
queue.poll().call();
75+
queue.poll().call(this);
7276
}
7377

7478
QUEUE.set(null);
@@ -143,4 +147,5 @@ public void testSequenceOfActions() {
143147
}
144148

145149
}
150+
146151
}

rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -17,27 +17,31 @@
1717

1818
import java.util.concurrent.atomic.AtomicBoolean;
1919

20+
import rx.Scheduler;
2021
import rx.Subscription;
2122
import rx.util.AtomicObservableSubscription;
22-
import rx.util.functions.Func0;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
2325

2426
/**
2527
* Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked.
2628
*/
27-
/* package */class DiscardableAction implements Func0<Subscription>, Subscription {
28-
private final Func0<Subscription> underlying;
29+
/* package */class DiscardableAction<T> implements Func1<Scheduler, Subscription>, Subscription {
30+
private final Func2<Scheduler, T, Subscription> underlying;
31+
private final T state;
2932

3033
private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
3134
private final AtomicBoolean ready = new AtomicBoolean(true);
3235

33-
public DiscardableAction(Func0<Subscription> underlying) {
36+
public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {
37+
this.state = state;
3438
this.underlying = underlying;
3539
}
3640

3741
@Override
38-
public Subscription call() {
42+
public Subscription call(Scheduler scheduler) {
3943
if (ready.compareAndSet(true, false)) {
40-
Subscription subscription = underlying.call();
44+
Subscription subscription = underlying.call(scheduler, state);
4145
wrapper.wrap(subscription);
4246
return subscription;
4347
}
@@ -49,4 +53,5 @@ public void unsubscribe() {
4953
ready.set(false);
5054
wrapper.unsubscribe();
5155
}
56+
5257
}

0 commit comments

Comments
 (0)