77
77
import rx .util .OnErrorNotImplementedException ;
78
78
import rx .util .Range ;
79
79
import rx .util .Timestamped ;
80
+ import rx .util .functions .Action ;
80
81
import rx .util .functions .Action0 ;
81
82
import rx .util .functions .Action1 ;
82
83
import rx .util .functions .Func0 ;
@@ -249,56 +250,6 @@ private Subscription protectivelyWrapAndSubscribe(Observer<T> o) {
249
250
return subscription .wrap (subscribe (new SafeObserver <T >(subscription , o )));
250
251
}
251
252
252
- @ SuppressWarnings ({ "rawtypes" , "unchecked" })
253
- public Subscription subscribe (final Map <String , Object > callbacks ) {
254
- if (callbacks == null ) {
255
- throw new RuntimeException ("callbacks map can not be null" );
256
- }
257
- Object _onNext = callbacks .get ("onNext" );
258
- if (_onNext == null ) {
259
- throw new RuntimeException ("'onNext' key must contain an implementation" );
260
- }
261
- // lookup and memoize onNext
262
- final FuncN onNext = Functions .from (_onNext );
263
-
264
- /**
265
- * Wrapping since raw functions provided by the user are being invoked.
266
- *
267
- * See https://github.com/Netflix/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls to user code from within an operator"
268
- */
269
- return protectivelyWrapAndSubscribe (new Observer () {
270
-
271
- @ Override
272
- public void onCompleted () {
273
- Object onComplete = callbacks .get ("onCompleted" );
274
- if (onComplete != null ) {
275
- Functions .from (onComplete ).call ();
276
- }
277
- }
278
-
279
- @ Override
280
- public void onError (Throwable e ) {
281
- handleError (e );
282
- Object onError = callbacks .get ("onError" );
283
- if (onError != null ) {
284
- Functions .from (onError ).call (e );
285
- } else {
286
- throw new OnErrorNotImplementedException (e );
287
- }
288
- }
289
-
290
- @ Override
291
- public void onNext (Object args ) {
292
- onNext .call (args );
293
- }
294
-
295
- });
296
- }
297
-
298
- public Subscription subscribe (final Map <String , Object > callbacks , Scheduler scheduler ) {
299
- return subscribeOn (scheduler ).subscribe (callbacks );
300
- }
301
-
302
253
public Subscription subscribe (final Action1 <T > onNext ) {
303
254
if (onNext == null ) {
304
255
throw new IllegalArgumentException ("onNext can not be null" );
@@ -1086,13 +1037,13 @@ public static <R, T0, T1, T2, T3> Observable<R> zip(Observable<T0> w0, Observabl
1086
1037
* each time an event is received from one of the source observables, where the aggregation is defined by the given function.
1087
1038
* <p>
1088
1039
* <img width="640" src="https://github.com/Netflix/RxJava/wiki/images/rx-operators/combineLatest.png">
1089
- *
1040
+ *
1090
1041
* @param w0
1091
- * The first source observable.
1042
+ * The first source observable.
1092
1043
* @param w1
1093
- * The second source observable.
1044
+ * The second source observable.
1094
1045
* @param combineFunction
1095
- * The aggregation function used to combine the source observable values.
1046
+ * The aggregation function used to combine the source observable values.
1096
1047
* @return An Observable that combines the source Observables with the given combine function
1097
1048
*/
1098
1049
public static <R , T0 , T1 > Observable <R > combineLatest (Observable <T0 > w0 , Observable <T1 > w1 , Func2 <T0 , T1 , R > combineFunction ) {
@@ -1112,7 +1063,7 @@ public static <R, T0, T1, T2> Observable<R> combineLatest(Observable<T0> w0, Obs
1112
1063
public static <R , T0 , T1 , T2 , T3 > Observable <R > combineLatest (Observable <T0 > w0 , Observable <T1 > w1 , Observable <T2 > w2 , Observable <T3 > w3 , Func4 <T0 , T1 , T2 , T3 , R > combineFunction ) {
1113
1064
return create (OperationCombineLatest .combineLatest (w0 , w1 , w2 , w3 , combineFunction ));
1114
1065
}
1115
-
1066
+
1116
1067
/**
1117
1068
* Creates an Observable which produces buffers of collected values.
1118
1069
*
0 commit comments