Skip to content

Commit 2bf03cf

Browse files
Merge pull request #225 from benjchristensen/schedulers-merge
Schedulers (merge of pull #199)
2 parents fccef29 + 97fbcc7 commit 2bf03cf

15 files changed

+1448
-4
lines changed

rxjava-core/src/main/java/rx/Observable.java

+119-4
Original file line numberDiff line numberDiff line change
@@ -49,11 +49,13 @@
4949
import rx.operators.OperationMergeDelayError;
5050
import rx.operators.OperationMostRecent;
5151
import rx.operators.OperationNext;
52+
import rx.operators.OperationObserveOn;
5253
import rx.operators.OperationOnErrorResumeNextViaFunction;
5354
import rx.operators.OperationOnErrorResumeNextViaObservable;
5455
import rx.operators.OperationOnErrorReturn;
5556
import rx.operators.OperationScan;
5657
import rx.operators.OperationSkip;
58+
import rx.operators.OperationSubscribeOn;
5759
import rx.operators.OperationSynchronize;
5860
import rx.operators.OperationTake;
5961
import rx.operators.OperationTakeLast;
@@ -189,6 +191,37 @@ public Subscription subscribe(Observer<T> observer) {
189191
}
190192
}
191193

194+
/**
195+
* an {@link Observer} must call an Observable's <code>subscribe</code> method in order to register itself
196+
* to receive push-based notifications from the Observable. A typical implementation of the
197+
* <code>subscribe</code> method does the following:
198+
* <p>
199+
* It stores a reference to the Observer in a collection object, such as a <code>List<T></code>
200+
* object.
201+
* <p>
202+
* It returns a reference to the {@link Subscription} interface. This enables
203+
* Observers to unsubscribe (that is, to stop receiving notifications) before the Observable has
204+
* finished sending them and has called the Observer's {@link Observer#onCompleted()} method.
205+
* <p>
206+
* At any given time, a particular instance of an <code>Observable<T></code> implementation is
207+
* responsible for accepting all subscriptions and notifying all subscribers. Unless the
208+
* documentation for a particular <code>Observable<T></code> implementation indicates otherwise,
209+
* Observers should make no assumptions about the <code>Observable<T></code> implementation, such
210+
* as the order of notifications that multiple Observers will receive.
211+
* <p>
212+
* For more information see the <a href="https://github.com/Netflix/RxJava/wiki/Observable">RxJava Wiki</a>
213+
*
214+
*
215+
* @param observer
216+
* @param scheduler
217+
* The {@link Scheduler} that the sequence is subscribed to on.
218+
* @return a {@link Subscription} reference that allows observers
219+
* to stop receiving notifications before the provider has finished sending them
220+
*/
221+
public Subscription subscribe(Observer<T> observer, Scheduler scheduler) {
222+
return subscribeOn(scheduler).subscribe(observer);
223+
}
224+
192225
/**
193226
* Used for protecting against errors being thrown from Observer implementations and ensuring onNext/onError/onCompleted contract compliance.
194227
* <p>
@@ -237,6 +270,10 @@ public void onNext(Object args) {
237270
});
238271
}
239272

273+
public Subscription subscribe(final Map<String, Object> callbacks, Scheduler scheduler) {
274+
return subscribeOn(scheduler).subscribe(callbacks);
275+
}
276+
240277
@SuppressWarnings({ "rawtypes", "unchecked" })
241278
public Subscription subscribe(final Object o) {
242279
if (o instanceof Observer) {
@@ -273,6 +310,10 @@ public void onNext(Object args) {
273310
});
274311
}
275312

313+
public Subscription subscribe(final Object o, Scheduler scheduler) {
314+
return subscribeOn(scheduler).subscribe(o);
315+
}
316+
276317
public Subscription subscribe(final Action1<T> onNext) {
277318

278319
/**
@@ -301,6 +342,10 @@ public void onNext(T args) {
301342
});
302343
}
303344

345+
public Subscription subscribe(final Action1<T> onNext, Scheduler scheduler) {
346+
return subscribeOn(scheduler).subscribe(onNext);
347+
}
348+
304349
@SuppressWarnings({ "rawtypes", "unchecked" })
305350
public Subscription subscribe(final Object onNext, final Object onError) {
306351
// lookup and memoize onNext
@@ -334,6 +379,10 @@ public void onNext(Object args) {
334379
});
335380
}
336381

382+
public Subscription subscribe(final Object onNext, final Object onError, Scheduler scheduler) {
383+
return subscribeOn(scheduler).subscribe(onNext, onError);
384+
}
385+
337386
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError) {
338387

339388
/**
@@ -364,6 +413,10 @@ public void onNext(T args) {
364413
});
365414
}
366415

416+
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, Scheduler scheduler) {
417+
return subscribeOn(scheduler).subscribe(onNext, onError);
418+
}
419+
367420
@SuppressWarnings({ "rawtypes", "unchecked" })
368421
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete) {
369422
// lookup and memoize onNext
@@ -399,6 +452,10 @@ public void onNext(Object args) {
399452
});
400453
}
401454

455+
public Subscription subscribe(final Object onNext, final Object onError, final Object onComplete, Scheduler scheduler) {
456+
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
457+
}
458+
402459
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete) {
403460

404461
/**
@@ -429,6 +486,10 @@ public void onNext(T args) {
429486
});
430487
}
431488

489+
public Subscription subscribe(final Action1<T> onNext, final Action1<Exception> onError, final Action0 onComplete, Scheduler scheduler) {
490+
return subscribeOn(scheduler).subscribe(onNext, onError, onComplete);
491+
}
492+
432493
/**
433494
* Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated.
434495
* <p>
@@ -831,6 +892,36 @@ public static Observable<Integer> range(int start, int count) {
831892
return from(Range.createWithCount(start, count));
832893
}
833894

895+
/**
896+
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
897+
*
898+
* @param source
899+
* the source observable.
900+
* @param scheduler
901+
* the scheduler to perform subscription and unsubscription actions on.
902+
* @param <T>
903+
* the type of observable.
904+
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
905+
*/
906+
public static <T> Observable<T> subscribeOn(Observable<T> source, Scheduler scheduler) {
907+
return create(OperationSubscribeOn.subscribeOn(source, scheduler));
908+
}
909+
910+
/**
911+
* Asynchronously notify observers on the specified scheduler.
912+
*
913+
* @param source
914+
* the source observable.
915+
* @param scheduler
916+
* the scheduler to notify observers on.
917+
* @param <T>
918+
* the type of observable.
919+
* @return the source sequence whose observations happen on the specified scheduler.
920+
*/
921+
public static <T> Observable<T> observeOn(Observable<T> source, Scheduler scheduler) {
922+
return create(OperationObserveOn.observeOn(source, scheduler));
923+
}
924+
834925
/**
835926
* Returns an observable sequence that invokes the observable factory whenever a new observer subscribes.
836927
* The Defer operator allows you to defer or delay the creation of the sequence until the time when an observer
@@ -1242,7 +1333,7 @@ public static <T> Observable<T> concat(Observable<T>... source) {
12421333
* @return an Observable that emits the same objects, then calls the action.
12431334
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212133(v=vs.103).aspx">MSDN: Observable.Finally Method</a>
12441335
*/
1245-
public static <T> Observable<T> finallyDo(Observable source, Action0 action) {
1336+
public static <T> Observable<T> finallyDo(Observable<T> source, Action0 action) {
12461337
return create(OperationFinally.finallyDo(source, action));
12471338
}
12481339

@@ -1756,6 +1847,7 @@ public static <T> Observable<Boolean> all(final Observable<T> sequence, final Fu
17561847
* @return true if all elements of an observable sequence satisfies a condition; otherwise, false.
17571848
*/
17581849
public static <T> Observable<Boolean> all(final Observable<T> sequence, Object predicate) {
1850+
@SuppressWarnings("rawtypes")
17591851
final FuncN _f = Functions.from(predicate);
17601852

17611853
return all(sequence, new Func1<T, Boolean>() {
@@ -2150,7 +2242,7 @@ public static <T> Observable<T> toObservable(Future<T> future) {
21502242
*
21512243
* @param future
21522244
* the source {@link Future}
2153-
* @param time
2245+
* @param timeout
21542246
* the maximum time to wait
21552247
* @param unit
21562248
* the time unit of the time argument
@@ -2159,8 +2251,8 @@ public static <T> Observable<T> toObservable(Future<T> future) {
21592251
* Observable
21602252
* @return an Observable that emits the item from the source Future
21612253
*/
2162-
public static <T> Observable<T> toObservable(Future<T> future, long time, TimeUnit unit) {
2163-
return create(OperationToObservableFuture.toObservableFuture(future, time, unit));
2254+
public static <T> Observable<T> toObservable(Future<T> future, long timeout, TimeUnit unit) {
2255+
return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit));
21642256
}
21652257

21662258
/**
@@ -2736,6 +2828,28 @@ public Observable<Notification<T>> materialize() {
27362828
return materialize(this);
27372829
}
27382830

2831+
/**
2832+
* Asynchronously subscribes and unsubscribes observers on the specified scheduler.
2833+
*
2834+
* @param scheduler
2835+
* the scheduler to perform subscription and unsubscription actions on.
2836+
* @return the source sequence whose subscriptions and unsubscriptions happen on the specified scheduler.
2837+
*/
2838+
public Observable<T> subscribeOn(Scheduler scheduler) {
2839+
return subscribeOn(this, scheduler);
2840+
}
2841+
2842+
/**
2843+
* Asynchronously notify observers on the specified scheduler.
2844+
*
2845+
* @param scheduler
2846+
* the scheduler to notify observers on.
2847+
* @return the source sequence whose observations happen on the specified scheduler.
2848+
*/
2849+
public Observable<T> observeOn(Scheduler scheduler) {
2850+
return observeOn(this, scheduler);
2851+
}
2852+
27392853
/**
27402854
* Dematerializes the explicit notification values of an observable sequence as implicit notifications.
27412855
*
@@ -3656,6 +3770,7 @@ public void testMaterializeDematerializeChaining() {
36563770
Observable<Integer> obs = Observable.just(1);
36573771
Observable<Integer> chained = obs.materialize().dematerialize();
36583772

3773+
@SuppressWarnings("unchecked")
36593774
Observer<Integer> observer = mock(Observer.class);
36603775
chained.subscribe(observer);
36613776

Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.util.functions.Action0;
21+
import rx.util.functions.Func0;
22+
23+
/**
24+
* Represents an object that schedules units of work.
25+
*/
26+
public interface Scheduler {
27+
28+
/**
29+
* Schedules a cancelable action to be executed.
30+
*
31+
* @param action
32+
* action
33+
* @return a subscription to be able to unsubscribe from action.
34+
*/
35+
Subscription schedule(Func0<Subscription> action);
36+
37+
/**
38+
* Schedules an action to be executed.
39+
*
40+
* @param action
41+
* action
42+
* @return a subscription to be able to unsubscribe from action.
43+
*/
44+
Subscription schedule(Action0 action);
45+
46+
/**
47+
* Schedules an action to be executed in dueTime.
48+
*
49+
* @param action
50+
* action
51+
* @return a subscription to be able to unsubscribe from action.
52+
*/
53+
Subscription schedule(Action0 action, long dueTime, TimeUnit unit);
54+
55+
/**
56+
* Schedules a cancelable action to be executed in dueTime.
57+
*
58+
* @param action
59+
* action
60+
* @return a subscription to be able to unsubscribe from action.
61+
*/
62+
Subscription schedule(Func0<Subscription> action, long dueTime, TimeUnit unit);
63+
64+
/**
65+
* Returns the scheduler's notion of current time.
66+
*/
67+
long now();
68+
69+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.concurrency;
17+
18+
import java.util.concurrent.TimeUnit;
19+
20+
import rx.Scheduler;
21+
import rx.Subscription;
22+
import rx.subscriptions.Subscriptions;
23+
import rx.util.functions.Action0;
24+
import rx.util.functions.Func0;
25+
26+
/* package */abstract class AbstractScheduler implements Scheduler {
27+
28+
@Override
29+
public Subscription schedule(Action0 action) {
30+
return schedule(asFunc0(action));
31+
}
32+
33+
@Override
34+
public Subscription schedule(Action0 action, long dueTime, TimeUnit unit) {
35+
return schedule(asFunc0(action), dueTime, unit);
36+
}
37+
38+
@Override
39+
public long now() {
40+
return System.nanoTime();
41+
}
42+
43+
private static Func0<Subscription> asFunc0(final Action0 action) {
44+
return new Func0<Subscription>() {
45+
@Override
46+
public Subscription call() {
47+
action.call();
48+
return Subscriptions.empty();
49+
}
50+
};
51+
}
52+
53+
}

0 commit comments

Comments
 (0)