Skip to content

Schedulers Interface (Merging and Adding to Pull Request 229) #235

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
159 changes: 150 additions & 9 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,15 +15,99 @@
*/
package rx;

import java.util.Date;
import java.util.concurrent.TimeUnit;

import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Represents an object that schedules units of work.
* <p>
* The methods left to implement are:
* <ul>
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit)}</li>
* <li>{@code <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action)}</li>
* </ul>
* <p>
* Why is this an abstract class instead of an interface?
* <p>
* <ol>
* <li>Java doesn't support extension methods and there are many overload methods needing default implementations.</li>
* <li>Virtual extension methods aren't available until Java8 which RxJava will not set as a minimum target for a long time.</li>
* <li>If only an interface were used Scheduler implementations would then need to extend from an AbstractScheduler pair that gives all of the functionality unless they intend on copy/pasting the
* functionality.</li>
* <li>Without virtual extension methods even additive changes are breaking and thus severely impede library maintenance.</li>
* </ol>
*/
public interface Scheduler {
public abstract class Scheduler {

/**
* Schedules a cancelable action to be executed.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action);

/**
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are still missing the .Net method:

Scheduler.Schedule Method (IScheduler, DateTimeOffset, Action)

The DateTimeOffset object is described as representing "The absolute time at which to execute the action."

This would be pretty straight-forward to implement as an overload in AbstractScheduler that takes desired time minus current time and gets the delay to pass to the other methods delayTime argument.

We should probably add this before committing so we fully match the .Net interface.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree. Easy to do, and doesn't hurt to have it.

* Schedules a cancelable action to be executed in delayTime.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param unit
* Time unit of the delay time.
* @return a subscription to be able to unsubscribe from action.
*/
public abstract <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long delayTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed at dueTime.
*
* @param state
* State to pass into the action.
* @param action
* Action to schedule.
* @param dueTime
* Time the action is to be executed. If in the past it will be executed immediately.
* @return a subscription to be able to unsubscribe from action.
*/
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, Date dueTime) {
long scheduledTime = dueTime.getTime();
long timeInFuture = scheduledTime - now();
if (timeInFuture <= 0) {
return schedule(state, action);
} else {
return schedule(state, action, timeInFuture, TimeUnit.MILLISECONDS);
}
}

/**
* Schedules a cancelable action to be executed.
*
* @param action
* Action to schedule.
* @return a subscription to be able to unsubscribe from action.
*/
public Subscription schedule(final Func1<Scheduler, Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
return action.call(scheduler);
}
});
}

/**
* Schedules a cancelable action to be executed.
Expand All @@ -32,7 +116,15 @@ public interface Scheduler {
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action);
public Subscription schedule(final Func0<Subscription> action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
return action.call();
}
});
}

/**
* Schedules an action to be executed.
Expand All @@ -41,29 +133,78 @@ public interface Scheduler {
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action);
public Subscription schedule(final Action0 action) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
action.call();
return Subscriptions.empty();
}
});
}

/**
* Schedules an action to be executed in dueTime.
* Schedules a cancelable action to be executed in delayTime.
*
* @param action
* Action to schedule.
* @param delayTime
* Time the action is to be delayed before executing.
* @param unit
* Time unit of the delay time.
* @return a subscription to be able to unsubscribe from action.
*/
public Subscription schedule(final Func1<Scheduler, Subscription> action, long delayTime, TimeUnit unit) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
return action.call(scheduler);
}
}, delayTime, unit);
}

/**
* Schedules an action to be executed in delayTime.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
public Subscription schedule(final Action0 action, long delayTime, TimeUnit unit) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
action.call();
return Subscriptions.empty();
}
}, delayTime, unit);
}

/**
* Schedules a cancelable action to be executed in dueTime.
* Schedules a cancelable action to be executed in delayTime.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
public Subscription schedule(final Func0<Subscription> action, long delayTime, TimeUnit unit) {
return schedule(null, new Func2<Scheduler, Void, Subscription>() {

@Override
public Subscription call(Scheduler scheduler, Void t2) {
return action.call();
}
}, delayTime, unit);
}

/**
* Returns the scheduler's notion of current time.
* Returns the scheduler's notion of current absolute time in milliseconds.
*/
long now();
public long now() {
return System.currentTimeMillis();
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that this is in milliseconds since it must be an absolute time that can be used to create a Date and getNanos() does not provide that.

This has implications for the SleepingAction which will need to change: #236

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See how line 87 above is using now() to get milliseconds and interact with Date.

}

}
53 changes: 0 additions & 53 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -24,51 +24,55 @@
import org.junit.Test;
import org.mockito.InOrder;

import rx.Scheduler;
import rx.Subscription;
import rx.util.functions.Action0;
import rx.util.functions.Func0;
import rx.util.functions.Func2;

/**
* Schedules work on the current thread but does not execute immediately. Work is put in a queue and executed after the current unit of work is completed.
*/
public class CurrentThreadScheduler extends AbstractScheduler {
public class CurrentThreadScheduler extends Scheduler {
private static final CurrentThreadScheduler INSTANCE = new CurrentThreadScheduler();

public static CurrentThreadScheduler getInstance() {
return INSTANCE;
}

private static final ThreadLocal<Queue<DiscardableAction>> QUEUE = new ThreadLocal<Queue<DiscardableAction>>();
private static final ThreadLocal<Queue<DiscardableAction<?>>> QUEUE = new ThreadLocal<Queue<DiscardableAction<?>>>();

private CurrentThreadScheduler() {
}

@Override
public Subscription schedule(Func0<Subscription> action) {
DiscardableAction discardableAction = new DiscardableAction(action);
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action) {
DiscardableAction<T> discardableAction = new DiscardableAction<T>(state, action);
enqueue(discardableAction);
return discardableAction;
}

@Override
public Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit) {
return schedule(new SleepingAction(action, this, dueTime, unit));
public <T> Subscription schedule(T state, Func2<Scheduler, T, Subscription> action, long dueTime, TimeUnit unit) {
// since we are executing immediately on this thread we must cause this thread to sleep
// TODO right now the 'enqueue' does not take delay into account so if another task is enqueued after this it will
// wait behind the sleeping action ... should that be the case or should it be allowed to proceed ahead of the delayed action?
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this doing the right thing?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, I've been wondering about this myself. Not sure, but I think that it does the right thing, although together with sleeps, it's not particularly useful (either way).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When implementing this I thought about having a priority queue as an alternative, but this implementation was straightforward to implement and I wasn't sure whether it makes sense or not so I let it go.

After second thought it is clear that this is a bug:

Here is a unit test to reproduce it

    @Test
    public void testSched() {

        final Scheduler scheduler = Schedulers.currentThread();

        scheduler.schedule(new Action0() {
            @Override
            public void call() {
                scheduler.schedule(new Action0() {
                    @Override
                    public void call() {
                        System.out.println("one");
                    }
                }, 3, TimeUnit.SECONDS);

                scheduler.schedule(new Action0() {
                    @Override
                    public void call() {
                        System.out.println("two");
                    }
                }, 1, TimeUnit.SECONDS);
                System.out.println("scheduled");
            }
        });


    }

This prints out one and then two which is wrong.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of me trying to fix that in this pull request I think we should do another fix after I merge these changes then.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, I'll work on it, and update the test coverage.

return schedule(state, new SleepingAction<T>(action, this, dueTime, unit));
}

private void enqueue(DiscardableAction action) {
Queue<DiscardableAction> queue = QUEUE.get();
private void enqueue(DiscardableAction<?> action) {
Queue<DiscardableAction<?>> queue = QUEUE.get();
boolean exec = queue == null;

if (exec) {
queue = new LinkedList<DiscardableAction>();
queue = new LinkedList<DiscardableAction<?>>();
QUEUE.set(queue);
}

queue.add(action);

if (exec) {
while (!queue.isEmpty()) {
queue.poll().call();
queue.poll().call(this);
}

QUEUE.set(null);
Expand Down Expand Up @@ -143,4 +147,5 @@ public void testSequenceOfActions() {
}

}

}
17 changes: 11 additions & 6 deletions rxjava-core/src/main/java/rx/concurrency/DiscardableAction.java
Original file line number Diff line number Diff line change
Expand Up @@ -17,27 +17,31 @@

import java.util.concurrent.atomic.AtomicBoolean;

import rx.Scheduler;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.functions.Func0;
import rx.util.functions.Func1;
import rx.util.functions.Func2;

/**
* Combines standard {@link Subscription#unsubscribe()} functionality with ability to skip execution if an unsubscribe occurs before the {@link #call()} method is invoked.
*/
/* package */class DiscardableAction implements Func0<Subscription>, Subscription {
private final Func0<Subscription> underlying;
/* package */class DiscardableAction<T> implements Func1<Scheduler, Subscription>, Subscription {
private final Func2<Scheduler, T, Subscription> underlying;
private final T state;

private final AtomicObservableSubscription wrapper = new AtomicObservableSubscription();
private final AtomicBoolean ready = new AtomicBoolean(true);

public DiscardableAction(Func0<Subscription> underlying) {
public DiscardableAction(T state, Func2<Scheduler, T, Subscription> underlying) {
this.state = state;
this.underlying = underlying;
}

@Override
public Subscription call() {
public Subscription call(Scheduler scheduler) {
if (ready.compareAndSet(true, false)) {
Subscription subscription = underlying.call();
Subscription subscription = underlying.call(scheduler, state);
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think I'm passing the state along correctly here by having it stored in the DiscardableAction ... though I have not spent enough time to unit test all of the scenarios.

wrapper.wrap(subscription);
return subscription;
}
Expand All @@ -49,4 +53,5 @@ public void unsubscribe() {
ready.set(false);
wrapper.unsubscribe();
}

}
Loading