@@ -106,7 +106,7 @@ object RxImplicits {
106
106
* type never escapes the for-comprehension
107
107
*/
108
108
implicit class ScalaObservable [A ](wrapped : Observable [A ]) {
109
- def map [B ](f : A => B ): Observable [B ] = wrapped.map(f)
109
+ def map [B ](f : A => B ): Observable [B ] = wrapped.map[ B ] (f)
110
110
def flatMap [B ](f : A => Observable [B ]): Observable [B ] = wrapped.mapMany(f)
111
111
def foreach (f : A => Unit ): Unit = wrapped.toBlockingObservable.forEach(f)
112
112
def withFilter (p : A => Boolean ): WithFilter = new WithFilter (p)
@@ -147,7 +147,7 @@ class UnitTestSuite extends JUnitSuite {
147
147
class ObservableWithException (s : Subscription , values : String * ) extends Observable [String ] {
148
148
var t : Thread = null
149
149
150
- override def subscribe (observer : Observer [String ]): Subscription = {
150
+ override def subscribe (observer : Observer [_ >: String ]): Subscription = {
151
151
println(" ObservableWithException subscribed to ..." )
152
152
t = new Thread (new Runnable () {
153
153
override def run () {
@@ -272,6 +272,18 @@ class UnitTestSuite extends JUnitSuite {
272
272
assertSubscribeReceives(synchronized )(1 , 2 , 3 )
273
273
}
274
274
275
+ @ Test def testZip2 () {
276
+ val colors : Observable [String ] = Observable .from(" red" , " green" , " blue" )
277
+ val names : Observable [String ] = Observable .from(" lion-o" , " cheetara" , " panthro" )
278
+
279
+ case class Character (color : String , name : String )
280
+
281
+ val cheetara = Character (" green" , " cheetara" )
282
+ val panthro = Character (" blue" , " panthro" )
283
+ val characters = Observable .zip[Character , String , String ](colors, names, Character .apply _)
284
+ assertSubscribeReceives(characters)(cheetara, panthro)
285
+ }
286
+
275
287
@ Test def testZip3 () {
276
288
val numbers = Observable .from(1 , 2 , 3 )
277
289
val colors = Observable .from(" red" , " green" , " blue" )
@@ -283,7 +295,7 @@ class UnitTestSuite extends JUnitSuite {
283
295
val cheetara = Character (2 , " green" , " cheetara" )
284
296
val panthro = Character (3 , " blue" , " panthro" )
285
297
286
- val characters = Observable .zip(numbers, colors, names, Character .apply _)
298
+ val characters = Observable .zip[ Character , Int , String , String ] (numbers, colors, names, Character .apply _)
287
299
assertSubscribeReceives(characters)(liono, cheetara, panthro)
288
300
}
289
301
@@ -299,7 +311,7 @@ class UnitTestSuite extends JUnitSuite {
299
311
val cheetara = Character (2 , " green" , " cheetara" , false )
300
312
val panthro = Character (3 , " blue" , " panthro" , false )
301
313
302
- val characters = Observable .zip(numbers, colors, names, isLeader, Character .apply _)
314
+ val characters = Observable .zip[ Character , Int , String , String , Boolean ] (numbers, colors, names, isLeader, Character .apply _)
303
315
assertSubscribeReceives(characters)(liono, cheetara, panthro)
304
316
}
305
317
@@ -459,18 +471,9 @@ class UnitTestSuite extends JUnitSuite {
459
471
assertSubscribeReceives(skipped)(3 , 4 )
460
472
}
461
473
462
- /**
463
- * Both testTake and testTakeWhileWithIndex exposed a bug with unsubscribes not properly propagating.
464
- * observable.take(2) produces onNext(first), onNext(second), and 4 onCompleteds
465
- * it should produce onNext(first), onNext(second), and 1 onCompleted
466
- *
467
- * Switching to Observable.create(OperationTake.take(observable, 2)) works as expected
468
- */
469
474
@ Test def testTake {
470
- import rx .operators ._
471
-
472
475
val observable = Observable .from(1 , 2 , 3 , 4 , 5 )
473
- val took = Observable .create( OperationTake . take(observable, 2 ) )
476
+ val took = observable. take(2 )
474
477
assertSubscribeReceives(took)(1 , 2 )
475
478
}
476
479
@@ -480,11 +483,11 @@ class UnitTestSuite extends JUnitSuite {
480
483
assertSubscribeReceives(took)(1 , 3 , 5 )
481
484
}
482
485
483
- /* @Test def testTakeWhileWithIndex {
484
- val observable = Observable.from(1, 3, 5, 6 , 7, 9, 11, 12, 13, 15, 17)
485
- val took = observable.takeWhileWithIndex((i: Int, idx: Int) => isOdd(i) && idx > 4 )
486
- assertSubscribeReceives(took)(9, 11)
487
- } */
486
+ @ Test def testTakeWhileWithIndex {
487
+ val observable = Observable .from(1 , 3 , 5 , 7 , 9 , 11 , 12 , 13 , 15 , 17 )
488
+ val took = observable.takeWhileWithIndex((i : Int , idx : Int ) => isOdd(i) && idx < 8 )
489
+ assertSubscribeReceives(took)(1 , 3 , 5 , 7 , 9 , 11 )
490
+ }
488
491
489
492
@ Test def testTakeLast {
490
493
val observable = Observable .from(1 , 2 , 3 , 4 , 5 , 6 , 7 , 8 , 9 )
0 commit comments