21
21
import rx .util .functions .Action0 ;
22
22
23
23
import java .util .concurrent .ConcurrentLinkedQueue ;
24
+ import java .util .concurrent .atomic .AtomicInteger ;
24
25
25
26
/* package */ class ScheduledObserver <T > implements Observer <T > {
26
27
private final Observer <T > underlying ;
27
28
private final Scheduler scheduler ;
28
29
29
30
private final ConcurrentLinkedQueue <Notification <T >> queue = new ConcurrentLinkedQueue <Notification <T >>();
30
- private final Object lock = new Object ( );
31
+ private final AtomicInteger counter = new AtomicInteger ( 0 );
31
32
32
33
public ScheduledObserver (Observer <T > underlying , Scheduler scheduler ) {
33
34
this .underlying = underlying ;
@@ -50,48 +51,43 @@ public void onNext(final T args) {
50
51
}
51
52
52
53
private void enqueue (Notification <T > notification ) {
53
- boolean schedule ;
54
- synchronized (lock ) {
55
- schedule = queue .isEmpty ();
54
+ int count = counter .getAndIncrement ();
56
55
57
- queue .offer (notification );
56
+ queue .offer (notification );
57
+
58
+ if (count == 0 ) {
59
+ processQueue ();
58
60
}
61
+ }
59
62
60
- if (schedule ) {
61
- scheduler .schedule (new Action0 () {
62
- @ Override
63
- public void call () {
64
- Notification <T > not = dequeue ();
65
-
66
- if (not == null ) {
67
- return ;
68
- }
69
-
70
- switch (not .getKind ()) {
71
- case OnNext :
72
- underlying .onNext (not .getValue ());
73
- break ;
74
- case OnError :
75
- underlying .onError (not .getException ());
76
- break ;
77
- case OnCompleted :
78
- underlying .onCompleted ();
79
- break ;
80
- default :
81
- throw new IllegalStateException ("Unknown kind of notification " + not );
82
-
83
- }
63
+ private void processQueue () {
64
+ scheduler .schedule (new Action0 () {
65
+ @ Override
66
+ public void call () {
67
+ int count = counter .decrementAndGet ();
68
+
69
+ Notification <T > not = queue .poll ();
70
+
71
+ switch (not .getKind ()) {
72
+ case OnNext :
73
+ underlying .onNext (not .getValue ());
74
+ break ;
75
+ case OnError :
76
+ underlying .onError (not .getException ());
77
+ break ;
78
+ case OnCompleted :
79
+ underlying .onCompleted ();
80
+ break ;
81
+ default :
82
+ throw new IllegalStateException ("Unknown kind of notification " + not );
84
83
85
- scheduler . schedule ( this );
84
+ }
86
85
86
+ if (count > 0 ) {
87
+ scheduler .schedule (this );
87
88
}
88
- });
89
- }
90
- }
91
89
92
- private Notification <T > dequeue () {
93
- synchronized (lock ) {
94
- return queue .poll ();
95
- }
90
+ }
91
+ });
96
92
}
97
93
}
0 commit comments