Skip to content

Commit 21f3ba4

Browse files
Merge pull request ReactiveX#238 from benjchristensen/pull-234-merge-ObserveOn
ScheduledObserver/ObserveOn - Manual Merge of Pull 234
2 parents 5caf7be + e425d38 commit 21f3ba4

File tree

3 files changed

+223
-21
lines changed

3 files changed

+223
-21
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 41 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -15,17 +15,22 @@
1515
*/
1616
package rx.operators;
1717

18-
import static org.mockito.Matchers.*;
18+
import static org.junit.Assert.*;
1919
import static org.mockito.Mockito.*;
2020

21+
import java.util.concurrent.CountDownLatch;
22+
import java.util.concurrent.TimeUnit;
23+
2124
import org.junit.Test;
25+
import org.mockito.InOrder;
26+
import org.mockito.invocation.InvocationOnMock;
27+
import org.mockito.stubbing.Answer;
2228

2329
import rx.Observable;
2430
import rx.Observer;
2531
import rx.Scheduler;
2632
import rx.Subscription;
2733
import rx.concurrency.Schedulers;
28-
import rx.util.functions.Action0;
2934
import rx.util.functions.Func1;
3035

3136
public class OperationObserveOn {
@@ -60,15 +65,46 @@ public void testObserveOn() {
6065
Observer<Integer> observer = mock(Observer.class);
6166
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);
6267

63-
verify(scheduler, times(4)).schedule(any(Action0.class));
64-
verifyNoMoreInteractions(scheduler);
65-
6668
verify(observer, times(1)).onNext(1);
6769
verify(observer, times(1)).onNext(2);
6870
verify(observer, times(1)).onNext(3);
6971
verify(observer, times(1)).onCompleted();
7072
}
7173

74+
@Test
75+
@SuppressWarnings("unchecked")
76+
public void testOrdering() throws InterruptedException {
77+
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
78+
79+
Observer<String> observer = mock(Observer.class);
80+
81+
InOrder inOrder = inOrder(observer);
82+
83+
final CountDownLatch completedLatch = new CountDownLatch(1);
84+
doAnswer(new Answer<Void>() {
85+
86+
@Override
87+
public Void answer(InvocationOnMock invocation) throws Throwable {
88+
completedLatch.countDown();
89+
return null;
90+
}
91+
}).when(observer).onCompleted();
92+
93+
obs.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
94+
95+
if (!completedLatch.await(1000, TimeUnit.MILLISECONDS)) {
96+
fail("timed out waiting");
97+
}
98+
99+
inOrder.verify(observer, times(1)).onNext("one");
100+
inOrder.verify(observer, times(1)).onNext(null);
101+
inOrder.verify(observer, times(1)).onNext("two");
102+
inOrder.verify(observer, times(1)).onNext("three");
103+
inOrder.verify(observer, times(1)).onNext("four");
104+
inOrder.verify(observer, times(1)).onCompleted();
105+
inOrder.verifyNoMoreInteractions();
106+
}
107+
72108
}
73109

74110
}

rxjava-core/src/main/java/rx/operators/ScheduledObserver.java

Lines changed: 48 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,45 +15,77 @@
1515
*/
1616
package rx.operators;
1717

18+
import rx.Notification;
1819
import rx.Observer;
1920
import rx.Scheduler;
2021
import rx.util.functions.Action0;
2122

23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
import java.util.concurrent.atomic.AtomicInteger;
25+
2226
/* package */class ScheduledObserver<T> implements Observer<T> {
2327
private final Observer<T> underlying;
2428
private final Scheduler scheduler;
2529

30+
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
31+
private final AtomicInteger counter = new AtomicInteger(0);
32+
2633
public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
2734
this.underlying = underlying;
2835
this.scheduler = scheduler;
2936
}
3037

3138
@Override
3239
public void onCompleted() {
33-
scheduler.schedule(new Action0() {
34-
@Override
35-
public void call() {
36-
underlying.onCompleted();
37-
}
38-
});
40+
enqueue(new Notification<T>());
3941
}
4042

4143
@Override
4244
public void onError(final Exception e) {
43-
scheduler.schedule(new Action0() {
44-
@Override
45-
public void call() {
46-
underlying.onError(e);
47-
}
48-
});
45+
enqueue(new Notification<T>(e));
4946
}
5047

5148
@Override
5249
public void onNext(final T args) {
50+
enqueue(new Notification<T>(args));
51+
}
52+
53+
private void enqueue(Notification<T> notification) {
54+
int count = counter.getAndIncrement();
55+
56+
queue.offer(notification);
57+
58+
if (count == 0) {
59+
processQueue();
60+
}
61+
}
62+
63+
private void processQueue() {
5364
scheduler.schedule(new Action0() {
5465
@Override
5566
public void call() {
56-
underlying.onNext(args);
67+
Notification<T> not = queue.poll();
68+
69+
switch (not.getKind()) {
70+
case OnNext:
71+
underlying.onNext(not.getValue());
72+
break;
73+
case OnError:
74+
underlying.onError(not.getException());
75+
break;
76+
case OnCompleted:
77+
underlying.onCompleted();
78+
break;
79+
default:
80+
throw new IllegalStateException("Unknown kind of notification " + not);
81+
82+
}
83+
84+
int count = counter.decrementAndGet();
85+
if (count > 0) {
86+
scheduler.schedule(this);
87+
}
88+
5789
}
5890
});
5991
}

rxjava-core/src/test/java/rx/concurrency/TestSchedulers.java

Lines changed: 134 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import java.util.concurrent.TimeUnit;
2424
import java.util.concurrent.atomic.AtomicBoolean;
2525
import java.util.concurrent.atomic.AtomicInteger;
26+
import java.util.concurrent.atomic.AtomicReference;
2627

2728
import org.junit.Test;
2829

@@ -395,4 +396,137 @@ public Subscription call(Scheduler scheduler, String state) {
395396
}
396397
}
397398

399+
@Test
400+
public void testConcurrentOnNextFailsValidation() throws InterruptedException {
401+
402+
final int count = 10;
403+
final CountDownLatch latch = new CountDownLatch(count);
404+
Observable<String> o = Observable.create(new Func1<Observer<String>, Subscription>() {
405+
406+
@Override
407+
public Subscription call(final Observer<String> observer) {
408+
for (int i = 0; i < count; i++) {
409+
final int v = i;
410+
new Thread(new Runnable() {
411+
412+
@Override
413+
public void run() {
414+
observer.onNext("v: " + v);
415+
416+
latch.countDown();
417+
}
418+
}).start();
419+
}
420+
return Subscriptions.empty();
421+
}
422+
});
423+
424+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
425+
// this should call onNext concurrently
426+
o.subscribe(observer);
427+
428+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
429+
fail("timed out");
430+
}
431+
432+
if (observer.error.get() == null) {
433+
fail("We expected error messages due to concurrency");
434+
}
435+
}
436+
437+
@Test
438+
public void testObserveOn() throws InterruptedException {
439+
440+
Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten");
441+
442+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
443+
444+
o.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
445+
446+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
447+
fail("timed out");
448+
}
449+
450+
if (observer.error.get() != null) {
451+
observer.error.get().printStackTrace();
452+
fail("Error: " + observer.error.get().getMessage());
453+
}
454+
}
455+
456+
@Test
457+
public void testSubscribeOnNestedConcurrency() throws InterruptedException {
458+
459+
Observable<String> o = Observable.from("one", "two", "three", "four", "five", "six", "seven", "eight", "nine", "ten")
460+
.mapMany(new Func1<String, Observable<String>>() {
461+
462+
@Override
463+
public Observable<String> call(final String v) {
464+
return Observable.create(new Func1<Observer<String>, Subscription>() {
465+
466+
@Override
467+
public Subscription call(final Observer<String> observer) {
468+
observer.onNext("value_after_map-" + v);
469+
observer.onCompleted();
470+
return Subscriptions.empty();
471+
}
472+
}).subscribeOn(Schedulers.newThread()); // subscribe on a new thread
473+
}
474+
});
475+
476+
ConcurrentObserverValidator<String> observer = new ConcurrentObserverValidator<String>();
477+
478+
o.subscribe(observer);
479+
480+
if (!observer.completed.await(3000, TimeUnit.MILLISECONDS)) {
481+
fail("timed out");
482+
}
483+
484+
if (observer.error.get() != null) {
485+
observer.error.get().printStackTrace();
486+
fail("Error: " + observer.error.get().getMessage());
487+
}
488+
}
489+
490+
/**
491+
* Used to determine if onNext is being invoked concurrently.
492+
*
493+
* @param <T>
494+
*/
495+
private static class ConcurrentObserverValidator<T> implements Observer<T> {
496+
497+
final AtomicInteger concurrentCounter = new AtomicInteger();
498+
final AtomicReference<Exception> error = new AtomicReference<Exception>();
499+
final CountDownLatch completed = new CountDownLatch(1);
500+
501+
@Override
502+
public void onCompleted() {
503+
completed.countDown();
504+
}
505+
506+
@Override
507+
public void onError(Exception e) {
508+
completed.countDown();
509+
error.set(e);
510+
}
511+
512+
@Override
513+
public void onNext(T args) {
514+
int count = concurrentCounter.incrementAndGet();
515+
System.out.println("ConcurrentObserverValidator.onNext: " + args);
516+
if (count > 1) {
517+
onError(new RuntimeException("we should not have concurrent execution of onNext"));
518+
}
519+
try {
520+
try {
521+
// take some time so other onNext calls could pile up (I haven't yet thought of a way to do this without sleeping)
522+
Thread.sleep(50);
523+
} catch (InterruptedException e) {
524+
// ignore
525+
}
526+
} finally {
527+
concurrentCounter.decrementAndGet();
528+
}
529+
}
530+
531+
}
398532
}

0 commit comments

Comments
 (0)