Skip to content

Commit b7681aa

Browse files
Merge pull request ReactiveX#368 from benjchristensen/throttle-and-debounce
Operators: Throttle and Debounce
2 parents d07b663 + 6443786 commit b7681aa

8 files changed

+840
-7
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 179 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,11 @@
1515
*/
1616
package rx;
1717

18-
import static rx.util.functions.Functions.not;
18+
import static rx.util.functions.Functions.*;
1919

2020
import java.util.ArrayList;
2121
import java.util.Arrays;
2222
import java.util.Collection;
23-
import java.util.Collections;
2423
import java.util.List;
2524
import java.util.concurrent.Future;
2625
import java.util.concurrent.TimeUnit;
@@ -65,6 +64,8 @@
6564
import rx.operators.OperationTakeLast;
6665
import rx.operators.OperationTakeUntil;
6766
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationThrottleFirst;
68+
import rx.operators.OperationDebounce;
6869
import rx.operators.OperationTimestamp;
6970
import rx.operators.OperationToObservableFuture;
7071
import rx.operators.OperationToObservableIterable;
@@ -1810,6 +1811,182 @@ public static Observable<Long> interval(long interval, TimeUnit unit, Scheduler
18101811
return create(OperationInterval.interval(interval, unit, scheduler));
18111812
}
18121813

1814+
/**
1815+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1816+
* <p>
1817+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1818+
* <p>
1819+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1820+
* <p>
1821+
* Information on debounce vs throttle:
1822+
* <p>
1823+
* <ul>
1824+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1825+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1826+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1827+
* </ul>
1828+
*
1829+
* @param timeout
1830+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1831+
* @param unit
1832+
* The {@link TimeUnit} for the timeout.
1833+
*
1834+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1835+
* @see {@link #throttleWithTimeout};
1836+
*/
1837+
public Observable<T> debounce(long timeout, TimeUnit unit) {
1838+
return create(OperationDebounce.debounce(this, timeout, unit));
1839+
}
1840+
1841+
/**
1842+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1843+
* <p>
1844+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1845+
* <p>
1846+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/debounce.png">
1847+
* <p>
1848+
* Information on debounce vs throttle:
1849+
* <p>
1850+
* <ul>
1851+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1852+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1853+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1854+
* </ul>
1855+
*
1856+
* @param timeout
1857+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1858+
* @param unit
1859+
* The unit of time for the specified timeout.
1860+
* @param scheduler
1861+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1862+
* @return Observable which performs the throttle operation.
1863+
* @see {@link #throttleWithTimeout};
1864+
*/
1865+
public Observable<T> debounce(long timeout, TimeUnit unit, Scheduler scheduler) {
1866+
return create(OperationDebounce.debounce(this, timeout, unit));
1867+
}
1868+
1869+
/**
1870+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1871+
* <p>
1872+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1873+
* <p>
1874+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1875+
* <p>
1876+
* Information on debounce vs throttle:
1877+
* <p>
1878+
* <ul>
1879+
* <li>http://drupalmotion.com/article/debounce-and-throttle-visual-explanation</li>
1880+
* <li>http://unscriptable.com/2009/03/20/debouncing-javascript-methods/</li>
1881+
* <li>http://www.illyriad.co.uk/blog/index.php/2011/09/javascript-dont-spam-your-server-debounce-and-throttle/</li>
1882+
* </ul>
1883+
*
1884+
* @param timeout
1885+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1886+
* @param unit
1887+
* The {@link TimeUnit} for the timeout.
1888+
*
1889+
* @return An {@link Observable} which filters out values which are too quickly followed up with newer values.
1890+
* @see {@link #debounce}
1891+
*/
1892+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit) {
1893+
return create(OperationDebounce.debounce(this, timeout, unit));
1894+
}
1895+
1896+
/**
1897+
* Debounces by dropping all values that are followed by newer values before the timeout value expires. The timer resets on each `onNext` call.
1898+
* <p>
1899+
* NOTE: If events keep firing faster than the timeout then no data will be emitted.
1900+
* <p>
1901+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleWithTimeout.png">
1902+
*
1903+
* @param timeout
1904+
* The time each value has to be 'the most recent' of the {@link Observable} to ensure that it's not dropped.
1905+
* @param unit
1906+
* The unit of time for the specified timeout.
1907+
* @param scheduler
1908+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1909+
* @return Observable which performs the throttle operation.
1910+
* @see {@link #debounce}
1911+
*/
1912+
public Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler) {
1913+
return create(OperationDebounce.debounce(this, timeout, unit, scheduler));
1914+
}
1915+
1916+
/**
1917+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1918+
* <p>
1919+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1920+
* <p>
1921+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1922+
*
1923+
* @param skipDuration
1924+
* Time to wait before sending another value after emitting last value.
1925+
* @param unit
1926+
* The unit of time for the specified timeout.
1927+
* @param scheduler
1928+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1929+
* @return Observable which performs the throttle operation.
1930+
*/
1931+
public Observable<T> throttleFirst(long windowDuration, TimeUnit unit) {
1932+
return create(OperationThrottleFirst.throttleFirst(this, windowDuration, unit));
1933+
}
1934+
1935+
/**
1936+
* Throttles by skipping value until `skipDuration` passes and then emits the next received value.
1937+
* <p>
1938+
* This differs from {@link #throttleLast} in that this only tracks passage of time whereas {@link #throttleLast} ticks at scheduled intervals.
1939+
* <p>
1940+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleFirst.png">
1941+
*
1942+
* @param skipDuration
1943+
* Time to wait before sending another value after emitting last value.
1944+
* @param unit
1945+
* The unit of time for the specified timeout.
1946+
* @param scheduler
1947+
* The {@link Scheduler} to use internally to manage the timers which handle timeout for each event.
1948+
* @return Observable which performs the throttle operation.
1949+
*/
1950+
public Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler) {
1951+
return create(OperationThrottleFirst.throttleFirst(this, skipDuration, unit, scheduler));
1952+
}
1953+
1954+
/**
1955+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1956+
* <p>
1957+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1958+
* <p>
1959+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1960+
*
1961+
* @param intervalDuration
1962+
* Duration of windows within with the last value will be chosen.
1963+
* @param unit
1964+
* The unit of time for the specified interval.
1965+
* @return Observable which performs the throttle operation.
1966+
* @see {@link #sample(long, TimeUnit)}
1967+
*/
1968+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit) {
1969+
return sample(intervalDuration, unit);
1970+
}
1971+
1972+
/**
1973+
* Throttles by returning the last value of each interval defined by 'intervalDuration'.
1974+
* <p>
1975+
* This differs from {@link #throttleFirst} in that this ticks along at a scheduled interval whereas {@link #throttleFirst} does not tick, it just tracks passage of time.
1976+
* <p>
1977+
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/throttleLast.png">
1978+
*
1979+
* @param intervalDuration
1980+
* Duration of windows within with the last value will be chosen.
1981+
* @param unit
1982+
* The unit of time for the specified interval.
1983+
* @return Observable which performs the throttle operation.
1984+
* @see {@link #sample(long, TimeUnit, Scheduler)}
1985+
*/
1986+
public Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler) {
1987+
return sample(intervalDuration, unit, scheduler);
1988+
}
1989+
18131990
/**
18141991
* Wraps each item emitted by a source Observable in a {@link Timestamped} object.
18151992
* <p>

rxjava-core/src/main/java/rx/concurrency/TestScheduler.java

Lines changed: 22 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,20 +19,22 @@
1919
import java.util.PriorityQueue;
2020
import java.util.Queue;
2121
import java.util.concurrent.TimeUnit;
22+
import java.util.concurrent.atomic.AtomicBoolean;
2223

2324
import rx.Scheduler;
2425
import rx.Subscription;
25-
import rx.subscriptions.Subscriptions;
2626
import rx.util.functions.Func2;
2727

2828
public class TestScheduler extends Scheduler {
2929
private final Queue<TimedAction<?>> queue = new PriorityQueue<TimedAction<?>>(11, new CompareActionsByTime());
3030

3131
private static class TimedAction<T> {
32+
3233
private final long time;
3334
private final Func2<? super Scheduler, ? super T, ? extends Subscription> action;
3435
private final T state;
3536
private final TestScheduler scheduler;
37+
private final AtomicBoolean isCancelled = new AtomicBoolean(false);
3638

3739
private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler, ? super T, ? extends Subscription> action, T state) {
3840
this.time = time;
@@ -41,6 +43,10 @@ private TimedAction(TestScheduler scheduler, long time, Func2<? super Scheduler,
4143
this.scheduler = scheduler;
4244
}
4345

46+
public void cancel() {
47+
isCancelled.set(true);
48+
}
49+
4450
@Override
4551
public String toString() {
4652
return String.format("TimedAction(time = %d, action = %s)", time, action.toString());
@@ -84,8 +90,12 @@ private void triggerActions(long targetTimeInNanos) {
8490
}
8591
time = current.time;
8692
queue.remove();
87-
// because the queue can have wildcards we have to ignore the type T for the state
88-
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
93+
94+
// Only execute if the TimedAction has not yet been cancelled
95+
if (!current.isCancelled.get()) {
96+
// because the queue can have wildcards we have to ignore the type T for the state
97+
((Func2<Scheduler, Object, Subscription>) current.action).call(current.scheduler, current.state);
98+
}
8999
}
90100
time = targetTimeInNanos;
91101
}
@@ -97,7 +107,14 @@ public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ?
97107

98108
@Override
99109
public <T> Subscription schedule(T state, Func2<? super Scheduler, ? super T, ? extends Subscription> action, long delayTime, TimeUnit unit) {
100-
queue.add(new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state));
101-
return Subscriptions.empty();
110+
final TimedAction<T> timedAction = new TimedAction<T>(this, time + unit.toNanos(delayTime), action, state);
111+
queue.add(timedAction);
112+
113+
return new Subscription() {
114+
@Override
115+
public void unsubscribe() {
116+
timedAction.cancel();
117+
}
118+
};
102119
}
103120
}

0 commit comments

Comments
 (0)