Skip to content

Merge overload - possibility B #226

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -1255,6 +1255,25 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences from a list of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param source
* a list of Observables that emit sequences of items
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the <code>source</code> list of Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge</a>
*/
public static <T> Observable<T> merge(List<Observable<T>> source, Scheduler scheduler) {
return create(OperationMerge.merge(source, scheduler));
}

/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
Expand All @@ -1272,6 +1291,25 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences emitted by a sequence of Observables that are emitted by a
* Observable into one Observable sequence without any transformation. You can combine the output
* of multiple Observables so that they act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param source
* an Observable that emits Observables
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the Observables emitted by the <code>source</code> Observable
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<Observable<T>> source, Scheduler scheduler) {
return create(OperationMerge.merge(source, scheduler));
}

/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
Expand All @@ -1289,6 +1327,25 @@ public static <T> Observable<T> merge(Observable<T>... source) {
return create(OperationMerge.merge(source));
}

/**
* Flattens the Observable sequences from a series of Observables into one Observable sequence
* without any transformation. You can combine the output of multiple Observables so that they
* act like a single Observable, by using the <code>merge</code> method.
* <p>
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/merge.png">
*
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @param source
* a series of Observables that emit sequences of items
* @return an Observable that emits a sequence of elements that are the result of flattening the
* output from the <code>source</code> Observables
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Scheduler scheduler, Observable<T>... source) {
return create(OperationMerge.merge(scheduler, source));
}

/**
* Returns the values from the source observable sequence until the other observable sequence produces a value.
*
Expand Down
62 changes: 58 additions & 4 deletions rxjava-core/src/main/java/rx/operators/OperationMerge.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,11 @@

import rx.Observable;
import rx.Observer;
import rx.Scheduler;
import rx.Subscription;
import rx.util.AtomicObservableSubscription;
import rx.util.SynchronizedObserver;
import rx.util.functions.Func0;
import rx.util.functions.Func1;

public final class OperationMerge {
Expand All @@ -60,12 +62,59 @@ public Subscription call(Observer<T> observer) {
};
}

/**
* Flattens the observable sequences from the list of Observables into one observable sequence without any transformation.
*
* @param source
* An observable sequence of elements to project.
* @param scheduler
* The {@link Scheduler} that the sequence is subscribed to on.
* @return An observable sequence whose elements are the result of flattening the output from the list of Observables.
* @see http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx
*/
public static <T> Func1<Observer<T>, Subscription> merge(final Observable<Observable<T>> o, final Scheduler scheduler) {
return new Func1<Observer<T>, Subscription>() {

@Override
public Subscription call(final Observer<T> observer) {
return scheduler.schedule(new Func0<Subscription>() {

@Override
public Subscription call() {
return new MergeObservable<T>(o).call(observer);
}
});
}
};

}

public static <T> Func1<Observer<T>, Subscription> merge(final Observable<T>... sequences) {
return merge(Arrays.asList(sequences));
}

public static <T> Func1<Observer<T>, Subscription> merge(Scheduler scheduler, final Observable<T>... sequences) {
return merge(Arrays.asList(sequences), scheduler);
}

public static <T> Func1<Observer<T>, Subscription> merge(final List<Observable<T>> sequences) {
return merge(Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {
/*
* https://github.com/Netflix/RxJava/issues/19
*
* Schedulers.immedate() or NULL?
*
* Right now I see no reason to add yet more layers of wrapping on the stack to pass in 'immediate' when that
* is what happens by default without schedulers.
*
* In testing I see the exact same behavior with or without ... just more on the stack.
*
* If someone has a unit test that demonstrates why we should always go via a scheduler I'll gladly change this.
*/
return merge(sequences, null);
}

public static <T> Func1<Observer<T>, Subscription> merge(final List<Observable<T>> sequences, Scheduler scheduler) {
Observable<Observable<T>> o = Observable.create(new Func1<Observer<Observable<T>>, Subscription>() {

private volatile boolean unsubscribed = false;

Expand All @@ -92,7 +141,12 @@ public void unsubscribe() {

};
}
}));
});
if (scheduler == null) {
return merge(o);
} else {
return merge(o, scheduler);
}
}

/**
Expand Down Expand Up @@ -447,9 +501,9 @@ public void onNext(String v) {
// so I'm unfortunately reverting to using a Thread.sleep to allow the process scheduler time
// to make sure after o1.onNextBeingSent and o2.onNextBeingSent are hit that the following
// onNext is invoked.

Thread.sleep(300);

try { // in try/finally so threads are released via latch countDown even if assertion fails
assertEquals(1, concurrentCounter.get());
} finally {
Expand Down
68 changes: 68 additions & 0 deletions rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java
Original file line number Diff line number Diff line change
Expand Up @@ -245,4 +245,72 @@ public void call(Integer t) {
assertEquals(5, count.get());
}

@Test
public void testMixedSchedulers() throws InterruptedException {
final String mainThreadName = Thread.currentThread().getName();

Observable<String> o = Observable.<String> create(new Func1<Observer<String>, Subscription>() {

@Override
public Subscription call(Observer<String> observer) {

System.out.println("Origin observable is running on: " + Thread.currentThread().getName());

assertFalse(Thread.currentThread().getName().equals(mainThreadName));
assertTrue("Actually: " + Thread.currentThread().getName(), Thread.currentThread().getName().startsWith("RxIOThreadPool"));

observer.onNext("one");
observer.onNext("two");
observer.onNext("three");
observer.onCompleted();

return Subscriptions.empty();
}
}).subscribeOn(Schedulers.threadPoolForIO()); // subscribe to the source on the IO thread pool

// now merge on the CPU threadpool
o = Observable.<String> merge(o, Observable.<String> from("four", "five"))
.subscribeOn(Schedulers.threadPoolForComputation())
.map(new Func1<String, String>() {

@Override
public String call(String v) {
// opportunity to see what thread the merge is running on
System.out.println("Merge is running on: " + Thread.currentThread().getName());
return v;
}

});

final CountDownLatch latch = new CountDownLatch(1);

final AtomicReference<RuntimeException> onError = new AtomicReference<RuntimeException>();

// subscribe on a new thread
o.subscribe(new Observer<String>() {

@Override
public void onCompleted() {
System.out.println("==> received onCompleted");
latch.countDown();
}

@Override
public void onError(Exception e) {
System.out.println("==> received onError: " + e.getMessage());
onError.set((RuntimeException) e);
latch.countDown();
}

@Override
public void onNext(String v) {
System.out.println("==> Final subscribe is running on: " + Thread.currentThread().getName());
System.out.println("==> onNext: " + v);

}
}, Schedulers.newThread());

// wait for the above to finish or blow up if it's blocked
latch.await(5, TimeUnit.SECONDS);
}
}