Skip to content

Commit 8eee5ae

Browse files
Merge pull request ReactiveX#463 from zsxwing/timeout-overload
Added the rest overloads of Timeout operator
2 parents a10e0a2 + 367d592 commit 8eee5ae

File tree

3 files changed

+212
-18
lines changed

3 files changed

+212
-18
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 67 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -4543,31 +4543,86 @@ public Observable<T> ignoreElements() {
45434543
}
45444544

45454545
/**
4546-
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4547-
*
4546+
* Applies a timeout policy for each element in the observable sequence,
4547+
* using the specified scheduler to run timeout timers. If the next element
4548+
* isn't received within the specified timeout duration starting from its
4549+
* predecessor, a TimeoutException is propagated to the observer.
4550+
*
45484551
* @param timeout
4549-
* The timeout duration
4552+
* Maximum duration between values before a timeout occurs.
45504553
* @param timeUnit
4551-
* The time unit of the timeout
4554+
* The unit of time which applies to the "timeout" argument.
4555+
*
4556+
* @return The source sequence with a TimeoutException in case of a timeout.
4557+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244283(v=vs.103).aspx">MSDN: Observable.Timeout</a>
4558+
*/
4559+
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
4560+
return create(OperationTimeout.timeout(this, timeout, timeUnit));
4561+
}
4562+
4563+
/**
4564+
* Applies a timeout policy for each element in the observable sequence,
4565+
* using the specified scheduler to run timeout timers. If the next element
4566+
* isn't received within the specified timeout duration starting from its
4567+
* predecessor, the other observable sequence is used to produce future
4568+
* messages from that point on.
4569+
*
4570+
* @param timeout
4571+
* Maximum duration between values before a timeout occurs.
4572+
* @param timeUnit
4573+
* The unit of time which applies to the "timeout" argument.
4574+
* @param other
4575+
* Sequence to return in case of a timeout.
4576+
*
4577+
* @return The source sequence switching to the other sequence in case of a timeout.
4578+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229512(v=vs.103).aspx">MSDN: Observable.Timeout</a>
4579+
*/
4580+
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
4581+
return create(OperationTimeout.timeout(this, timeout, timeUnit, other));
4582+
}
4583+
4584+
/**
4585+
* Applies a timeout policy for each element in the observable sequence,
4586+
* using the specified scheduler to run timeout timers. If the next element
4587+
* isn't received within the specified timeout duration starting from its
4588+
* predecessor, a TimeoutException is propagated to the observer.
4589+
*
4590+
* @param timeout
4591+
* Maximum duration between values before a timeout occurs.
4592+
* @param timeUnit
4593+
* The unit of time which applies to the "timeout" argument.
45524594
* @param scheduler
4553-
* The scheduler to run the timeout timers on.
4595+
* Scheduler to run the timeout timers on.
4596+
*
45544597
* @return The source sequence with a TimeoutException in case of a timeout.
4598+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh228946(v=vs.103).aspx">MSDN: Observable.Timeout</a>
45554599
*/
45564600
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Scheduler scheduler) {
45574601
return create(OperationTimeout.timeout(this, timeout, timeUnit, scheduler));
45584602
}
45594603

45604604
/**
4561-
* Returns either the observable sequence or an TimeoutException if timeout elapses.
4562-
*
4605+
* Applies a timeout policy for each element in the observable sequence,
4606+
* using the specified scheduler to run timeout timers. If the next element
4607+
* isn't received within the specified timeout duration starting from its
4608+
* predecessor, the other observable sequence is used to produce future
4609+
* messages from that point on.
4610+
*
45634611
* @param timeout
4564-
* The timeout duration
4612+
* Maximum duration between values before a timeout occurs.
45654613
* @param timeUnit
4566-
* The time unit of the timeout
4567-
* @return The source sequence with a TimeoutException in case of a timeout.
4614+
* The unit of time which applies to the "timeout" argument.
4615+
* @param other
4616+
* Sequence to return in case of a timeout.
4617+
* @param scheduler
4618+
* Scheduler to run the timeout timers on.
4619+
*
4620+
* @return The source sequence switching to the other sequence in case of a
4621+
* timeout.
4622+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211676(v=vs.103).aspx">MSDN: Observable.Timeout</a>
45684623
*/
4569-
public Observable<T> timeout(long timeout, TimeUnit timeUnit) {
4570-
return create(OperationTimeout.timeout(this, timeout, timeUnit, Schedulers.threadPoolForComputation()));
4624+
public Observable<T> timeout(long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
4625+
return create(OperationTimeout.timeout(this, timeout, timeUnit, other, scheduler));
45714626
}
45724627

45734628
/**

rxjava-core/src/main/java/rx/operators/OperationTimeout.java

Lines changed: 33 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -21,29 +21,53 @@
2121
import java.util.concurrent.atomic.AtomicLong;
2222

2323
import rx.Observable;
24+
import rx.Observable.OnSubscribeFunc;
2425
import rx.Observer;
2526
import rx.Scheduler;
2627
import rx.Subscription;
28+
import rx.concurrency.Schedulers;
2729
import rx.subscriptions.CompositeSubscription;
2830
import rx.subscriptions.SerialSubscription;
2931
import rx.util.functions.Action0;
3032
import rx.util.functions.Func0;
3133

34+
/**
35+
* Applies a timeout policy for each element in the observable sequence, using
36+
* the specified scheduler to run timeout timers. If the next element isn't
37+
* received within the specified timeout duration starting from its predecessor,
38+
* the other observable sequence is used to produce future messages from that
39+
* point on.
40+
*/
3241
public final class OperationTimeout {
33-
public static <T> Observable.OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
34-
return new Timeout<T>(source, timeout, timeUnit, scheduler);
42+
43+
public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit) {
44+
return new Timeout<T>(source, timeout, timeUnit, null, Schedulers.threadPoolForComputation());
45+
}
46+
47+
public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other) {
48+
return new Timeout<T>(sequence, timeout, timeUnit, other, Schedulers.threadPoolForComputation());
49+
}
50+
51+
public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
52+
return new Timeout<T>(source, timeout, timeUnit, null, scheduler);
53+
}
54+
55+
public static <T> OnSubscribeFunc<T> timeout(Observable<? extends T> sequence, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
56+
return new Timeout<T>(sequence, timeout, timeUnit, other, scheduler);
3557
}
3658

3759
private static class Timeout<T> implements Observable.OnSubscribeFunc<T> {
3860
private final Observable<? extends T> source;
3961
private final long timeout;
4062
private final TimeUnit timeUnit;
4163
private final Scheduler scheduler;
64+
private final Observable<? extends T> other;
4265

43-
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Scheduler scheduler) {
66+
private Timeout(Observable<? extends T> source, long timeout, TimeUnit timeUnit, Observable<? extends T> other, Scheduler scheduler) {
4467
this.source = source;
4568
this.timeout = timeout;
4669
this.timeUnit = timeUnit;
70+
this.other = other;
4771
this.scheduler = scheduler;
4872
}
4973

@@ -68,7 +92,12 @@ public void call() {
6892
}
6993
}
7094
if (timeoutWins) {
71-
observer.onError(new TimeoutException());
95+
if (other == null) {
96+
observer.onError(new TimeoutException());
97+
}
98+
else {
99+
serial.setSubscription(other.subscribe(observer));
100+
}
72101
}
73102

74103
}

rxjava-core/src/test/java/rx/TimeoutTests.java

Lines changed: 112 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,19 @@
1515
*/
1616
package rx;
1717

18-
import static org.mockito.Matchers.*;
19-
import static org.mockito.Mockito.*;
18+
import static org.mockito.Matchers.any;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.never;
22+
import static org.mockito.Mockito.times;
23+
import static org.mockito.Mockito.verify;
2024

2125
import java.util.concurrent.TimeUnit;
2226
import java.util.concurrent.TimeoutException;
2327

2428
import org.junit.Before;
2529
import org.junit.Test;
30+
import org.mockito.InOrder;
2631
import org.mockito.MockitoAnnotations;
2732

2833
import rx.concurrency.TestScheduler;
@@ -46,6 +51,7 @@ public void setUp() {
4651

4752
@Test
4853
public void shouldNotTimeoutIfOnNextWithinTimeout() {
54+
@SuppressWarnings("unchecked")
4955
Observer<String> observer = mock(Observer.class);
5056
Subscription subscription = withTimeout.subscribe(observer);
5157
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
@@ -58,6 +64,7 @@ public void shouldNotTimeoutIfOnNextWithinTimeout() {
5864

5965
@Test
6066
public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
67+
@SuppressWarnings("unchecked")
6168
Observer<String> observer = mock(Observer.class);
6269
Subscription subscription = withTimeout.subscribe(observer);
6370
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
@@ -72,6 +79,7 @@ public void shouldNotTimeoutIfSecondOnNextWithinTimeout() {
7279

7380
@Test
7481
public void shouldTimeoutIfOnNextNotWithinTimeout() {
82+
@SuppressWarnings("unchecked")
7583
Observer<String> observer = mock(Observer.class);
7684
Subscription subscription = withTimeout.subscribe(observer);
7785
testScheduler.advanceTimeBy(TIMEOUT + 1, TimeUnit.SECONDS);
@@ -81,6 +89,7 @@ public void shouldTimeoutIfOnNextNotWithinTimeout() {
8189

8290
@Test
8391
public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
92+
@SuppressWarnings("unchecked")
8493
Observer<String> observer = mock(Observer.class);
8594
Subscription subscription = withTimeout.subscribe(observer);
8695
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
@@ -93,6 +102,7 @@ public void shouldTimeoutIfSecondOnNextNotWithinTimeout() {
93102

94103
@Test
95104
public void shouldCompleteIfUnderlyingComletes() {
105+
@SuppressWarnings("unchecked")
96106
Observer<String> observer = mock(Observer.class);
97107
Subscription subscription = withTimeout.subscribe(observer);
98108
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
@@ -105,6 +115,7 @@ public void shouldCompleteIfUnderlyingComletes() {
105115

106116
@Test
107117
public void shouldErrorIfUnderlyingErrors() {
118+
@SuppressWarnings("unchecked")
108119
Observer<String> observer = mock(Observer.class);
109120
Subscription subscription = withTimeout.subscribe(observer);
110121
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
@@ -113,4 +124,103 @@ public void shouldErrorIfUnderlyingErrors() {
113124
verify(observer).onError(any(UnsupportedOperationException.class));
114125
subscription.unsubscribe();
115126
}
127+
128+
@Test
129+
public void shouldSwitchToOtherIfOnNextNotWithinTimeout() {
130+
Observable<String> other = Observable.from("a", "b", "c");
131+
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);
132+
133+
@SuppressWarnings("unchecked")
134+
Observer<String> observer = mock(Observer.class);
135+
Subscription subscription = source.subscribe(observer);
136+
137+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
138+
underlyingSubject.onNext("One");
139+
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
140+
underlyingSubject.onNext("Two");
141+
InOrder inOrder = inOrder(observer);
142+
inOrder.verify(observer, times(1)).onNext("One");
143+
inOrder.verify(observer, times(1)).onNext("a");
144+
inOrder.verify(observer, times(1)).onNext("b");
145+
inOrder.verify(observer, times(1)).onNext("c");
146+
inOrder.verify(observer, times(1)).onCompleted();
147+
inOrder.verifyNoMoreInteractions();
148+
subscription.unsubscribe();
149+
}
150+
151+
@Test
152+
public void shouldSwitchToOtherIfOnErrorNotWithinTimeout() {
153+
Observable<String> other = Observable.from("a", "b", "c");
154+
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);
155+
156+
@SuppressWarnings("unchecked")
157+
Observer<String> observer = mock(Observer.class);
158+
Subscription subscription = source.subscribe(observer);
159+
160+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
161+
underlyingSubject.onNext("One");
162+
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
163+
underlyingSubject.onError(new UnsupportedOperationException());
164+
InOrder inOrder = inOrder(observer);
165+
inOrder.verify(observer, times(1)).onNext("One");
166+
inOrder.verify(observer, times(1)).onNext("a");
167+
inOrder.verify(observer, times(1)).onNext("b");
168+
inOrder.verify(observer, times(1)).onNext("c");
169+
inOrder.verify(observer, times(1)).onCompleted();
170+
inOrder.verifyNoMoreInteractions();
171+
subscription.unsubscribe();
172+
}
173+
174+
@Test
175+
public void shouldSwitchToOtherIfOnCompletedNotWithinTimeout() {
176+
Observable<String> other = Observable.from("a", "b", "c");
177+
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);
178+
179+
@SuppressWarnings("unchecked")
180+
Observer<String> observer = mock(Observer.class);
181+
Subscription subscription = source.subscribe(observer);
182+
183+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
184+
underlyingSubject.onNext("One");
185+
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
186+
underlyingSubject.onCompleted();
187+
InOrder inOrder = inOrder(observer);
188+
inOrder.verify(observer, times(1)).onNext("One");
189+
inOrder.verify(observer, times(1)).onNext("a");
190+
inOrder.verify(observer, times(1)).onNext("b");
191+
inOrder.verify(observer, times(1)).onNext("c");
192+
inOrder.verify(observer, times(1)).onCompleted();
193+
inOrder.verifyNoMoreInteractions();
194+
subscription.unsubscribe();
195+
}
196+
197+
@Test
198+
public void shouldSwitchToOtherAndCanBeUnsubscribedIfOnNextNotWithinTimeout() {
199+
PublishSubject<String> other = PublishSubject.create();
200+
Observable<String> source = underlyingSubject.timeout(TIMEOUT, TIME_UNIT, other, testScheduler);
201+
202+
@SuppressWarnings("unchecked")
203+
Observer<String> observer = mock(Observer.class);
204+
Subscription subscription = source.subscribe(observer);
205+
206+
testScheduler.advanceTimeBy(2, TimeUnit.SECONDS);
207+
underlyingSubject.onNext("One");
208+
testScheduler.advanceTimeBy(4, TimeUnit.SECONDS);
209+
underlyingSubject.onNext("Two");
210+
211+
other.onNext("a");
212+
other.onNext("b");
213+
subscription.unsubscribe();
214+
215+
// The following messages should not be delivered.
216+
other.onNext("c");
217+
other.onNext("d");
218+
other.onCompleted();
219+
220+
InOrder inOrder = inOrder(observer);
221+
inOrder.verify(observer, times(1)).onNext("One");
222+
inOrder.verify(observer, times(1)).onNext("a");
223+
inOrder.verify(observer, times(1)).onNext("b");
224+
inOrder.verifyNoMoreInteractions();
225+
}
116226
}

0 commit comments

Comments
 (0)