Skip to content

Commit d1f0258

Browse files
Merge pull request #492 from zsxwing/scheduler-overloads
Implement the scheduler overloads for Range, From, StartWith
2 parents 5e80980 + 1669bde commit d1f0258

File tree

2 files changed

+124
-0
lines changed

2 files changed

+124
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -665,6 +665,23 @@ public static <T> Observable<T> from(Iterable<? extends T> iterable) {
665665
return create(OperationToObservableIterable.toObservableIterable(iterable));
666666
}
667667

668+
/**
669+
* Converts an {@link Iterable} sequence into an Observable with the specified scheduler.
670+
*
671+
* @param iterable
672+
* the source {@link Iterable} sequence
673+
* @param scheduler
674+
* the scheduler to emit the items of the iterable
675+
* @param <T>
676+
* the type of items in the {@link Iterable} sequence and the type of items to be
677+
* emitted by the resulting Observable
678+
* @return an Observable that emits each item in the source {@link Iterable} sequence with the specified scheduler
679+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh212140(v=vs.103).aspx">MSDN: Observable.ToObservable</a>
680+
*/
681+
public static <T> Observable<T> from(Iterable<? extends T> iterable, Scheduler scheduler) {
682+
return from(iterable).observeOn(scheduler);
683+
}
684+
668685
/**
669686
* Converts an Array into an Observable.
670687
* <p>
@@ -959,6 +976,23 @@ public static Observable<Integer> range(int start, int count) {
959976
return from(Range.createWithCount(start, count));
960977
}
961978

979+
/**
980+
* Generates an Observable that emits a sequence of integers within a specified range with the specified scheduler.
981+
*
982+
* @param start
983+
* the value of the first integer in the sequence
984+
* @param count
985+
* the number of sequential integers to generate
986+
* @param scheduler
987+
* the scheduler to run the generator loop on
988+
* @return an Observable that emits a range of sequential integers
989+
*
990+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211896(v=vs.103).aspx">Observable.Range Method (Int32, Int32, IScheduler)</a>
991+
*/
992+
public static Observable<Integer> range(int start, int count, Scheduler scheduler) {
993+
return range(start, count).observeOn(scheduler);
994+
}
995+
962996
/**
963997
* Returns an Observable that calls an Observable factory to create its
964998
* Observable for each new Observer that subscribes. That is, for each
@@ -4553,6 +4587,34 @@ public Observable<T> startWith(Iterable<T> values) {
45534587
return concat(Observable.<T> from(values), this);
45544588
}
45554589

4590+
/**
4591+
* Emit a specified set of items with the specified scheduler before beginning to emit items from the source Observable.
4592+
*
4593+
* @param values
4594+
* Iterable of the items you want the modified Observable to emit first
4595+
* @param scheduler
4596+
* The scheduler to emit the prepended values on.
4597+
* @return an Observable that exhibits the modified behavior
4598+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229372(v=vs.103).aspx">MSDN: Observable.StartWith</a>
4599+
*/
4600+
public Observable<T> startWith(Iterable<T> values, Scheduler scheduler) {
4601+
return concat(from(values, scheduler), this);
4602+
}
4603+
4604+
/**
4605+
* Emit a specified array of items with the specified scheduler before beginning to emit items from the source Observable.
4606+
*
4607+
* @param values
4608+
* The items you want the modified Observable to emit first
4609+
* @param scheduler
4610+
* The scheduler to emit the prepended values on.
4611+
* @return an Observable that exhibits the modified behavior
4612+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229372(v=vs.103).aspx">MSDN: Observable.StartWith</a>
4613+
*/
4614+
public Observable<T> startWith(T[] values, Scheduler scheduler) {
4615+
return startWith(Arrays.asList(values), scheduler);
4616+
}
4617+
45564618
/**
45574619
* Emit a specified item before beginning to emit items from the source
45584620
* Observable.

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import static org.mockito.Mockito.*;
2121

2222
import java.util.ArrayList;
23+
import java.util.Arrays;
2324
import java.util.LinkedList;
2425
import java.util.List;
2526
import java.util.concurrent.CountDownLatch;
@@ -29,10 +30,12 @@
2930

3031
import org.junit.Before;
3132
import org.junit.Test;
33+
import org.mockito.InOrder;
3234
import org.mockito.Mock;
3335
import org.mockito.MockitoAnnotations;
3436

3537
import rx.Observable.OnSubscribeFunc;
38+
import rx.concurrency.TestScheduler;
3639
import rx.observables.ConnectableObservable;
3740
import rx.subscriptions.BooleanSubscription;
3841
import rx.subscriptions.Subscriptions;
@@ -886,6 +889,7 @@ public void testContainsWithEmptyObservable() {
886889
verify(aObserver, times(1)).onCompleted();
887890
}
888891

892+
@Test
889893
public void testIgnoreElements() {
890894
Observable<Integer> observable = Observable.from(1, 2, 3).ignoreElements();
891895

@@ -896,4 +900,62 @@ public void testIgnoreElements() {
896900
verify(aObserver, never()).onError(any(Throwable.class));
897901
verify(aObserver, times(1)).onCompleted();
898902
}
903+
904+
@Test
905+
public void testFromWithScheduler() {
906+
TestScheduler scheduler = new TestScheduler();
907+
Observable<Integer> observable = Observable.from(Arrays.asList(1, 2), scheduler);
908+
909+
@SuppressWarnings("unchecked")
910+
Observer<Integer> aObserver = mock(Observer.class);
911+
observable.subscribe(aObserver);
912+
913+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
914+
915+
InOrder inOrder = inOrder(aObserver);
916+
inOrder.verify(aObserver, times(1)).onNext(1);
917+
inOrder.verify(aObserver, times(1)).onNext(2);
918+
inOrder.verify(aObserver, times(1)).onCompleted();
919+
inOrder.verifyNoMoreInteractions();
920+
}
921+
922+
@Test
923+
public void testStartWithWithScheduler() {
924+
TestScheduler scheduler = new TestScheduler();
925+
Observable<Integer> observable = Observable.from(3, 4).startWith(Arrays.asList(1, 2), scheduler);
926+
927+
@SuppressWarnings("unchecked")
928+
Observer<Integer> aObserver = mock(Observer.class);
929+
observable.subscribe(aObserver);
930+
931+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
932+
933+
InOrder inOrder = inOrder(aObserver);
934+
inOrder.verify(aObserver, times(1)).onNext(1);
935+
inOrder.verify(aObserver, times(1)).onNext(2);
936+
inOrder.verify(aObserver, times(1)).onNext(3);
937+
inOrder.verify(aObserver, times(1)).onNext(4);
938+
inOrder.verify(aObserver, times(1)).onCompleted();
939+
inOrder.verifyNoMoreInteractions();
940+
}
941+
942+
@Test
943+
public void testRangeWithScheduler() {
944+
TestScheduler scheduler = new TestScheduler();
945+
Observable<Integer> observable = Observable.range(3, 4, scheduler);
946+
947+
@SuppressWarnings("unchecked")
948+
Observer<Integer> aObserver = mock(Observer.class);
949+
observable.subscribe(aObserver);
950+
951+
scheduler.advanceTimeBy(1, TimeUnit.MILLISECONDS);
952+
953+
InOrder inOrder = inOrder(aObserver);
954+
inOrder.verify(aObserver, times(1)).onNext(3);
955+
inOrder.verify(aObserver, times(1)).onNext(4);
956+
inOrder.verify(aObserver, times(1)).onNext(5);
957+
inOrder.verify(aObserver, times(1)).onNext(6);
958+
inOrder.verify(aObserver, times(1)).onCompleted();
959+
inOrder.verifyNoMoreInteractions();
960+
}
899961
}

0 commit comments

Comments
 (0)