From 2ea065c0ef22ea7cf58e9fb6d6f24c69f365bed6 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 10:49:23 +0200 Subject: [PATCH 01/12] Created and wired an implementation for the throttle operation on Observables. --- rxjava-core/src/main/java/rx/Observable.java | 37 ++- .../java/rx/concurrency/TestScheduler.java | 27 +- .../java/rx/operators/OperationThrottle.java | 295 ++++++++++++++++++ 3 files changed, 351 insertions(+), 8 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottle.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index a7d8ffe55f..d52dcb5710 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,9 +15,16 @@ */ package rx; -import static org.junit.Assert.*; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.fail; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.ArrayList; import java.util.Arrays; @@ -65,6 +72,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottle; import rx.operators.OperationTimestamp; import rx.operators.OperationToFuture; import rx.operators.OperationToIterator; @@ -2095,6 +2103,29 @@ public Boolean call(T t, Integer integer) })); } + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit The {@link TimeUnit} for the timeout. + * @return An {@link Observable} which filters out values which are too quickly followed up with never values. + */ + public Observable throttle(long timeout, TimeUnit unit) { + return create(OperationThrottle.throttle(this, timeout, unit)); + } + + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit The {@link TimeUnit} for the timeout. + * @param scheduler The {@link Scheduler} to use when timing incoming values. + * @return An {@link Observable} which filters out values which are too quickly followed up with never values. + */ + public Observable throttle(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottle.throttle(this, timeout, unit, scheduler)); + } + /** * Adds a timestamp to each item emitted by this observable. * @return An observable sequence of timestamped items. diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index a10ab90e21..1e56dbfcf6 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -19,20 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); private static class TimedAction { + private final long time; private final Func2 action; private final T state; private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { this.time = time; @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2) current.action).call(current.scheduler, current.state); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } } } @@ -97,7 +107,14 @@ public Subscription schedule(T state, Func2 acti @Override public Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); - return Subscriptions.empty(); + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java new file mode 100644 index 0000000000..05286f0cb4 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -0,0 +1,295 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; + +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Notification; +import rx.Observable; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + + +/** + * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too + * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published + * as soon as the timeout expires. + */ +public final class OperationThrottle { + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items The {@link Observable} which is publishing events. + * @param timeout How long each event has to be the 'last event' before it gets published. + * @param unit The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static Func1, Subscription> throttle(final Observable items, long timeout, TimeUnit unit) { + return throttle(items, timeout, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); + } + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items The {@link Observable} which is publishing events. + * @param timeout How long each event has to be the 'last event' before it gets published. + * @param unit The unit of time for the specified timeout. + * @param scheduler The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static Func1, Subscription> throttle(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { + return new Func1, Subscription>() { + @Override + public Subscription call(Observer observer) { + return new Throttle(items, timeout, unit, scheduler).call(observer); + } + }; + } + + private static class Throttle implements Func1, Subscription> { + + private final Observable items; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { + this.items = items; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription call(Observer observer) { + return items.subscribe(new ThrottledObserver(observer, timeout, unit, scheduler)); + } + } + + private static class ThrottledObserver implements Observer { + + private final Observer observer; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + private final AtomicLong waitUntil = new AtomicLong(); + private final AtomicReference subscription = new AtomicReference(Subscriptions.empty()); + + public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { + this.observer = observer; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + throttle(new Notification()); + } + + @Override + public void onError(Exception e) { + throttle(new Notification(e)); + } + + @Override + public void onNext(final T args) { + throttle(new Notification(args)); + } + + private void throttle(final Notification args) { + synchronized (subscription) { + if (!timerHasExpired()) { + subscription.get().unsubscribe(); + } + subscription.set(scheduler.schedule(new ThrottleAction(observer, args), timeout, unit)); + } + } + + private boolean timerHasExpired() { + long now = scheduler.now(); + long nextTimeout = now + unit.toMillis(timeout); + long previousTimeout = waitUntil.getAndSet(nextTimeout); + return previousTimeout <= now; + } + } + + private static final class ThrottleAction implements Action0 { + + private final Observer observer; + private final Notification notification; + + public ThrottleAction(Observer observer, Notification notification) { + this.observer = observer; + this.notification = notification; + } + + @Override + public void call() { + if (notification.isOnNext()) { + observer.onNext(notification.getValue()); + } + else if (notification.isOnError()) { + observer.onError(notification.getException()); + } + else if (notification.isOnCompleted()) { + observer.onCompleted(); + } + } + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testThrottlingWithCompleted() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observser) { + publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observser, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(700, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(900, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1600, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, times(1)).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + } + + @Test + public void testThrottlingWithError() { + Observable source = Observable.create(new Func1, Subscription>() { + @Override + public Subscription call(final Observer observser) { + Exception error = new TestException(); + publishNext(observser, 100, "one"); // Should be published since "two" will arrive after the timeout expires. + publishNext(observser, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. + publishError(observser, 700, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(600, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + verify(observer, never()).onCompleted(); + verify(observer, never()).onError(any(Exception.class)); + + scheduler.advanceTimeTo(1200, TimeUnit.MILLISECONDS); + inOrder.verify(observer, never()).onNext(anyString()); + verify(observer, never()).onCompleted(); + verify(observer, times(1)).onError(any(TestException.class)); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, long delay, final String value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { } + + } + +} From 622d861efea555e98600f165f6521afd7f4084c0 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 10:54:33 +0200 Subject: [PATCH 02/12] Ensure static star imports are used for test cases. --- rxjava-core/src/main/java/rx/Observable.java | 13 +++---------- .../main/java/rx/operators/OperationThrottle.java | 9 ++------- 2 files changed, 5 insertions(+), 17 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d52dcb5710..35512dcf7a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,16 +15,9 @@ */ package rx; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; -import static org.junit.Assert.fail; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyInt; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.junit.Assert.*; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.ArrayList; import java.util.Arrays; diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 05286f0cb4..c53fc14864 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,13 +15,8 @@ */ package rx.operators; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; From 77ac15bb78fd9bc4f698edd44bbd88a55907a2b8 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Sun, 5 May 2013 11:23:24 +0200 Subject: [PATCH 03/12] No longer using Notification for scheduling throttled events. --- .../java/rx/operators/OperationThrottle.java | 68 +++++++++++++------ 1 file changed, 47 insertions(+), 21 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index c53fc14864..5badf0858c 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,8 +15,13 @@ */ package rx.operators; -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -27,7 +32,6 @@ import org.junit.Test; import org.mockito.InOrder; -import rx.Notification; import rx.Observable; import rx.Observer; import rx.Scheduler; @@ -119,25 +123,25 @@ public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Sche @Override public void onCompleted() { - throttle(new Notification()); + throttle(new ThrottledOnComplete(observer)); } @Override public void onError(Exception e) { - throttle(new Notification(e)); + throttle(new ThrottledOnError(observer, e)); } @Override public void onNext(final T args) { - throttle(new Notification(args)); + throttle(new ThrottledOnNext(observer, args)); } - private void throttle(final Notification args) { + private void throttle(Action0 action) { synchronized (subscription) { if (!timerHasExpired()) { subscription.get().unsubscribe(); } - subscription.set(scheduler.schedule(new ThrottleAction(observer, args), timeout, unit)); + subscription.set(scheduler.schedule(action, timeout, unit)); } } @@ -149,27 +153,49 @@ private boolean timerHasExpired() { } } - private static final class ThrottleAction implements Action0 { + private static final class ThrottledOnNext implements Action0 { private final Observer observer; - private final Notification notification; + private final T value; - public ThrottleAction(Observer observer, Notification notification) { + public ThrottledOnNext(Observer observer, T value) { this.observer = observer; - this.notification = notification; + this.value = value; } @Override public void call() { - if (notification.isOnNext()) { - observer.onNext(notification.getValue()); - } - else if (notification.isOnError()) { - observer.onError(notification.getException()); - } - else if (notification.isOnCompleted()) { - observer.onCompleted(); - } + observer.onNext(value); + } + } + + private static final class ThrottledOnError implements Action0 { + + private final Observer observer; + private final Exception error; + + public ThrottledOnError(Observer observer, Exception error) { + this.observer = observer; + this.error = error; + } + + @Override + public void call() { + observer.onError(error); + } + } + + private static final class ThrottledOnComplete implements Action0 { + + private final Observer observer; + + public ThrottledOnComplete(Observer observer) { + this.observer = observer; + } + + @Override + public void call() { + observer.onCompleted(); } } From 02ee6fae45e5b6c2c90af88f6479fb130c87cbe7 Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 00:48:53 +0200 Subject: [PATCH 04/12] Cleaned up imports and removed unnecessary final keywords in the OperationThrottle class. --- .../java/rx/operators/OperationThrottle.java | 23 ++++++++----------- 1 file changed, 9 insertions(+), 14 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 5badf0858c..282347a38e 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -15,13 +15,8 @@ */ package rx.operators; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.anyString; -import static org.mockito.Mockito.inOrder; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.never; -import static org.mockito.Mockito.times; -import static org.mockito.Mockito.verify; +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; @@ -60,7 +55,7 @@ public final class OperationThrottle { * @param unit The unit of time for the specified timeout. * @return A {@link Func1} which performs the throttle operation. */ - public static Func1, Subscription> throttle(final Observable items, long timeout, TimeUnit unit) { + public static Func1, Subscription> throttle(Observable items, long timeout, TimeUnit unit) { return throttle(items, timeout, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); } @@ -132,7 +127,7 @@ public void onError(Exception e) { } @Override - public void onNext(final T args) { + public void onNext(T args) { throttle(new ThrottledOnNext(observer, args)); } @@ -215,7 +210,7 @@ public void before() { public void testThrottlingWithCompleted() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(final Observer observser) { + public Subscription call(Observer observser) { publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. @@ -250,7 +245,7 @@ public Subscription call(final Observer observser) { public void testThrottlingWithError() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(final Observer observser) { + public Subscription call(Observer observser) { Exception error = new TestException(); publishNext(observser, 100, "one"); // Should be published since "two" will arrive after the timeout expires. publishNext(observser, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. @@ -281,7 +276,7 @@ public Subscription call(final Observer observser) { verify(observer, times(1)).onError(any(TestException.class)); } - private void publishCompleted(final Observer observer, long delay) { + private void publishCompleted(final Observer observer, long delay) { scheduler.schedule(new Action0() { @Override public void call() { @@ -290,7 +285,7 @@ public void call() { }, delay, TimeUnit.MILLISECONDS); } - private void publishError(final Observer observer, long delay, final Exception error) { + private void publishError(final Observer observer, long delay, final Exception error) { scheduler.schedule(new Action0() { @Override public void call() { @@ -299,7 +294,7 @@ public void call() { }, delay, TimeUnit.MILLISECONDS); } - private void publishNext(final Observer observer, long delay, final String value) { + private void publishNext(final Observer observer, long delay, final T value) { scheduler.schedule(new Action0() { @Override public void call() { From 2519ef8164d6e3405c40c3f187b5477188d142aa Mon Sep 17 00:00:00 2001 From: Michael de Jong Date: Mon, 6 May 2013 01:29:33 +0200 Subject: [PATCH 05/12] Fixed a typo the UnitTest class of OperationThrottle. --- .../src/main/java/rx/operators/OperationThrottle.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java index 282347a38e..ebef4e7286 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java @@ -210,11 +210,11 @@ public void before() { public void testThrottlingWithCompleted() { Observable source = Observable.create(new Func1, Subscription>() { @Override - public Subscription call(Observer observser) { - publishNext(observser, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. - publishNext(observser, 400, "two"); // Should be published since "three" will arrive after the timeout expires. - publishNext(observser, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. - publishCompleted(observser, 1000); // Should be published as soon as the timeout expires. + public Subscription call(Observer observer) { + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. return Subscriptions.empty(); } From 4c0c4dbc54aa40619066a19c83cb21257fa3e07b Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Mon, 9 Sep 2013 23:51:43 -0700 Subject: [PATCH 06/12] Operator: throttleWithTimeout MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Another take on `throttle` … I believe this matches Rx.Net behavior. This will wait until timeout value has passed without any further values before emitting the received value. --- rxjava-core/src/main/java/rx/Observable.java | 31 +++ .../java/rx/concurrency/TestScheduler.java | 27 +- .../OperationThrottleWithTimeout.java | 250 ++++++++++++++++++ .../java/rx/ThrottleWithTimeoutTests.java | 46 ++++ 4 files changed, 349 insertions(+), 5 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java create mode 100644 rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac0f42754..7ae3326bf8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -64,6 +64,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottleWithTimeout; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1809,6 +1810,36 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler return create(OperationInterval.interval(interval, unit, scheduler)); } + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * + * @param unit + * The {@link TimeUnit} for the timeout. + * + * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + */ + public Observable throttleWithTimeout(long timeout, TimeUnit unit) { + return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit)); + } + + /** + * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The {@link TimeUnit} for the timeout. + * @param scheduler + * The {@link Scheduler} to use when timing incoming values. + * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + */ + public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler)); + } + /** * Wraps each item emitted by a source Observable in a {@link Timestamped} object. *

diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index 7afab7ec42..04b8c1a2c5 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -19,20 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); private static class TimedAction { + private final long time; private final Func2 action; private final T state; private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { this.time = time; @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2) current.action).call(current.scheduler, current.state); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } } time = targetTimeInNanos; } @@ -97,7 +107,14 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); - return Subscriptions.empty(); + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java new file mode 100644 index 0000000000..a6c77c2084 --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java @@ -0,0 +1,250 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too + * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published + * as soon as the timeout expires. + */ +public final class OperationThrottleWithTimeout { + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items + * The {@link Observable} which is publishing events. + * @param timeout + * How long each event has to be the 'last event' before it gets published. + * @param unit + * The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleWithTimeout(Observable items, long timeout, TimeUnit unit) { + return throttleWithTimeout(items, timeout, unit, Schedulers.threadPoolForComputation()); + } + + /** + * This operation filters out events which are published too quickly in succession. This is done by dropping events which are + * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) + * the last received event is published. + * + * @param items + * The {@link Observable} which is publishing events. + * @param timeout + * How long each event has to be the 'last event' before it gets published. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleWithTimeout(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return new Throttle(items, timeout, unit, scheduler).onSubscribe(observer); + } + }; + } + + private static class Throttle implements OnSubscribeFunc { + + private final Observable items; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { + this.items = items; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public Subscription onSubscribe(Observer observer) { + return items.subscribe(new ThrottledObserver(observer, timeout, unit, scheduler)); + } + } + + private static class ThrottledObserver implements Observer { + + private final Observer observer; + private final long timeout; + private final TimeUnit unit; + private final Scheduler scheduler; + + private final AtomicReference lastScheduledNotification = new AtomicReference(); + + public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { + this.observer = observer; + this.timeout = timeout; + this.unit = unit; + this.scheduler = scheduler; + } + + @Override + public void onCompleted() { + observer.onCompleted(); + } + + @Override + public void onError(Throwable e) { + lastScheduledNotification.get().unsubscribe(); + observer.onError(e); + } + + @Override + public void onNext(final T v) { + Subscription previousSubscription = lastScheduledNotification.getAndSet(scheduler.schedule(new Action0() { + + @Override + public void call() { + observer.onNext(v); + } + + }, timeout, unit)); + // cancel previous if not already executed + if (previousSubscription != null) { + previousSubscription.unsubscribe(); + } + } + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testThrottlingWithCompleted() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. + publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. + publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires. + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // must go to 800 since it must be 400 after when two is sent, which is at 400 + scheduler.advanceTimeTo(800, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("two"); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testThrottlingWithError() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + Exception error = new TestException(); + publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires. + publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. + publishError(observer, 700, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + // 100 + 400 means it triggers at 500 + scheduler.advanceTimeTo(500, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onNext("one"); + scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, final long delay, final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { + } + + } + +} diff --git a/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java new file mode 100644 index 0000000000..3503fe7adb --- /dev/null +++ b/rxjava-core/src/test/java/rx/ThrottleWithTimeoutTests.java @@ -0,0 +1,46 @@ +package rx; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +public class ThrottleWithTimeoutTests { + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleWithTimeout(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // skip + o.onNext(2); // deliver + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // skip + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // deliver at 1300 after 500ms has passed since onNext(5) + s.advanceTimeTo(1300, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1800, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(2); + inOrder.verify(observer).onNext(6); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +} From 2e625a6e7644828bc8a0d8d097d5c5e4cb9c2e0d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 00:05:40 -0700 Subject: [PATCH 07/12] Operator: throttleFirst MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Another take on `throttle` … this delivers the first value in each window. --- rxjava-core/src/main/java/rx/Observable.java | 29 +++ .../java/rx/concurrency/TestScheduler.java | 27 ++- .../rx/operators/OperationThrottleFirst.java | 185 ++++++++++++++++++ .../src/test/java/rx/ThrottleFirstTests.java | 46 +++++ 4 files changed, 282 insertions(+), 5 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java create mode 100644 rxjava-core/src/test/java/rx/ThrottleFirstTests.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 2ac0f42754..b1a1027ca7 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -64,6 +64,7 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; +import rx.operators.OperationThrottleFirst; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1809,6 +1810,34 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler return create(OperationInterval.interval(interval, unit, scheduler)); } + /** + * Throttles to first value in each window. + * + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @return Observable which performs the throttle operation. + */ + public Observable throttleFirst(long windowDuration, TimeUnit unit) { + return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit)); + } + + /** + * Throttles to first value in each window. + * + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + */ + public Observable throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler)); + } + /** * Wraps each item emitted by a source Observable in a {@link Timestamped} object. *

diff --git a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java index 7afab7ec42..04b8c1a2c5 100644 --- a/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java +++ b/rxjava-core/src/main/java/rx/concurrency/TestScheduler.java @@ -19,20 +19,22 @@ import java.util.PriorityQueue; import java.util.Queue; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import rx.Scheduler; import rx.Subscription; -import rx.subscriptions.Subscriptions; import rx.util.functions.Func2; public class TestScheduler extends Scheduler { private final Queue> queue = new PriorityQueue>(11, new CompareActionsByTime()); private static class TimedAction { + private final long time; private final Func2 action; private final T state; private final TestScheduler scheduler; + private final AtomicBoolean isCancelled = new AtomicBoolean(false); private TimedAction(TestScheduler scheduler, long time, Func2 action, T state) { this.time = time; @@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2) current.action).call(current.scheduler, current.state); + + // Only execute if the TimedAction has not yet been cancelled + if (!current.isCancelled.get()) { + // because the queue can have wildcards we have to ignore the type T for the state + ((Func2) current.action).call(current.scheduler, current.state); + } } time = targetTimeInNanos; } @@ -97,7 +107,14 @@ public Subscription schedule(T state, Func2 Subscription schedule(T state, Func2 action, long delayTime, TimeUnit unit) { - queue.add(new TimedAction(this, time + unit.toNanos(delayTime), action, state)); - return Subscriptions.empty(); + final TimedAction timedAction = new TimedAction(this, time + unit.toNanos(delayTime), action, state); + queue.add(timedAction); + + return new Subscription() { + @Override + public void unsubscribe() { + timedAction.cancel(); + } + }; } } diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java new file mode 100644 index 0000000000..c87339502e --- /dev/null +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java @@ -0,0 +1,185 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.operators; + +import static org.mockito.Matchers.*; +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Before; +import org.junit.Test; +import org.mockito.InOrder; + +import rx.Observable; +import rx.Observable.OnSubscribeFunc; +import rx.Observer; +import rx.Scheduler; +import rx.Subscription; +import rx.concurrency.Schedulers; +import rx.concurrency.TestScheduler; +import rx.subscriptions.Subscriptions; +import rx.util.functions.Action0; +import rx.util.functions.Func1; + +/** + * Throttle by windowing a stream and returning the first value in each window. + */ +public final class OperationThrottleFirst { + + /** + * Throttles to first value in each window. + * + * @param items + * The {@link Observable} which is publishing events. + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleFirst(Observable items, long windowDuration, TimeUnit unit) { + return throttleFirst(items, windowDuration, unit, Schedulers.threadPoolForComputation()); + } + + /** + * Throttles to first value in each window. + * + * @param items + * The {@link Observable} which is publishing events. + * @param windowDuration + * Duration of windows within with the first value will be chosen. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return A {@link Func1} which performs the throttle operation. + */ + public static OnSubscribeFunc throttleFirst(final Observable items, final long windowDuration, final TimeUnit unit, final Scheduler scheduler) { + return new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + return items.window(windowDuration, unit, scheduler).flatMap(new Func1, Observable>() { + + @Override + public Observable call(Observable o) { + return o.takeFirst(); + } + }).subscribe(observer); + } + }; + } + + public static class UnitTest { + + private TestScheduler scheduler; + private Observer observer; + + @Before + @SuppressWarnings("unchecked") + public void before() { + scheduler = new TestScheduler(); + observer = mock(Observer.class); + } + + @Test + public void testThrottlingWithCompleted() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + publishNext(observer, 100, "one"); // publish as it's first + publishNext(observer, 300, "two"); // skip as it's last within the first 400 + publishNext(observer, 900, "three"); // publish + publishNext(observer, 905, "four"); // skip + publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onNext("one"); + inOrder.verify(observer, times(0)).onNext("two"); + inOrder.verify(observer, times(1)).onNext("three"); + inOrder.verify(observer, times(0)).onNext("four"); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testThrottlingWithError() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + Exception error = new TestException(); + publishNext(observer, 100, "one"); // Should be published since it is first + publishNext(observer, 200, "two"); // Should be skipped since onError will arrive before the timeout expires + publishError(observer, 300, error); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationThrottleFirst.throttleFirst(source, 400, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + InOrder inOrder = inOrder(observer); + + scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS); + inOrder.verify(observer).onNext("one"); + inOrder.verify(observer).onError(any(TestException.class)); + inOrder.verifyNoMoreInteractions(); + } + + private void publishCompleted(final Observer observer, long delay) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onCompleted(); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishError(final Observer observer, long delay, final Exception error) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onError(error); + } + }, delay, TimeUnit.MILLISECONDS); + } + + private void publishNext(final Observer observer, long delay, final T value) { + scheduler.schedule(new Action0() { + @Override + public void call() { + observer.onNext(value); + } + }, delay, TimeUnit.MILLISECONDS); + } + + @SuppressWarnings("serial") + private class TestException extends Exception { + } + + } + +} diff --git a/rxjava-core/src/test/java/rx/ThrottleFirstTests.java b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java new file mode 100644 index 0000000000..5e8f3ef1cb --- /dev/null +++ b/rxjava-core/src/test/java/rx/ThrottleFirstTests.java @@ -0,0 +1,46 @@ +package rx; + +import static org.mockito.Mockito.*; + +import java.util.concurrent.TimeUnit; + +import org.junit.Test; +import org.mockito.InOrder; + +import rx.concurrency.TestScheduler; +import rx.subjects.PublishSubject; + +public class ThrottleFirstTests { + + @Test + public void testThrottle() { + @SuppressWarnings("unchecked") + Observer observer = mock(Observer.class); + TestScheduler s = new TestScheduler(); + PublishSubject o = PublishSubject.create(); + o.throttleFirst(500, TimeUnit.MILLISECONDS, s).subscribe(observer); + + // send events with simulated time increments + s.advanceTimeTo(0, TimeUnit.MILLISECONDS); + o.onNext(1); // deliver + o.onNext(2); // skip + s.advanceTimeTo(501, TimeUnit.MILLISECONDS); + o.onNext(3); // deliver + s.advanceTimeTo(600, TimeUnit.MILLISECONDS); + o.onNext(4); // skip + s.advanceTimeTo(700, TimeUnit.MILLISECONDS); + o.onNext(5); // skip + o.onNext(6); // skip + s.advanceTimeTo(1001, TimeUnit.MILLISECONDS); + o.onNext(7); // deliver + s.advanceTimeTo(1501, TimeUnit.MILLISECONDS); + o.onCompleted(); + + InOrder inOrder = inOrder(observer); + inOrder.verify(observer).onNext(1); + inOrder.verify(observer).onNext(3); + inOrder.verify(observer).onNext(7); + inOrder.verify(observer).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } +} From b75b37c513b537025c939ced498d747c233bea38 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 00:10:54 -0700 Subject: [PATCH 08/12] Update javadoc for throttleLast --- rxjava-core/src/main/java/rx/Observable.java | 24 +++++++++----------- 1 file changed, 11 insertions(+), 13 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index b10db8dfcf..f835a4da6f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1811,30 +1811,28 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler } /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. - * - * @param timeout - * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * Throttles to last value in each window. * + * @param windowDuration + * Duration of windows within with the first value will be chosen. * @param unit - * The {@link TimeUnit} for the timeout. - * - * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * The unit of time for the specified timeout. + * @return Observable which performs the throttle operation. */ public Observable throttleLast(long timeout, TimeUnit unit) { return create(OperationThrottleLast.throttleLast(this, timeout, unit)); } /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * Throttles to last value in each window. * - * @param timeout - * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param windowDuration + * Duration of windows within with the first value will be chosen. * @param unit - * The {@link TimeUnit} for the timeout. + * The unit of time for the specified timeout. * @param scheduler - * The {@link Scheduler} to use when timing incoming values. - * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. */ public Observable throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler)); From 2a3ade2d6162dc328a340ce247ccdf569284ffa8 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 00:13:51 -0700 Subject: [PATCH 09/12] Update javadoc for throttleWithTimeout --- rxjava-core/src/main/java/rx/Observable.java | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 7ae3326bf8..93b1f3c5ac 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1811,11 +1811,12 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler } /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call. + *

+ * NOTE: If the timeout is set higher than the rate of traffic then this will drop all data. * * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. - * * @param unit * The {@link TimeUnit} for the timeout. * @@ -1826,15 +1827,17 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { } /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call. + *

+ * NOTE: If the timeout is set higher than the rate of traffic then this will drop all data. * * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. * @param unit - * The {@link TimeUnit} for the timeout. + * The unit of time for the specified timeout. * @param scheduler - * The {@link Scheduler} to use when timing incoming values. - * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. */ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler)); From 78cecddcb805584598e4156bab10ae282a4dea71 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 12:39:01 -0700 Subject: [PATCH 10/12] Operators: throttleWithTimeout, throttleFirst, throttleLast - javadocs explaining differences - link between throttleLast and sample (aliase) - refactored throttleFirst to be a more efficient implementations - concurrency changes to throttleWithTimeout --- rxjava-core/src/main/java/rx/Observable.java | 77 ++--- .../java/rx/operators/OperationThrottle.java | 312 ------------------ .../rx/operators/OperationThrottleFirst.java | 18 +- .../rx/operators/OperationThrottleLast.java | 188 ----------- .../OperationThrottleWithTimeout.java | 15 +- .../rx/operators/SynchronizedObserver.java | 10 + 6 files changed, 79 insertions(+), 541 deletions(-) delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottle.java delete mode 100644 rxjava-core/src/main/java/rx/operators/OperationThrottleLast.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5a28efcd47..c90deb021f 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,12 +15,11 @@ */ package rx; -import static rx.util.functions.Functions.not; +import static rx.util.functions.Functions.*; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.List; import java.util.concurrent.Future; import java.util.concurrent.TimeUnit; @@ -64,10 +63,8 @@ import rx.operators.OperationTakeLast; import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; -import rx.operators.OperationThrottle; import rx.operators.OperationThrottleFirst; import rx.operators.OperationThrottleWithTimeout; -import rx.operators.OperationThrottleLast; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1814,9 +1811,9 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler } /** - * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call. + * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

- * NOTE: If the timeout is set higher than the rate of traffic then this will drop all data. + * NOTE: If events keep firing faster than the timeout then no data will be emitted. * * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. @@ -1830,9 +1827,9 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { } /** - * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer reset on each `onNext` call. + * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

- * NOTE: If the timeout is set higher than the rate of traffic then this will drop all data. + * NOTE: If events keep firing faster than the timeout then no data will be emitted. * * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. @@ -1847,12 +1844,16 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler } /** - * Throttles to first value in each window. + * Throttles by skipping value until `skipDuration` passes and then emits the next received value. + *

+ * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. * - * @param windowDuration - * Duration of windows within with the first value will be chosen. + * @param skipDuration + * Time to wait before sending another value after emitting last value. * @param unit * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. * @return Observable which performs the throttle operation. */ public Observable throttleFirst(long windowDuration, TimeUnit unit) { @@ -1860,51 +1861,53 @@ public Observable throttleFirst(long windowDuration, TimeUnit unit) { } /** - * Throttles to first value in each window. + * Throttles by skipping value until `skipDuration` passes and then emits the next received value. + *

+ * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. * - * @param windowDuration - * Duration of windows within with the first value will be chosen. + * @param skipDuration + * Time to wait before sending another value after emitting last value. * @param unit * The unit of time for the specified timeout. * @param scheduler * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. * @return Observable which performs the throttle operation. */ - public Observable throttleFirst(long windowDuration, TimeUnit unit, Scheduler scheduler) { - return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit, scheduler)); + public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) { + return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler)); } - - + /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. - * - * @param timeout - * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * Throttles by returning the last value of each interval defined by 'intervalDuration'. + *

+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. * + * @param intervalDuration + * Duration of windows within with the last value will be chosen. * @param unit - * The {@link TimeUnit} for the timeout. - * - * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * The unit of time for the specified interval. + * @return Observable which performs the throttle operation. + * @see {@link #sample(long, TimeUnit)} */ - public Observable throttleLast(long timeout, TimeUnit unit) { - return create(OperationThrottleLast.throttleLast(this, timeout, unit)); + public Observable throttleLast(long intervalDuration, TimeUnit unit) { + return sample(intervalDuration, unit); } /** - * Throttles the {@link Observable} by dropping values which are followed by newer values before the timer has expired. + * Throttles by returning the last value of each interval defined by 'intervalDuration'. + *

+ * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. * - * @param timeout - * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param intervalDuration + * Duration of windows within with the last value will be chosen. * @param unit - * The {@link TimeUnit} for the timeout. - * @param scheduler - * The {@link Scheduler} to use when timing incoming values. - * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * The unit of time for the specified interval. + * @return Observable which performs the throttle operation. + * @see {@link #sample(long, TimeUnit, Scheduler)} */ - public Observable throttleLast(long timeout, TimeUnit unit, Scheduler scheduler) { - return create(OperationThrottleLast.throttleLast(this, timeout, unit, scheduler)); + public Observable throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) { + return sample(intervalDuration, unit, scheduler); } - /** * Wraps each item emitted by a source Observable in a {@link Timestamped} object. diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java b/rxjava-core/src/main/java/rx/operators/OperationThrottle.java deleted file mode 100644 index 0bee16043d..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottle.java +++ /dev/null @@ -1,312 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.atomic.AtomicReference; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Subscription; -import rx.concurrency.Schedulers; -import rx.concurrency.TestScheduler; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Func1; - - -/** - * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too - * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published - * as soon as the timeout expires. - */ -public final class OperationThrottle { - - /** - * This operation filters out events which are published too quickly in succession. This is done by dropping events which are - * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) - * the last received event is published. - * - * @param items The {@link Observable} which is publishing events. - * @param timeout How long each event has to be the 'last event' before it gets published. - * @param unit The unit of time for the specified timeout. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttle(Observable items, long timeout, TimeUnit unit) { - return throttle(items, timeout, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor())); - } - - /** - * This operation filters out events which are published too quickly in succession. This is done by dropping events which are - * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) - * the last received event is published. - * - * @param items The {@link Observable} which is publishing events. - * @param timeout How long each event has to be the 'last event' before it gets published. - * @param unit The unit of time for the specified timeout. - * @param scheduler The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttle(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - return new Throttle(items, timeout, unit, scheduler).onSubscribe(observer); - } - }; - } - - private static class Throttle implements OnSubscribeFunc { - - private final Observable items; - private final long timeout; - private final TimeUnit unit; - private final Scheduler scheduler; - - public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { - this.items = items; - this.timeout = timeout; - this.unit = unit; - this.scheduler = scheduler; - } - - @Override - public Subscription onSubscribe(Observer observer) { - return items.subscribe(new ThrottledObserver(observer, timeout, unit, scheduler)); - } - } - - private static class ThrottledObserver implements Observer { - - private final Observer observer; - private final long timeout; - private final TimeUnit unit; - private final Scheduler scheduler; - - private final AtomicLong waitUntil = new AtomicLong(); - private final AtomicReference subscription = new AtomicReference(Subscriptions.empty()); - - public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { - this.observer = observer; - this.timeout = timeout; - this.unit = unit; - this.scheduler = scheduler; - } - - @Override - public void onCompleted() { - throttle(new ThrottledOnComplete(observer)); - } - - @Override - public void onError(Throwable e) { - throttle(new ThrottledOnError(observer, e)); - } - - @Override - public void onNext(T args) { - throttle(new ThrottledOnNext(observer, args)); - } - - private void throttle(Action0 action) { - synchronized (subscription) { - if (!timerHasExpired()) { - subscription.get().unsubscribe(); - } - subscription.set(scheduler.schedule(action, timeout, unit)); - } - } - - private boolean timerHasExpired() { - long now = scheduler.now(); - long nextTimeout = now + unit.toMillis(timeout); - long previousTimeout = waitUntil.getAndSet(nextTimeout); - return previousTimeout <= now; - } - } - - private static final class ThrottledOnNext implements Action0 { - - private final Observer observer; - private final T value; - - public ThrottledOnNext(Observer observer, T value) { - this.observer = observer; - this.value = value; - } - - @Override - public void call() { - observer.onNext(value); - } - } - - private static final class ThrottledOnError implements Action0 { - - private final Observer observer; - private final Throwable error; - - public ThrottledOnError(Observer observer, Throwable error) { - this.observer = observer; - this.error = error; - } - - @Override - public void call() { - observer.onError(error); - } - } - - private static final class ThrottledOnComplete implements Action0 { - - private final Observer observer; - - public ThrottledOnComplete(Observer observer) { - this.observer = observer; - } - - @Override - public void call() { - observer.onCompleted(); - } - } - - public static class UnitTest { - - private TestScheduler scheduler; - private Observer observer; - - @Before - @SuppressWarnings("unchecked") - public void before() { - scheduler = new TestScheduler(); - observer = mock(Observer.class); - } - - @Test - public void testThrottlingWithCompleted() { - Observable source = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. - publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. - publishNext(observer, 900, "four"); // Should be skipped since onCompleted will arrive before the timeout expires. - publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); - } - }); - - Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); - sampled.subscribe(observer); - - InOrder inOrder = inOrder(observer); - - scheduler.advanceTimeTo(700, TimeUnit.MILLISECONDS); - inOrder.verify(observer, never()).onNext(anyString()); - verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); - - scheduler.advanceTimeTo(900, TimeUnit.MILLISECONDS); - inOrder.verify(observer, times(1)).onNext("two"); - verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); - - scheduler.advanceTimeTo(1600, TimeUnit.MILLISECONDS); - inOrder.verify(observer, never()).onNext(anyString()); - verify(observer, times(1)).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); - } - - @Test - public void testThrottlingWithError() { - Observable source = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - Exception error = new TestException(); - publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires. - publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. - publishError(observer, 700, error); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); - } - }); - - Observable sampled = Observable.create(OperationThrottle.throttle(source, 400, TimeUnit.MILLISECONDS, scheduler)); - sampled.subscribe(observer); - - InOrder inOrder = inOrder(observer); - - scheduler.advanceTimeTo(300, TimeUnit.MILLISECONDS); - inOrder.verify(observer, never()).onNext(anyString()); - verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); - - scheduler.advanceTimeTo(600, TimeUnit.MILLISECONDS); - inOrder.verify(observer, times(1)).onNext("one"); - verify(observer, never()).onCompleted(); - verify(observer, never()).onError(any(Exception.class)); - - scheduler.advanceTimeTo(1200, TimeUnit.MILLISECONDS); - inOrder.verify(observer, never()).onNext(anyString()); - verify(observer, never()).onCompleted(); - verify(observer, times(1)).onError(any(TestException.class)); - } - - private void publishCompleted(final Observer observer, long delay) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onCompleted(); - } - }, delay, TimeUnit.MILLISECONDS); - } - - private void publishError(final Observer observer, long delay, final Exception error) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onError(error); - } - }, delay, TimeUnit.MILLISECONDS); - } - - private void publishNext(final Observer observer, long delay, final T value) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onNext(value); - } - }, delay, TimeUnit.MILLISECONDS); - } - - @SuppressWarnings("serial") - private class TestException extends Exception { } - - } - -} diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java index c87339502e..4c3a1ea8d4 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleFirst.java @@ -19,6 +19,7 @@ import static org.mockito.Mockito.*; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import org.junit.Before; import org.junit.Test; @@ -72,12 +73,23 @@ public static OnSubscribeFunc throttleFirst(final Observable items, fi return new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { - return items.window(windowDuration, unit, scheduler).flatMap(new Func1, Observable>() { + + final AtomicLong lastOnNext = new AtomicLong(0); + final long timeInMilliseconds = unit.toMillis(windowDuration); + + return items.filter(new Func1() { @Override - public Observable call(Observable o) { - return o.takeFirst(); + public Boolean call(T value) { + long now = scheduler.now(); + if (lastOnNext.get() == 0 || now - lastOnNext.get() >= timeInMilliseconds) { + lastOnNext.set(now); + return Boolean.TRUE; + } else { + return Boolean.FALSE; + } } + }).subscribe(observer); } }; diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleLast.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleLast.java deleted file mode 100644 index db296d36ff..0000000000 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleLast.java +++ /dev/null @@ -1,188 +0,0 @@ -/** - * Copyright 2013 Netflix, Inc. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package rx.operators; - -import static org.mockito.Matchers.*; -import static org.mockito.Mockito.*; - -import java.util.concurrent.TimeUnit; - -import org.junit.Before; -import org.junit.Test; -import org.mockito.InOrder; - -import rx.Observable; -import rx.Observable.OnSubscribeFunc; -import rx.Observer; -import rx.Scheduler; -import rx.Subscription; -import rx.concurrency.Schedulers; -import rx.concurrency.TestScheduler; -import rx.subscriptions.Subscriptions; -import rx.util.functions.Action0; -import rx.util.functions.Func1; - -/** - * This operation is used to filter out bursts of events. This is done by ignoring the events from an observable which are too - * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published - * as soon as the timeout expires. - */ -public final class OperationThrottleLast { - - /** - * This operation filters out events which are published too quickly in succession. This is done by dropping events which are - * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) - * the last received event is published. - * - * @param items - * The {@link Observable} which is publishing events. - * @param timeout - * How long each event has to be the 'last event' before it gets published. - * @param unit - * The unit of time for the specified timeout. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttleLast(Observable items, long timeout, TimeUnit unit) { - return throttleLast(items, timeout, unit, Schedulers.threadPoolForComputation()); - } - - /** - * This operation filters out events which are published too quickly in succession. This is done by dropping events which are - * followed up by other events before a specified timer has expired. If the timer expires and no follow up event was published (yet) - * the last received event is published. - * - * @param items - * The {@link Observable} which is publishing events. - * @param timeout - * How long each event has to be the 'last event' before it gets published. - * @param unit - * The unit of time for the specified timeout. - * @param scheduler - * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. - * @return A {@link Func1} which performs the throttle operation. - */ - public static OnSubscribeFunc throttleLast(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { - return new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - return items.window(timeout, unit, scheduler).flatMap(new Func1, Observable>() { - - @Override - public Observable call(Observable o) { - return o.takeLast(1); - } - }).subscribe(observer); - } - }; - } - - public static class UnitTest { - - private TestScheduler scheduler; - private Observer observer; - - @Before - @SuppressWarnings("unchecked") - public void before() { - scheduler = new TestScheduler(); - observer = mock(Observer.class); - } - - @Test - public void testThrottlingWithCompleted() { - Observable source = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - publishNext(observer, 100, "one"); // Should be skipped since "two" will arrive before the timeout expires. - publishNext(observer, 400, "two"); // Should be published since "three" will arrive after the timeout expires. - publishNext(observer, 900, "three"); // Should be skipped since onCompleted will arrive before the timeout expires. - publishCompleted(observer, 1000); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); - } - }); - - Observable sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler)); - sampled.subscribe(observer); - - InOrder inOrder = inOrder(observer); - - scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); - inOrder.verify(observer, times(1)).onNext("two"); - inOrder.verify(observer, times(1)).onCompleted(); - inOrder.verifyNoMoreInteractions(); - } - - @Test - public void testThrottlingWithError() { - Observable source = Observable.create(new OnSubscribeFunc() { - @Override - public Subscription onSubscribe(Observer observer) { - Exception error = new TestException(); - publishNext(observer, 100, "one"); // Should be published since "two" will arrive after the timeout expires. - publishNext(observer, 600, "two"); // Should be skipped since onError will arrive before the timeout expires. - publishError(observer, 700, error); // Should be published as soon as the timeout expires. - - return Subscriptions.empty(); - } - }); - - Observable sampled = Observable.create(OperationThrottleLast.throttleLast(source, 400, TimeUnit.MILLISECONDS, scheduler)); - sampled.subscribe(observer); - - InOrder inOrder = inOrder(observer); - - scheduler.advanceTimeTo(400, TimeUnit.MILLISECONDS); - inOrder.verify(observer).onNext("one"); - scheduler.advanceTimeTo(701, TimeUnit.MILLISECONDS); - inOrder.verify(observer).onError(any(TestException.class)); - inOrder.verifyNoMoreInteractions(); - } - - private void publishCompleted(final Observer observer, long delay) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onCompleted(); - } - }, delay, TimeUnit.MILLISECONDS); - } - - private void publishError(final Observer observer, long delay, final Exception error) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onError(error); - } - }, delay, TimeUnit.MILLISECONDS); - } - - private void publishNext(final Observer observer, long delay, final T value) { - scheduler.schedule(new Action0() { - @Override - public void call() { - observer.onNext(value); - } - }, delay, TimeUnit.MILLISECONDS); - } - - @SuppressWarnings("serial") - private class TestException extends Exception { - } - - } - -} diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java index a6c77c2084..394bc7499d 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java @@ -114,7 +114,9 @@ private static class ThrottledObserver implements Observer { private final AtomicReference lastScheduledNotification = new AtomicReference(); public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { - this.observer = observer; + // we need to synchronize the observer since the on* events can be coming from different + // threads and are thus non-deterministic and could be interleaved + this.observer = new SynchronizedObserver(observer); this.timeout = timeout; this.unit = unit; this.scheduler = scheduler; @@ -122,11 +124,22 @@ public ThrottledObserver(Observer observer, long timeout, TimeUnit un @Override public void onCompleted() { + /* + * Cancel previous subscription if it has not already executed. + * Expected that some race-condition will occur as this is crossing over thread boundaries + * We are using SynchronizedObserver around 'observer' to handle interleaving and out-of-order calls. + */ + lastScheduledNotification.get().unsubscribe(); observer.onCompleted(); } @Override public void onError(Throwable e) { + /* + * Cancel previous subscription if it has not already executed. + * Expected that some race-condition will occur as this is crossing over thread boundaries + * We are using SynchronizedObserver around 'observer' to handle interleaving and out-of-order calls. + */ lastScheduledNotification.get().unsubscribe(); observer.onError(e); } diff --git a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java index c795bb60cc..fff3fccbf8 100644 --- a/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java +++ b/rxjava-core/src/main/java/rx/operators/SynchronizedObserver.java @@ -73,6 +73,16 @@ public SynchronizedObserver(Observer Observer, SafeObservableSubscrip this.observer = Observer; this.subscription = subscription; } + + /** + * Used when synchronizing an Observer without access to the subscription. + * + * @param Observer + */ + public SynchronizedObserver(Observer Observer) { + this.observer = Observer; + this.subscription = new SafeObservableSubscription(); + } public void onNext(T arg) { if (finished || finishRequested || subscription.isUnsubscribed()) { From 5fabd5883561ff18b18b0d1dfb7001e2959cb11d Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 12:53:30 -0700 Subject: [PATCH 11/12] Use 'debounce' as proper name for ThrottleWithTimeout which unfortunately is the poorly named Rx Throttle operator. http://drupalmotion.com/article/debounce-and-throttle-visual-explanation Debounce: Think of it as "grouping multiple events in one". Imagine that you go home, enter in the elevator, doors are closing... and suddenly your neighbor appears in the hall and tries to jump on the elevator. Be polite! and open the doors for him: you are debouncing the elevator departure. Consider that the same situation can happen again with a third person, and so on... probably delaying the departure several minutes. Throttle: Think of it as a valve, it regulates the flow of the executions. We can determine the maximum number of times a function can be called in certain time. So in the elevator analogy.. you are polite enough to let people in for 10 secs, but once that delay passes, you must go! http://unscriptable.com/2009/03/20/debouncing-javascript-methods/ http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/ --- rxjava-core/src/main/java/rx/Observable.java | 73 +++++++++++++++++-- ...ithTimeout.java => OperationDebounce.java} | 59 +++++++++++---- 2 files changed, 112 insertions(+), 20 deletions(-) rename rxjava-core/src/main/java/rx/operators/{OperationThrottleWithTimeout.java => OperationDebounce.java} (78%) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index c90deb021f..267660cbe6 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -64,7 +64,7 @@ import rx.operators.OperationTakeUntil; import rx.operators.OperationTakeWhile; import rx.operators.OperationThrottleFirst; -import rx.operators.OperationThrottleWithTimeout; +import rx.operators.OperationDebounce; import rx.operators.OperationTimestamp; import rx.operators.OperationToObservableFuture; import rx.operators.OperationToObservableIterable; @@ -1811,9 +1811,17 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler } /** - * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

* NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * Information on debounce vs throttle: + *

+ *

    + *
  • http://drupalmotion.com/article/debounce-and-throttle-visual-explanation
  • + *
  • http://unscriptable.com/2009/03/20/debouncing-javascript-methods/
  • + *
  • http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/
  • + *
* * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. @@ -1821,13 +1829,65 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler * The {@link TimeUnit} for the timeout. * * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * @see {@link #throttleWithTimeout}; + */ + public Observable debounce(long timeout, TimeUnit unit) { + return create(OperationDebounce.debounce(this, timeout, unit)); + } + + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * Information on debounce vs throttle: + *

+ *

    + *
  • http://drupalmotion.com/article/debounce-and-throttle-visual-explanation
  • + *
  • http://unscriptable.com/2009/03/20/debouncing-javascript-methods/
  • + *
  • http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/
  • + *
+ * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The unit of time for the specified timeout. + * @param scheduler + * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. + * @return Observable which performs the throttle operation. + * @see {@link #throttleWithTimeout}; + */ + public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) { + return create(OperationDebounce.debounce(this, timeout, unit)); + } + + /** + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + *

+ * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

+ * Information on debounce vs throttle: + *

+ *

    + *
  • http://drupalmotion.com/article/debounce-and-throttle-visual-explanation
  • + *
  • http://unscriptable.com/2009/03/20/debouncing-javascript-methods/
  • + *
  • http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/
  • + *
+ * + * @param timeout + * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. + * @param unit + * The {@link TimeUnit} for the timeout. + * + * @return An {@link Observable} which filters out values which are too quickly followed up with newer values. + * @see {@link #debounce} */ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { - return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit)); + return create(OperationDebounce.debounce(this, timeout, unit)); } /** - * Throttles by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. + * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

* NOTE: If events keep firing faster than the timeout then no data will be emitted. * @@ -1838,11 +1898,12 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { * @param scheduler * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. * @return Observable which performs the throttle operation. + * @see {@link #debounce} */ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) { - return create(OperationThrottleWithTimeout.throttleWithTimeout(this, timeout, unit, scheduler)); + return create(OperationDebounce.debounce(this, timeout, unit, scheduler)); } - + /** * Throttles by skipping value until `skipDuration` passes and then emits the next received value. *

diff --git a/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java similarity index 78% rename from rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java rename to rxjava-core/src/main/java/rx/operators/OperationDebounce.java index 394bc7499d..d225477069 100644 --- a/rxjava-core/src/main/java/rx/operators/OperationThrottleWithTimeout.java +++ b/rxjava-core/src/main/java/rx/operators/OperationDebounce.java @@ -41,7 +41,7 @@ * quickly followed up with other values. Values which are not followed up by other values within the specified timeout are published * as soon as the timeout expires. */ -public final class OperationThrottleWithTimeout { +public final class OperationDebounce { /** * This operation filters out events which are published too quickly in succession. This is done by dropping events which are @@ -56,8 +56,8 @@ public final class OperationThrottleWithTimeout { * The unit of time for the specified timeout. * @return A {@link Func1} which performs the throttle operation. */ - public static OnSubscribeFunc throttleWithTimeout(Observable items, long timeout, TimeUnit unit) { - return throttleWithTimeout(items, timeout, unit, Schedulers.threadPoolForComputation()); + public static OnSubscribeFunc debounce(Observable items, long timeout, TimeUnit unit) { + return debounce(items, timeout, unit, Schedulers.threadPoolForComputation()); } /** @@ -75,23 +75,23 @@ public static OnSubscribeFunc throttleWithTimeout(Observable items, lo * The {@link Scheduler} to use internally to manage the timers which handle timeout for each event. * @return A {@link Func1} which performs the throttle operation. */ - public static OnSubscribeFunc throttleWithTimeout(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { + public static OnSubscribeFunc debounce(final Observable items, final long timeout, final TimeUnit unit, final Scheduler scheduler) { return new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { - return new Throttle(items, timeout, unit, scheduler).onSubscribe(observer); + return new Debounce(items, timeout, unit, scheduler).onSubscribe(observer); } }; } - private static class Throttle implements OnSubscribeFunc { + private static class Debounce implements OnSubscribeFunc { private final Observable items; private final long timeout; private final TimeUnit unit; private final Scheduler scheduler; - public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { + public Debounce(Observable items, long timeout, TimeUnit unit, Scheduler scheduler) { this.items = items; this.timeout = timeout; this.unit = unit; @@ -100,11 +100,11 @@ public Throttle(Observable items, long timeout, TimeUnit unit, Scheduler sche @Override public Subscription onSubscribe(Observer observer) { - return items.subscribe(new ThrottledObserver(observer, timeout, unit, scheduler)); + return items.subscribe(new DebounceObserver(observer, timeout, unit, scheduler)); } } - private static class ThrottledObserver implements Observer { + private static class DebounceObserver implements Observer { private final Observer observer; private final long timeout; @@ -113,7 +113,7 @@ private static class ThrottledObserver implements Observer { private final AtomicReference lastScheduledNotification = new AtomicReference(); - public ThrottledObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { + public DebounceObserver(Observer observer, long timeout, TimeUnit unit, Scheduler scheduler) { // we need to synchronize the observer since the on* events can be coming from different // threads and are thus non-deterministic and could be interleaved this.observer = new SynchronizedObserver(observer); @@ -174,7 +174,7 @@ public void before() { } @Test - public void testThrottlingWithCompleted() { + public void testDebounceWithCompleted() { Observable source = Observable.create(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { @@ -187,7 +187,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler)); + Observable sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler)); sampled.subscribe(observer); scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); @@ -201,7 +201,38 @@ public Subscription onSubscribe(Observer observer) { } @Test - public void testThrottlingWithError() { + public void testDebounceNeverEmits() { + Observable source = Observable.create(new OnSubscribeFunc() { + @Override + public Subscription onSubscribe(Observer observer) { + // all should be skipped since they are happening faster than the 200ms timeout + publishNext(observer, 100, "a"); // Should be skipped + publishNext(observer, 200, "b"); // Should be skipped + publishNext(observer, 300, "c"); // Should be skipped + publishNext(observer, 400, "d"); // Should be skipped + publishNext(observer, 500, "e"); // Should be skipped + publishNext(observer, 600, "f"); // Should be skipped + publishNext(observer, 700, "g"); // Should be skipped + publishNext(observer, 800, "h"); // Should be skipped + publishCompleted(observer, 900); // Should be published as soon as the timeout expires. + + return Subscriptions.empty(); + } + }); + + Observable sampled = Observable.create(OperationDebounce.debounce(source, 200, TimeUnit.MILLISECONDS, scheduler)); + sampled.subscribe(observer); + + scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); + InOrder inOrder = inOrder(observer); + inOrder.verify(observer, times(0)).onNext(anyString()); + scheduler.advanceTimeTo(1000, TimeUnit.MILLISECONDS); + inOrder.verify(observer, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testDebounceWithError() { Observable source = Observable.create(new OnSubscribeFunc() { @Override public Subscription onSubscribe(Observer observer) { @@ -214,7 +245,7 @@ public Subscription onSubscribe(Observer observer) { } }); - Observable sampled = Observable.create(OperationThrottleWithTimeout.throttleWithTimeout(source, 400, TimeUnit.MILLISECONDS, scheduler)); + Observable sampled = Observable.create(OperationDebounce.debounce(source, 400, TimeUnit.MILLISECONDS, scheduler)); sampled.subscribe(observer); scheduler.advanceTimeTo(0, TimeUnit.MILLISECONDS); From 5e7edd2775edf274b2e7c098c6fb70bcdf7d9446 Mon Sep 17 00:00:00 2001 From: Ben Christensen Date: Tue, 10 Sep 2013 21:41:45 -0700 Subject: [PATCH 12/12] Javadoc with images --- rxjava-core/src/main/java/rx/Observable.java | 16 ++++++++++++++++ 1 file changed, 16 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 267660cbe6..ac9d9b58b4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -1815,6 +1815,8 @@ public static Observable interval(long interval, TimeUnit unit, Scheduler *

* NOTE: If events keep firing faster than the timeout then no data will be emitted. *

+ * + *

* Information on debounce vs throttle: *

*

    @@ -1840,6 +1842,8 @@ public Observable debounce(long timeout, TimeUnit unit) { *

    * NOTE: If events keep firing faster than the timeout then no data will be emitted. *

    + * + *

    * Information on debounce vs throttle: *

    *

      @@ -1866,6 +1870,8 @@ public Observable debounce(long timeout, TimeUnit unit, Scheduler scheduler) *

      * NOTE: If events keep firing faster than the timeout then no data will be emitted. *

      + * + *

      * Information on debounce vs throttle: *

      *

        @@ -1890,6 +1896,8 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit) { * Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call. *

        * NOTE: If events keep firing faster than the timeout then no data will be emitted. + *

        + * * * @param timeout * The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped. @@ -1908,6 +1916,8 @@ public Observable throttleWithTimeout(long timeout, TimeUnit unit, Scheduler * Throttles by skipping value until `skipDuration` passes and then emits the next received value. *

        * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. + *

        + * * * @param skipDuration * Time to wait before sending another value after emitting last value. @@ -1925,6 +1935,8 @@ public Observable throttleFirst(long windowDuration, TimeUnit unit) { * Throttles by skipping value until `skipDuration` passes and then emits the next received value. *

        * This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals. + *

        + * * * @param skipDuration * Time to wait before sending another value after emitting last value. @@ -1942,6 +1954,8 @@ public Observable throttleFirst(long skipDuration, TimeUnit unit, Scheduler s * Throttles by returning the last value of each interval defined by 'intervalDuration'. *

        * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. + *

        + * * * @param intervalDuration * Duration of windows within with the last value will be chosen. @@ -1958,6 +1972,8 @@ public Observable throttleLast(long intervalDuration, TimeUnit unit) { * Throttles by returning the last value of each interval defined by 'intervalDuration'. *

        * This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time. + *

        + * * * @param intervalDuration * Duration of windows within with the last value will be chosen.