Skip to content

Commit 9c9ca59

Browse files
author
Aaron Tull
committed
Implemented Observable#toCompletable
1 parent 7ba9067 commit 9c9ca59

File tree

2 files changed

+123
-0
lines changed

2 files changed

+123
-0
lines changed

src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -232,6 +232,31 @@ public Single<T> toSingle() {
232232
return new Single<T>(OnSubscribeSingle.create(this));
233233
}
234234

235+
/**
236+
* Returns a Completable that discards all onNext emissions (similar to
237+
* {@code ignoreAllElements()}) and calls onCompleted when this source observable calls
238+
* onCompleted. Error terminal events are propagated.
239+
* <p>
240+
* <img width="640" height="295" src=
241+
* "https://raw.github.com/wiki/ReactiveX/RxJava/images/rx-operators/Completable.toCompletable.png"
242+
* alt="">
243+
* <dl>
244+
* <dt><b>Scheduler:</b></dt>
245+
* <dd>{@code toCompletable} does not operate by default on a particular {@link Scheduler}.</dd>
246+
* </dl>
247+
*
248+
* @return a Completable that calls onCompleted on it's subscriber when the source Observable
249+
* calls onCompleted
250+
* @see <a href="http://reactivex.io/documentation/completable.html">ReactiveX documentation:
251+
* Completable</a>
252+
* @since (if this graduates from Experimental/Beta to supported, replace this parenthetical
253+
* with the release number)
254+
*/
255+
@Experimental
256+
public Completable toCompletable() {
257+
return Completable.fromObservable(this);
258+
}
259+
235260

236261
/* *********************************************************************************************************
237262
* Operators Below Here
Lines changed: 98 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,98 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.internal.operators;
17+
18+
import static org.junit.Assert.assertFalse;
19+
20+
import java.util.concurrent.atomic.AtomicBoolean;
21+
22+
import org.junit.Test;
23+
24+
import rx.Completable;
25+
import rx.Observable;
26+
import rx.functions.Action0;
27+
import rx.observers.TestSubscriber;
28+
29+
public class OnSubscribeCompletableTest {
30+
31+
@Test
32+
public void testJustSingleItemObservable() {
33+
TestSubscriber<String> subscriber = TestSubscriber.create();
34+
Completable cmp = Observable.just("Hello World!").toCompletable();
35+
cmp.subscribe(subscriber);
36+
37+
subscriber.assertNoValues();
38+
subscriber.assertCompleted();
39+
subscriber.assertNoErrors();
40+
}
41+
42+
@Test
43+
public void testErrorObservable() {
44+
TestSubscriber<String> subscriber = TestSubscriber.create();
45+
IllegalArgumentException error = new IllegalArgumentException("Error");
46+
Completable cmp = Observable.<String>error(error).toCompletable();
47+
cmp.subscribe(subscriber);
48+
49+
subscriber.assertError(error);
50+
subscriber.assertNoValues();
51+
}
52+
53+
@Test
54+
public void testJustTwoEmissionsObservableThrowsError() {
55+
TestSubscriber<String> subscriber = TestSubscriber.create();
56+
Completable cmp = Observable.just("First", "Second").toCompletable();
57+
cmp.subscribe(subscriber);
58+
59+
subscriber.assertNoErrors();
60+
subscriber.assertNoValues();
61+
}
62+
63+
@Test
64+
public void testEmptyObservable() {
65+
TestSubscriber<String> subscriber = TestSubscriber.create();
66+
Completable cmp = Observable.<String>empty().toCompletable();
67+
cmp.subscribe(subscriber);
68+
69+
subscriber.assertNoErrors();
70+
subscriber.assertNoValues();
71+
subscriber.assertCompleted();
72+
}
73+
74+
@Test
75+
public void testNeverObservable() {
76+
TestSubscriber<String> subscriber = TestSubscriber.create();
77+
Completable cmp = Observable.<String>never().toCompletable();
78+
cmp.subscribe(subscriber);
79+
80+
subscriber.assertNoTerminalEvent();
81+
subscriber.assertNoValues();
82+
}
83+
84+
@Test
85+
public void testShouldUseUnsafeSubscribeInternallyNotSubscribe() {
86+
TestSubscriber<String> subscriber = TestSubscriber.create();
87+
final AtomicBoolean unsubscribed = new AtomicBoolean(false);
88+
Completable cmp = Observable.just("Hello World!").doOnUnsubscribe(new Action0() {
89+
90+
@Override
91+
public void call() {
92+
unsubscribed.set(true);
93+
}}).toCompletable();
94+
cmp.subscribe(subscriber);
95+
subscriber.assertCompleted();
96+
assertFalse(unsubscribed.get());
97+
}
98+
}

0 commit comments

Comments
 (0)