Skip to content

Commit 2e372c2

Browse files
author
Joachim Hofer
committed
wrapped subscription so that interval works for multiple subscribers and added a test for staggered subscriptions with publish/connect, too
1 parent 2d810be commit 2e372c2

File tree

1 file changed

+52
-5
lines changed

1 file changed

+52
-5
lines changed

rxjava-core/src/main/java/rx/operators/OperationInterval.java

Lines changed: 52 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.Subscription;
3333
import rx.concurrency.Schedulers;
3434
import rx.concurrency.TestScheduler;
35+
import rx.observables.ConnectableObservable;
3536
import rx.subscriptions.Subscriptions;
3637
import rx.util.functions.Action0;
3738

@@ -45,14 +46,20 @@ public final class OperationInterval {
4546
* Creates an event each time interval.
4647
*/
4748
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
48-
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
49+
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
4950
}
5051

5152
/**
5253
* Creates an event each time interval.
5354
*/
54-
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
55-
return new Interval(interval, unit, scheduler);
55+
public static OnSubscribeFunc<Long> interval(final long interval, final TimeUnit unit, final Scheduler scheduler) {
56+
// wrapped in order to work with multiple subscribers
57+
return new OnSubscribeFunc<Long>() {
58+
@Override
59+
public Subscription onSubscribe(Observer<? super Long> observer) {
60+
return new Interval(interval, unit, scheduler).onSubscribe(observer);
61+
}
62+
};
5663
}
5764

5865
private static class Interval implements OnSubscribeFunc<Long> {
@@ -188,7 +195,6 @@ public void testWithMultipleStaggeredSubscribers() {
188195

189196
inOrder1.verify(observer, times(1)).onNext(2L);
190197
inOrder1.verify(observer, times(1)).onNext(3L);
191-
inOrder1.verify(observer, never()).onNext(4L);
192198

193199
InOrder inOrder2 = inOrder(observer2);
194200
inOrder2.verify(observer2, times(1)).onNext(0L);
@@ -201,9 +207,50 @@ public void testWithMultipleStaggeredSubscribers() {
201207
inOrder1.verify(observer, times(1)).onCompleted();
202208
verify(observer, never()).onError(any(Throwable.class));
203209

204-
inOrder2.verify(observer2, never()).onNext(2L);
210+
inOrder2.verify(observer2, never()).onNext(anyLong());
205211
inOrder2.verify(observer2, times(1)).onCompleted();
206212
verify(observer2, never()).onError(any(Throwable.class));
207213
}
214+
215+
@Test
216+
public void testWithMultipleStaggeredSubscribersAndPublish() {
217+
ConnectableObservable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish();
218+
Subscription sub1 = w.subscribe(observer);
219+
w.connect();
220+
221+
verify(observer, never()).onNext(anyLong());
222+
223+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
224+
Subscription sub2 = w.subscribe(observer2);
225+
226+
InOrder inOrder1 = inOrder(observer);
227+
inOrder1.verify(observer, times(1)).onNext(0L);
228+
inOrder1.verify(observer, times(1)).onNext(1L);
229+
inOrder1.verify(observer, never()).onNext(2L);
230+
231+
verify(observer, never()).onCompleted();
232+
verify(observer, never()).onError(any(Throwable.class));
233+
verify(observer2, never()).onNext(anyLong());
234+
235+
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);
236+
237+
inOrder1.verify(observer, times(1)).onNext(2L);
238+
inOrder1.verify(observer, times(1)).onNext(3L);
239+
240+
InOrder inOrder2 = inOrder(observer2);
241+
inOrder2.verify(observer2, times(1)).onNext(2L);
242+
inOrder2.verify(observer2, times(1)).onNext(3L);
243+
244+
sub1.unsubscribe();
245+
sub2.unsubscribe();
246+
247+
inOrder1.verify(observer, never()).onNext(anyLong());
248+
inOrder1.verify(observer, never()).onCompleted();
249+
verify(observer, never()).onError(any(Throwable.class));
250+
251+
inOrder2.verify(observer2, never()).onNext(anyLong());
252+
inOrder2.verify(observer2, never()).onCompleted();
253+
verify(observer2, never()).onError(any(Throwable.class));
254+
}
208255
}
209256
}

0 commit comments

Comments
 (0)