Skip to content

Commit 54b19be

Browse files
Fix Unit Tests related to SubscribeOn
- timeout test could be interrupted when unsubscribed - groupBy.subscribeOn needs blocking buffer
1 parent 43437fe commit 54b19be

File tree

2 files changed

+20
-11
lines changed

2 files changed

+20
-11
lines changed

rxjava-core/src/test/java/rx/operators/OperatorSubscribeOnTest.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -300,14 +300,15 @@ public void call() {
300300

301301
});
302302
} else {
303-
return group.subscribeOn(Schedulers.newThread()).delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
303+
return group.nest().lift(new OperatorSubscribeOn<Integer>(Schedulers.newThread(), 0))
304+
.delay(400, TimeUnit.MILLISECONDS).map(new Func1<Integer, String>() {
304305

305-
@Override
306-
public String call(Integer t1) {
307-
return "last group: " + t1;
308-
}
306+
@Override
307+
public String call(Integer t1) {
308+
return "last group: " + t1;
309+
}
309310

310-
});
311+
});
311312
}
312313
}
313314

rxjava-core/src/test/java/rx/operators/OperatorTimeoutWithSelectorTest.java

Lines changed: 13 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import rx.Observable.OnSubscribe;
3232
import rx.Observer;
3333
import rx.Subscriber;
34+
import rx.observers.TestSubscriber;
3435
import rx.schedulers.Schedulers;
3536
import rx.subjects.PublishSubject;
3637
import rx.subscriptions.Subscriptions;
@@ -334,26 +335,31 @@ public void testTimeoutSelectorWithTimeoutAndOnNextRaceCondition() throws Interr
334335
public Observable<Integer> call(Integer t1) {
335336
if (t1 == 1) {
336337
// Force "unsubscribe" run on another thread
337-
return Observable.create(new OnSubscribe<Integer>(){
338+
return Observable.create(new OnSubscribe<Integer>() {
338339
@Override
339340
public void call(Subscriber<? super Integer> subscriber) {
340-
subscriber.add(Subscriptions.create(new Action0(){
341+
subscriber.add(Subscriptions.create(new Action0() {
341342
@Override
342343
public void call() {
343344
try {
344345
// emulate "unsubscribe" is busy and finishes after timeout.onNext(1)
345346
timeoutEmittedOne.await();
346347
} catch (InterruptedException e) {
348+
// if we are interrupted then we complete (as this can happen when unsubscribed)
349+
observerCompleted.countDown();
347350
e.printStackTrace();
348351
}
349-
}}));
352+
}
353+
}));
350354
// force the timeout message be sent after observer.onNext(2)
351355
try {
352356
observerReceivedTwo.await();
353357
} catch (InterruptedException e) {
358+
// if we are interrupted then we complete (as this can happen when unsubscribed)
359+
observerCompleted.countDown();
354360
e.printStackTrace();
355361
}
356-
if(!subscriber.isUnsubscribed()) {
362+
if (!subscriber.isUnsubscribed()) {
357363
subscriber.onNext(1);
358364
timeoutEmittedOne.countDown();
359365
}
@@ -386,12 +392,14 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
386392

387393
}).when(o).onCompleted();
388394

395+
final TestSubscriber<Integer> ts = new TestSubscriber<Integer>(o);
396+
389397
new Thread(new Runnable() {
390398

391399
@Override
392400
public void run() {
393401
PublishSubject<Integer> source = PublishSubject.create();
394-
source.timeout(timeoutFunc, Observable.from(3)).subscribe(o);
402+
source.timeout(timeoutFunc, Observable.from(3)).subscribe(ts);
395403
source.onNext(1); // start timeout
396404
source.onNext(2); // disable timeout
397405
try {

0 commit comments

Comments
 (0)