Skip to content

Commit 634cea3

Browse files
Replay operator on Observable
ReactiveX/RxJava#71
1 parent 6651539 commit 634cea3

File tree

1 file changed

+83
-2
lines changed

1 file changed

+83
-2
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 83 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@
7878
import rx.plugins.RxJavaObservableExecutionHook;
7979
import rx.plugins.RxJavaPlugins;
8080
import rx.subjects.PublishSubject;
81+
import rx.subjects.ReplaySubject;
8182
import rx.subjects.Subject;
8283
import rx.subscriptions.BooleanSubscription;
8384
import rx.subscriptions.Subscriptions;
@@ -1666,6 +1667,17 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<Ex
16661667
return create(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
16671668
}
16681669

1670+
/**
1671+
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
1672+
*
1673+
* @param that
1674+
* the source Observable
1675+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
1676+
*/
1677+
public static <T> ConnectableObservable<T> replay(final Observable<T> that) {
1678+
return OperationMulticast.multicast(that, ReplaySubject.<T> create());
1679+
}
1680+
16691681
/**
16701682
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
16711683
*
@@ -3239,13 +3251,22 @@ public Observable<T> reduce(Func2<T, T, T> accumulator) {
32393251
return reduce(this, accumulator);
32403252
}
32413253

3254+
/**
3255+
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence replaying all notifications.
3256+
*
3257+
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
3258+
*/
3259+
public ConnectableObservable<T> replay() {
3260+
return replay(this);
3261+
}
3262+
32423263
/**
32433264
* Returns a connectable observable sequence that shares a single subscription to the underlying sequence.
32443265
*
32453266
* @return a connectable observable sequence that upon connection causes the source sequence to push results into the specified subject.
32463267
*/
32473268
public ConnectableObservable<T> publish() {
3248-
return OperationMulticast.multicast(this, PublishSubject.<T> create());
3269+
return publish(this);
32493270
}
32503271

32513272
/**
@@ -4245,10 +4266,10 @@ public Subscription call(final Observer<String> observer) {
42454266

42464267
@Override
42474268
public void run() {
4269+
counter.incrementAndGet();
42484270
System.out.println("published observable being executed");
42494271
observer.onNext("one");
42504272
observer.onCompleted();
4251-
counter.incrementAndGet();
42524273
}
42534274
}).start();
42544275
return subscription;
@@ -4290,6 +4311,66 @@ public void call(String v) {
42904311
}
42914312
}
42924313

4314+
@Test
4315+
public void testReplay() throws InterruptedException {
4316+
final AtomicInteger counter = new AtomicInteger();
4317+
ConnectableObservable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
4318+
4319+
@Override
4320+
public Subscription call(final Observer<String> observer) {
4321+
final BooleanSubscription subscription = new BooleanSubscription();
4322+
new Thread(new Runnable() {
4323+
4324+
@Override
4325+
public void run() {
4326+
System.out.println("published observable being executed");
4327+
observer.onNext("one");
4328+
observer.onCompleted();
4329+
counter.incrementAndGet();
4330+
}
4331+
}).start();
4332+
return subscription;
4333+
}
4334+
}).replay();
4335+
4336+
// we connect immediately and it will emit the value
4337+
Subscription s = o.connect();
4338+
try {
4339+
4340+
// we then expect the following 2 subscriptions to get that same value
4341+
final CountDownLatch latch = new CountDownLatch(2);
4342+
4343+
// subscribe once
4344+
o.subscribe(new Action1<String>() {
4345+
4346+
@Override
4347+
public void call(String v) {
4348+
assertEquals("one", v);
4349+
System.out.println("v: " + v);
4350+
latch.countDown();
4351+
}
4352+
});
4353+
4354+
// subscribe again
4355+
o.subscribe(new Action1<String>() {
4356+
4357+
@Override
4358+
public void call(String v) {
4359+
assertEquals("one", v);
4360+
System.out.println("v: " + v);
4361+
latch.countDown();
4362+
}
4363+
});
4364+
4365+
if (!latch.await(1000, TimeUnit.MILLISECONDS)) {
4366+
fail("subscriptions did not receive values");
4367+
}
4368+
assertEquals(1, counter.get());
4369+
} finally {
4370+
s.unsubscribe();
4371+
}
4372+
}
4373+
42934374
private static class TestException extends RuntimeException {
42944375
private static final long serialVersionUID = 1L;
42954376
}

0 commit comments

Comments
 (0)