Skip to content

Commit f189a98

Browse files
Repeat Operator
- merge and slight modification of #518
2 parents 92ba6e7 + 1a7e51f commit f189a98

File tree

4 files changed

+158
-2
lines changed

4 files changed

+158
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -73,6 +73,7 @@
7373
import rx.operators.OperationOnExceptionResumeNextViaObservable;
7474
import rx.operators.OperationParallel;
7575
import rx.operators.OperationParallelMerge;
76+
import rx.operators.OperationRepeat;
7677
import rx.operators.OperationReplay;
7778
import rx.operators.OperationRetry;
7879
import rx.operators.OperationSample;
@@ -1097,6 +1098,28 @@ public static Observable<Integer> range(int start, int count, Scheduler schedule
10971098
return from(Range.createWithCount(start, count), scheduler);
10981099
}
10991100

1101+
/**
1102+
* Repeats the observable sequence indefinitely.
1103+
* <p>
1104+
*
1105+
* @return The observable sequence producing the elements of the given sequence repeatedly and sequentially.
1106+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428(v=vs.103).aspx">MSDN: Observable.Repeat</a>
1107+
*/
1108+
public Observable<T> repeat() {
1109+
return this.repeat(Schedulers.currentThread());
1110+
}
1111+
1112+
/**
1113+
* Repeats the observable sequence indefinitely.
1114+
* <p>
1115+
* @param scheduler the scheduler to send the values on.
1116+
* @return The observable sequence producing the elements of the given sequence repeatedly and sequentially.
1117+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229428(v=vs.103).aspx">MSDN: Observable.Repeat</a>
1118+
*/
1119+
public Observable<T> repeat(Scheduler scheduler) {
1120+
return create(OperationRepeat.repeat(this, scheduler));
1121+
}
1122+
11001123
/**
11011124
* Returns an Observable that calls an Observable factory to create its
11021125
* Observable for each new Observer that subscribes. That is, for each
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.operators;
18+
19+
import rx.Observable;
20+
import rx.Observer;
21+
import rx.Scheduler;
22+
import rx.Subscription;
23+
import rx.subscriptions.CompositeSubscription;
24+
import rx.subscriptions.MultipleAssignmentSubscription;
25+
import rx.util.functions.Action0;
26+
import rx.util.functions.Action1;
27+
28+
public class OperationRepeat<T> implements Observable.OnSubscribeFunc<T> {
29+
30+
private final Observable<T> source;
31+
private final Scheduler scheduler;
32+
33+
public static <T> Observable.OnSubscribeFunc<T> repeat(Observable<T> source, Scheduler scheduler) {
34+
return new OperationRepeat<T>(source, scheduler);
35+
}
36+
37+
private OperationRepeat(Observable<T> source, Scheduler scheduler) {
38+
this.source = source;
39+
this.scheduler = scheduler;
40+
}
41+
42+
@Override
43+
public Subscription onSubscribe(final Observer<? super T> observer) {
44+
final CompositeSubscription compositeSubscription = new CompositeSubscription();
45+
final MultipleAssignmentSubscription innerSubscription = new MultipleAssignmentSubscription();
46+
compositeSubscription.add(innerSubscription);
47+
compositeSubscription.add(scheduler.schedule(new Action1<Action0>() {
48+
@Override
49+
public void call(final Action0 self) {
50+
innerSubscription.set(source.subscribe(new Observer<T>() {
51+
52+
@Override
53+
public void onCompleted() {
54+
self.call();
55+
}
56+
57+
@Override
58+
public void onError(Throwable error) {
59+
observer.onError(error);
60+
}
61+
62+
@Override
63+
public void onNext(T value) {
64+
observer.onNext(value);
65+
}
66+
}));
67+
}
68+
}));
69+
return compositeSubscription;
70+
}
71+
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,11 +37,11 @@
3737
import org.mockito.stubbing.Answer;
3838

3939
import rx.Observable.OnSubscribeFunc;
40-
import rx.schedulers.TestScheduler;
4140
import rx.observables.ConnectableObservable;
41+
import rx.schedulers.Schedulers;
42+
import rx.schedulers.TestScheduler;
4243
import rx.subscriptions.BooleanSubscription;
4344
import rx.subscriptions.Subscriptions;
44-
import rx.util.functions.Action0;
4545
import rx.util.functions.Action1;
4646
import rx.util.functions.Func0;
4747
import rx.util.functions.Func1;
Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.junit.Assert.*;
19+
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import org.junit.Test;
23+
24+
import rx.Observable;
25+
import rx.Observable.OnSubscribeFunc;
26+
import rx.Observer;
27+
import rx.Subscription;
28+
import rx.schedulers.Schedulers;
29+
import rx.subscriptions.Subscriptions;
30+
31+
public class OperationRepeatTest {
32+
33+
@Test
34+
public void testRepetition() {
35+
int NUM = 10;
36+
final AtomicInteger count = new AtomicInteger();
37+
int value = Observable.create(new OnSubscribeFunc<Integer>() {
38+
39+
@Override
40+
public Subscription onSubscribe(Observer<? super Integer> o) {
41+
o.onNext(count.incrementAndGet());
42+
o.onCompleted();
43+
return Subscriptions.empty();
44+
}
45+
}).repeat(Schedulers.threadPoolForComputation()).take(NUM).toBlockingObservable().last();
46+
47+
assertEquals(NUM, value);
48+
}
49+
50+
@Test
51+
public void testRepeatTake() {
52+
Observable<Integer> xs = Observable.from(1, 2);
53+
Object[] ys = xs.repeat(Schedulers.newThread()).take(4).toList().toBlockingObservable().last().toArray();
54+
assertArrayEquals(new Object[] { 1, 2, 1, 2 }, ys);
55+
}
56+
57+
@Test
58+
public void testNoStackOverFlow() {
59+
Observable.from(1).repeat(Schedulers.newThread()).take(100000).toBlockingObservable().last();
60+
}
61+
62+
}

0 commit comments

Comments
 (0)