From 017a0222d8e48254eb50e0334ae6c7056edd5c6d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 13:32:54 -0700 Subject: [PATCH 1/2] Merge scheduler overloads using Scheduler.schedule --- rxjava-core/src/main/java/rx/Observable.java | 57 +++++++++++++++++ .../java/rx/operators/OperationMerge.java | 62 +++++++++++++++++-- 2 files changed, 115 insertions(+), 4 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6d7f503084..47eca7ceb0 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1255,6 +1255,25 @@ public static Observable merge(List> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a list of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * a list of Observables that emit sequences of items + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source list of Observables + * @see MSDN: Observable.Merge + */ + public static Observable merge(List> source, Scheduler scheduler) { + return create(OperationMerge.merge(source, scheduler)); + } + /** * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a * Observable into one Observable sequence without any transformation. You can combine the output @@ -1272,6 +1291,25 @@ public static Observable merge(Observable> source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a + * Observable into one Observable sequence without any transformation. You can combine the output + * of multiple Observables so that they act like a single Observable, by using the merge method. + *

+ * + * + * @param source + * an Observable that emits Observables + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the Observables emitted by the source Observable + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Observable> source, Scheduler scheduler) { + return create(OperationMerge.merge(source, scheduler)); + } + /** * Flattens the Observable sequences from a series of Observables into one Observable sequence * without any transformation. You can combine the output of multiple Observables so that they @@ -1289,6 +1327,25 @@ public static Observable merge(Observable... source) { return create(OperationMerge.merge(source)); } + /** + * Flattens the Observable sequences from a series of Observables into one Observable sequence + * without any transformation. You can combine the output of multiple Observables so that they + * act like a single Observable, by using the merge method. + *

+ * + * + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @param source + * a series of Observables that emit sequences of items + * @return an Observable that emits a sequence of elements that are the result of flattening the + * output from the source Observables + * @see MSDN: Observable.Merge Method + */ + public static Observable merge(Scheduler scheduler, Observable... source) { + return create(OperationMerge.merge(scheduler, source)); + } + /** * Returns the values from the source observable sequence until the other observable sequence produces a value. * diff --git a/rxjava-core/src/main/java/rx/operators/OperationMerge.java b/rxjava-core/src/main/java/rx/operators/OperationMerge.java index d5aebe0ad5..aaf51dec48 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationMerge.java +++ b/rxjava-core/src/main/java/rx/operators/OperationMerge.java @@ -34,9 +34,11 @@ import rx.Observable; import rx.Observer; +import rx.Scheduler; import rx.Subscription; import rx.util.AtomicObservableSubscription; import rx.util.SynchronizedObserver; +import rx.util.functions.Func0; import rx.util.functions.Func1; public final class OperationMerge { @@ -60,12 +62,59 @@ public Subscription call(Observer observer) { }; } + /** + * Flattens the observable sequences from the list of Observables into one observable sequence without any transformation. + * + * @param source + * An observable sequence of elements to project. + * @param scheduler + * The {@link Scheduler} that the sequence is subscribed to on. + * @return An observable sequence whose elements are the result of flattening the output from the list of Observables. + * @see http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx + */ + public static Func1, Subscription> merge(final Observable> o, final Scheduler scheduler) { + return new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + return scheduler.schedule(new Func0() { + + @Override + public Subscription call() { + return new MergeObservable(o).call(observer); + } + }); + } + }; + + } + public static Func1, Subscription> merge(final Observable... sequences) { return merge(Arrays.asList(sequences)); } + public static Func1, Subscription> merge(Scheduler scheduler, final Observable... sequences) { + return merge(Arrays.asList(sequences), scheduler); + } + public static Func1, Subscription> merge(final List> sequences) { - return merge(Observable.create(new Func1>, Subscription>() { + /* + * https://github.com/Netflix/RxJava/issues/19 + * + * Schedulers.immedate() or NULL? + * + * Right now I see no reason to add yet more layers of wrapping on the stack to pass in 'immediate' when that + * is what happens by default without schedulers. + * + * In testing I see the exact same behavior with or without ... just more on the stack. + * + * If someone has a unit test that demonstrates why we should always go via a scheduler I'll gladly change this. + */ + return merge(sequences, null); + } + + public static Func1, Subscription> merge(final List> sequences, Scheduler scheduler) { + Observable> o = Observable.create(new Func1>, Subscription>() { private volatile boolean unsubscribed = false; @@ -92,7 +141,12 @@ public void unsubscribe() { }; } - })); + }); + if (scheduler == null) { + return merge(o); + } else { + return merge(o, scheduler); + } } /** @@ -447,9 +501,9 @@ public void onNext(String v) { // so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time // to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following // onNext is invoked. - + Thread.sleep(300); - + try { // in try/finally so threads are released via latch countDown even if assertion fails assertEquals(1, concurrentCounter.get()); } finally { From 179ef098a11a2a505de91655eec90d5b37848fae Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Fri, 5 Apr 2013 13:33:15 -0700 Subject: [PATCH 2/2] Unit test mixing several schedulers - how should this behave? --- .../java/rx/concurrency/TestSchedulers.java | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) diff --git a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java index ec247d0b95..1444bb802c 100644 --- a/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java +++ b/rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java @@ -245,4 +245,72 @@ public void call(Integer t) { assertEquals(5, count.get()); } + @Test + public void testMixedSchedulers() throws InterruptedException { + final String mainThreadName = Thread.currentThread().getName(); + + Observable o = Observable. create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + + System.out.println("Origin observable is running on: " + Thread.currentThread().getName()); + + assertFalse(Thread.currentThread().getName().equals(mainThreadName)); + assertTrue("Actually: " + Thread.currentThread().getName(), Thread.currentThread().getName().startsWith("RxIOThreadPool")); + + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onCompleted(); + + return Subscriptions.empty(); + } + }).subscribeOn(Schedulers.threadPoolForIO()); // subscribe to the source on the IO thread pool + + // now merge on the CPU threadpool + o = Observable. merge(o, Observable. from("four", "five")) + .subscribeOn(Schedulers.threadPoolForComputation()) + .map(new Func1() { + + @Override + public String call(String v) { + // opportunity to see what thread the merge is running on + System.out.println("Merge is running on: " + Thread.currentThread().getName()); + return v; + } + + }); + + final CountDownLatch latch = new CountDownLatch(1); + + final AtomicReference onError = new AtomicReference(); + + // subscribe on a new thread + o.subscribe(new Observer() { + + @Override + public void onCompleted() { + System.out.println("==> received onCompleted"); + latch.countDown(); + } + + @Override + public void onError(Exception e) { + System.out.println("==> received onError: " + e.getMessage()); + onError.set((RuntimeException) e); + latch.countDown(); + } + + @Override + public void onNext(String v) { + System.out.println("==> Final subscribe is running on: " + Thread.currentThread().getName()); + System.out.println("==> onNext: " + v); + + } + }, Schedulers.newThread()); + + // wait for the above to finish or blow up if it's blocked + latch.await(5, TimeUnit.SECONDS); + } }