Skip to content

Implement the scheduler overloads for Range, From, StartWith #492

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Nov 19, 2013
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
62 changes: 62 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,23 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
return create(OperationToObservableIterable.toObservableIterable(iterable));
}

/**
* Converts an {@link Iterable} sequence into an Observable with the specified scheduler.
*
* @param iterable
* the source {@link Iterable} sequence
* @param scheduler
* the scheduler to emit the items of the iterable
* @param <T>
* the type of items in the {@link Iterable} sequence and the type of items to be
* emitted by the resulting Observable
* @return an Observable that emits each item in the source {@link Iterable} sequence with the specified scheduler
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140(v=vs.103).aspx">MSDN: Observable.ToObservable</a>
*/
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
return from(iterable).observeOn(scheduler);
}

/**
* Converts an Array into an Observable.
* <p>
Expand Down Expand Up @@ -959,6 +976,23 @@ public static Observable<Integer> range(int start, int count) {
return from(Range.createWithCount(start, count));
}

/**
* Generates an Observable that emits a sequence of integers within a specified range with the specified scheduler.
*
* @param start
* the value of the first integer in the sequence
* @param count
* the number of sequential integers to generate
* @param scheduler
* the scheduler to run the generator loop on
* @return an Observable that emits a range of sequential integers
*
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896(v=vs.103).aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
*/
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
return range(start, count).observeOn(scheduler);
}

/**
* Returns an Observable that calls an Observable factory to create its
* Observable for each new Observer that subscribes. That is, for each
Expand Down Expand Up @@ -4553,6 +4587,34 @@ public Observable<T> startWith(Iterable<T> values) {
return concat(Observable.<T> from(values), this);
}

/**
* Emit a specified set of items with the specified scheduler before beginning to emit items from the source Observable.
*
* @param values
* Iterable of the items you want the modified Observable to emit first
* @param scheduler
* The scheduler to emit the prepended values on.
* @return an Observable that exhibits the modified behavior
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229372(v=vs.103).aspx">MSDN: Observable.StartWith</a>
*/
public Observable<T> startWith(Iterable<T> values, Scheduler scheduler) {
return concat(from(values, scheduler), this);
}

/**
* Emit a specified array of items with the specified scheduler before beginning to emit items from the source Observable.
*
* @param values
* The items you want the modified Observable to emit first
* @param scheduler
* The scheduler to emit the prepended values on.
* @return an Observable that exhibits the modified behavior
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229372(v=vs.103).aspx">MSDN: Observable.StartWith</a>
*/
public Observable<T> startWith(T[] values, Scheduler scheduler) {
return startWith(Arrays.asList(values), scheduler);
}

/**
* Emit a specified item before beginning to emit items from the source
* Observable.
Expand Down
62 changes: 62 additions & 0 deletions rxjava-core/src/test/java/rx/ObservableTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import static org.mockito.Mockito.*;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CountDownLatch;
Expand All @@ -29,10 +30,12 @@

import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;

import rx.Observable.OnSubscribeFunc;
import rx.concurrency.TestScheduler;
import rx.observables.ConnectableObservable;
import rx.subscriptions.BooleanSubscription;
import rx.subscriptions.Subscriptions;
Expand Down Expand Up @@ -886,6 +889,7 @@ public void testContainsWithEmptyObservable() {
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testIgnoreElements() {
Observable<Integer> observable = Observable.from(1, 2, 3).ignoreElements();

Expand All @@ -896,4 +900,62 @@ public void testIgnoreElements() {
verify(aObserver, never()).onError(any(Throwable.class));
verify(aObserver, times(1)).onCompleted();
}

@Test
public void testFromWithScheduler() {
TestScheduler scheduler = new TestScheduler();
Observable<Integer> observable = Observable.from(Arrays.asList(1, 2), scheduler);

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);

scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(1)).onNext(1);
inOrder.verify(aObserver, times(1)).onNext(2);
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testStartWithWithScheduler() {
TestScheduler scheduler = new TestScheduler();
Observable<Integer> observable = Observable.from(3, 4).startWith(Arrays.asList(1, 2), scheduler);

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);

scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(1)).onNext(1);
inOrder.verify(aObserver, times(1)).onNext(2);
inOrder.verify(aObserver, times(1)).onNext(3);
inOrder.verify(aObserver, times(1)).onNext(4);
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}

@Test
public void testRangeWithScheduler() {
TestScheduler scheduler = new TestScheduler();
Observable<Integer> observable = Observable.range(3, 4, scheduler);

@SuppressWarnings("unchecked")
Observer<Integer> aObserver = mock(Observer.class);
observable.subscribe(aObserver);

scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);

InOrder inOrder = inOrder(aObserver);
inOrder.verify(aObserver, times(1)).onNext(3);
inOrder.verify(aObserver, times(1)).onNext(4);
inOrder.verify(aObserver, times(1)).onNext(5);
inOrder.verify(aObserver, times(1)).onNext(6);
inOrder.verify(aObserver, times(1)).onCompleted();
inOrder.verifyNoMoreInteractions();
}
}