Skip to content

Commit 564fba0

Browse files
BugFix: Handling of Terminal State for Behavior/Publish Subjects
- They were not correctly emitting onCompleted when new Observers subscribed after the Subject was terminated. - Added same logic that already existed on AsyncSubject
1 parent d09964c commit 564fba0

File tree

6 files changed

+295
-345
lines changed

6 files changed

+295
-345
lines changed
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
package rx.subjects;
2+
3+
import java.util.ArrayList;
4+
import java.util.Collection;
5+
import java.util.concurrent.ConcurrentHashMap;
6+
import java.util.concurrent.atomic.AtomicBoolean;
7+
import java.util.concurrent.atomic.AtomicReference;
8+
import java.util.concurrent.locks.ReentrantLock;
9+
10+
import rx.Notification;
11+
import rx.Observer;
12+
import rx.Subscription;
13+
import rx.operators.SafeObservableSubscription;
14+
import rx.subscriptions.Subscriptions;
15+
import rx.util.functions.Action2;
16+
17+
public abstract class AbstractSubject<T> extends Subject<T, T> {
18+
19+
protected AbstractSubject(rx.Observable.OnSubscribeFunc<T> onSubscribe) {
20+
super(onSubscribe);
21+
}
22+
23+
protected static class SubjectState<T> {
24+
protected final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
25+
protected final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
26+
protected final AtomicBoolean completed = new AtomicBoolean();
27+
protected final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
28+
}
29+
30+
protected static <T> OnSubscribeFunc<T> getOnSubscribeFunc(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
31+
return new OnSubscribeFunc<T>() {
32+
@Override
33+
public Subscription onSubscribe(Observer<? super T> observer) {
34+
/*
35+
* Subscription needs to be synchronized with terminal states to ensure
36+
* race conditions are handled. When subscribing we must make sure
37+
* onComplete/onError is correctly emitted to all observers, even if it
38+
* comes in while the onComplete/onError is being propagated.
39+
*/
40+
state.SUBSCRIPTION_LOCK.lock();
41+
try {
42+
if (state.completed.get()) {
43+
emitNotification(state.currentValue.get(), observer);
44+
if (onEach != null) {
45+
onEach.call(state, observer);
46+
}
47+
return Subscriptions.empty();
48+
} else {
49+
// the subject is not completed so we subscribe
50+
final SafeObservableSubscription subscription = new SafeObservableSubscription();
51+
52+
subscription.wrap(new Subscription() {
53+
@Override
54+
public void unsubscribe() {
55+
// on unsubscribe remove it from the map of outbound observers to notify
56+
state.observers.remove(subscription);
57+
}
58+
});
59+
60+
// on subscribe add it to the map of outbound observers to notify
61+
state.observers.put(subscription, observer);
62+
63+
// invoke onSubscribe logic
64+
if (onEach != null) {
65+
onEach.call(state, observer);
66+
}
67+
68+
return subscription;
69+
}
70+
} finally {
71+
state.SUBSCRIPTION_LOCK.unlock();
72+
}
73+
74+
}
75+
76+
};
77+
}
78+
79+
protected static <T> void emitNotification(Notification<T> value, Observer<? super T> observer) {
80+
// if null that means onNext was never invoked (no Notification set)
81+
if (value != null) {
82+
if (value.isOnNext()) {
83+
observer.onNext(value.getValue());
84+
} else if (value.isOnError()) {
85+
observer.onError(value.getThrowable());
86+
} else if (value.isOnCompleted()) {
87+
observer.onCompleted();
88+
}
89+
}
90+
}
91+
92+
/**
93+
* Emit the current value.
94+
*
95+
* @param state
96+
*/
97+
protected static <T> void emitNotification(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
98+
for (Subscription s : snapshotOfObservers(state)) {
99+
Observer<? super T> o = state.observers.get(s);
100+
// emit notifications to this observer
101+
emitNotification(state.currentValue.get(), o);
102+
// onEach action if applicable
103+
if (onEach != null) {
104+
onEach.call(state, o);
105+
}
106+
}
107+
}
108+
109+
/**
110+
* Emit the current value to all observers and remove their subscription.
111+
*
112+
* @param state
113+
*/
114+
protected void emitNotificationAndTerminate(final SubjectState<T> state, final Action2<SubjectState<T>, Observer<? super T>> onEach) {
115+
/*
116+
* We can not allow new subscribers to be added while we execute the terminal state.
117+
*/
118+
state.SUBSCRIPTION_LOCK.lock();
119+
try {
120+
if (state.completed.compareAndSet(false, true)) {
121+
for (Subscription s : snapshotOfObservers(state)) {
122+
Observer<? super T> o = state.observers.get(s);
123+
// emit notifications to this observer
124+
emitNotification(state.currentValue.get(), o);
125+
// onEach action if applicable
126+
if (onEach != null) {
127+
onEach.call(state, o);
128+
}
129+
130+
// remove the subscription as it is completed
131+
state.observers.remove(s);
132+
}
133+
}
134+
} finally {
135+
state.SUBSCRIPTION_LOCK.unlock();
136+
}
137+
}
138+
139+
/**
140+
* Current snapshot of 'state.observers.keySet()' so that concurrent modifications aren't included.
141+
*
142+
* This makes it behave deterministically in a single-threaded execution when nesting subscribes.
143+
*
144+
* In multi-threaded execution it will cause new subscriptions to wait until the following onNext instead
145+
* of possibly being included in the current onNext iteration.
146+
*
147+
* @return List<Observer<T>>
148+
*/
149+
private static <T> Collection<Subscription> snapshotOfObservers(final SubjectState<T> state) {
150+
return new ArrayList<Subscription>(state.observers.keySet());
151+
}
152+
}

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 31 additions & 93 deletions
Original file line numberDiff line numberDiff line change
@@ -15,16 +15,9 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.concurrent.ConcurrentHashMap;
19-
import java.util.concurrent.atomic.AtomicBoolean;
20-
import java.util.concurrent.atomic.AtomicReference;
21-
import java.util.concurrent.locks.ReentrantLock;
22-
2318
import rx.Notification;
2419
import rx.Observer;
25-
import rx.Subscription;
26-
import rx.operators.SafeObservableSubscription;
27-
import rx.subscriptions.Subscriptions;
20+
import rx.util.functions.Action2;
2821

2922
/**
3023
* Subject that publishes only the last event to each {@link Observer} that has subscribed when the
@@ -55,123 +48,68 @@
5548
*
5649
* @param <T>
5750
*/
58-
public class AsyncSubject<T> extends Subject<T, T> {
51+
public class AsyncSubject<T> extends AbstractSubject<T> {
5952

6053
/**
6154
* Create a new AsyncSubject
6255
*
6356
* @return a new AsyncSubject
6457
*/
6558
public static <T> AsyncSubject<T> create() {
66-
final AsyncSubjectState<T> state = new AsyncSubjectState<T>();
59+
final SubjectState<T> state = new SubjectState<T>();
60+
OnSubscribeFunc<T> onSubscribe = getOnSubscribeFunc(state, new Action2<SubjectState<T>, Observer<? super T>>() {
6761

68-
OnSubscribeFunc<T> onSubscribe = new OnSubscribeFunc<T>() {
6962
@Override
70-
public Subscription onSubscribe(Observer<? super T> observer) {
71-
/*
72-
* Subscription needs to be synchronized with terminal states to ensure
73-
* race conditions are handled. When subscribing we must make sure
74-
* onComplete/onError is correctly emitted to all observers, even if it
75-
* comes in while the onComplete/onError is being propagated.
76-
*/
77-
state.SUBSCRIPTION_LOCK.lock();
78-
try {
79-
if (state.completed.get()) {
80-
emitNotificationToObserver(state, observer);
81-
return Subscriptions.empty();
82-
} else {
83-
// the subject is not completed so we subscribe
84-
final SafeObservableSubscription subscription = new SafeObservableSubscription();
85-
86-
subscription.wrap(new Subscription() {
87-
@Override
88-
public void unsubscribe() {
89-
// on unsubscribe remove it from the map of outbound observers to notify
90-
state.observers.remove(subscription);
91-
}
92-
});
93-
94-
// on subscribe add it to the map of outbound observers to notify
95-
state.observers.put(subscription, observer);
96-
97-
return subscription;
63+
public void call(SubjectState<T> state, Observer<? super T> o) {
64+
// we want the last value + completed so add this extra logic
65+
// to send onCompleted if the last value is an onNext
66+
if (state.completed.get()) {
67+
Notification<T> value = state.currentValue.get();
68+
if (value != null && value.isOnNext()) {
69+
o.onCompleted();
9870
}
99-
} finally {
100-
state.SUBSCRIPTION_LOCK.unlock();
10171
}
102-
10372
}
104-
105-
};
106-
73+
});
10774
return new AsyncSubject<T>(onSubscribe, state);
10875
}
10976

110-
private static <T> void emitNotificationToObserver(final AsyncSubjectState<T> state, Observer<? super T> observer) {
111-
Notification<T> finalValue = state.currentValue.get();
112-
113-
// if null that means onNext was never invoked (no Notification set)
114-
if (finalValue != null) {
115-
if (finalValue.isOnNext()) {
116-
observer.onNext(finalValue.getValue());
117-
} else if (finalValue.isOnError()) {
118-
observer.onError(finalValue.getThrowable());
119-
}
120-
}
121-
observer.onCompleted();
122-
}
123-
124-
/**
125-
* State externally constructed and passed in so the onSubscribe function has access to it.
126-
*
127-
* @param <T>
128-
*/
129-
private static class AsyncSubjectState<T> {
130-
private final ConcurrentHashMap<Subscription, Observer<? super T>> observers = new ConcurrentHashMap<Subscription, Observer<? super T>>();
131-
private final AtomicReference<Notification<T>> currentValue = new AtomicReference<Notification<T>>();
132-
private final AtomicBoolean completed = new AtomicBoolean();
133-
private final ReentrantLock SUBSCRIPTION_LOCK = new ReentrantLock();
134-
}
135-
136-
private final AsyncSubjectState<T> state;
77+
private final SubjectState<T> state;
13778

138-
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, AsyncSubjectState<T> state) {
79+
protected AsyncSubject(OnSubscribeFunc<T> onSubscribe, SubjectState<T> state) {
13980
super(onSubscribe);
14081
this.state = state;
14182
}
14283

14384
@Override
14485
public void onCompleted() {
145-
terminalState();
86+
/**
87+
* Mark this subject as completed and emit latest value + 'onCompleted' to all Observers
88+
*/
89+
emitNotificationAndTerminate(state, new Action2<SubjectState<T>, Observer<? super T>>() {
90+
91+
@Override
92+
public void call(SubjectState<T> state, Observer<? super T> o) {
93+
o.onCompleted();
94+
}
95+
});
14696
}
14797

14898
@Override
14999
public void onError(Throwable e) {
100+
/**
101+
* Mark this subject as completed with an error as the last value and emit 'onError' to all Observers
102+
*/
150103
state.currentValue.set(new Notification<T>(e));
151-
terminalState();
104+
emitNotificationAndTerminate(state, null);
152105
}
153106

154107
@Override
155108
public void onNext(T v) {
109+
/**
110+
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
111+
*/
156112
state.currentValue.set(new Notification<T>(v));
157113
}
158114

159-
private void terminalState() {
160-
/*
161-
* We can not allow new subscribers to be added while we execute the terminal state.
162-
*/
163-
state.SUBSCRIPTION_LOCK.lock();
164-
try {
165-
if (state.completed.compareAndSet(false, true)) {
166-
for (Subscription s : state.observers.keySet()) {
167-
// emit notifications to this observer
168-
emitNotificationToObserver(state, state.observers.get(s));
169-
// remove the subscription as it is completed
170-
state.observers.remove(s);
171-
}
172-
}
173-
} finally {
174-
state.SUBSCRIPTION_LOCK.unlock();
175-
}
176-
}
177115
}

0 commit comments

Comments
 (0)