Skip to content

Commit 5de471a

Browse files
Replay operator on Observable
ReactiveX#71
1 parent 16879eb commit 5de471a

File tree

1 file changed

+83
-2
lines changed

1 file changed

+83
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import rx.plugins.RxJavaObservableExecutionHook;
7979
import rx.plugins.RxJavaPlugins;
8080
import rx.subjects.PublishSubject;
81+
import rx.subjects.ReplaySubject;
8182
import rx.subjects.Subject;
8283
import rx.subscriptions.BooleanSubscription;
8384
import rx.subscriptions.Subscriptions;
@@ -1666,6 +1667,17 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
16661667
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
16671668
}
16681669

1670+
/**
1671+
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
1672+
*
1673+
* @param that
1674+
* the source Observable
1675+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
1676+
*/
1677+
public static <T> ConnectableObservable<T> replay(final Observable<T> that) {
1678+
return OperationMulticast.multicast(that, ReplaySubject.<T> create());
1679+
}
1680+
16691681
/**
16701682
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
16711683
*
@@ -3199,13 +3211,22 @@ public Observable<T> reduce(Func2<T, T, T> accumulator) {
31993211
return reduce(this, accumulator);
32003212
}
32013213

3214+
/**
3215+
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
3216+
*
3217+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
3218+
*/
3219+
public ConnectableObservable<T> replay() {
3220+
return replay(this);
3221+
}
3222+
32023223
/**
32033224
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
32043225
*
32053226
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
32063227
*/
32073228
public ConnectableObservable<T> publish() {
3208-
return OperationMulticast.multicast(this, PublishSubject.<T> create());
3229+
return publish(this);
32093230
}
32103231

32113232
/**
@@ -4174,10 +4195,10 @@ public Subscription call(final Observer<String> observer) {
41744195

41754196
@Override
41764197
public void run() {
4198+
counter.incrementAndGet();
41774199
System.out.println("published observable being executed");
41784200
observer.onNext("one");
41794201
observer.onCompleted();
4180-
counter.incrementAndGet();
41814202
}
41824203
}).start();
41834204
return subscription;
@@ -4219,6 +4240,66 @@ public void call(String v) {
42194240
}
42204241
}
42214242

4243+
@Test
4244+
public void testReplay() throws InterruptedException {
4245+
final AtomicInteger counter = new AtomicInteger();
4246+
ConnectableObservable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
4247+
4248+
@Override
4249+
public Subscription call(final Observer<String> observer) {
4250+
final BooleanSubscription subscription = new BooleanSubscription();
4251+
new Thread(new Runnable() {
4252+
4253+
@Override
4254+
public void run() {
4255+
System.out.println("published observable being executed");
4256+
observer.onNext("one");
4257+
observer.onCompleted();
4258+
counter.incrementAndGet();
4259+
}
4260+
}).start();
4261+
return subscription;
4262+
}
4263+
}).replay();
4264+
4265+
// we connect immediately and it will emit the value
4266+
Subscription s = o.connect();
4267+
try {
4268+
4269+
// we then expect the following 2 subscriptions to get that same value
4270+
final CountDownLatch latch = new CountDownLatch(2);
4271+
4272+
// subscribe once
4273+
o.subscribe(new Action1<String>() {
4274+
4275+
@Override
4276+
public void call(String v) {
4277+
assertEquals("one", v);
4278+
System.out.println("v: " + v);
4279+
latch.countDown();
4280+
}
4281+
});
4282+
4283+
// subscribe again
4284+
o.subscribe(new Action1<String>() {
4285+
4286+
@Override
4287+
public void call(String v) {
4288+
assertEquals("one", v);
4289+
System.out.println("v: " + v);
4290+
latch.countDown();
4291+
}
4292+
});
4293+
4294+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
4295+
fail("subscriptions did not receive values");
4296+
}
4297+
assertEquals(1, counter.get());
4298+
} finally {
4299+
s.unsubscribe();
4300+
}
4301+
}
4302+
42224303
private static class TestException extends RuntimeException {
42234304
private static final long serialVersionUID = 1L;
42244305
}

0 commit comments

Comments
 (0)