@@ -304,7 +304,7 @@ public Subscription subscribe(Observer<T> observer) {
304
304
* @return a Observable that, when a Observer subscribes to it, will execute the given function
305
305
*/
306
306
public static <T > Observable <T > create (Func1 <Subscription , Observer <T >> func ) {
307
- return wrap ( OperationToObservableFunction .toObservableFunction (func ) );
307
+ return OperationToObservableFunction .toObservableFunction (func );
308
308
}
309
309
310
310
/**
@@ -367,7 +367,7 @@ public static <T> Observable<T> empty() {
367
367
* @return a Observable object that calls <code>onError</code> when a Observer subscribes
368
368
*/
369
369
public static <T > Observable <T > error (Exception exception ) {
370
- return wrap ( new ThrowObservable <T >(exception ) );
370
+ return new ThrowObservable <T >(exception );
371
371
}
372
372
373
373
/**
@@ -455,7 +455,7 @@ public static <T> Observable<T> just(T value) {
455
455
* by the source Observable
456
456
*/
457
457
public static <T > Observable <T > last (final Observable <T > that ) {
458
- return wrap ( OperationLast .last (that ) );
458
+ return OperationLast .last (that );
459
459
}
460
460
461
461
/**
@@ -477,7 +477,7 @@ public static <T> Observable<T> last(final Observable<T> that) {
477
477
* in the sequence emitted by the source Observable
478
478
*/
479
479
public static <T , R > Observable <R > map (Observable <T > sequence , Func1 <R , T > func ) {
480
- return wrap ( OperationMap .map (sequence , func ) );
480
+ return OperationMap .map (sequence , func );
481
481
}
482
482
483
483
/**
@@ -532,7 +532,7 @@ public R call(T t1) {
532
532
* the Observables obtained from this transformation
533
533
*/
534
534
public static <T , R > Observable <R > mapMany (Observable <T > sequence , Func1 <Observable <R >, T > func ) {
535
- return wrap ( OperationMap .mapMany (sequence , func ) );
535
+ return OperationMap .mapMany (sequence , func );
536
536
}
537
537
538
538
/**
@@ -598,7 +598,7 @@ public static <T> Observable<Notification<T>> materialize(final Observable<T> se
598
598
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
599
599
*/
600
600
public static <T > Observable <T > merge (List <Observable <T >> source ) {
601
- return wrap ( OperationMerge .merge (source ) );
601
+ return OperationMerge .merge (source );
602
602
}
603
603
604
604
/**
@@ -616,7 +616,7 @@ public static <T> Observable<T> merge(List<Observable<T>> source) {
616
616
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
617
617
*/
618
618
public static <T > Observable <T > merge (Observable <Observable <T >> source ) {
619
- return wrap ( OperationMerge .merge (source ) );
619
+ return OperationMerge .merge (source );
620
620
}
621
621
622
622
/**
@@ -634,7 +634,7 @@ public static <T> Observable<T> merge(Observable<Observable<T>> source) {
634
634
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
635
635
*/
636
636
public static <T > Observable <T > merge (Observable <T >... source ) {
637
- return wrap ( OperationMerge .merge (source ) );
637
+ return OperationMerge .merge (source );
638
638
}
639
639
640
640
/**
@@ -654,7 +654,7 @@ public static <T> Observable<T> merge(Observable<T>... source) {
654
654
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
655
655
*/
656
656
public static <T > Observable <T > mergeDelayError (List <Observable <T >> source ) {
657
- return wrap ( OperationMergeDelayError .mergeDelayError (source ) );
657
+ return OperationMergeDelayError .mergeDelayError (source );
658
658
}
659
659
660
660
/**
@@ -674,7 +674,7 @@ public static <T> Observable<T> mergeDelayError(List<Observable<T>> source) {
674
674
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
675
675
*/
676
676
public static <T > Observable <T > mergeDelayError (Observable <Observable <T >> source ) {
677
- return wrap ( OperationMergeDelayError .mergeDelayError (source ) );
677
+ return OperationMergeDelayError .mergeDelayError (source );
678
678
}
679
679
680
680
/**
@@ -694,7 +694,7 @@ public static <T> Observable<T> mergeDelayError(Observable<Observable<T>> source
694
694
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229099(v=vs.103).aspx">MSDN: Observable.Merge Method</a>
695
695
*/
696
696
public static <T > Observable <T > mergeDelayError (Observable <T >... source ) {
697
- return wrap ( OperationMergeDelayError .mergeDelayError (source ) );
697
+ return OperationMergeDelayError .mergeDelayError (source );
698
698
}
699
699
700
700
/**
@@ -710,7 +710,7 @@ public static <T> Observable<T> mergeDelayError(Observable<T>... source) {
710
710
* @return a Observable that never sends any information to a Observer
711
711
*/
712
712
public static <T > Observable <T > never () {
713
- return wrap ( new NeverObservable <T >() );
713
+ return new NeverObservable <T >();
714
714
}
715
715
716
716
/**
@@ -749,7 +749,7 @@ public static Subscription noOpSubscription() {
749
749
* @return the source Observable, with its behavior modified as described
750
750
*/
751
751
public static <T > Observable <T > onErrorResumeNext (final Observable <T > that , final Func1 <Observable <T >, Exception > resumeFunction ) {
752
- return wrap ( OperationOnErrorResumeNextViaFunction .onErrorResumeNextViaFunction (that , resumeFunction ) );
752
+ return OperationOnErrorResumeNextViaFunction .onErrorResumeNextViaFunction (that , resumeFunction );
753
753
}
754
754
755
755
/**
@@ -812,7 +812,7 @@ public Observable<T> call(Exception e) {
812
812
* @return the source Observable, with its behavior modified as described
813
813
*/
814
814
public static <T > Observable <T > onErrorResumeNext (final Observable <T > that , final Observable <T > resumeSequence ) {
815
- return wrap ( OperationOnErrorResumeNextViaObservable .onErrorResumeNextViaObservable (that , resumeSequence ) );
815
+ return OperationOnErrorResumeNextViaObservable .onErrorResumeNextViaObservable (that , resumeSequence );
816
816
}
817
817
818
818
/**
@@ -839,7 +839,7 @@ public static <T> Observable<T> onErrorResumeNext(final Observable<T> that, fina
839
839
* @return the source Observable, with its behavior modified as described
840
840
*/
841
841
public static <T > Observable <T > onErrorReturn (final Observable <T > that , Func1 <T , Exception > resumeFunction ) {
842
- return wrap ( OperationOnErrorReturn .onErrorReturn (that , resumeFunction ) );
842
+ return OperationOnErrorReturn .onErrorReturn (that , resumeFunction );
843
843
}
844
844
845
845
/**
@@ -870,7 +870,7 @@ public static <T> Observable<T> onErrorReturn(final Observable<T> that, Func1<T,
870
870
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
871
871
*/
872
872
public static <T > Observable <T > reduce (Observable <T > sequence , Func2 <T , T , T > accumulator ) {
873
- return wrap ( OperationScan .scan (sequence , accumulator ).last () );
873
+ return OperationScan .scan (sequence , accumulator ).last ();
874
874
}
875
875
876
876
/**
@@ -941,7 +941,7 @@ public T call(T t1, T t2) {
941
941
* @see <a href="http://en.wikipedia.org/wiki/Fold_(higher-order_function)">Wikipedia: Fold (higher-order function)</a>
942
942
*/
943
943
public static <T > Observable <T > reduce (Observable <T > sequence , T initialValue , Func2 <T , T , T > accumulator ) {
944
- return wrap ( OperationScan .scan (sequence , initialValue , accumulator ).last () );
944
+ return OperationScan .scan (sequence , initialValue , accumulator ).last ();
945
945
}
946
946
947
947
/**
@@ -1004,7 +1004,7 @@ public T call(T t1, T t2) {
1004
1004
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
1005
1005
*/
1006
1006
public static <T > Observable <T > scan (Observable <T > sequence , Func2 <T , T , T > accumulator ) {
1007
- return wrap ( OperationScan .scan (sequence , accumulator ) );
1007
+ return OperationScan .scan (sequence , accumulator );
1008
1008
}
1009
1009
1010
1010
/**
@@ -1061,7 +1061,7 @@ public T call(T t1, T t2) {
1061
1061
* @see <a href="http://msdn.microsoft.com/en-us/library/hh211665(v%3Dvs.103).aspx">MSDN: Observable.Scan</a>
1062
1062
*/
1063
1063
public static <T > Observable <T > scan (Observable <T > sequence , T initialValue , Func2 <T , T , T > accumulator ) {
1064
- return wrap ( OperationScan .scan (sequence , initialValue , accumulator ) );
1064
+ return OperationScan .scan (sequence , initialValue , accumulator );
1065
1065
}
1066
1066
1067
1067
/**
@@ -1114,7 +1114,7 @@ public T call(T t1, T t2) {
1114
1114
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229847(v=vs.103).aspx">MSDN: Observable.Skip Method</a>
1115
1115
*/
1116
1116
public static <T > Observable <T > skip (final Observable <T > items , int num ) {
1117
- return wrap ( OperationSkip .skip (items , num ) );
1117
+ return OperationSkip .skip (items , num );
1118
1118
}
1119
1119
1120
1120
/**
@@ -1132,7 +1132,7 @@ public static <T> Observable<T> skip(final Observable<T> items, int num) {
1132
1132
* @return a Observable that is a chronologically well-behaved version of the source Observable
1133
1133
*/
1134
1134
public static <T > Observable <T > synchronize (Observable <T > observable ) {
1135
- return wrap ( OperationSynchronize .synchronize (observable ) );
1135
+ return OperationSynchronize .synchronize (observable );
1136
1136
}
1137
1137
1138
1138
/**
@@ -1155,7 +1155,7 @@ public static <T> Observable<T> synchronize(Observable<T> observable) {
1155
1155
* Observable
1156
1156
*/
1157
1157
public static <T > Observable <T > take (final Observable <T > items , final int num ) {
1158
- return wrap ( OperationTake .take (items , num ) );
1158
+ return OperationTake .take (items , num );
1159
1159
}
1160
1160
1161
1161
/**
@@ -1178,7 +1178,7 @@ public static <T> Observable<T> take(final Observable<T> items, final int num) {
1178
1178
* items emitted by the source Observable
1179
1179
*/
1180
1180
public static <T > Observable <List <T >> toList (final Observable <T > that ) {
1181
- return wrap ( OperationToObservableList .toObservableList (that ) );
1181
+ return OperationToObservableList .toObservableList (that );
1182
1182
}
1183
1183
1184
1184
/**
@@ -1198,7 +1198,7 @@ public static <T> Observable<List<T>> toList(final Observable<T> that) {
1198
1198
* @return a Observable that emits each item in the source Iterable sequence
1199
1199
*/
1200
1200
public static <T > Observable <T > toObservable (Iterable <T > iterable ) {
1201
- return wrap ( OperationToObservableIterable .toObservableIterable (iterable ) );
1201
+ return OperationToObservableIterable .toObservableIterable (iterable );
1202
1202
}
1203
1203
1204
1204
/**
@@ -1233,7 +1233,7 @@ public static <T> Observable<T> toObservable(T... items) {
1233
1233
* @return
1234
1234
*/
1235
1235
public static <T > Observable <List <T >> toSortedList (Observable <T > sequence ) {
1236
- return wrap ( OperationToObservableSortedList .toSortedList (sequence ) );
1236
+ return OperationToObservableSortedList .toSortedList (sequence );
1237
1237
}
1238
1238
1239
1239
/**
@@ -1247,7 +1247,7 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence) {
1247
1247
* @return
1248
1248
*/
1249
1249
public static <T > Observable <List <T >> toSortedList (Observable <T > sequence , Func2 <Integer , T , T > sortFunction ) {
1250
- return wrap ( OperationToObservableSortedList .toSortedList (sequence , sortFunction ) );
1250
+ return OperationToObservableSortedList .toSortedList (sequence , sortFunction );
1251
1251
}
1252
1252
1253
1253
/**
@@ -1261,40 +1261,14 @@ public static <T> Observable<List<T>> toSortedList(Observable<T> sequence, Func2
1261
1261
* @return
1262
1262
*/
1263
1263
public static <T > Observable <List <T >> toSortedList (Observable <T > sequence , final Object sortFunction ) {
1264
- return wrap ( OperationToObservableSortedList .toSortedList (sequence , new Func2 <Integer , T , T >() {
1264
+ return OperationToObservableSortedList .toSortedList (sequence , new Func2 <Integer , T , T >() {
1265
1265
1266
1266
@ Override
1267
1267
public Integer call (T t1 , T t2 ) {
1268
1268
return Functions .execute (sortFunction , t1 , t2 );
1269
1269
}
1270
1270
1271
- }));
1272
- }
1273
-
1274
- /**
1275
- * Allow wrapping responses with the <code>AbstractObservable</code> so that we have all of
1276
- * the utility methods available for subscribing.
1277
- * <p>
1278
- * This is not expected to benefit Java usage, but is intended for dynamic script which are a primary target of the Observable operations.
1279
- * <p>
1280
- * Since they are dynamic they can execute the "hidden" methods on <code>AbstractObservable</code> while appearing to only receive an <code>Observable</code> without first casting.
1281
- *
1282
- * @param o
1283
- * @return
1284
- */
1285
- private static <T > Observable <T > wrap (final Observable <T > o ) {
1286
- if (o instanceof Observable ) {
1287
- // if the Observable is already an AbstractObservable, don't wrap it again.
1288
- return (Observable <T >) o ;
1289
- }
1290
- return new Observable <T >() {
1291
-
1292
- @ Override
1293
- public Subscription subscribe (Observer <T > observer ) {
1294
- return o .subscribe (observer );
1295
- }
1296
-
1297
- };
1271
+ });
1298
1272
}
1299
1273
1300
1274
/**
@@ -1322,7 +1296,7 @@ public Subscription subscribe(Observer<T> observer) {
1322
1296
* @return a Observable that emits the zipped results
1323
1297
*/
1324
1298
public static <R , T0 , T1 > Observable <R > zip (Observable <T0 > w0 , Observable <T1 > w1 , Func2 <R , T0 , T1 > reduceFunction ) {
1325
- return wrap ( OperationZip .zip (w0 , w1 , reduceFunction ) );
1299
+ return OperationZip .zip (w0 , w1 , reduceFunction );
1326
1300
}
1327
1301
1328
1302
/**
@@ -1389,7 +1363,7 @@ public R call(T0 t0, T1 t1) {
1389
1363
* @return a Observable that emits the zipped results
1390
1364
*/
1391
1365
public static <R , T0 , T1 , T2 > Observable <R > zip (Observable <T0 > w0 , Observable <T1 > w1 , Observable <T2 > w2 , Func3 <R , T0 , T1 , T2 > function ) {
1392
- return wrap ( OperationZip .zip (w0 , w1 , w2 , function ) );
1366
+ return OperationZip .zip (w0 , w1 , w2 , function );
1393
1367
}
1394
1368
1395
1369
/**
@@ -1461,7 +1435,7 @@ public R call(T0 t0, T1 t1, T2 t2) {
1461
1435
* @return a Observable that emits the zipped results
1462
1436
*/
1463
1437
public static <R , T0 , T1 , T2 , T3 > Observable <R > zip (Observable <T0 > w0 , Observable <T1 > w1 , Observable <T2 > w2 , Observable <T3 > w3 , Func4 <R , T0 , T1 , T2 , T3 > reduceFunction ) {
1464
- return wrap ( OperationZip .zip (w0 , w1 , w2 , w3 , reduceFunction ) );
1438
+ return OperationZip .zip (w0 , w1 , w2 , w3 , reduceFunction );
1465
1439
}
1466
1440
1467
1441
/**
0 commit comments