Skip to content

Commit e94d213

Browse files
committed
Blocking implementation of ScheduledObserver
1 parent 04068c1 commit e94d213

File tree

2 files changed

+79
-25
lines changed

2 files changed

+79
-25
lines changed

rxjava-core/src/main/java/rx/operators/OperationObserveOn.java

Lines changed: 21 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -20,12 +20,12 @@
2020

2121
import org.junit.Test;
2222

23+
import org.mockito.InOrder;
2324
import rx.Observable;
2425
import rx.Observer;
2526
import rx.Scheduler;
2627
import rx.Subscription;
2728
import rx.concurrency.Schedulers;
28-
import rx.util.functions.Action0;
2929
import rx.util.functions.Func1;
3030

3131
public class OperationObserveOn {
@@ -60,15 +60,32 @@ public void testObserveOn() {
6060
Observer<Integer> observer = mock(Observer.class);
6161
Observable.create(observeOn(Observable.toObservable(1, 2, 3), scheduler)).subscribe(observer);
6262

63-
verify(scheduler, times(4)).schedule(any(Action0.class));
64-
verifyNoMoreInteractions(scheduler);
65-
6663
verify(observer, times(1)).onNext(1);
6764
verify(observer, times(1)).onNext(2);
6865
verify(observer, times(1)).onNext(3);
6966
verify(observer, times(1)).onCompleted();
7067
}
7168

69+
70+
@Test
71+
@SuppressWarnings("unchecked")
72+
public void testOrdering() {
73+
Observable<String> obs = Observable.from("one", null, "two", "three", "four");
74+
75+
Observer<String> observer = mock(Observer.class);
76+
77+
InOrder inOrder = inOrder(observer);
78+
79+
obs.observeOn(Schedulers.threadPoolForComputation()).subscribe(observer);
80+
inOrder.verify(observer, times(1)).onNext("one");
81+
inOrder.verify(observer, times(1)).onNext(null);
82+
inOrder.verify(observer, times(1)).onNext("two");
83+
inOrder.verify(observer, times(1)).onNext("three");
84+
inOrder.verify(observer, times(1)).onNext("four");
85+
inOrder.verify(observer, times(1)).onCompleted();
86+
inOrder.verifyNoMoreInteractions();
87+
}
88+
7289
}
7390

7491
}
Lines changed: 58 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,12 @@
11
/**
22
* Copyright 2013 Netflix, Inc.
3-
*
3+
*
44
* Licensed under the Apache License, Version 2.0 (the "License");
55
* you may not use this file except in compliance with the License.
66
* You may obtain a copy of the License at
7-
*
7+
*
88
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
9+
*
1010
* Unless required by applicable law or agreed to in writing, software
1111
* distributed under the License is distributed on an "AS IS" BASIS,
1212
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
@@ -15,46 +15,83 @@
1515
*/
1616
package rx.operators;
1717

18+
import rx.Notification;
1819
import rx.Observer;
1920
import rx.Scheduler;
2021
import rx.util.functions.Action0;
2122

23+
import java.util.concurrent.ConcurrentLinkedQueue;
24+
2225
/* package */class ScheduledObserver<T> implements Observer<T> {
2326
private final Observer<T> underlying;
2427
private final Scheduler scheduler;
2528

29+
private final ConcurrentLinkedQueue<Notification<T>> queue = new ConcurrentLinkedQueue<Notification<T>>();
30+
private final Object lock = new Object();
31+
2632
public ScheduledObserver(Observer<T> underlying, Scheduler scheduler) {
2733
this.underlying = underlying;
2834
this.scheduler = scheduler;
2935
}
3036

3137
@Override
3238
public void onCompleted() {
33-
scheduler.schedule(new Action0() {
34-
@Override
35-
public void call() {
36-
underlying.onCompleted();
37-
}
38-
});
39+
enqueue(new Notification<T>());
3940
}
4041

4142
@Override
4243
public void onError(final Exception e) {
43-
scheduler.schedule(new Action0() {
44-
@Override
45-
public void call() {
46-
underlying.onError(e);
47-
}
48-
});
44+
enqueue(new Notification<T>(e));
4945
}
5046

5147
@Override
5248
public void onNext(final T args) {
53-
scheduler.schedule(new Action0() {
54-
@Override
55-
public void call() {
56-
underlying.onNext(args);
57-
}
58-
});
49+
enqueue(new Notification<T>(args));
50+
}
51+
52+
private void enqueue(Notification<T> notification) {
53+
boolean schedule;
54+
synchronized (lock) {
55+
schedule = queue.isEmpty();
56+
57+
queue.offer(notification);
58+
}
59+
60+
if (schedule) {
61+
scheduler.schedule(new Action0() {
62+
@Override
63+
public void call() {
64+
Notification<T> not = dequeue();
65+
66+
if (not == null) {
67+
return;
68+
}
69+
70+
switch (not.getKind()) {
71+
case OnNext:
72+
underlying.onNext(not.getValue());
73+
break;
74+
case OnError:
75+
underlying.onError(not.getException());
76+
break;
77+
case OnCompleted:
78+
underlying.onCompleted();
79+
break;
80+
default:
81+
throw new IllegalStateException("Unknown kind of notification " + not);
82+
83+
}
84+
85+
scheduler.schedule(this);
86+
87+
}
88+
});
89+
}
90+
}
91+
92+
private Notification<T> dequeue() {
93+
synchronized (lock) {
94+
return queue.poll();
95+
}
5996
}
6097
}

0 commit comments

Comments
 (0)