diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5cf8d82ec2..066134598c 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,17 +15,12 @@ */ package rx; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; import org.junit.Before; @@ -58,6 +53,7 @@ import rx.plugins.RxJavaPlugins; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; +import rx.util.Exceptions; import rx.util.Range; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -1524,6 +1520,79 @@ public static Observable> toList(final Observable that) { return _create(OperationToObservableList.toObservableList(that)); } + /** + * Converts an observable sequence to an Iterable. + * + * @param that the source Observable + * @return Observable converted to Iterable. + */ + public static Iterable toIterable(final Observable that) { + final BlockingQueue> notifications = new LinkedBlockingQueue>(); + + materialize(that).subscribe(new Observer>() { + @Override + public void onCompleted() { + // ignore + } + + @Override + public void onError(Exception e) { + // ignore + } + + @Override + public void onNext(Notification args) { + notifications.offer(args); + } + }); + + final Iterator it = new Iterator() { + private Notification buf; + + @Override + public boolean hasNext() { + if (buf == null) { + buf = take(); + } + return !buf.isOnCompleted(); + } + + @Override + public T next() { + if (buf == null) { + buf = take(); + } + if (buf.isOnError()) { + throw Exceptions.propagate(buf.getException()); + } + + T result = buf.getValue(); + buf = null; + return result; + } + + private Notification take() { + try { + return notifications.take(); + } catch (InterruptedException e) { + throw Exceptions.propagate(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator"); + } + }; + + return new Iterable() { + @Override + public Iterator iterator() { + return it; + } + }; + } + /** * Converts an Iterable sequence to an Observable sequence. * @@ -2544,6 +2613,15 @@ public Observable> toSortedList(final Object sortFunction) { return toSortedList(this, sortFunction); } + /** + * Converts an observable sequence to an Iterable. + * + * @return Observable converted to Iterable. + */ + public Iterable toIterable() { + return toIterable(this); + } + public static class UnitTest { @Mock @@ -2623,6 +2701,52 @@ public void testSequenceEqual() { verify(result, times(1)).onNext(false); } + + @Test + public void testToIterable() { + Observable obs = toObservable("one", "two", "three"); + + Iterator it = obs.toIterable().iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("two", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("three", it.next()); + + assertEquals(false, it.hasNext()); + + } + + @Test(expected = TestException.class) + public void testToIterableWithException() { + Observable obs = create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onError(new TestException()); + return Observable.noOpSubscription(); + } + }); + + Iterator it = obs.toIterable().iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + it.next(); + + } + + private static class TestException extends RuntimeException { + + } + } } diff --git a/rxjava-core/src/main/java/rx/util/Exceptions.java b/rxjava-core/src/main/java/rx/util/Exceptions.java new file mode 100644 index 0000000000..f2a31ed491 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/Exceptions.java @@ -0,0 +1,31 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util; + +public class Exceptions { + private Exceptions() { + + } + + public static RuntimeException propagate(Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + +} \ No newline at end of file