Skip to content

Commit b92c664

Browse files
author
Aaron Tull
committed
Implemented Completable#andThen(Observable)
1 parent 0c039ee commit b92c664

File tree

2 files changed

+65
-0
lines changed

2 files changed

+65
-0
lines changed

src/main/java/rx/Completable.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1080,6 +1080,21 @@ public final Completable compose(CompletableTransformer transformer) {
10801080
return to(transformer);
10811081
}
10821082

1083+
/**
1084+
* Returns an Observable which will subscribe to this Completable and once that is completed then
1085+
* will subscribe to the {@code next} Observable. An error event from this Completable will be
1086+
* propagated to the downstream subscriber and will result in skipping the subscription of the
1087+
* Observable.
1088+
*
1089+
* @param next the Observable to subscribe after this Completable is completed, not null
1090+
* @return Observable that composes this Completable and next
1091+
* @throws NullPointerException if next is null
1092+
*/
1093+
public final <T> Observable<T> andThen(Observable<T> next) {
1094+
requireNonNull(next);
1095+
return next.delaySubscription(toObservable());
1096+
}
1097+
10831098
/**
10841099
* Concatenates this Completable with another Completable.
10851100
* @param other the other Completable, not null

src/test/java/rx/CompletableTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -357,6 +357,56 @@ public void call(Long v) {
357357
Assert.assertEquals(Arrays.asList(5L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L, 1L), requested);
358358
}
359359

360+
@Test
361+
public void andThen() {
362+
TestSubscriber<String> ts = new TestSubscriber<>(0);
363+
Completable.complete().andThen(Observable.just("foo")).subscribe(ts);
364+
ts.requestMore(1);
365+
ts.assertValue("foo");
366+
ts.assertCompleted();
367+
ts.assertNoErrors();
368+
}
369+
370+
@Test
371+
public void andThenNever() {
372+
TestSubscriber<String> ts = new TestSubscriber<>(0);
373+
Completable.never().andThen(Observable.just("foo")).subscribe(ts);
374+
ts.requestMore(1);
375+
ts.assertNoValues();
376+
ts.assertNoTerminalEvent();
377+
}
378+
379+
@Test
380+
public void andThenError() {
381+
TestSubscriber<String> ts = new TestSubscriber<>(0);
382+
AtomicBoolean hasRun = new AtomicBoolean(false);
383+
Exception e = new Exception();
384+
Completable.create(cs -> cs.onError(e))
385+
.andThen(Observable.<String>create(s-> {
386+
hasRun.set(true);
387+
s.onNext("foo");
388+
s.onCompleted();
389+
}))
390+
.subscribe(ts);
391+
ts.assertNoValues();
392+
ts.assertError(e);
393+
Assert.assertFalse("Should not have subscribed to observable when completable errors", hasRun.get());
394+
}
395+
396+
@Test
397+
public void andThenSubscribeOn() {
398+
TestSubscriber<String> ts = new TestSubscriber<>(0);
399+
TestScheduler scheduler = new TestScheduler();
400+
Completable.complete().andThen(Observable.just("foo").delay(1, TimeUnit.SECONDS, scheduler)).subscribe(ts);
401+
ts.requestMore(1);
402+
ts.assertNoValues();
403+
ts.assertNoTerminalEvent();
404+
scheduler.advanceTimeBy(1, TimeUnit.SECONDS);
405+
ts.assertValue("foo");
406+
ts.assertCompleted();
407+
ts.assertNoErrors();
408+
}
409+
360410
@Test(expected = NullPointerException.class)
361411
public void createNull() {
362412
Completable.create(null);

0 commit comments

Comments
 (0)