Skip to content

Commit ea3cbbf

Browse files
ObserveOn and SubscribeOn concurrency unit tests
- these are very rudimentary and may have a determinism problem due to the Thread.sleep
1 parent c9c816e commit ea3cbbf

File tree

1 file changed

+134
-0
lines changed

1 file changed

+134
-0
lines changed

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import org.junit.Test;
2829

@@ -395,4 +396,137 @@ public Subscription call(Scheduler scheduler, String state) {
395396
}
396397
}
397398

399+
@Test
400+
public void testConcurrentOnNextFailsValidation() throws InterruptedException {
401+
402+
final int count = 10;
403+
final CountDownLatch latch = new CountDownLatch(count);
404+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
405+
406+
@Override
407+
public Subscription call(final Observer<String> observer) {
408+
for (int i = 0; i < count; i++) {
409+
final int v = i;
410+
new Thread(new Runnable() {
411+
412+
@Override
413+
public void run() {
414+
observer.onNext("v: " + v);
415+
416+
latch.countDown();
417+
}
418+
}).start();
419+
}
420+
return Subscriptions.empty();
421+
}
422+
});
423+
424+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
425+
// this should call onNext concurrently
426+
o.subscribe(observer);
427+
428+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
429+
fail("timed out");
430+
}
431+
432+
if (observer.error.get() == null) {
433+
fail("We expected error messages due to concurrency");
434+
}
435+
}
436+
437+
@Test
438+
public void testObserveOn() throws InterruptedException {
439+
440+
Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");
441+
442+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
443+
444+
o.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
445+
446+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
447+
fail("timed out");
448+
}
449+
450+
if (observer.error.get() != null) {
451+
observer.error.get().printStackTrace();
452+
fail("Error: " + observer.error.get().getMessage());
453+
}
454+
}
455+
456+
@Test
457+
public void testSubscribeOnNestedConcurrency() throws InterruptedException {
458+
459+
Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten")
460+
.mapMany(new Func1<String, Observable<String>>() {
461+
462+
@Override
463+
public Observable<String> call(final String v) {
464+
return Observable.create(new Func1<Observer<String>, Subscription>() {
465+
466+
@Override
467+
public Subscription call(final Observer<String> observer) {
468+
observer.onNext("value_after_map-" + v);
469+
observer.onCompleted();
470+
return Subscriptions.empty();
471+
}
472+
}).subscribeOn(Schedulers.newThread()); // subscribe on a new thread
473+
}
474+
});
475+
476+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
477+
478+
o.subscribe(observer);
479+
480+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
481+
fail("timed out");
482+
}
483+
484+
if (observer.error.get() != null) {
485+
observer.error.get().printStackTrace();
486+
fail("Error: " + observer.error.get().getMessage());
487+
}
488+
}
489+
490+
/**
491+
* Used to determine if onNext is being invoked concurrently.
492+
*
493+
* @param <T>
494+
*/
495+
private static class ConcurrentObserverValidator<T> implements Observer<T> {
496+
497+
final AtomicInteger concurrentCounter = new AtomicInteger();
498+
final AtomicReference<Exception> error = new AtomicReference<Exception>();
499+
final CountDownLatch completed = new CountDownLatch(1);
500+
501+
@Override
502+
public void onCompleted() {
503+
completed.countDown();
504+
}
505+
506+
@Override
507+
public void onError(Exception e) {
508+
completed.countDown();
509+
error.set(e);
510+
}
511+
512+
@Override
513+
public void onNext(T args) {
514+
int count = concurrentCounter.incrementAndGet();
515+
System.out.println("ConcurrentObserverValidator.onNext: " + args);
516+
if (count > 1) {
517+
onError(new RuntimeException("we should not have concurrent execution of onNext"));
518+
}
519+
try {
520+
try {
521+
// take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping)
522+
Thread.sleep(50);
523+
} catch (InterruptedException e) {
524+
// ignore
525+
}
526+
} finally {
527+
concurrentCounter.decrementAndGet();
528+
}
529+
}
530+
531+
}
398532
}

0 commit comments

Comments
 (0)