Skip to content

Commit eb29595

Browse files
committed
Operators Skip, SkipLast, Take with time
1 parent c9be48c commit eb29595

File tree

8 files changed

+686
-6
lines changed

8 files changed

+686
-6
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -5246,6 +5246,29 @@ public Observable<T> skip(int num) {
52465246
return create(OperationSkip.skip(this, num));
52475247
}
52485248

5249+
/**
5250+
* Create an Observable that skips values before the given time ellapses.
5251+
* @param time the length of the time window
5252+
* @param unit the time unit
5253+
* @return an Observable that skips values before the given time ellapses
5254+
*/
5255+
public Observable<T> skip(long time, TimeUnit unit) {
5256+
return skip(time, unit, Schedulers.threadPoolForComputation());
5257+
}
5258+
5259+
/**
5260+
* Create an Observable that skips values before the given time
5261+
* elapses while waiting on the given scheduler.
5262+
* @param time the length of the time window
5263+
* @param unit the time unit
5264+
* @param scheduler the scheduler where the timed wait happens
5265+
* @return an Observable that skips values before the given time
5266+
* elapses while waiting on the given scheduler
5267+
*/
5268+
public Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
5269+
return create(new OperationSkip.SkipTimed<T>(this, time, unit, scheduler));
5270+
}
5271+
52495272
/**
52505273
* If the Observable completes after emitting a single item, return an
52515274
* Observable containing that item. If it emits more than one item or no
@@ -5426,6 +5449,31 @@ public Observable<T> take(final int num) {
54265449
return create(OperationTake.take(this, num));
54275450
}
54285451

5452+
/**
5453+
* Create an Observable that takes the emitted values of the source
5454+
* Observable before the time runs out.
5455+
* @param time the length of the time window
5456+
* @param unit the time unit
5457+
* @return an Observable that takes the emitted values of the source
5458+
* Observable before the time runs out.
5459+
*/
5460+
public Observable<T> take(long time, TimeUnit unit) {
5461+
return take(time, unit, Schedulers.threadPoolForComputation());
5462+
}
5463+
5464+
/**
5465+
* Create an Observable that takes the emitted values of the source
5466+
* Observable before the time runs out, waiting on the given scheduler.
5467+
* @param time the length of the time window
5468+
* @param unit the time unit
5469+
* @param scheduler the scheduler used for time source
5470+
* @return an Observable that takes the emitted values of the source
5471+
* Observable before the time runs out, waiting on the given scheduler.
5472+
*/
5473+
public Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
5474+
return create(new OperationTake.TakeTimed<T>(this, time, unit, scheduler));
5475+
}
5476+
54295477
/**
54305478
* Returns an Observable that emits items emitted by the source Observable
54315479
* so long as a specified condition is true.
@@ -5734,6 +5782,31 @@ public Observable<T> skipLast(int count) {
57345782
return create(OperationSkipLast.skipLast(this, count));
57355783
}
57365784

5785+
/**
5786+
* Create an observable which skips values emitted in a time window
5787+
* before the source completes.
5788+
* @param time the length of the time window
5789+
* @param unit the time unit
5790+
* @return an observable which skips values emitted in a time window
5791+
* before the source completes
5792+
*/
5793+
public Observable<T> skipLast(long time, TimeUnit unit) {
5794+
return skipLast(time, unit, Schedulers.threadPoolForComputation());
5795+
}
5796+
5797+
/**
5798+
* Create an observable which skips values emitted in a time window
5799+
* before the source completes by using the given scheduler as time source.
5800+
* @param time the length of the time window
5801+
* @param unit the time unit
5802+
* @param scheduler the scheduler used for time source
5803+
* @return an observable which skips values emitted in a time window
5804+
* before the source completes by using the given scheduler as time source
5805+
*/
5806+
public Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler) {
5807+
return create(new OperationSkipLast.SkipLastTimed<T>(this, time, unit, scheduler));
5808+
}
5809+
57375810
/**
57385811
* Returns an Observable that emits a single item, a list composed of all
57395812
* the items emitted by the source Observable.

rxjava-core/src/main/java/rx/operators/OperationSkip.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1820
import java.util.concurrent.atomic.AtomicInteger;
1921

2022
import rx.Observable;
2123
import rx.Observable.OnSubscribeFunc;
2224
import rx.Observer;
25+
import rx.Scheduler;
2326
import rx.Subscription;
27+
import rx.subscriptions.CompositeSubscription;
28+
import rx.util.functions.Action0;
2429

2530
/**
2631
* Returns an Observable that skips the first <code>num</code> items emitted by the source
@@ -107,4 +112,86 @@ public void onNext(T args) {
107112
}
108113

109114
}
115+
116+
/**
117+
* Skip the items after subscription for the given duration.
118+
* @param <T> the value type
119+
*/
120+
public static final class SkipTimed<T> implements OnSubscribeFunc<T> {
121+
final Observable<? extends T> source;
122+
final long time;
123+
final TimeUnit unit;
124+
final Scheduler scheduler;
125+
126+
public SkipTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
127+
this.source = source;
128+
this.time = time;
129+
this.unit = unit;
130+
this.scheduler = scheduler;
131+
}
132+
133+
@Override
134+
public Subscription onSubscribe(Observer<? super T> t1) {
135+
136+
SafeObservableSubscription timer = new SafeObservableSubscription();
137+
SafeObservableSubscription data = new SafeObservableSubscription();
138+
139+
CompositeSubscription csub = new CompositeSubscription(timer, data);
140+
141+
SourceObserver<T> so = new SourceObserver<T>(t1, csub);
142+
data.wrap(source.subscribe(so));
143+
if (!data.isUnsubscribed()) {
144+
timer.wrap(scheduler.schedule(so, time, unit));
145+
}
146+
147+
return csub;
148+
}
149+
/**
150+
* Observes the source and relays its values once gate turns into true.
151+
* @param <T> the observed value type
152+
*/
153+
private static final class SourceObserver<T> implements Observer<T>, Action0 {
154+
final AtomicBoolean gate;
155+
final Observer<? super T> observer;
156+
final Subscription cancel;
157+
158+
public SourceObserver(Observer<? super T> observer,
159+
Subscription cancel) {
160+
this.gate = new AtomicBoolean();
161+
this.observer = observer;
162+
this.cancel = cancel;
163+
}
164+
165+
@Override
166+
public void onNext(T args) {
167+
if (gate.get()) {
168+
observer.onNext(args);
169+
}
170+
}
171+
172+
@Override
173+
public void onError(Throwable e) {
174+
try {
175+
observer.onError(e);
176+
} finally {
177+
cancel.unsubscribe();
178+
}
179+
}
180+
181+
@Override
182+
public void onCompleted() {
183+
try {
184+
observer.onCompleted();
185+
} finally {
186+
cancel.unsubscribe();
187+
}
188+
}
189+
190+
@Override
191+
public void call() {
192+
gate.set(true);
193+
}
194+
195+
}
196+
}
110197
}

rxjava-core/src/main/java/rx/operators/OperationSkipLast.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collections;
1820
import java.util.Deque;
1921
import java.util.LinkedList;
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
2024
import java.util.concurrent.locks.ReentrantLock;
2125

2226
import rx.Observable;
2327
import rx.Observable.OnSubscribeFunc;
2428
import rx.Observer;
29+
import rx.Scheduler;
2530
import rx.Subscription;
31+
import rx.util.Timestamped;
2632

2733
/**
2834
* Bypasses a specified number of elements at the end of an observable sequence.
@@ -123,4 +129,73 @@ public void onNext(T value) {
123129
}));
124130
}
125131
}
132+
133+
/**
134+
* Skip delivering values in the time window before the values.
135+
* @param <T> the result value type
136+
*/
137+
public static final class SkipLastTimed<T> implements OnSubscribeFunc<T> {
138+
final Observable<? extends T> source;
139+
final long timeInMillis;
140+
final Scheduler scheduler;
141+
142+
public SkipLastTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
143+
this.source = source;
144+
this.timeInMillis = unit.toMillis(time);
145+
this.scheduler = scheduler;
146+
}
147+
148+
@Override
149+
public Subscription onSubscribe(Observer<? super T> t1) {
150+
return source.subscribe(new SourceObserver<T>(t1, timeInMillis, scheduler));
151+
}
152+
/** Observes the source. */
153+
private static final class SourceObserver<T> implements Observer<T> {
154+
final Observer<? super T> observer;
155+
final long timeInMillis;
156+
final Scheduler scheduler;
157+
List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();
158+
159+
public SourceObserver(Observer<? super T> observer,
160+
long timeInMillis, Scheduler scheduler) {
161+
this.observer = observer;
162+
this.timeInMillis = timeInMillis;
163+
this.scheduler = scheduler;
164+
}
165+
166+
@Override
167+
public void onNext(T args) {
168+
buffer.add(new Timestamped<T>(scheduler.now(), args));
169+
}
170+
171+
@Override
172+
public void onError(Throwable e) {
173+
buffer = Collections.emptyList();
174+
observer.onError(e);
175+
}
176+
177+
@Override
178+
public void onCompleted() {
179+
long limit = scheduler.now() - timeInMillis;
180+
try {
181+
for (Timestamped<T> v : buffer) {
182+
if (v.getTimestampMillis() < limit) {
183+
try {
184+
observer.onNext(v.getValue());
185+
} catch (Throwable t) {
186+
observer.onError(t);
187+
return;
188+
}
189+
} else {
190+
observer.onCompleted();
191+
break;
192+
}
193+
}
194+
} finally {
195+
buffer = Collections.emptyList();
196+
}
197+
}
198+
199+
}
200+
}
126201
}

0 commit comments

Comments
 (0)