diff --git a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java index 8a581667ae..272ac6af7f 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCombineLatest.java @@ -15,17 +15,18 @@ */ package rx.operators; -import java.util.LinkedList; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.BitSet; import java.util.List; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; import rx.Observable; import rx.Observable.OnSubscribeFunc; import rx.Observer; import rx.Subscription; +import rx.subscriptions.CompositeSubscription; import rx.util.functions.Func2; import rx.util.functions.Func3; import rx.util.functions.Func4; @@ -59,290 +60,256 @@ public class OperationCombineLatest { * The aggregation function used to combine the source observable values. * @return A function from an observer to a subscription. This can be used to create an observable from. */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - return a; + return new CombineLatest(Arrays.asList(w0, w1), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Func3 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Func4 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Observable w4, Func5 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - a.addObserver(new CombineObserver(a, w4)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3, w4), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Observable w4, Observable w5, Func6 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - a.addObserver(new CombineObserver(a, w4)); - a.addObserver(new CombineObserver(a, w5)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3, w4, w5), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Observable w4, Observable w5, Observable w6, Func7 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - a.addObserver(new CombineObserver(a, w4)); - a.addObserver(new CombineObserver(a, w5)); - a.addObserver(new CombineObserver(a, w6)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3, w4, w5, w6), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Observable w4, Observable w5, Observable w6, Observable w7, Func8 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - a.addObserver(new CombineObserver(a, w4)); - a.addObserver(new CombineObserver(a, w5)); - a.addObserver(new CombineObserver(a, w6)); - a.addObserver(new CombineObserver(a, w7)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3, w4, w5, w6, w7), Functions.fromFunc(combineLatestFunction)); } /** * @see #combineLatest(Observable w0, Observable w1, Func2 combineLatestFunction) */ + @SuppressWarnings("unchecked") public static OnSubscribeFunc combineLatest(Observable w0, Observable w1, Observable w2, Observable w3, Observable w4, Observable w5, Observable w6, Observable w7, Observable w8, Func9 combineLatestFunction) { - Aggregator a = new Aggregator(Functions.fromFunc(combineLatestFunction)); - a.addObserver(new CombineObserver(a, w0)); - a.addObserver(new CombineObserver(a, w1)); - a.addObserver(new CombineObserver(a, w2)); - a.addObserver(new CombineObserver(a, w3)); - a.addObserver(new CombineObserver(a, w4)); - a.addObserver(new CombineObserver(a, w5)); - a.addObserver(new CombineObserver(a, w6)); - a.addObserver(new CombineObserver(a, w7)); - a.addObserver(new CombineObserver(a, w8)); - return a; + return new CombineLatest(Arrays.asList(w0, w1, w2, w3, w4, w5, w6, w7, w8), Functions.fromFunc(combineLatestFunction)); } - /* package accessible for unit tests */static class CombineObserver implements Observer { - final Observable w; - final Aggregator a; - private Subscription subscription; + static final class CombineLatest implements OnSubscribeFunc { + final List> sources; + final FuncN combiner; - public CombineObserver(Aggregator a, Observable w) { - this.a = a; - this.w = w; - } - - private void startWatching() { - if (subscription != null) { - throw new RuntimeException("This should only be called once."); + public CombineLatest(Iterable> sources, FuncN combiner) { + this.sources = new ArrayList>(); + this.combiner = combiner; + for (Observable source : sources) { + this.sources.add(source); } - subscription = w.subscribe(this); } @Override - public void onCompleted() { - a.complete(this); - } - - @Override - public void onError(Throwable e) { - a.error(e); - } - - @Override - public void onNext(T args) { - a.next(this, args); - } - } - - /** - * Receive notifications from each of the observables we are reducing and execute the combineLatestFunction - * whenever we have received an event from one of the observables, as soon as each Observable has received - * at least one event. - */ - /* package accessible for unit tests */static class Aggregator implements OnSubscribeFunc { - - private volatile Observer observer; - - private final FuncN combineLatestFunction; - private final AtomicBoolean running = new AtomicBoolean(true); - - // Stores how many observers have already completed - private final AtomicInteger numCompleted = new AtomicInteger(0); - - /** - * The latest value from each observer. - */ - private final Map, Object> latestValue = new ConcurrentHashMap, Object>(); - - /** - * Ordered list of observers to combine. - * No synchronization is necessary as these can not be added or changed asynchronously. - */ - private final List> observers = new LinkedList>(); - - public Aggregator(FuncN combineLatestFunction) { - this.combineLatestFunction = combineLatestFunction; - } - - /** - * Receive notification of a Observer starting (meaning we should require it for aggregation) - * - * @param w - * The observer to add. - */ - void addObserver(CombineObserver w) { - observers.add(w); - } + public Subscription onSubscribe(Observer t1) { + CompositeSubscription csub = new CompositeSubscription(); + + Collector collector = new Collector(t1, csub, sources.size()); + + int index = 0; + List observers = new ArrayList(sources.size() + 1); + for (Observable source : sources) { + SafeObservableSubscription sas = new SafeObservableSubscription(); + csub.add(sas); + observers.add(new SourceObserver(collector, sas, index, source)); + index++; + } - /** - * Receive notification of a Observer completing its iterations. - * - * @param w - * The observer that has completed. - */ - void complete(CombineObserver w) { - int completed = numCompleted.incrementAndGet(); - // if all CombineObservers are completed, we mark the whole thing as completed - if (completed == observers.size()) { - if (running.get()) { - // mark ourselves as done - observer.onCompleted(); - // just to ensure we stop processing in case we receive more onNext/complete/error calls after this - running.set(false); + for (SourceObserver so : observers) { + // if we run to completion, don't bother any further + if (!csub.isUnsubscribed()) { + so.connect(); } } - } - /** - * Receive error for a Observer. Throw the error up the chain and stop processing. - */ - void error(Throwable e) { - observer.onError(e); - /* tell all observers to unsubscribe since we had an error */ - stop(); + return csub; } /** - * Receive the next value from an observer. - *

- * If we have received values from all observers, trigger the combineLatest function, otherwise store the value and keep waiting. - * - * @param w - * @param arg + * The collector that combines the latest values from many sources. */ - void next(CombineObserver w, T arg) { - if (observer == null) { - throw new RuntimeException("This shouldn't be running if an Observer isn't registered"); + final class Collector { + final Observer observer; + final Subscription cancel; + final Lock lock; + final Object[] values; + /** Bitmap to keep track who produced a value already. */ + final BitSet hasValue; + /** Bitmap to keep track who has completed. */ + final BitSet completed; + /** Number of source observers who have produced a value. */ + int hasCount; + /** Number of completed source observers. */ + int completedCount; + + public Collector(Observer observer, Subscription cancel, int count) { + this.observer = observer; + this.cancel = cancel; + this.values = new Object[count]; + this.hasValue = new BitSet(count); + this.completed = new BitSet(count); + this.lock = new ReentrantLock(); } - /* if we've been 'unsubscribed' don't process anything further even if the things we're watching keep sending (likely because they are not responding to the unsubscribe call) */ - if (!running.get()) { - return; + public void next(int index, T value) { + Throwable err = null; + lock.lock(); + try { + if (!isTerminated()) { + values[index] = value; + if (!hasValue.get(index)) { + hasValue.set(index); + hasCount++; + } + if (hasCount == values.length) { + // clone: defensive copy due to varargs + try { + observer.onNext(combiner.call(values.clone())); + } catch (Throwable t) { + terminate(); + err = t; + } + } + } + } finally { + lock.unlock(); + } + if (err != null) { + // no need to lock here + observer.onError(err); + cancel.unsubscribe(); + } } - // remember this as the latest value for this observer - latestValue.put(w, arg); + public void error(int index, Throwable e) { + boolean unsub = false; + lock.lock(); + try { + if (!isTerminated()) { + terminate(); + unsub = true; + } + } finally { + lock.unlock(); + } + if (unsub) { + observer.onError(e); + cancel.unsubscribe(); + } + } - if (latestValue.size() < observers.size()) { - // we don't have a value yet for each observer to combine, so we don't have a combined value yet either - return; + boolean isTerminated() { + return completedCount == values.length + 1; } - Object[] argsToCombineLatest = new Object[observers.size()]; - int i = 0; - for (CombineObserver _w : observers) { - argsToCombineLatest[i++] = latestValue.get(_w); + void terminate() { + completedCount = values.length + 1; + Arrays.fill(values, null); } - try { - R combinedValue = combineLatestFunction.call(argsToCombineLatest); - observer.onNext(combinedValue); - } catch (Throwable ex) { - observer.onError(ex); + public void completed(int index) { + boolean unsub = false; + lock.lock(); + try { + if (!completed.get(index)) { + completed.set(index); + completedCount++; + } + if ((!hasValue.get(index) || completedCount == values.length) + && !isTerminated()) { + terminate(); + unsub = true; + } + } finally { + lock.unlock(); + } + if (unsub) { + // no need to hold a lock at this point + observer.onCompleted(); + cancel.unsubscribe(); + } } } - @Override - public Subscription onSubscribe(Observer observer) { - if (this.observer != null) { - throw new IllegalStateException("Only one Observer can subscribe to this Observable."); + /** + * Observes a specific source and communicates with the collector. + */ + final class SourceObserver implements Observer { + final SafeObservableSubscription self; + final Collector collector; + final int index; + Observable source; + + public SourceObserver(Collector collector, + SafeObservableSubscription self, int index, + Observable source) { + this.self = self; + this.collector = collector; + this.index = index; + this.source = source; } - SafeObservableSubscription subscription = new SafeObservableSubscription(new Subscription() { - @Override - public void unsubscribe() { - stop(); - } - }); - this.observer = new SynchronizedObserver(observer, subscription); + @Override + public void onNext(T args) { + collector.next(index, args); + } - /* start the observers */ - for (CombineObserver rw : observers) { - rw.startWatching(); + @Override + public void onError(Throwable e) { + collector.error(index, e); } - return subscription; - } + @Override + public void onCompleted() { + collector.completed(index); + self.unsubscribe(); + } - private void stop() { - /* tell ourselves to stop processing onNext events */ - running.set(false); - /* propogate to all observers to unsubscribe */ - for (CombineObserver rw : observers) { - if (rw.subscription != null) { - rw.subscription.unsubscribe(); - } + /** Connect to the source. */ + void connect() { + self.wrap(source.subscribe(this)); + source = null; } } } diff --git a/rxjava-core/src/test/java/rx/operators/OperationCombineLatestTest.java b/rxjava-core/src/test/java/rx/operators/OperationCombineLatestTest.java index c576337e1e..70d34bd887 100644 --- a/rxjava-core/src/test/java/rx/operators/OperationCombineLatestTest.java +++ b/rxjava-core/src/test/java/rx/operators/OperationCombineLatestTest.java @@ -28,8 +28,7 @@ import rx.Observable; import rx.Observer; import rx.Subscription; -import rx.operators.OperationCombineLatest.Aggregator; -import rx.operators.OperationCombineLatest.CombineObserver; +import rx.subjects.PublishSubject; import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; import rx.util.functions.Func3; @@ -176,321 +175,7 @@ public void testCombineLatestWithInterleavingSequences() { inOrder.verify(w, times(1)).onCompleted(); } - /** - * Testing internal private logic due to the complexity so I want to use TDD to test as a I build it rather than relying purely on the overall functionality expected by the public methods. - */ - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorSimple() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - - InOrder inOrder = inOrder(aObserver); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - inOrder.verify(aObserver, times(1)).onNext("helloworld"); - - a.next(r1, "hello "); - a.next(r2, "again"); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - inOrder.verify(aObserver, times(1)).onNext("hello again"); - - a.complete(r1); - a.complete(r2); - - inOrder.verify(aObserver, never()).onNext(anyString()); - verify(aObserver, times(1)).onCompleted(); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorDifferentSizedResultsWithOnComplete() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - a.complete(r2); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("helloworld"); - - a.next(r1, "hi"); - a.complete(r1); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("hiworld"); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregateMultipleTypes() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - a.complete(r2); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("helloworld"); - - a.next(r1, "hi"); - a.complete(r1); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, times(1)).onCompleted(); - verify(aObserver, times(1)).onNext("hiworld"); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregate3Types() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - CombineObserver r3 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - a.addObserver(r3); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, 2); - a.next(r3, new int[] { 5, 6, 7 }); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("hello2[5, 6, 7]"); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorsWithDifferentSizesAndTiming() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "one"); - a.next(r1, "two"); - a.next(r1, "three"); - a.next(r2, "A"); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("threeA"); - - a.next(r1, "four"); - a.complete(r1); - a.next(r2, "B"); - verify(aObserver, times(1)).onNext("fourB"); - a.next(r2, "C"); - verify(aObserver, times(1)).onNext("fourC"); - a.next(r2, "D"); - verify(aObserver, times(1)).onNext("fourD"); - a.next(r2, "E"); - verify(aObserver, times(1)).onNext("fourE"); - a.complete(r2); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, times(1)).onCompleted(); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorError() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("helloworld"); - - a.error(new RuntimeException("")); - a.next(r1, "hello"); - a.next(r2, "again"); - - verify(aObserver, times(1)).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - // we don't want to be called again after an error - verify(aObserver, times(0)).onNext("helloagain"); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorUnsubscribe() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Subscription subscription = Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "hello"); - a.next(r2, "world"); - - verify(aObserver, never()).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - verify(aObserver, times(1)).onNext("helloworld"); - - subscription.unsubscribe(); - a.next(r1, "hello"); - a.next(r2, "again"); - - verify(aObserver, times(0)).onError(any(Throwable.class)); - verify(aObserver, never()).onCompleted(); - // we don't want to be called again after an error - verify(aObserver, times(0)).onNext("helloagain"); - } - - @SuppressWarnings("unchecked") - /* mock calls don't do generics */ - @Test - public void testAggregatorEarlyCompletion() { - FuncN combineLatestFunction = getConcatCombineLatestFunction(); - /* create the aggregator which will execute the combineLatest function when all Observables provide values */ - Aggregator a = new Aggregator(combineLatestFunction); - - /* define a Observer to receive aggregated events */ - Observer aObserver = mock(Observer.class); - Observable.create(a).subscribe(aObserver); - - /* mock the Observable Observers that are 'pushing' data for us */ - CombineObserver r1 = mock(CombineObserver.class); - CombineObserver r2 = mock(CombineObserver.class); - - /* pretend we're starting up */ - a.addObserver(r1); - a.addObserver(r2); - - /* simulate the Observables pushing data into the aggregator */ - a.next(r1, "one"); - a.next(r1, "two"); - a.complete(r1); - a.next(r2, "A"); - - InOrder inOrder = inOrder(aObserver); - - inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, never()).onCompleted(); - inOrder.verify(aObserver, times(1)).onNext("twoA"); - - a.complete(r2); - - inOrder.verify(aObserver, never()).onError(any(Throwable.class)); - inOrder.verify(aObserver, times(1)).onCompleted(); - // we shouldn't get this since completed is called before any other onNext calls could trigger this - inOrder.verify(aObserver, never()).onNext(anyString()); - } - + @SuppressWarnings("unchecked") /* mock calls don't do generics */ @Test @@ -633,4 +318,158 @@ public Subscription onSubscribe(Observer observer) { } } + + Func2 or = new Func2() { + @Override + public Integer call(Integer t1, Integer t2) { + return t1 | t2; + } + }; + + @Test + public void combineSimple() { + PublishSubject a = PublishSubject.create(); + PublishSubject b = PublishSubject.create(); + + Observable source = Observable.combineLatest(a, b, or); + + Observer observer = mock(Observer.class); + + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + a.onNext(1); + + inOrder.verify(observer, never()).onNext(any()); + + a.onNext(2); + + inOrder.verify(observer, never()).onNext(any()); + + b.onNext(0x10); + + inOrder.verify(observer, times(1)).onNext(0x12); + + b.onNext(0x20); + inOrder.verify(observer, times(1)).onNext(0x22); + + b.onCompleted(); + + inOrder.verify(observer, never()).onCompleted(); + + a.onCompleted(); + + inOrder.verify(observer, times(1)).onCompleted(); + + a.onNext(3); + b.onNext(0x30); + a.onCompleted(); + b.onCompleted(); + + inOrder.verifyNoMoreInteractions(); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void combineMultipleObservers() { + PublishSubject a = PublishSubject.create(); + PublishSubject b = PublishSubject.create(); + + Observable source = Observable.combineLatest(a, b, or); + + Observer observer1 = mock(Observer.class); + Observer observer2 = mock(Observer.class); + + source.subscribe(observer1); + source.subscribe(observer2); + + InOrder inOrder1 = inOrder(observer1); + InOrder inOrder2 = inOrder(observer2); + + a.onNext(1); + + inOrder1.verify(observer1, never()).onNext(any()); + inOrder2.verify(observer2, never()).onNext(any()); + + a.onNext(2); + + inOrder1.verify(observer1, never()).onNext(any()); + inOrder2.verify(observer2, never()).onNext(any()); + + b.onNext(0x10); + + inOrder1.verify(observer1, times(1)).onNext(0x12); + inOrder2.verify(observer2, times(1)).onNext(0x12); + + b.onNext(0x20); + inOrder1.verify(observer1, times(1)).onNext(0x22); + inOrder2.verify(observer2, times(1)).onNext(0x22); + + b.onCompleted(); + + inOrder1.verify(observer1, never()).onCompleted(); + inOrder2.verify(observer2, never()).onCompleted(); + + a.onCompleted(); + + inOrder1.verify(observer1, times(1)).onCompleted(); + inOrder2.verify(observer2, times(1)).onCompleted(); + + a.onNext(3); + b.onNext(0x30); + a.onCompleted(); + b.onCompleted(); + + inOrder1.verifyNoMoreInteractions(); + inOrder2.verifyNoMoreInteractions(); + verify(observer1, never()).onError(any(Throwable.class)); + verify(observer2, never()).onError(any(Throwable.class)); + } + @Test + public void testFirstNeverProduces() { + PublishSubject a = PublishSubject.create(); + PublishSubject b = PublishSubject.create(); + + Observable source = Observable.combineLatest(a, b, or); + + Observer observer = mock(Observer.class); + + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + b.onNext(0x10); + b.onNext(0x20); + + a.onCompleted(); + + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onNext(any()); + verify(observer, never()).onError(any(Throwable.class)); + } + + @Test + public void testSecondNeverProduces() { + PublishSubject a = PublishSubject.create(); + PublishSubject b = PublishSubject.create(); + + Observable source = Observable.combineLatest(a, b, or); + + Observer observer = mock(Observer.class); + + source.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + a.onNext(0x1); + a.onNext(0x2); + + b.onCompleted(); + a.onCompleted(); + + inOrder.verify(observer, times(1)).onCompleted(); + verify(observer, never()).onNext(any()); + verify(observer, never()).onError(any(Throwable.class)); + } }