Skip to content

Commit 88c4955

Browse files
Merge pull request #659 from akarnokd/SubjectsFixAdditional
Missing fixes from the subject rewrite
2 parents c9be48c + c45fce2 commit 88c4955

File tree

3 files changed

+58
-6
lines changed

3 files changed

+58
-6
lines changed

rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -169,12 +169,13 @@ public void call(Collection<SubjectObserver<? super T>> observers) {
169169

170170
@Override
171171
public void onNext(T v) {
172-
/**
173-
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
174-
*/
175-
lastNotification.set(new Notification<T>(v));
176-
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
177-
o.onNext(v);
172+
// do not overwrite a terminal notification
173+
// so new subscribers can get them
174+
if (lastNotification.get().isOnNext()) {
175+
lastNotification.set(new Notification<T>(v));
176+
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
177+
o.onNext(v);
178+
}
178179
}
179180
}
180181

rxjava-core/src/main/java/rx/subjects/SubjectSubscriptionManager.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,6 +65,7 @@ public Subscription onSubscribe(Observer<? super T> actualObserver) {
6565
try {
6666
current.terminationLatch.await();
6767
} catch (InterruptedException e) {
68+
Thread.currentThread().interrupt();
6869
throw new RuntimeException("Interrupted waiting for termination.", e);
6970
}
7071
break;

rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java

Lines changed: 50 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,5 +185,55 @@ public void testCompletedAfterErrorIsNotSent() {
185185
verify(aObserver, never()).onNext("two");
186186
verify(aObserver, never()).onCompleted();
187187
}
188+
@Test
189+
public void testCompletedAfterErrorIsNotSent2() {
190+
BehaviorSubject<String> subject = BehaviorSubject.create("default");
191+
192+
@SuppressWarnings("unchecked")
193+
Observer<String> aObserver = mock(Observer.class);
194+
subject.subscribe(aObserver);
195+
196+
subject.onNext("one");
197+
subject.onError(testException);
198+
subject.onNext("two");
199+
subject.onCompleted();
200+
201+
verify(aObserver, times(1)).onNext("default");
202+
verify(aObserver, times(1)).onNext("one");
203+
verify(aObserver, times(1)).onError(testException);
204+
verify(aObserver, never()).onNext("two");
205+
verify(aObserver, never()).onCompleted();
206+
207+
Observer<Object> o2 = mock(Observer.class);
208+
subject.subscribe(o2);
209+
verify(o2, times(1)).onError(testException);
210+
verify(o2, never()).onNext(any());
211+
verify(o2, never()).onCompleted();
212+
}
213+
214+
@Test
215+
public void testCompletedAfterErrorIsNotSent3() {
216+
BehaviorSubject<String> subject = BehaviorSubject.create("default");
217+
218+
@SuppressWarnings("unchecked")
219+
Observer<String> aObserver = mock(Observer.class);
220+
subject.subscribe(aObserver);
221+
222+
subject.onNext("one");
223+
subject.onCompleted();
224+
subject.onNext("two");
225+
subject.onCompleted();
188226

227+
verify(aObserver, times(1)).onNext("default");
228+
verify(aObserver, times(1)).onNext("one");
229+
verify(aObserver, times(1)).onCompleted();
230+
verify(aObserver, never()).onError(any(Throwable.class));
231+
verify(aObserver, never()).onNext("two");
232+
233+
Observer<Object> o2 = mock(Observer.class);
234+
subject.subscribe(o2);
235+
verify(o2, times(1)).onCompleted();
236+
verify(o2, never()).onNext(any());
237+
verify(aObserver, never()).onError(any(Throwable.class));
238+
}
189239
}

0 commit comments

Comments
 (0)