Skip to content

Commit d205f99

Browse files
Manual Merge github.com:akarnokd/RxJava SkipTakeTimed2
Conflicts: rxjava-core/src/main/java/rx/Observable.java rxjava-core/src/main/java/rx/operators/OperationTakeLast.java
2 parents c497423 + eb29595 commit d205f99

File tree

8 files changed

+700
-13
lines changed

8 files changed

+700
-13
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -5381,16 +5381,45 @@ public Observable<T> skip(int num) {
53815381
}
53825382

53835383
/**
5384-
* If the source Observable completes after emitting a single item, return
5385-
* an Observable that emits that item. If the source Observable emits more
5386-
* than one item or no items, throw an IllegalArgumentException.
5387-
* <p>
5388-
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/single.png">
5384+
* Create an Observable that skips values before the given time ellapses.
5385+
*
5386+
* @param time
5387+
* the length of the time window
5388+
* @param unit
5389+
* the time unit
5390+
* @return an Observable that skips values before the given time ellapses
5391+
*/
5392+
public Observable<T> skip(long time, TimeUnit unit) {
5393+
return skip(time, unit, Schedulers.threadPoolForComputation());
5394+
}
5395+
5396+
/**
5397+
* Create an Observable that skips values before the given time
5398+
* elapses while waiting on the given scheduler.
5399+
*
5400+
* @param time
5401+
* the length of the time window
5402+
* @param unit
5403+
* the time unit
5404+
* @param scheduler
5405+
* the scheduler where the timed wait happens
5406+
* @return an Observable that skips values before the given time
5407+
* elapses while waiting on the given scheduler
5408+
*/
5409+
public Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler) {
5410+
return create(new OperationSkip.SkipTimed<T>(this, time, unit, scheduler));
5411+
}
5412+
5413+
/**
5414+
* If the Observable completes after emitting a single item, return an
5415+
* Observable containing that item. If it emits more than one item or no
5416+
* item, throw an IllegalArgumentException.
53895417
*
53905418
* @return an Observable that emits the single item emitted by the source
53915419
* Observable that matches the predicate
5392-
* @throws IllegalArgumentException if the source emits more than one item
5393-
* or no items
5420+
* @throws IllegalArgumentException
5421+
* if the source emits more than one item
5422+
* or no items
53945423
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#single-and-singleordefault">RxJava Wiki: single()</a>
53955424
* @see MSDN: <code>Observable.singleAsync()</code>
53965425
*/
@@ -5575,6 +5604,31 @@ public Observable<T> take(final int num) {
55755604
return create(OperationTake.take(this, num));
55765605
}
55775606

5607+
/**
5608+
* Create an Observable that takes the emitted values of the source
5609+
* Observable before the time runs out.
5610+
* @param time the length of the time window
5611+
* @param unit the time unit
5612+
* @return an Observable that takes the emitted values of the source
5613+
* Observable before the time runs out.
5614+
*/
5615+
public Observable<T> take(long time, TimeUnit unit) {
5616+
return take(time, unit, Schedulers.threadPoolForComputation());
5617+
}
5618+
5619+
/**
5620+
* Create an Observable that takes the emitted values of the source
5621+
* Observable before the time runs out, waiting on the given scheduler.
5622+
* @param time the length of the time window
5623+
* @param unit the time unit
5624+
* @param scheduler the scheduler used for time source
5625+
* @return an Observable that takes the emitted values of the source
5626+
* Observable before the time runs out, waiting on the given scheduler.
5627+
*/
5628+
public Observable<T> take(long time, TimeUnit unit, Scheduler scheduler) {
5629+
return create(new OperationTake.TakeTimed<T>(this, time, unit, scheduler));
5630+
}
5631+
55785632
/**
55795633
* Returns an Observable that emits items emitted by the source Observable
55805634
* so long as a specified condition is true.
@@ -5917,6 +5971,31 @@ public Observable<T> skipLast(int count) {
59175971
return create(OperationSkipLast.skipLast(this, count));
59185972
}
59195973

5974+
/**
5975+
* Create an observable which skips values emitted in a time window
5976+
* before the source completes.
5977+
* @param time the length of the time window
5978+
* @param unit the time unit
5979+
* @return an observable which skips values emitted in a time window
5980+
* before the source completes
5981+
*/
5982+
public Observable<T> skipLast(long time, TimeUnit unit) {
5983+
return skipLast(time, unit, Schedulers.threadPoolForComputation());
5984+
}
5985+
5986+
/**
5987+
* Create an observable which skips values emitted in a time window
5988+
* before the source completes by using the given scheduler as time source.
5989+
* @param time the length of the time window
5990+
* @param unit the time unit
5991+
* @param scheduler the scheduler used for time source
5992+
* @return an observable which skips values emitted in a time window
5993+
* before the source completes by using the given scheduler as time source
5994+
*/
5995+
public Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler) {
5996+
return create(new OperationSkipLast.SkipLastTimed<T>(this, time, unit, scheduler));
5997+
}
5998+
59205999
/**
59216000
* Returns an Observable that emits a single item, a list composed of all
59226001
* the items emitted by the source Observable.

rxjava-core/src/main/java/rx/operators/OperationSkip.java

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,12 +15,17 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.concurrent.TimeUnit;
19+
import java.util.concurrent.atomic.AtomicBoolean;
1820
import java.util.concurrent.atomic.AtomicInteger;
1921

2022
import rx.Observable;
2123
import rx.Observable.OnSubscribeFunc;
2224
import rx.Observer;
25+
import rx.Scheduler;
2326
import rx.Subscription;
27+
import rx.subscriptions.CompositeSubscription;
28+
import rx.util.functions.Action0;
2429

2530
/**
2631
* Returns an Observable that skips the first <code>num</code> items emitted by the source
@@ -107,4 +112,86 @@ public void onNext(T args) {
107112
}
108113

109114
}
115+
116+
/**
117+
* Skip the items after subscription for the given duration.
118+
* @param <T> the value type
119+
*/
120+
public static final class SkipTimed<T> implements OnSubscribeFunc<T> {
121+
final Observable<? extends T> source;
122+
final long time;
123+
final TimeUnit unit;
124+
final Scheduler scheduler;
125+
126+
public SkipTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
127+
this.source = source;
128+
this.time = time;
129+
this.unit = unit;
130+
this.scheduler = scheduler;
131+
}
132+
133+
@Override
134+
public Subscription onSubscribe(Observer<? super T> t1) {
135+
136+
SafeObservableSubscription timer = new SafeObservableSubscription();
137+
SafeObservableSubscription data = new SafeObservableSubscription();
138+
139+
CompositeSubscription csub = new CompositeSubscription(timer, data);
140+
141+
SourceObserver<T> so = new SourceObserver<T>(t1, csub);
142+
data.wrap(source.subscribe(so));
143+
if (!data.isUnsubscribed()) {
144+
timer.wrap(scheduler.schedule(so, time, unit));
145+
}
146+
147+
return csub;
148+
}
149+
/**
150+
* Observes the source and relays its values once gate turns into true.
151+
* @param <T> the observed value type
152+
*/
153+
private static final class SourceObserver<T> implements Observer<T>, Action0 {
154+
final AtomicBoolean gate;
155+
final Observer<? super T> observer;
156+
final Subscription cancel;
157+
158+
public SourceObserver(Observer<? super T> observer,
159+
Subscription cancel) {
160+
this.gate = new AtomicBoolean();
161+
this.observer = observer;
162+
this.cancel = cancel;
163+
}
164+
165+
@Override
166+
public void onNext(T args) {
167+
if (gate.get()) {
168+
observer.onNext(args);
169+
}
170+
}
171+
172+
@Override
173+
public void onError(Throwable e) {
174+
try {
175+
observer.onError(e);
176+
} finally {
177+
cancel.unsubscribe();
178+
}
179+
}
180+
181+
@Override
182+
public void onCompleted() {
183+
try {
184+
observer.onCompleted();
185+
} finally {
186+
cancel.unsubscribe();
187+
}
188+
}
189+
190+
@Override
191+
public void call() {
192+
gate.set(true);
193+
}
194+
195+
}
196+
}
110197
}

rxjava-core/src/main/java/rx/operators/OperationSkipLast.java

Lines changed: 75 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,20 @@
1515
*/
1616
package rx.operators;
1717

18+
import java.util.ArrayList;
19+
import java.util.Collections;
1820
import java.util.Deque;
1921
import java.util.LinkedList;
22+
import java.util.List;
23+
import java.util.concurrent.TimeUnit;
2024
import java.util.concurrent.locks.ReentrantLock;
2125

2226
import rx.Observable;
2327
import rx.Observable.OnSubscribeFunc;
2428
import rx.Observer;
29+
import rx.Scheduler;
2530
import rx.Subscription;
31+
import rx.util.Timestamped;
2632

2733
/**
2834
* Bypasses a specified number of elements at the end of an observable sequence.
@@ -123,4 +129,73 @@ public void onNext(T value) {
123129
}));
124130
}
125131
}
132+
133+
/**
134+
* Skip delivering values in the time window before the values.
135+
* @param <T> the result value type
136+
*/
137+
public static final class SkipLastTimed<T> implements OnSubscribeFunc<T> {
138+
final Observable<? extends T> source;
139+
final long timeInMillis;
140+
final Scheduler scheduler;
141+
142+
public SkipLastTimed(Observable<? extends T> source, long time, TimeUnit unit, Scheduler scheduler) {
143+
this.source = source;
144+
this.timeInMillis = unit.toMillis(time);
145+
this.scheduler = scheduler;
146+
}
147+
148+
@Override
149+
public Subscription onSubscribe(Observer<? super T> t1) {
150+
return source.subscribe(new SourceObserver<T>(t1, timeInMillis, scheduler));
151+
}
152+
/** Observes the source. */
153+
private static final class SourceObserver<T> implements Observer<T> {
154+
final Observer<? super T> observer;
155+
final long timeInMillis;
156+
final Scheduler scheduler;
157+
List<Timestamped<T>> buffer = new ArrayList<Timestamped<T>>();
158+
159+
public SourceObserver(Observer<? super T> observer,
160+
long timeInMillis, Scheduler scheduler) {
161+
this.observer = observer;
162+
this.timeInMillis = timeInMillis;
163+
this.scheduler = scheduler;
164+
}
165+
166+
@Override
167+
public void onNext(T args) {
168+
buffer.add(new Timestamped<T>(scheduler.now(), args));
169+
}
170+
171+
@Override
172+
public void onError(Throwable e) {
173+
buffer = Collections.emptyList();
174+
observer.onError(e);
175+
}
176+
177+
@Override
178+
public void onCompleted() {
179+
long limit = scheduler.now() - timeInMillis;
180+
try {
181+
for (Timestamped<T> v : buffer) {
182+
if (v.getTimestampMillis() < limit) {
183+
try {
184+
observer.onNext(v.getValue());
185+
} catch (Throwable t) {
186+
observer.onError(t);
187+
return;
188+
}
189+
} else {
190+
observer.onCompleted();
191+
break;
192+
}
193+
}
194+
} finally {
195+
buffer = Collections.emptyList();
196+
}
197+
}
198+
199+
}
200+
}
126201
}

0 commit comments

Comments
 (0)