Skip to content

Schedulers (merge of pull #199) #225

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 26 commits into from
Apr 5, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
dfc7841
Naive schedulers implementation
mairbek Mar 14, 2013
0aa6ca2
Added ScheduledExecutorServiceScheduler
mairbek Mar 18, 2013
1896da3
Added to Schedulers
mairbek Mar 18, 2013
9eb111e
More tests
mairbek Mar 18, 2013
86a750c
Headers
mairbek Mar 18, 2013
df09fcb
ObserveOn/SubscribeOn unit tests
mairbek Mar 18, 2013
2d1c45d
Some documentation
mairbek Mar 18, 2013
db9f9a6
Documenting code
mairbek Mar 19, 2013
eaa0316
renamed tests
mairbek Mar 19, 2013
81ee35d
Extracted ScheduledObserver as a separate class
mairbek Mar 19, 2013
bd32659
Simplified ImmediateScheduler
mairbek Apr 2, 2013
b24b42f
Removed SleepingAction from Abstract scheduler.
mairbek Apr 3, 2013
9cfb294
added threadPoolForComputation and threadPoolForIO schedulers
mairbek Apr 3, 2013
48ec950
Merge branch 'schedulers' of git://github.com/mairbek/RxJava into sch…
benjchristensen Apr 5, 2013
4510b6e
Keeping ScheduledObserver out of public API
benjchristensen Apr 5, 2013
a78d756
Formatting to match codebase
benjchristensen Apr 5, 2013
a6ccf5a
Consolidating ExecutorScheduler and ScheduledExecutorScheduler
benjchristensen Apr 5, 2013
a8292de
Make ForwardingScheduler Internal
benjchristensen Apr 5, 2013
7c6a14d
Tweaks to Executor/ExecutorScheduler/IOScheduler and Javadocs
benjchristensen Apr 5, 2013
57875b0
Basic unit tests
benjchristensen Apr 5, 2013
54c1dfd
Scheduler overloads for Subscribe, ToObservable, From, Merge, Empty
benjchristensen Apr 5, 2013
52bf7e1
Set threads to daemons so they don't prevent system from exiting
benjchristensen Apr 5, 2013
56bd8db
Name the NewThreadScheduler threads
benjchristensen Apr 5, 2013
cfca6fd
Use long instead of int
benjchristensen Apr 5, 2013
d35b3e7
Fix non-deterministic unit test
benjchristensen Apr 5, 2013
97fbcc7
Removing Scheduler overloads on operators (for now)
benjchristensen Apr 5, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
123 changes: 119 additions & 4 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,13 @@
import rx.operators.OperationMergeDelayError;
import rx.operators.OperationMostRecent;
import rx.operators.OperationNext;
import rx.operators.OperationObserveOn;
import rx.operators.OperationOnErrorResumeNextViaFunction;
import rx.operators.OperationOnErrorResumeNextViaObservable;
import rx.operators.OperationOnErrorReturn;
import rx.operators.OperationScan;
import rx.operators.OperationSkip;
import rx.operators.OperationSubscribeOn;
import rx.operators.OperationSynchronize;
import rx.operators.OperationTake;
import rx.operators.OperationTakeLast;
Expand Down Expand Up @@ -189,6 +191,37 @@ public Subscription subscribe(Observer<T> observer) {
}
}

/**
* an {@link Observer} must call an Observable's <code>subscribe</code> method in order to register itself
* to receive push-based notifications from the Observable. A typical implementation of the
* <code>subscribe</code> method does the following:
* <p>
* It stores a reference to the Observer in a collection object, such as a <code>List<T></code>
* object.
* <p>
* It returns a reference to the {@link Subscription} interface. This enables
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
* <p>
* At any given time, a particular instance of an <code>Observable<T></code> implementation is
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
* documentation for a particular <code>Observable<T></code> implementation indicates otherwise,
* Observers should make no assumptions about the <code>Observable<T></code> implementation, such
* as the order of notifications that multiple Observers will receive.
* <p>
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
*
*
* @param observer
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return a {@link Subscription} reference that allows observers
* to stop receiving notifications before the provider has finished sending them
*/
public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(observer);
}

/**
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
* <p>
Expand Down Expand Up @@ -237,6 +270,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Map<String, Object> callbacks, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(callbacks);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object o) {
if (o instanceof Observer) {
Expand Down Expand Up @@ -273,6 +310,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object o, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(o);
}

public Subscription subscribe(final Action1<T> onNext) {

/**
Expand Down Expand Up @@ -301,6 +342,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError) {
// lookup and memoize onNext
Expand Down Expand Up @@ -334,6 +379,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {

/**
Expand Down Expand Up @@ -364,6 +413,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError);
}

@SuppressWarnings({ "rawtypes", "unchecked" })
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
// lookup and memoize onNext
Expand Down Expand Up @@ -399,6 +452,10 @@ public void onNext(Object args) {
});
}

public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {

/**
Expand Down Expand Up @@ -429,6 +486,10 @@ public void onNext(T args) {
});
}

public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete, Scheduler scheduler) {
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
}

/**
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
* <p>
Expand Down Expand Up @@ -831,6 +892,36 @@ public static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @param <T>
* the type of observable.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param source
* the source observable.
* @param scheduler
* the scheduler to notify observers on.
* @param <T>
* the type of observable.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
return create(OperationObserveOn.observeOn(source, scheduler));
}

/**
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
Expand Down Expand Up @@ -1242,7 +1333,7 @@ public static <T> Observable<T> concat(Observable<T>... source) {
* @return an Observable that emits the same objects, then calls the action.
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
*/
public static <T> Observable<T> finallyDo(Observable source, Action0 action) {
public static <T> Observable<T> finallyDo(Observable<T> source, Action0 action) {
return create(OperationFinally.finallyDo(source, action));
}

Expand Down Expand Up @@ -1756,6 +1847,7 @@ public static <T> Observable<Boolean> all(final Observable<T> sequence, final Fu
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
*/
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return all(sequence, new Func1<T, Boolean>() {
Expand Down Expand Up @@ -2150,7 +2242,7 @@ public static <T> Observable<T> toObservable(Future<T> future) {
*
* @param future
* the source {@link Future}
* @param time
* @param timeout
* the maximum time to wait
* @param unit
* the time unit of the time argument
Expand All @@ -2159,8 +2251,8 @@ public static <T> Observable<T> toObservable(Future<T> future) {
* Observable
* @return an Observable that emits the item from the source Future
*/
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
public static <T> Observable<T> toObservable(Future<T> future, long timeout, TimeUnit unit) {
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
}

/**
Expand Down Expand Up @@ -2736,6 +2828,28 @@ public Observable<Notification<T>> materialize() {
return materialize(this);
}

/**
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
*
* @param scheduler
* the scheduler to perform subscription and unsubscription actions on.
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
*/
public Observable<T> subscribeOn(Scheduler scheduler) {
return subscribeOn(this, scheduler);
}

/**
* Asynchronously notify observers on the specified scheduler.
*
* @param scheduler
* the scheduler to notify observers on.
* @return the source sequence whose observations happen on the specified scheduler.
*/
public Observable<T> observeOn(Scheduler scheduler) {
return observeOn(this, scheduler);
}

/**
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
*
Expand Down Expand Up @@ -3656,6 +3770,7 @@ public void testMaterializeDematerializeChaining() {
Observable<Integer> obs = Observable.just(1);
Observable<Integer> chained = obs.materialize().dematerialize();

@SuppressWarnings("unchecked")
Observer<Integer> observer = mock(Observer.class);
chained.subscribe(observer);

Expand Down
69 changes: 69 additions & 0 deletions rxjava-core/src/main/java/rx/Scheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx;

import java.util.concurrent.TimeUnit;

import rx.util.functions.Action0;
import rx.util.functions.Func0;

/**
* Represents an object that schedules units of work.
*/
public interface Scheduler {

/**
* Schedules a cancelable action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action);

/**
* Schedules an action to be executed.
*
* @param action
* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action);

/**
* Schedules an action to be executed in dueTime.
*
* @param action
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add param for dueTime

* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);

/**
* Schedules a cancelable action to be executed in dueTime.
*
* @param action
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add param for dueTime

* action
* @return a subscription to be able to unsubscribe from action.
*/
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);

/**
* Returns the scheduler's notion of current time.
*/
long now();

}
53 changes: 53 additions & 0 deletions rxjava-core/src/main/java/rx/concurrency/AbstractScheduler.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
/**
* Copyright 2013 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package rx.concurrency;

import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.subscriptions.Subscriptions;
import rx.util.functions.Action0;
import rx.util.functions.Func0;

/* package */abstract class AbstractScheduler implements Scheduler {

@Override
public Subscription schedule(Action0 action) {
return schedule(asFunc0(action));
}

@Override
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
return schedule(asFunc0(action), dueTime, unit);
}

@Override
public long now() {
return System.nanoTime();
}

private static Func0<Subscription> asFunc0(final Action0 action) {
return new Func0<Subscription>() {
@Override
public Subscription call() {
action.call();
return Subscriptions.empty();
}
};
}

}
Loading