Skip to content

remove 'wrap' functionality #4

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 1 commit into from
Jan 12, 2013
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
88 changes: 31 additions & 57 deletions rxjava-core/src/main/java/org/rx/reactive/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -304,7 +304,7 @@ public Subscription subscribe(Observer<T> observer) {
* @return a Observable that, when a Observer subscribes to it, will execute the given function
*/
public static <T> Observable<T> create(Func1<Subscription, Observer<T>> func) {
return wrap(OperationToObservableFunction.toObservableFunction(func));
return OperationToObservableFunction.toObservableFunction(func);
}

/**
Expand Down Expand Up @@ -367,7 +367,7 @@ public static <T> Observable<T> empty() {
* @return a Observable object that calls <code>onError</code> when a Observer subscribes
*/
public static <T> Observable<T> error(Exception exception) {
return wrap(new ThrowObservable<T>(exception));
return new ThrowObservable<T>(exception);
}

/**
Expand Down Expand Up @@ -455,7 +455,7 @@ public static <T> Observable<T> just(T value) {
* by the source Observable
*/
public static <T> Observable<T> last(final Observable<T> that) {
return wrap(OperationLast.last(that));
return OperationLast.last(that);
}

/**
Expand All @@ -477,7 +477,7 @@ public static <T> Observable<T> last(final Observable<T> that) {
* in the sequence emitted by the source Observable
*/
public static <T, R> Observable<R> map(Observable<T> sequence, Func1<R, T> func) {
return wrap(OperationMap.map(sequence, func));
return OperationMap.map(sequence, func);
}

/**
Expand Down Expand Up @@ -532,7 +532,7 @@ public R call(T t1) {
* the Observables obtained from this transformation
*/
public static <T, R> Observable<R> mapMany(Observable<T> sequence, Func1<Observable<R>, T> func) {
return wrap(OperationMap.mapMany(sequence, func));
return OperationMap.mapMany(sequence, func);
}

/**
Expand Down Expand Up @@ -598,7 +598,7 @@ public static <T> Observable<Notification<T>> materialize(final Observable<T> se
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(List<Observable<T>> source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -616,7 +616,7 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<Observable<T>> source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -634,7 +634,7 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> merge(Observable<T>... source) {
return wrap(OperationMerge.merge(source));
return OperationMerge.merge(source);
}

/**
Expand All @@ -654,7 +654,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -674,7 +674,7 @@ public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -694,7 +694,7 @@ public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
*/
public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
return wrap(OperationMergeDelayError.mergeDelayError(source));
return OperationMergeDelayError.mergeDelayError(source);
}

/**
Expand All @@ -710,7 +710,7 @@ public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
* @return a Observable that never sends any information to a Observer
*/
public static <T> Observable<T> never() {
return wrap(new NeverObservable<T>());
return new NeverObservable<T>();
}

/**
Expand Down Expand Up @@ -749,7 +749,7 @@ public static Subscription noOpSubscription() {
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Func1<Observable<T>, Exception> resumeFunction) {
return wrap(OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction));
return OperationOnErrorResumeNextViaFunction.onErrorResumeNextViaFunction(that, resumeFunction);
}

/**
Expand Down Expand Up @@ -812,7 +812,7 @@ public Observable<T> call(Exception e) {
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, final Observable<T> resumeSequence) {
return wrap(OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence));
return OperationOnErrorResumeNextViaObservable.onErrorResumeNextViaObservable(that, resumeSequence);
}

/**
Expand All @@ -839,7 +839,7 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
* @return the source Observable, with its behavior modified as described
*/
public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<T, Exception> resumeFunction) {
return wrap(OperationOnErrorReturn.onErrorReturn(that, resumeFunction));
return OperationOnErrorReturn.onErrorReturn(that, resumeFunction);
}

/**
Expand Down Expand Up @@ -870,7 +870,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<T,
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, accumulator).last());
return OperationScan.scan(sequence, accumulator).last();
}

/**
Expand Down Expand Up @@ -941,7 +941,7 @@ public T call(T t1, T t2) {
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
*/
public static <T> Observable<T> reduce(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, initialValue, accumulator).last());
return OperationScan.scan(sequence, initialValue, accumulator).last();
}

/**
Expand Down Expand Up @@ -1004,7 +1004,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(Observable<T> sequence, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, accumulator));
return OperationScan.scan(sequence, accumulator);
}

/**
Expand Down Expand Up @@ -1061,7 +1061,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
*/
public static <T> Observable<T> scan(Observable<T> sequence, T initialValue, Func2<T, T, T> accumulator) {
return wrap(OperationScan.scan(sequence, initialValue, accumulator));
return OperationScan.scan(sequence, initialValue, accumulator);
}

/**
Expand Down Expand Up @@ -1114,7 +1114,7 @@ public T call(T t1, T t2) {
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">MSDN: Observable.Skip Method</a>
*/
public static <T> Observable<T> skip(final Observable<T> items, int num) {
return wrap(OperationSkip.skip(items, num));
return OperationSkip.skip(items, num);
}

/**
Expand All @@ -1132,7 +1132,7 @@ public static <T> Observable<T> skip(final Observable<T> items, int num) {
* @return a Observable that is a chronologically well-behaved version of the source Observable
*/
public static <T> Observable<T> synchronize(Observable<T> observable) {
return wrap(OperationSynchronize.synchronize(observable));
return OperationSynchronize.synchronize(observable);
}

/**
Expand All @@ -1155,7 +1155,7 @@ public static <T> Observable<T> synchronize(Observable<T> observable) {
* Observable
*/
public static <T> Observable<T> take(final Observable<T> items, final int num) {
return wrap(OperationTake.take(items, num));
return OperationTake.take(items, num);
}

/**
Expand All @@ -1178,7 +1178,7 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
* items emitted by the source Observable
*/
public static <T> Observable<List<T>> toList(final Observable<T> that) {
return wrap(OperationToObservableList.toObservableList(that));
return OperationToObservableList.toObservableList(that);
}

/**
Expand All @@ -1198,7 +1198,7 @@ public static <T> Observable<List<T>> toList(final Observable<T> that) {
* @return a Observable that emits each item in the source Iterable sequence
*/
public static <T> Observable<T> toObservable(Iterable<T> iterable) {
return wrap(OperationToObservableIterable.toObservableIterable(iterable));
return OperationToObservableIterable.toObservableIterable(iterable);
}

/**
Expand Down Expand Up @@ -1233,7 +1233,7 @@ public static <T> Observable<T> toObservable(T... items) {
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
return wrap(OperationToObservableSortedList.toSortedList(sequence));
return OperationToObservableSortedList.toSortedList(sequence);
}

/**
Expand All @@ -1247,7 +1247,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2<Integer, T, T> sortFunction) {
return wrap(OperationToObservableSortedList.toSortedList(sequence, sortFunction));
return OperationToObservableSortedList.toSortedList(sequence, sortFunction);
}

/**
Expand All @@ -1261,40 +1261,14 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
* @return
*/
public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, final Object sortFunction) {
return wrap(OperationToObservableSortedList.toSortedList(sequence, new Func2<Integer, T, T>() {
return OperationToObservableSortedList.toSortedList(sequence, new Func2<Integer, T, T>() {

@Override
public Integer call(T t1, T t2) {
return Functions.execute(sortFunction, t1, t2);
}

}));
}

/**
* Allow wrapping responses with the <code>AbstractObservable</code> so that we have all of
* the utility methods available for subscribing.
* <p>
* This is not expected to benefit Java usage, but is intended for dynamic script which are a primary target of the Observable operations.
* <p>
* Since they are dynamic they can execute the "hidden" methods on <code>AbstractObservable</code> while appearing to only receive an <code>Observable</code> without first casting.
*
* @param o
* @return
*/
private static <T> Observable<T> wrap(final Observable<T> o) {
if (o instanceof Observable) {
// if the Observable is already an AbstractObservable, don't wrap it again.
return (Observable<T>) o;
}
return new Observable<T>() {

@Override
public Subscription subscribe(Observer<T> observer) {
return o.subscribe(observer);
}

};
});
}

/**
Expand Down Expand Up @@ -1322,7 +1296,7 @@ public Subscription subscribe(Observer<T> observer) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Func2<R, T0, T1> reduceFunction) {
return wrap(OperationZip.zip(w0, w1, reduceFunction));
return OperationZip.zip(w0, w1, reduceFunction);
}

/**
Expand Down Expand Up @@ -1389,7 +1363,7 @@ public R call(T0 t0, T1 t1) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Func3<R, T0, T1, T2> function) {
return wrap(OperationZip.zip(w0, w1, w2, function));
return OperationZip.zip(w0, w1, w2, function);
}

/**
Expand Down Expand Up @@ -1461,7 +1435,7 @@ public R call(T0 t0, T1 t1, T2 t2) {
* @return a Observable that emits the zipped results
*/
public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observable<T1> w1, Observable<T2> w2, Observable<T3> w3, Func4<R, T0, T1, T2, T3> reduceFunction) {
return wrap(OperationZip.zip(w0, w1, w2, w3, reduceFunction));
return OperationZip.zip(w0, w1, w2, w3, reduceFunction);
}

/**
Expand Down