From 2e0d61d338020e1ad27298736528aced62265750 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 6 May 2013 23:43:00 -0700 Subject: [PATCH 1/5] Publish operator on Observable https://github.com/Netflix/RxJava/issues/15 --- rxjava-core/src/main/java/rx/Observable.java | 129 +++++++++++++++---- 1 file changed, 104 insertions(+), 25 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 9336237c7e..240018a224 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -77,6 +77,7 @@ import rx.plugins.RxJavaErrorHandler; import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; +import rx.subjects.PublishSubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -1665,6 +1666,17 @@ public static Observable onErrorReturn(final Observable that, Func1 ConnectableObservable publish(final Observable that) { + return OperationMulticast.multicast(that, PublishSubject. create()); + } + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1720,7 +1732,7 @@ public T call(T t1, T t2) { public static Observable aggregate(Observable sequence, Func2 accumulator) { return reduce(sequence, accumulator); } - + /** * Used by dynamic languages. * @@ -1729,7 +1741,7 @@ public static Observable aggregate(Observable sequence, Func2 public static Observable aggregate(Observable sequence, Object accumulator) { return reduce(sequence, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -1787,7 +1799,7 @@ public R call(R r, T t) { public static Observable aggregate(Observable sequence, R initialValue, Func2 accumulator) { return reduce(sequence, initialValue, accumulator); } - + /** * Used by dynamic languages. * @@ -1796,7 +1808,7 @@ public static Observable aggregate(Observable sequence, R initialVa public static Observable aggregate(Observable sequence, R initialValue, Object accumulator) { return reduce(sequence, initialValue, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -2003,7 +2015,7 @@ public static Observable takeLast(final Observable items, final int co * @param items * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhile(final Observable items, Func1 predicate) { return create(OperationTakeWhile.takeWhile(items, predicate)); @@ -2015,7 +2027,7 @@ public static Observable takeWhile(final Observable items, Func1 Observable takeWhile(final Observable items, Object predicate) { @SuppressWarnings("rawtypes") @@ -2035,7 +2047,7 @@ public Boolean call(T t) { * @param items * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public static Observable takeWhileWithIndex(final Observable items, Func2 predicate) { return create(OperationTakeWhile.takeWhileWithIndex(items, predicate)); @@ -2057,12 +2069,13 @@ public Boolean call(T t, Integer integer) /** * Adds a timestamp to each item emitted by this observable. + * * @return An observable sequence of timestamped items. */ public Observable> timestamp() { return create(OperationTimestamp.timestamp(this)); } - + /** * Return a Future representing a single value of the Observable. *

@@ -2384,7 +2397,7 @@ public static Observable toObservable(T... items) { * @param sequence * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence) { return create(OperationToObservableSortedList.toSortedList(sequence)); @@ -2397,7 +2410,7 @@ public static Observable> toSortedList(Observable sequence) { * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, Func2 sortFunction) { return create(OperationToObservableSortedList.toSortedList(sequence, sortFunction)); @@ -2410,7 +2423,7 @@ public static Observable> toSortedList(Observable sequence, Func2 * * @param sequence * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public static Observable> toSortedList(Observable sequence, final Object sortFunction) { @SuppressWarnings("rawtypes") @@ -3186,6 +3199,15 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable publish() { + return OperationMulticast.multicast(this, PublishSubject. create()); + } + /** * Used by dynamic languages. * @@ -3201,7 +3223,7 @@ public Observable reduce(Object accumulator) { public Observable aggregate(Func2 accumulator) { return aggregate(this, accumulator); } - + /** * Used by dynamic languages. * @@ -3210,7 +3232,7 @@ public Observable aggregate(Func2 accumulator) { public Observable aggregate(Object accumulator) { return aggregate(this, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3263,7 +3285,7 @@ public Observable aggregate(R initialValue, Func2 accumulator) { public Observable aggregate(R initialValue, Object accumulator) { return aggregate(this, initialValue, accumulator); } - + /** * Returns an Observable that applies a function of your choosing to the first item emitted by a * source Observable, then feeds the result of that function along with the second item emitted @@ -3298,7 +3320,7 @@ public Observable scan(Func2 accumulator) { public Observable sample(long period, TimeUnit unit) { return create(OperationSample.sample(this, period, unit)); } - + /** * Samples the observable sequence at each interval. * @@ -3313,10 +3335,10 @@ public Observable sample(long period, TimeUnit unit) { public Observable sample(long period, TimeUnit unit, Scheduler scheduler) { return create(OperationSample.sample(this, period, unit, scheduler)); } - + /** * Used by dynamic languages. - * + * * @see #scan(Func2) */ public Observable scan(final Object accumulator) { @@ -3348,7 +3370,7 @@ public Observable scan(R initialValue, Func2 accumulator) { /** * Used by dynamic languages. - * + * * @see #scan(Object, Func2) */ public Observable scan(final R initialValue, final Object accumulator) { @@ -3419,7 +3441,7 @@ public Observable take(final int num) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Func1 predicate) { return takeWhile(this, predicate); @@ -3430,7 +3452,7 @@ public Observable takeWhile(final Func1 predicate) { * * @param predicate * a function to test each source element for a condition - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhile(final Object predicate) { return takeWhile(this, predicate); @@ -3441,7 +3463,7 @@ public Observable takeWhile(final Object predicate) { * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Func2 predicate) { return takeWhileWithIndex(this, predicate); @@ -3452,7 +3474,7 @@ public Observable takeWhileWithIndex(final Func2 predica * * @param predicate * a function to test each element for a condition; the second parameter of the function represents the index of the source element; otherwise, false. - * @return the values from the start of the given sequence + * @return the values from the start of the given sequence */ public Observable takeWhileWithIndex(final Object predicate) { return takeWhileWithIndex(this, predicate); @@ -3523,7 +3545,7 @@ public Observable> toList() { * * @throws ClassCastException * if T objects do not implement Comparable - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList() { return toSortedList(this); @@ -3535,7 +3557,7 @@ public Observable> toSortedList() { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(Func2 sortFunction) { return toSortedList(this, sortFunction); @@ -3547,7 +3569,7 @@ public Observable> toSortedList(Func2 sortFunction) { * * * @param sortFunction - * @return an observable containing the sorted list + * @return an observable containing the sorted list */ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); @@ -4140,6 +4162,63 @@ public void call(String t1) { } } + @Test + public void testPublish() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }).publish(); + + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + Subscription s = o.connect(); + try { + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } From f4669d183824b1af401111fd079cb9f71577a55a Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 6 May 2013 23:57:16 -0700 Subject: [PATCH 2/5] Replay operator on Observable https://github.com/Netflix/RxJava/issues/71 --- rxjava-core/src/main/java/rx/Observable.java | 85 +++++++++++++++++++- 1 file changed, 83 insertions(+), 2 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 240018a224..51fd53bd2a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -78,6 +78,7 @@ import rx.plugins.RxJavaObservableExecutionHook; import rx.plugins.RxJavaPlugins; import rx.subjects.PublishSubject; +import rx.subjects.ReplaySubject; import rx.subjects.Subject; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -1666,6 +1667,17 @@ public static Observable onErrorReturn(final Observable that, Func1 ConnectableObservable replay(final Observable that) { + return OperationMulticast.multicast(that, ReplaySubject. create()); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * @@ -3199,13 +3211,22 @@ public Observable reduce(Func2 accumulator) { return reduce(this, accumulator); } + /** + * Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications. + * + * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. + */ + public ConnectableObservable replay() { + return replay(this); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * * @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject. */ public ConnectableObservable publish() { - return OperationMulticast.multicast(this, PublishSubject. create()); + return publish(this); } /** @@ -4174,10 +4195,10 @@ public Subscription call(final Observer observer) { @Override public void run() { + counter.incrementAndGet(); System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); - counter.incrementAndGet(); } }).start(); return subscription; @@ -4219,6 +4240,66 @@ public void call(String v) { } } + @Test + public void testReplay() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + ConnectableObservable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }).replay(); + + // we connect immediately and it will emit the value + Subscription s = o.connect(); + try { + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } finally { + s.unsubscribe(); + } + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } From ada6a68bd5d80032ffbb385178ba41bb8e883edd Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 7 May 2013 00:30:38 -0700 Subject: [PATCH 3/5] Cache operator Cache operator as discussed in https://github.com/Netflix/RxJava/pull/209 Similar to `replay()` except that this auto-subscribes to the source sequence. This comes with the same cautions as `toList` when dealing with infinite or very large sequences. --- rxjava-core/src/main/java/rx/Observable.java | 86 ++++++++++++ .../java/rx/operators/OperationCache.java | 128 ++++++++++++++++++ 2 files changed, 214 insertions(+) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationCache.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 51fd53bd2a..560b79e02f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -39,6 +39,7 @@ import rx.observables.ConnectableObservable; import rx.observables.GroupedObservable; import rx.operators.OperationAll; +import rx.operators.OperationCache; import rx.operators.OperationConcat; import rx.operators.OperationDefer; import rx.operators.OperationDematerialize; @@ -1678,6 +1679,22 @@ public static ConnectableObservable replay(final Observable that) { return OperationMulticast.multicast(that, ReplaySubject. create()); } + /** + * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. + */ + public static Observable cache(final Observable that) { + return create(OperationCache.cache(that)); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * @@ -3220,6 +3237,22 @@ public ConnectableObservable replay() { return replay(this); } + /** + * Similar to {@link #replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + * @return an observable sequence that upon first subscription caches all events for subsequent subscriptions. + */ + public Observable cache() { + return cache(this); + } + /** * Returns a connectable observable sequence that shares a single subscription to the underlying sequence. * @@ -4300,6 +4333,59 @@ public void call(String v) { } } + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }).cache(); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + private static class TestException extends RuntimeException { private static final long serialVersionUID = 1L; } diff --git a/rxjava-core/src/main/java/rx/operators/OperationCache.java b/rxjava-core/src/main/java/rx/operators/OperationCache.java new file mode 100644 index 0000000000..169d332626 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationCache.java @@ -0,0 +1,128 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.junit.Assert.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; + +import org.junit.Test; + +import rx.Observable; +import rx.Observer; +import rx.Subscription; +import rx.subjects.ReplaySubject; +import rx.subscriptions.BooleanSubscription; +import rx.util.functions.Action1; +import rx.util.functions.Func1; + +/** + * Similar to {@link Observable#replay()} except that this auto-subscribes to the source sequence. + *

+ * This is useful when returning an Observable that you wish to cache responses but can't control the + * subscribe/unsubscribe behavior of all the Observers. + *

+ * NOTE: You sacrifice the ability to unsubscribe from the origin with this operator so be careful to not + * use this on infinite or very large sequences that will use up memory. This is similar to + * the {@link Observable#toList()} operator in this caution. + * + */ +public class OperationCache { + + public static Func1, Subscription> cache(final Observable source) { + return new Func1, Subscription>() { + + final AtomicBoolean subscribed = new AtomicBoolean(false); + private final ReplaySubject cache = ReplaySubject.create(); + + @Override + public Subscription call(Observer observer) { + if (subscribed.compareAndSet(false, true)) { + // subscribe to the source once + source.subscribe(cache); + /* + * Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values. + * + * This means this should never be used on an infinite or very large sequence, similar to toList(). + */ + } + + return cache.subscribe(observer); + } + + }; + } + + public static class UnitTest { + + @Test + public void testCache() throws InterruptedException { + final AtomicInteger counter = new AtomicInteger(); + Observable o = Observable.create(cache(Observable.create(new Func1, Subscription>() { + + @Override + public Subscription call(final Observer observer) { + final BooleanSubscription subscription = new BooleanSubscription(); + new Thread(new Runnable() { + + @Override + public void run() { + System.out.println("published observable being executed"); + observer.onNext("one"); + observer.onCompleted(); + counter.incrementAndGet(); + } + }).start(); + return subscription; + } + }))); + + // we then expect the following 2 subscriptions to get that same value + final CountDownLatch latch = new CountDownLatch(2); + + // subscribe once + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + // subscribe again + o.subscribe(new Action1() { + + @Override + public void call(String v) { + assertEquals("one", v); + System.out.println("v: " + v); + latch.countDown(); + } + }); + + if (!latch.await(1000, TimeUnit.MILLISECONDS)) { + fail("subscriptions did not receive values"); + } + assertEquals(1, counter.get()); + } + } + +} From cf8a50201cf252a407b3007cc0efb81e6a34752e Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 7 May 2013 00:42:17 -0700 Subject: [PATCH 4/5] Re-order count increment to make unit tests deterministic --- rxjava-core/src/main/java/rx/Observable.java | 4 ++-- rxjava-core/src/main/java/rx/operators/OperationCache.java | 2 +- 2 files changed, 3 insertions(+), 3 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 560b79e02f..da6f58525a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4285,10 +4285,10 @@ public Subscription call(final Observer observer) { @Override public void run() { + counter.incrementAndGet(); System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); - counter.incrementAndGet(); } }).start(); return subscription; @@ -4345,10 +4345,10 @@ public Subscription call(final Observer observer) { @Override public void run() { + counter.incrementAndGet(); System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); - counter.incrementAndGet(); } }).start(); return subscription; diff --git a/rxjava-core/src/main/java/rx/operators/OperationCache.java b/rxjava-core/src/main/java/rx/operators/OperationCache.java index 169d332626..5a5d932bad 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationCache.java +++ b/rxjava-core/src/main/java/rx/operators/OperationCache.java @@ -83,10 +83,10 @@ public Subscription call(final Observer observer) { @Override public void run() { + counter.incrementAndGet(); System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); - counter.incrementAndGet(); } }).start(); return subscription; From 031f1ea5a08320ece917e1db3145e8af551b2879 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 7 May 2013 07:47:10 -0700 Subject: [PATCH 5/5] remove debug statements from unit tests --- rxjava-core/src/main/java/rx/Observable.java | 9 --------- 1 file changed, 9 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index da6f58525a..208e4c54d7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -4229,7 +4229,6 @@ public Subscription call(final Observer observer) { @Override public void run() { counter.incrementAndGet(); - System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); } @@ -4246,7 +4245,6 @@ public void run() { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } }); @@ -4257,7 +4255,6 @@ public void call(String v) { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } }); @@ -4286,7 +4283,6 @@ public Subscription call(final Observer observer) { @Override public void run() { counter.incrementAndGet(); - System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); } @@ -4308,7 +4304,6 @@ public void run() { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } }); @@ -4319,7 +4314,6 @@ public void call(String v) { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } }); @@ -4346,7 +4340,6 @@ public Subscription call(final Observer observer) { @Override public void run() { counter.incrementAndGet(); - System.out.println("published observable being executed"); observer.onNext("one"); observer.onCompleted(); } @@ -4364,7 +4357,6 @@ public void run() { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } }); @@ -4375,7 +4367,6 @@ public void call(String v) { @Override public void call(String v) { assertEquals("one", v); - System.out.println("v: " + v); latch.countDown(); } });