23
23
24
24
import rx .Notification ;
25
25
import rx .Observable ;
26
+ import rx .Observer ;
26
27
import rx .operators .SafeObservableSubscription ;
28
+ import rx .operators .SafeObserver ;
27
29
import rx .util .functions .Action1 ;
28
30
29
31
/**
30
32
* Default implementation of a join observer.
31
33
*/
32
- public final class JoinObserver1 <T > extends ObserverBase <Notification <T >> implements JoinObserver {
34
+ public final class JoinObserver1 <T > implements Observer <Notification <T >>, JoinObserver {
33
35
private Object gate ;
34
36
private final Observable <T > source ;
35
37
private final Action1 <Throwable > onError ;
36
38
private final List <ActivePlan0 > activePlans ;
37
39
private final Queue <Notification <T >> queue ;
38
- private final SafeObservableSubscription subscription ;
40
+ private final SafeObservableSubscription subscription = new SafeObservableSubscription () ;
39
41
private volatile boolean done ;
40
42
private final AtomicBoolean subscribed = new AtomicBoolean (false );
43
+ private final SafeObserver <Notification <T >> safeObserver ;
41
44
42
45
public JoinObserver1 (Observable <T > source , Action1 <Throwable > onError ) {
43
46
this .source = source ;
44
47
this .onError = onError ;
45
48
queue = new LinkedList <Notification <T >>();
46
- subscription = new SafeObservableSubscription ();
47
49
activePlans = new ArrayList <ActivePlan0 >();
50
+ safeObserver = new SafeObserver <Notification <T >>(subscription , new InnerObserver ());
48
51
}
49
52
public Queue <Notification <T >> queue () {
50
53
return queue ;
@@ -67,35 +70,52 @@ public void dequeue() {
67
70
queue .remove ();
68
71
}
69
72
70
- @ Override
71
- protected void onNextCore (Notification <T > args ) {
72
- synchronized (gate ) {
73
- if (!done ) {
74
- if (args .isOnError ()) {
75
- onError .call (args .getThrowable ());
76
- return ;
77
- }
78
- queue .add (args );
79
-
80
- // remark: activePlans might change while iterating
81
- for (ActivePlan0 a : new ArrayList <ActivePlan0 >(activePlans )) {
82
- a .match ();
73
+ private final class InnerObserver implements Observer <Notification <T >> {
74
+
75
+ @ Override
76
+ public void onNext (Notification <T > args ) {
77
+ synchronized (gate ) {
78
+ if (!done ) {
79
+ if (args .isOnError ()) {
80
+ onError .call (args .getThrowable ());
81
+ return ;
82
+ }
83
+ queue .add (args );
84
+
85
+ // remark: activePlans might change while iterating
86
+ for (ActivePlan0 a : new ArrayList <ActivePlan0 >(activePlans )) {
87
+ a .match ();
88
+ }
83
89
}
84
90
}
85
91
}
92
+
93
+ @ Override
94
+ public void onError (Throwable e ) {
95
+ // not expected
96
+ }
97
+
98
+ @ Override
99
+ public void onCompleted () {
100
+ // not expected or ignored
101
+ }
102
+ }
103
+
104
+ @ Override
105
+ public void onNext (Notification <T > args ) {
106
+ safeObserver .onNext (args );
86
107
}
87
108
88
109
@ Override
89
- protected void onErrorCore (Throwable e ) {
90
- // not expected
110
+ public void onError (Throwable e ) {
111
+ safeObserver . onError ( e );
91
112
}
92
113
93
114
@ Override
94
- protected void onCompletedCore () {
95
- // not expected or ignored
115
+ public void onCompleted () {
116
+ safeObserver . onCompleted ();
96
117
}
97
118
98
-
99
119
void removeActivePlan (ActivePlan0 activePlan ) {
100
120
activePlans .remove (activePlan );
101
121
if (activePlans .isEmpty ()) {
0 commit comments