diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index f9253b2b9d..b349ffd94b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -665,6 +665,23 @@ public static Observable from(Iterable iterable) { return create(OperationToObservableIterable.toObservableIterable(iterable)); } + /** + * Converts an {@link Iterable} sequence into an Observable with the specified scheduler. + * + * @param iterable + * the source {@link Iterable} sequence + * @param scheduler + * the scheduler to emit the items of the iterable + * @param + * the type of items in the {@link Iterable} sequence and the type of items to be + * emitted by the resulting Observable + * @return an Observable that emits each item in the source {@link Iterable} sequence with the specified scheduler + * @see MSDN: Observable.ToObservable + */ + public static Observable from(Iterable iterable, Scheduler scheduler) { + return from(iterable).observeOn(scheduler); + } + /** * Converts an Array into an Observable. *

@@ -959,6 +976,23 @@ public static Observable range(int start, int count) { return from(Range.createWithCount(start, count)); } + /** + * Generates an Observable that emits a sequence of integers within a specified range with the specified scheduler. + * + * @param start + * the value of the first integer in the sequence + * @param count + * the number of sequential integers to generate + * @param scheduler + * the scheduler to run the generator loop on + * @return an Observable that emits a range of sequential integers + * + * @see Observable.Range Method (Int32, Int32, IScheduler) + */ + public static Observable range(int start, int count, Scheduler scheduler) { + return range(start, count).observeOn(scheduler); + } + /** * Returns an Observable that calls an Observable factory to create its * Observable for each new Observer that subscribes. That is, for each @@ -4553,6 +4587,34 @@ public Observable startWith(Iterable values) { return concat(Observable. from(values), this); } + /** + * Emit a specified set of items with the specified scheduler before beginning to emit items from the source Observable. + * + * @param values + * Iterable of the items you want the modified Observable to emit first + * @param scheduler + * The scheduler to emit the prepended values on. + * @return an Observable that exhibits the modified behavior + * @see MSDN: Observable.StartWith + */ + public Observable startWith(Iterable values, Scheduler scheduler) { + return concat(from(values, scheduler), this); + } + + /** + * Emit a specified array of items with the specified scheduler before beginning to emit items from the source Observable. + * + * @param values + * The items you want the modified Observable to emit first + * @param scheduler + * The scheduler to emit the prepended values on. + * @return an Observable that exhibits the modified behavior + * @see MSDN: Observable.StartWith + */ + public Observable startWith(T[] values, Scheduler scheduler) { + return startWith(Arrays.asList(values), scheduler); + } + /** * Emit a specified item before beginning to emit items from the source * Observable. diff --git a/rxjava-core/src/test/java/rx/ObservableTests.java b/rxjava-core/src/test/java/rx/ObservableTests.java index ee817e08b5..3d718210a1 100644 --- a/rxjava-core/src/test/java/rx/ObservableTests.java +++ b/rxjava-core/src/test/java/rx/ObservableTests.java @@ -20,6 +20,7 @@ import static org.mockito.Mockito.*; import java.util.ArrayList; +import java.util.Arrays; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CountDownLatch; @@ -29,10 +30,12 @@ import org.junit.Before; import org.junit.Test; +import org.mockito.InOrder; import org.mockito.Mock; import org.mockito.MockitoAnnotations; import rx.Observable.OnSubscribeFunc; +import rx.concurrency.TestScheduler; import rx.observables.ConnectableObservable; import rx.subscriptions.BooleanSubscription; import rx.subscriptions.Subscriptions; @@ -886,6 +889,7 @@ public void testContainsWithEmptyObservable() { verify(aObserver, times(1)).onCompleted(); } + @Test public void testIgnoreElements() { Observable observable = Observable.from(1, 2, 3).ignoreElements(); @@ -896,4 +900,62 @@ public void testIgnoreElements() { verify(aObserver, never()).onError(any(Throwable.class)); verify(aObserver, times(1)).onCompleted(); } + + @Test + public void testFromWithScheduler() { + TestScheduler scheduler = new TestScheduler(); + Observable observable = Observable.from(Arrays.asList(1, 2), scheduler); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext(1); + inOrder.verify(aObserver, times(1)).onNext(2); + inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testStartWithWithScheduler() { + TestScheduler scheduler = new TestScheduler(); + Observable observable = Observable.from(3, 4).startWith(Arrays.asList(1, 2), scheduler); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext(1); + inOrder.verify(aObserver, times(1)).onNext(2); + inOrder.verify(aObserver, times(1)).onNext(3); + inOrder.verify(aObserver, times(1)).onNext(4); + inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } + + @Test + public void testRangeWithScheduler() { + TestScheduler scheduler = new TestScheduler(); + Observable observable = Observable.range(3, 4, scheduler); + + @SuppressWarnings("unchecked") + Observer aObserver = mock(Observer.class); + observable.subscribe(aObserver); + + scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS); + + InOrder inOrder = inOrder(aObserver); + inOrder.verify(aObserver, times(1)).onNext(3); + inOrder.verify(aObserver, times(1)).onNext(4); + inOrder.verify(aObserver, times(1)).onNext(5); + inOrder.verify(aObserver, times(1)).onNext(6); + inOrder.verify(aObserver, times(1)).onCompleted(); + inOrder.verifyNoMoreInteractions(); + } } \ No newline at end of file