From 812b668c7386819efd592a125c290dca0a00bc93 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 13:30:28 -0700 Subject: [PATCH] Merge scheduler overloads using subscribeOn --- rxjava-core/src/main/java/rx/Observable.java | 193 ++++++++++++++++++ .../java/rx/concurrency/TestSchedulers.java | 6 +- 2 files changed, 196 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6d7f503084..db6a5cdf5b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -779,6 +779,21 @@ public static Observable empty() { return toObservable(new ArrayList()); } + /** + * Returns an Observable that returns no data to the {@link Observer} and immediately invokes its onCompleted method on the given {@link Scheduler}. + *

+ * + * + * @param + * the type of item emitted by the Observable + * @param {@link Scheduler} The scheduler to send the termination ({@link Observer#onCompleted()} call. + * @return an Observable that returns no data to the {@link Observer} and immediately invokes the {@link Observer}'s onCompleted method + */ + @SuppressWarnings("unchecked") + public static Observable empty(Scheduler scheduler) { + return (Observable) empty().subscribeOn(scheduler); + } + /** * Returns an Observable that calls onError when an {@link Observer} subscribes to it. *

@@ -862,6 +877,22 @@ public static Observable from(Iterable iterable) { return toObservable(iterable); } + /** + * Converts an {@link Iterable} sequence to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * @param iterable + * the source {@link Iterable} sequence + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param + * the type of items in the {@link Iterable} sequence and the type emitted by the resulting Observable + * @return an Observable that emits each item in the source {@link Iterable} sequence + * @see {@link #toObservable(Iterable)} + */ + public static Observable from(Iterable iterable, Scheduler scheduler) { + return toObservable(iterable, scheduler); + } + /** * Converts an Array to an Observable sequence. * @@ -876,6 +907,22 @@ public static Observable from(T... items) { return toObservable(items); } + /** + * Converts an Array to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param items + * the source Array + * @param + * the type of items in the Array, and the type of items emitted by the resulting Observable + * @return an Observable that emits each item in the source Array + * @see {@link #toObservable(Object...)} + */ + public static Observable from(Scheduler scheduler, T... items) { + return toObservable(scheduler, items); + } + /** * Generates an observable sequence of integral numbers within a specified range. * @@ -1255,6 +1302,25 @@ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a list of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * a list of Observables that emit sequences of items + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source list of Observables + * @see MSDN: Observable.Merge + */ + public static Observable merge(List> source, Scheduler scheduler) { + return merge(source).subscribeOn(scheduler); + } + /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output @@ -1272,6 +1338,25 @@ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a + * Observable into one Observable sequence without any transformation. You can combine the output + * of multiple Observables so that they act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * an Observable that emits Observables + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the Observables emitted by the source Observable + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Observable> source, Scheduler scheduler) { + return merge(source).subscribeOn(scheduler); + } + /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they @@ -1289,6 +1374,25 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a series of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param source + * a series of Observables that emit sequences of items + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source Observables + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Scheduler scheduler, Observable... source) { + return merge(source).subscribeOn(scheduler); + } + /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * @@ -2212,6 +2316,27 @@ public static Observable toObservable(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } + /** + * Converts an Iterable sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the Iterable interface can be converted into an Observable that emits + * each iterable item in the object, by passing the object into the toObservable method. + *

+ * + * + * @param iterable + * the source Iterable sequence + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param + * the type of items in the iterable sequence and the type emitted by the resulting + * Observable + * @return an Observable that emits each item in the source Iterable sequence + */ + public static Observable toObservable(Iterable iterable, Scheduler scheduler) { + return toObservable(iterable).subscribeOn(scheduler); + } + /** * Converts an Future to an Observable sequence. * @@ -2231,6 +2356,27 @@ public static Observable toObservable(Future future) { return create(OperationToObservableFuture.toObservableFuture(future)); } + /** + * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param scheduler + * The {@link Scheduler} to wait for the future on. + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, Scheduler scheduler) { + return toObservable(future).subscribeOn(scheduler); + } + /** * Converts an Future to an Observable sequence. * @@ -2255,6 +2401,32 @@ public static Observable toObservable(Future future, long timeout, Tim return create(OperationToObservableFuture.toObservableFuture(future, timeout, unit)); } + /** + * Converts an Future to an Observable sequence that is subscribed to on the given {@link Scheduler}. + * + * Any object that supports the {@link Future} interface can be converted into an Observable that emits + * the return value of the get() method in the object, by passing the object into the toObservable method. + * The subscribe method on this synchronously so the Subscription returned doesn't nothing. + *

+ * This is blocking so the Subscription returned when calling {@link #subscribe(Observer)} does nothing. + * + * @param future + * the source {@link Future} + * @param timeout + * the maximum time to wait + * @param unit + * the time unit of the time argument + * @param scheduler + * The {@link Scheduler} to wait for the future on. + * @param + * the type of of object that the future's returns and the type emitted by the resulting + * Observable + * @return an Observable that emits the item from the source Future + */ + public static Observable toObservable(Future future, long timeout, TimeUnit unit, Scheduler scheduler) { + return toObservable(future, timeout, unit).subscribeOn(scheduler); + } + /** * Converts an Array sequence to an Observable sequence. * @@ -2274,6 +2446,27 @@ public static Observable toObservable(T... items) { return toObservable(Arrays.asList(items)); } + /** + * Converts an Array sequence to an Observable sequence which is subscribed to on the given {@link Scheduler}. + * + * An Array can be converted into an Observable that emits each item in the Array, by passing the + * Array into the toObservable method. + *

+ * + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param items + * the source Array + * @param + * the type of items in the Array, and the type of items emitted by the resulting + * Observable + * @return an Observable that emits each item in the source Array + */ + public static Observable toObservable(Scheduler scheduler, T... items) { + return toObservable(items).subscribeOn(scheduler); + } + /** * Sort T objects by their natural order (object must implement Comparable). *

diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index ec247d0b95..bec93cfdcd 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -115,7 +115,7 @@ public void testMergeWithImmediateScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.immediate()).map(new Func1() { + Observable o = Observable. merge(Schedulers.immediate(), o1, o2).map(new Func1() { @Override public String call(Integer t) { @@ -141,7 +141,7 @@ public void testMergeWithCurrentThreadScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.currentThread()).map(new Func1() { + Observable o = Observable. merge(Schedulers.currentThread(), o1, o2).map(new Func1() { @Override public String call(Integer t) { @@ -167,7 +167,7 @@ public void testMergeWithScheduler1() { Observable o1 = Observable. from(1, 2, 3, 4, 5); Observable o2 = Observable. from(6, 7, 8, 9, 10); @SuppressWarnings("unchecked") - Observable o = Observable. merge(o1, o2).subscribeOn(Schedulers.threadPoolForComputation()).map(new Func1() { + Observable o = Observable. merge(Schedulers.threadPoolForComputation(), o1, o2).map(new Func1() { @Override public String call(Integer t) {