Skip to content

Commit f2d62f4

Browse files
Merge pull request #379 from jmhofer/interval-multiple-subscribers
Make `interval` work with multiple subscribers
2 parents 19541d0 + 2e372c2 commit f2d62f4

File tree

1 file changed

+132
-3
lines changed

1 file changed

+132
-3
lines changed

rxjava-core/src/main/java/rx/operators/OperationInterval.java

+132-3
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,7 @@
3232
import rx.Subscription;
3333
import rx.concurrency.Schedulers;
3434
import rx.concurrency.TestScheduler;
35+
import rx.observables.ConnectableObservable;
3536
import rx.subscriptions.Subscriptions;
3637
import rx.util.functions.Action0;
3738

@@ -45,14 +46,20 @@ public final class OperationInterval {
4546
* Creates an event each time interval.
4647
*/
4748
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit) {
48-
return new Interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
49+
return interval(interval, unit, Schedulers.executor(Executors.newSingleThreadScheduledExecutor()));
4950
}
5051

5152
/**
5253
* Creates an event each time interval.
5354
*/
54-
public static OnSubscribeFunc<Long> interval(long interval, TimeUnit unit, Scheduler scheduler) {
55-
return new Interval(interval, unit, scheduler);
55+
public static OnSubscribeFunc<Long> interval(final long interval, final TimeUnit unit, final Scheduler scheduler) {
56+
// wrapped in order to work with multiple subscribers
57+
return new OnSubscribeFunc<Long>() {
58+
@Override
59+
public Subscription onSubscribe(Observer<? super Long> observer) {
60+
return new Interval(interval, unit, scheduler).onSubscribe(observer);
61+
}
62+
};
5663
}
5764

5865
private static class Interval implements OnSubscribeFunc<Long> {
@@ -91,12 +98,14 @@ public void call() {
9198
public static class UnitTest {
9299
private TestScheduler scheduler;
93100
private Observer<Long> observer;
101+
private Observer<Long> observer2;
94102

95103
@Before
96104
@SuppressWarnings("unchecked") // due to mocking
97105
public void before() {
98106
scheduler = new TestScheduler();
99107
observer = mock(Observer.class);
108+
observer2 = mock(Observer.class);
100109
}
101110

102111
@Test
@@ -123,5 +132,125 @@ public void testInterval() {
123132
verify(observer, times(1)).onCompleted();
124133
verify(observer, never()).onError(any(Throwable.class));
125134
}
135+
136+
@Test
137+
public void testWithMultipleSubscribersStartingAtSameTime() {
138+
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
139+
Subscription sub1 = w.subscribe(observer);
140+
Subscription sub2 = w.subscribe(observer2);
141+
142+
verify(observer, never()).onNext(anyLong());
143+
verify(observer2, never()).onNext(anyLong());
144+
145+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
146+
147+
InOrder inOrder1 = inOrder(observer);
148+
InOrder inOrder2 = inOrder(observer2);
149+
150+
inOrder1.verify(observer, times(1)).onNext(0L);
151+
inOrder1.verify(observer, times(1)).onNext(1L);
152+
inOrder1.verify(observer, never()).onNext(2L);
153+
verify(observer, never()).onCompleted();
154+
verify(observer, never()).onError(any(Throwable.class));
155+
156+
inOrder2.verify(observer2, times(1)).onNext(0L);
157+
inOrder2.verify(observer2, times(1)).onNext(1L);
158+
inOrder2.verify(observer2, never()).onNext(2L);
159+
verify(observer2, never()).onCompleted();
160+
verify(observer2, never()).onError(any(Throwable.class));
161+
162+
sub1.unsubscribe();
163+
sub2.unsubscribe();
164+
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);
165+
166+
verify(observer, never()).onNext(2L);
167+
verify(observer, times(1)).onCompleted();
168+
verify(observer, never()).onError(any(Throwable.class));
169+
170+
verify(observer2, never()).onNext(2L);
171+
verify(observer2, times(1)).onCompleted();
172+
verify(observer2, never()).onError(any(Throwable.class));
173+
}
174+
175+
@Test
176+
public void testWithMultipleStaggeredSubscribers() {
177+
Observable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler));
178+
Subscription sub1 = w.subscribe(observer);
179+
180+
verify(observer, never()).onNext(anyLong());
181+
182+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
183+
Subscription sub2 = w.subscribe(observer2);
184+
185+
InOrder inOrder1 = inOrder(observer);
186+
inOrder1.verify(observer, times(1)).onNext(0L);
187+
inOrder1.verify(observer, times(1)).onNext(1L);
188+
inOrder1.verify(observer, never()).onNext(2L);
189+
190+
verify(observer, never()).onCompleted();
191+
verify(observer, never()).onError(any(Throwable.class));
192+
verify(observer2, never()).onNext(anyLong());
193+
194+
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);
195+
196+
inOrder1.verify(observer, times(1)).onNext(2L);
197+
inOrder1.verify(observer, times(1)).onNext(3L);
198+
199+
InOrder inOrder2 = inOrder(observer2);
200+
inOrder2.verify(observer2, times(1)).onNext(0L);
201+
inOrder2.verify(observer2, times(1)).onNext(1L);
202+
203+
sub1.unsubscribe();
204+
sub2.unsubscribe();
205+
206+
inOrder1.verify(observer, never()).onNext(anyLong());
207+
inOrder1.verify(observer, times(1)).onCompleted();
208+
verify(observer, never()).onError(any(Throwable.class));
209+
210+
inOrder2.verify(observer2, never()).onNext(anyLong());
211+
inOrder2.verify(observer2, times(1)).onCompleted();
212+
verify(observer2, never()).onError(any(Throwable.class));
213+
}
214+
215+
@Test
216+
public void testWithMultipleStaggeredSubscribersAndPublish() {
217+
ConnectableObservable<Long> w = Observable.create(OperationInterval.interval(1, TimeUnit.SECONDS, scheduler)).publish();
218+
Subscription sub1 = w.subscribe(observer);
219+
w.connect();
220+
221+
verify(observer, never()).onNext(anyLong());
222+
223+
scheduler.advanceTimeTo(2, TimeUnit.SECONDS);
224+
Subscription sub2 = w.subscribe(observer2);
225+
226+
InOrder inOrder1 = inOrder(observer);
227+
inOrder1.verify(observer, times(1)).onNext(0L);
228+
inOrder1.verify(observer, times(1)).onNext(1L);
229+
inOrder1.verify(observer, never()).onNext(2L);
230+
231+
verify(observer, never()).onCompleted();
232+
verify(observer, never()).onError(any(Throwable.class));
233+
verify(observer2, never()).onNext(anyLong());
234+
235+
scheduler.advanceTimeTo(4, TimeUnit.SECONDS);
236+
237+
inOrder1.verify(observer, times(1)).onNext(2L);
238+
inOrder1.verify(observer, times(1)).onNext(3L);
239+
240+
InOrder inOrder2 = inOrder(observer2);
241+
inOrder2.verify(observer2, times(1)).onNext(2L);
242+
inOrder2.verify(observer2, times(1)).onNext(3L);
243+
244+
sub1.unsubscribe();
245+
sub2.unsubscribe();
246+
247+
inOrder1.verify(observer, never()).onNext(anyLong());
248+
inOrder1.verify(observer, never()).onCompleted();
249+
verify(observer, never()).onError(any(Throwable.class));
250+
251+
inOrder2.verify(observer2, never()).onNext(anyLong());
252+
inOrder2.verify(observer2, never()).onCompleted();
253+
verify(observer2, never()).onError(any(Throwable.class));
254+
}
126255
}
127256
}

0 commit comments

Comments
 (0)