diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac0f42754..ac9d9b58b4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,12 +15,11 @@ */ package rx; -import static rx.util.functions.Functions.not; +import static rx.util.functions.Functions.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -64,6 +63,8 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottleFirst; +import rx.operators.OperationDebounce; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1809,6 +1810,182 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler return create(OperationInterval.interval(interval, unit, scheduler)); } + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * + *

+ * Information on debounce vs throttle: + *

+ *

+ * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The {@link TimeUnit} for the timeout. + * + * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * @see {@link #throttleWithTimeout}; + */ + public Observable debounce(long timeout, TimeUnit unit) { + return create(OperationDebounce.debounce(this, timeout, unit)); + } + + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * + *

+ * Information on debounce vs throttle: + *

+ *

+ * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + * @see {@link #throttleWithTimeout}; + */ + public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationDebounce.debounce(this, timeout, unit)); + } + + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * + *

+ * Information on debounce vs throttle: + *

+ *

    + *
  • http://drupalmotion.com/article/debounce-and-throttle-visual-explanation
  • + *
  • http://unscriptable.com/2009/03/20/debouncing-javascript-methods/
  • + *
  • http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/
  • + *
+ * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The {@link TimeUnit} for the timeout. + * + * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * @see {@link #debounce} + */ + public Observable throttleWithTimeout(long timeout, TimeUnit unit) { + return create(OperationDebounce.debounce(this, timeout, unit)); + } + + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * + * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + * @see {@link #debounce} + */ + public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); + } + + /** + * Throttles by skipping value until `skipDuration` passes and then emits the next received value. + *

+ * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. + *

+ * + * + * @param skipDuration + * Time to wait before sending another value after emitting last value. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + */ + public Observable throttleFirst(long windowDuration, TimeUnit unit) { + return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit)); + } + + /** + * Throttles by skipping value until `skipDuration` passes and then emits the next received value. + *

+ * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. + *

+ * + * + * @param skipDuration + * Time to wait before sending another value after emitting last value. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + */ + public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler)); + } + + /** + * Throttles by returning the last value of each interval defined by 'intervalDuration'. + *

+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. + *

+ * + * + * @param intervalDuration + * Duration of windows within with the last value will be chosen. + * @param unit + * The unit of time for the specified interval. + * @return Observable which performs the throttle operation. + * @see {@link #sample(long, TimeUnit)} + */ + public Observable throttleLast(long intervalDuration, TimeUnit unit) { + return sample(intervalDuration, unit); + } + + /** + * Throttles by returning the last value of each interval defined by 'intervalDuration'. + *

+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. + *

+ * + * + * @param intervalDuration + * Duration of windows within with the last value will be chosen. + * @param unit + * The unit of time for the specified interval. + * @return Observable which performs the throttle operation. + * @see {@link #sample(long, TimeUnit, Scheduler)} + */ + public Observable throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) { + return sample(intervalDuration, unit, scheduler); + } + /** * Wraps each item emitted by a source Observable in a {@link Timestamped} object. *

diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index 7afab7ec42..04b8c1a2c5 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -19,20 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); private static class TimedAction { + private final long time; private final Func2 action; private final T state; private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { this.time = time; @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2) current.action).call(current.scheduler, current.state); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } } time = targetTimeInNanos; } @@ -97,7 +107,14 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); - return Subscriptions.empty(); + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationDebounce.java b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java new file mode 100644 index 0000000000..d225477069 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java @@ -0,0 +1,294 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too + * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published + * as soon as the timeout expires. + */ +public final class OperationDebounce { + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items + * The {@link Observable} which is publishing events. + * @param timeout + * How long each event has to be the 'last event' before it gets published. + * @param unit + * The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc debounce(Observable items, long timeout, TimeUnit unit) { + return debounce(items, timeout, unit, Schedulers.threadPoolForComputation()); + } + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items + * The {@link Observable} which is publishing events. + * @param timeout + * How long each event has to be the 'last event' before it gets published. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc debounce(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new Debounce(items, timeout, unit, scheduler).onSubscribe(observer); + } + }; + } + + private static class Debounce implements OnSubscribeFunc { + + private final Observable items; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + public Debounce(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { + this.items = items; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(Observer observer) { + return items.subscribe(new DebounceObserver(observer, timeout, unit, scheduler)); + } + } + + private static class DebounceObserver implements Observer { + + private final Observer observer; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + private final AtomicReference lastScheduledNotification = new AtomicReference(); + + public DebounceObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { + // we need to synchronize the observer since the on* events can be coming from different + // threads and are thus non-deterministic and could be interleaved + this.observer = new SynchronizedObserver(observer); + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + /* + * Cancel previous subscription if it has not already executed. + * Expected that some race-condition will occur as this is crossing over thread boundaries + * We are using SynchronizedObserver around 'observer' to handle interleaving and out-of-order calls. + */ + lastScheduledNotification.get().unsubscribe(); + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + /* + * Cancel previous subscription if it has not already executed. + * Expected that some race-condition will occur as this is crossing over thread boundaries + * We are using SynchronizedObserver around 'observer' to handle interleaving and out-of-order calls. + */ + lastScheduledNotification.get().unsubscribe(); + observer.onError(e); + } + + @Override + public void onNext(final T v) { + Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action0() { + + @Override + public void call() { + observer.onNext(v); + } + + }, timeout, unit)); + // cancel previous if not already executed + if (previousSubscription != null) { + previousSubscription.unsubscribe(); + } + } + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testDebounceWithCompleted() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // must go to 800 since it must be 400 after when two is sent, which is at 400 + scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testDebounceNeverEmits() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + // all should be skipped since they are happening faster than the 200ms timeout + publishNext(observer, 100, "a"); // Should be skipped + publishNext(observer, 200, "b"); // Should be skipped + publishNext(observer, 300, "c"); // Should be skipped + publishNext(observer, 400, "d"); // Should be skipped + publishNext(observer, 500, "e"); // Should be skipped + publishNext(observer, 600, "f"); // Should be skipped + publishNext(observer, 700, "g"); // Should be skipped + publishNext(observer, 800, "h"); // Should be skipped + publishCompleted(observer, 900); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationDebounce.debounce(source, 200, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(0)).onNext(anyString()); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testDebounceWithError() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + Exception error = new TestException(); + publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires. + publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. + publishError(observer, 700, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // 100 + 400 means it triggers at 500 + scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onNext("one"); + scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, final long delay, final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java new file mode 100644 index 0000000000..4c3a1ea8d4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java @@ -0,0 +1,197 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Throttle by windowing a stream and returning the first value in each window. + */ +public final class OperationThrottleFirst { + + /** + * Throttles to first value in each window. + * + * @param items + * The {@link Observable} which is publishing events. + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleFirst(Observable items, long windowDuration, TimeUnit unit) { + return throttleFirst(items, windowDuration, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Throttles to first value in each window. + * + * @param items + * The {@link Observable} which is publishing events. + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleFirst(final Observable items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + + final AtomicLong lastOnNext = new AtomicLong(0); + final long timeInMilliseconds = unit.toMillis(windowDuration); + + return items.filter(new Func1() { + + @Override + public Boolean call(T value) { + long now = scheduler.now(); + if (lastOnNext.get() == 0 || now - lastOnNext.get() >= timeInMilliseconds) { + lastOnNext.set(now); + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } + } + + }).subscribe(observer); + } + }; + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testThrottlingWithCompleted() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 100, "one"); // publish as it's first + publishNext(observer, 300, "two"); // skip as it's last within the first 400 + publishNext(observer, 900, "three"); // publish + publishNext(observer, 905, "four"); // skip + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(0)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(0)).onNext("four"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testThrottlingWithError() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + Exception error = new TestException(); + publishNext(observer, 100, "one"); // Should be published since it is first + publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires + publishError(observer, 300, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onNext("one"); + inOrder.verify(observer).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, long delay, final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { + } + + } + +} diff --git a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index c795bb60cc..fff3fccbf8 100644 --- a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -73,6 +73,16 @@ public SynchronizedObserver(Observer Observer, SafeObservableSubscrip this.observer = Observer; this.subscription = subscription; } + + /** + * Used when synchronizing an Observer without access to the subscription. + * + * @param Observer + */ + public SynchronizedObserver(Observer Observer) { + this.observer = Observer; + this.subscription = new SafeObservableSubscription(); + } public void onNext(T arg) { if (finished || finishRequested || subscription.isUnsubscribed()) { diff --git a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java new file mode 100644 index 0000000000..5e8f3ef1cb --- /dev/null +++ b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java @@ -0,0 +1,46 @@ +package rx; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +public class ThrottleFirstTests { + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // deliver + o.onNext(2); // skip + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // deliver + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // skip + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onNext(3); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +} diff --git a/rxjava-core/src/test/java/rx/ThrottleLastTests.java b/rxjava-core/src/test/java/rx/ThrottleLastTests.java new file mode 100644 index 0000000000..742bbc09ba --- /dev/null +++ b/rxjava-core/src/test/java/rx/ThrottleLastTests.java @@ -0,0 +1,46 @@ +package rx; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +public class ThrottleLastTests { + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleLast(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // skip + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // deliver + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(2); + inOrder.verify(observer).onNext(6); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +} diff --git a/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java new file mode 100644 index 0000000000..3503fe7adb --- /dev/null +++ b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java @@ -0,0 +1,46 @@ +package rx; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +public class ThrottleWithTimeoutTests { + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // skip + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5) + s.advanceTimeTo(1300, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1800, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(2); + inOrder.verify(observer).onNext(6); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +}