Skip to content

Commit 9f0f753

Browse files
Merge pull request ReactiveX#252 from benjchristensen/toFuture
Observable.toFuture
2 parents 317c4b9 + ee00657 commit 9f0f753

File tree

2 files changed

+192
-0
lines changed

2 files changed

+192
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -64,6 +64,7 @@
6464
import rx.operators.OperationTakeLast;
6565
import rx.operators.OperationTakeUntil;
6666
import rx.operators.OperationTakeWhile;
67+
import rx.operators.OperationToFuture;
6768
import rx.operators.OperationToIterator;
6869
import rx.operators.OperationToObservableFuture;
6970
import rx.operators.OperationToObservableIterable;
@@ -2066,6 +2067,19 @@ public Boolean call(T t, Integer integer)
20662067
}));
20672068
}
20682069

2070+
/**
2071+
* Return a Future representing a single value of the Observable.
2072+
* <p>
2073+
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
2074+
*
2075+
* @param that
2076+
* the source Observable
2077+
* @returna Future that expects a single item emitted by the source Observable
2078+
*/
2079+
public static <T> Future<T> toFuture(final Observable<T> that) {
2080+
return OperationToFuture.toFuture(that);
2081+
}
2082+
20692083
/**
20702084
* Returns an Observable that emits a single item, a list composed of all the items emitted by
20712085
* the source Observable.
@@ -3473,6 +3487,17 @@ public <E> Observable<T> takeUntil(Observable<E> other) {
34733487
return takeUntil(this, other);
34743488
}
34753489

3490+
/**
3491+
* Return a Future representing a single value of the Observable.
3492+
* <p>
3493+
* This will throw an exception if the Observable emits more than 1 value. If more than 1 are expected then use <code>toList().toFuture()</code>.
3494+
*
3495+
* @returna Future that expects a single item emitted by the source Observable
3496+
*/
3497+
public Future<T> toFuture() {
3498+
return toFuture(this);
3499+
}
3500+
34763501
/**
34773502
* Returns an Observable that emits a single item, a list composed of all the items emitted by
34783503
* the source Observable.
Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
package rx.operators;
2+
3+
import static org.junit.Assert.*;
4+
5+
import java.util.List;
6+
import java.util.concurrent.CountDownLatch;
7+
import java.util.concurrent.ExecutionException;
8+
import java.util.concurrent.Future;
9+
import java.util.concurrent.TimeUnit;
10+
import java.util.concurrent.TimeoutException;
11+
import java.util.concurrent.atomic.AtomicReference;
12+
13+
import org.junit.Test;
14+
15+
import rx.Observable;
16+
import rx.Observer;
17+
import rx.Subscription;
18+
import rx.subscriptions.Subscriptions;
19+
import rx.util.functions.Func1;
20+
21+
/**
22+
* Convert an Observable into a Future.
23+
*/
24+
public class OperationToFuture {
25+
26+
/**
27+
* Returns a Future that expects a single item from the observable.
28+
*
29+
* @param that
30+
* an observable sequence to get a Future for.
31+
* @param <T>
32+
* the type of source.
33+
* @return the Future to retrieve a single elements from an Observable
34+
*/
35+
public static <T> Future<T> toFuture(Observable<T> that) {
36+
37+
final CountDownLatch finished = new CountDownLatch(1);
38+
final AtomicReference<T> value = new AtomicReference<T>();
39+
final AtomicReference<Exception> error = new AtomicReference<Exception>();
40+
41+
final Subscription s = that.subscribe(new Observer<T>() {
42+
43+
@Override
44+
public void onCompleted() {
45+
finished.countDown();
46+
}
47+
48+
@Override
49+
public void onError(Exception e) {
50+
error.compareAndSet(null, e);
51+
finished.countDown();
52+
}
53+
54+
@Override
55+
public void onNext(T v) {
56+
if (!value.compareAndSet(null, v)) {
57+
// this means we received more than one value and must fail as a Future can handle only a single value
58+
error.compareAndSet(null, new IllegalStateException("Observable.toFuture() only supports sequences with a single value. Use .toList().toFuture() if multiple values are expected."));
59+
finished.countDown();
60+
}
61+
}
62+
});
63+
64+
return new Future<T>() {
65+
66+
private volatile boolean cancelled = false;
67+
68+
@Override
69+
public boolean cancel(boolean mayInterruptIfRunning) {
70+
if (finished.getCount() > 0) {
71+
cancelled = true;
72+
s.unsubscribe();
73+
// release the latch (a race condition may have already released it by now)
74+
finished.countDown();
75+
return true;
76+
} else {
77+
// can't cancel
78+
return false;
79+
}
80+
}
81+
82+
@Override
83+
public boolean isCancelled() {
84+
return cancelled;
85+
}
86+
87+
@Override
88+
public boolean isDone() {
89+
return finished.getCount() == 0;
90+
}
91+
92+
@Override
93+
public T get() throws InterruptedException, ExecutionException {
94+
finished.await();
95+
return getValue();
96+
}
97+
98+
@Override
99+
public T get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, TimeoutException {
100+
if (finished.await(timeout, unit)) {
101+
return getValue();
102+
} else {
103+
throw new TimeoutException("Timed out after " + unit.toMillis(timeout) + "ms waiting for underlying Observable.");
104+
}
105+
}
106+
107+
private T getValue() throws ExecutionException {
108+
if (error.get() != null) {
109+
throw new ExecutionException("Observable onError", error.get());
110+
} else {
111+
return value.get();
112+
}
113+
}
114+
115+
};
116+
117+
}
118+
119+
@Test
120+
public void testToFuture() throws InterruptedException, ExecutionException {
121+
Observable<String> obs = Observable.toObservable("one");
122+
Future<String> f = toFuture(obs);
123+
assertEquals("one", f.get());
124+
}
125+
126+
@Test
127+
public void testToFutureList() throws InterruptedException, ExecutionException {
128+
Observable<String> obs = Observable.toObservable("one", "two", "three");
129+
Future<List<String>> f = toFuture(obs.toList());
130+
assertEquals("one", f.get().get(0));
131+
assertEquals("two", f.get().get(1));
132+
assertEquals("three", f.get().get(2));
133+
}
134+
135+
@Test(expected = ExecutionException.class)
136+
public void testExceptionWithMoreThanOneElement() throws InterruptedException, ExecutionException {
137+
Observable<String> obs = Observable.toObservable("one", "two");
138+
Future<String> f = toFuture(obs);
139+
assertEquals("one", f.get());
140+
// we expect an exception since there are more than 1 element
141+
}
142+
143+
@Test
144+
public void testToFutureWithException() {
145+
Observable<String> obs = Observable.create(new Func1<Observer<String>, Subscription>() {
146+
147+
@Override
148+
public Subscription call(Observer<String> observer) {
149+
observer.onNext("one");
150+
observer.onError(new TestException());
151+
return Subscriptions.empty();
152+
}
153+
});
154+
155+
Future<String> f = toFuture(obs);
156+
try {
157+
f.get();
158+
fail("expected exception");
159+
} catch (Exception e) {
160+
assertEquals(TestException.class, e.getCause().getClass());
161+
}
162+
}
163+
164+
private static class TestException extends RuntimeException {
165+
private static final long serialVersionUID = 1L;
166+
}
167+
}

0 commit comments

Comments
 (0)