Skip to content

Commit af40052

Browse files
Merge pull request #1185 from akarnokd/BehaviorSubjectTimeGapFix2
Behavior subject time gap fix 2
2 parents 3f03935 + 1b88781 commit af40052

File tree

7 files changed

+546
-558
lines changed

7 files changed

+546
-558
lines changed

rxjava-core/src/main/java/rx/subjects/AsyncSubject.java

Lines changed: 32 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -15,13 +15,9 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.Collection;
19-
import java.util.concurrent.atomic.AtomicReference;
20-
21-
import rx.Notification;
2218
import rx.Observer;
23-
import rx.functions.Action0;
2419
import rx.functions.Action1;
20+
import rx.operators.NotificationLite;
2521
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
2622

2723
/**
@@ -56,88 +52,59 @@
5652
public final class AsyncSubject<T> extends Subject<T, T> {
5753

5854
public static <T> AsyncSubject<T> create() {
59-
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
60-
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.<T>createOnCompleted());
61-
62-
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
63-
/**
64-
* This function executes at beginning of subscription.
65-
*
66-
* This will always run, even if Subject is in terminal state.
67-
*/
68-
new Action1<SubjectObserver<? super T>>() {
69-
70-
@Override
71-
public void call(SubjectObserver<? super T> o) {
72-
// nothing to do if not terminated
73-
}
74-
},
75-
/**
76-
* This function executes if the Subject is terminated.
77-
*/
78-
new Action1<SubjectObserver<? super T>>() {
79-
80-
@Override
81-
public void call(SubjectObserver<? super T> o) {
82-
// we want the last value + completed so add this extra logic
83-
// to send onCompleted if the last value is an onNext
84-
emitValueToObserver(lastNotification.get(), o);
85-
}
86-
}, null);
87-
88-
return new AsyncSubject<T>(onSubscribe, subscriptionManager, lastNotification);
55+
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
56+
state.onTerminated = new Action1<SubjectObserver<T>>() {
57+
@Override
58+
public void call(SubjectObserver<T> o) {
59+
Object v = state.get();
60+
o.accept(v);
61+
o.completeSingle(v);
62+
}
63+
};
64+
return new AsyncSubject<T>(state, state);
8965
}
9066

91-
protected static <T> void emitValueToObserver(Notification<T> n, Observer<? super T> o) {
92-
n.accept(o);
93-
if (n.isOnNext()) {
94-
o.onCompleted();
95-
}
96-
}
67+
final SubjectSubscriptionManager<T> state;
68+
volatile Object lastValue;
69+
private final NotificationLite<T> nl = NotificationLite.instance();
9770

98-
private final SubjectSubscriptionManager<T> subscriptionManager;
99-
final AtomicReference<Notification<T>> lastNotification;
10071

101-
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
72+
protected AsyncSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
10273
super(onSubscribe);
103-
this.subscriptionManager = subscriptionManager;
104-
this.lastNotification = lastNotification;
74+
this.state = state;
10575
}
10676

10777
@Override
10878
public void onCompleted() {
109-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
110-
111-
@Override
112-
public void call() {
79+
if (state.active) {
80+
Object last = lastValue;
81+
if (last == null) {
82+
last = nl.completed();
11383
}
114-
});
115-
if (observers != null) {
116-
for (Observer<? super T> o : observers) {
117-
emitValueToObserver(lastNotification.get(), o);
84+
for (SubjectObserver<T> bo : state.terminate(last)) {
85+
if (last == nl.completed()) {
86+
bo.onCompleted();
87+
} else {
88+
bo.onNext(nl.getValue(last));
89+
bo.onCompleted();
90+
}
11891
}
11992
}
12093
}
12194

12295
@Override
12396
public void onError(final Throwable e) {
124-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
125-
@Override
126-
public void call() {
127-
lastNotification.set(Notification.<T> createOnError(e));
128-
}
129-
});
130-
if (observers != null) {
131-
for (Observer<? super T> o : observers) {
132-
emitValueToObserver(lastNotification.get(), o);
97+
if (state.active) {
98+
Object n = nl.error(e);
99+
for (SubjectObserver<T> bo : state.terminate(n)) {
100+
bo.onError(e);
133101
}
134102
}
135-
136103
}
137104

138105
@Override
139106
public void onNext(T v) {
140-
lastNotification.set(Notification.createOnNext(v));
107+
lastValue = nl.next(v);
141108
}
142109

143110
}

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 53 additions & 77 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,14 @@
1515
*/
1616
package rx.subjects;
1717

18-
import java.util.Collection;
19-
import java.util.concurrent.atomic.AtomicReference;
2018

21-
import rx.Notification;
2219
import rx.Observer;
20+
import rx.Subscriber;
2321
import rx.functions.Action0;
2422
import rx.functions.Action1;
23+
import rx.operators.NotificationLite;
2524
import rx.subjects.SubjectSubscriptionManager.SubjectObserver;
25+
import rx.subscriptions.Subscriptions;
2626

2727
/**
2828
* Subject that publishes the most recent and all subsequent events to each subscribed {@link Observer}.
@@ -65,110 +65,86 @@
6565
*
6666
* @param <T>
6767
*/
68+
@SuppressWarnings({ "unchecked", "rawtypes" })
6869
public final class BehaviorSubject<T> extends Subject<T, T> {
69-
70+
/**
71+
* Create a {@link BehaviorSubject} without a default value.
72+
* @param <T> the value type
73+
* @return the constructed {@link BehaviorSubject}
74+
*/
75+
public static <T> BehaviorSubject<T> create() {
76+
return create(null, false);
77+
}
7078
/**
7179
* Creates a {@link BehaviorSubject} which publishes the last and all subsequent events to each {@link Observer} that subscribes to it.
7280
*
81+
* @param <T> the value type
7382
* @param defaultValue
7483
* the value which will be published to any {@link Observer} as long as the {@link BehaviorSubject} has not yet received any events
7584
* @return the constructed {@link BehaviorSubject}
7685
*/
7786
public static <T> BehaviorSubject<T> create(T defaultValue) {
78-
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
79-
// set a default value so subscriptions will immediately receive this until a new notification is received
80-
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(Notification.createOnNext(defaultValue));
81-
82-
OnSubscribe<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
83-
/**
84-
* This function executes at beginning of subscription.
85-
*
86-
* This will always run, even if Subject is in terminal state.
87-
*/
88-
new Action1<SubjectObserver<? super T>>() {
89-
90-
@Override
91-
public void call(SubjectObserver<? super T> o) {
92-
/*
93-
* When we subscribe we always emit the latest value to the observer.
94-
*
95-
* Here we only emit if it's an onNext as terminal states are handled in the next function.
96-
*/
97-
Notification<T> n = lastNotification.get();
98-
if (n.isOnNext()) {
99-
n.accept(o);
100-
}
101-
}
102-
},
103-
/**
104-
* This function executes if the Subject is terminated before subscription occurs.
105-
*/
106-
new Action1<SubjectObserver<? super T>>() {
107-
108-
@Override
109-
public void call(SubjectObserver<? super T> o) {
110-
/*
111-
* If we are already terminated, or termination happens while trying to subscribe
112-
* this will be invoked and we emit whatever the last terminal value was.
113-
*/
114-
lastNotification.get().accept(o);
115-
}
116-
}, null);
87+
return create(defaultValue, true);
88+
}
89+
private static <T> BehaviorSubject<T> create(T defaultValue, boolean hasDefault) {
90+
final SubjectSubscriptionManager<T> state = new SubjectSubscriptionManager<T>();
91+
if (hasDefault) {
92+
state.set(NotificationLite.instance().next(defaultValue));
93+
}
94+
state.onAdded = new Action1<SubjectObserver<T>>() {
11795

118-
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
96+
@Override
97+
public void call(SubjectObserver<T> o) {
98+
o.emitFirst(state.get());
99+
}
100+
101+
};
102+
state.onTerminated = state.onAdded;
103+
return new BehaviorSubject<T>(state, state);
119104
}
120105

121-
private final SubjectSubscriptionManager<T> subscriptionManager;
122-
final AtomicReference<Notification<T>> lastNotification;
106+
private final SubjectSubscriptionManager<T> state;
107+
private final NotificationLite<T> nl = NotificationLite.instance();
123108

124-
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
109+
protected BehaviorSubject(OnSubscribe<T> onSubscribe, SubjectSubscriptionManager<T> state) {
125110
super(onSubscribe);
126-
this.subscriptionManager = subscriptionManager;
127-
this.lastNotification = lastNotification;
111+
this.state = state;
128112
}
129113

130114
@Override
131115
public void onCompleted() {
132-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
133-
134-
@Override
135-
public void call() {
136-
lastNotification.set(Notification.<T> createOnCompleted());
137-
}
138-
});
139-
if (observers != null) {
140-
for (Observer<? super T> o : observers) {
141-
o.onCompleted();
116+
Object last = state.get();
117+
if (last == null || state.active) {
118+
Object n = nl.completed();
119+
for (SubjectObserver<T> bo : state.terminate(n)) {
120+
bo.emitNext(n);
142121
}
143122
}
144123
}
145124

146125
@Override
147-
public void onError(final Throwable e) {
148-
Collection<SubjectObserver<? super T>> observers = subscriptionManager.terminate(new Action0() {
149-
150-
@Override
151-
public void call() {
152-
lastNotification.set(Notification.<T> createOnError(e));
153-
}
154-
});
155-
if (observers != null) {
156-
for (Observer<? super T> o : observers) {
157-
o.onError(e);
126+
public void onError(Throwable e) {
127+
Object last = state.get();
128+
if (last == null || state.active) {
129+
Object n = nl.error(e);
130+
for (SubjectObserver<T> bo : state.terminate(n)) {
131+
bo.emitNext(n);
158132
}
159133
}
160134
}
161135

162136
@Override
163137
public void onNext(T v) {
164-
// do not overwrite a terminal notification
165-
// so new subscribers can get them
166-
if (lastNotification.get().isOnNext()) {
167-
lastNotification.set(Notification.createOnNext(v));
168-
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
169-
o.onNext(v);
138+
Object last = state.get();
139+
if (last == null || state.active) {
140+
Object n = nl.next(v);
141+
for (SubjectObserver<T> bo : state.next(n)) {
142+
bo.emitNext(n);
170143
}
171144
}
172145
}
173-
146+
147+
/* test support */ int subscriberCount() {
148+
return state.observers().length;
149+
}
174150
}

0 commit comments

Comments
 (0)