Skip to content

Hopefully fixed missing notifications if part of the subscription is del... #660

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 26 additions & 5 deletions rxjava-core/src/main/java/rx/subjects/BehaviorSubject.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package rx.subjects;

import java.util.Collection;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Notification;
Expand Down Expand Up @@ -88,6 +89,7 @@ public static <T> BehaviorSubject<T> createWithDefaultValue(T defaultValue) {
public static <T> BehaviorSubject<T> create(T defaultValue) {
final SubjectSubscriptionManager<T> subscriptionManager = new SubjectSubscriptionManager<T>();
// set a default value so subscriptions will immediately receive this until a new notification is received
final AtomicBoolean mutating = new AtomicBoolean();
final AtomicReference<Notification<T>> lastNotification = new AtomicReference<Notification<T>>(new Notification<T>(defaultValue));

OnSubscribeFunc<T> onSubscribe = subscriptionManager.getOnSubscribeFunc(
Expand Down Expand Up @@ -124,18 +126,23 @@ public void call(SubjectObserver<? super T> o) {
*/
lastNotification.get().accept(o);
}
});
}, mutating);

return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification);
return new BehaviorSubject<T>(onSubscribe, subscriptionManager, lastNotification, mutating);
}

private final SubjectSubscriptionManager<T> subscriptionManager;
final AtomicReference<Notification<T>> lastNotification;
final AtomicBoolean mutating;

protected BehaviorSubject(OnSubscribeFunc<T> onSubscribe, SubjectSubscriptionManager<T> subscriptionManager, AtomicReference<Notification<T>> lastNotification) {
protected BehaviorSubject(OnSubscribeFunc<T> onSubscribe,
SubjectSubscriptionManager<T> subscriptionManager,
AtomicReference<Notification<T>> lastNotification,
AtomicBoolean mutating) {
super(onSubscribe);
this.subscriptionManager = subscriptionManager;
this.lastNotification = lastNotification;
this.mutating = mutating;
}

@Override
Expand Down Expand Up @@ -169,11 +176,25 @@ public void call(Collection<SubjectObserver<? super T>> observers) {

@Override
public void onNext(T v) {
do {
// try to enter mutating state
} while (!mutating.compareAndSet(false, true));
/**
* Store the latest value but do not send it. It only gets sent when 'onCompleted' occurs.
*/
lastNotification.set(new Notification<T>(v));
for (Observer<? super T> o : subscriptionManager.rawSnapshot()) {
Observer<? super T>[] os;
try {
if (!lastNotification.get().isOnNext()) {
return;
}
lastNotification.set(new Notification<T>(v));

os = subscriptionManager.rawSnapshot();
} finally {
mutating.set(false);
}

for (Observer<? super T> o : os) {
o.onNext(v);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;

import rx.Observable.OnSubscribeFunc;
Expand Down Expand Up @@ -102,6 +103,88 @@ public void unsubscribe() {

};
}
/**
*
* @param onSubscribe
* Always runs at the beginning of 'subscribe' regardless of terminal state.
* @param onTerminated
* Only runs if Subject is in terminal state and the Observer ends up not being registered.
* @return
*/
public OnSubscribeFunc<T> getOnSubscribeFunc(
final Action1<SubjectObserver<? super T>> onSubscribe,
final Action1<SubjectObserver<? super T>> onTerminated,
final AtomicBoolean mutating) {
return new OnSubscribeFunc<T>() {
@Override
public Subscription onSubscribe(Observer<? super T> actualObserver) {
do {
// enter mutating state
} while (!mutating.compareAndSet(false, true));

try {
SubjectObserver<T> observer = new SubjectObserver<T>(actualObserver);
// invoke onSubscribe logic
if (onSubscribe != null) {
onSubscribe.call(observer);
}

State<T> current;
State<T> newState = null;
boolean addedObserver = false;
Subscription s;
do {
current = state.get();
if (current.terminated) {
// we are terminated so don't need to do anything
s = Subscriptions.empty();
addedObserver = false;
// break out and don't try to modify state
newState = current;
// wait for termination to complete if
try {
current.terminationLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException("Interrupted waiting for termination.", e);
}
break;
} else {
final SafeObservableSubscription subscription = new SafeObservableSubscription();
s = subscription;
addedObserver = true;
subscription.wrap(new Subscription() {
@Override
public void unsubscribe() {
State<T> current;
State<T> newState;
do {
current = state.get();
// on unsubscribe remove it from the map of outbound observers to notify
newState = current.removeObserver(subscription);
} while (!state.compareAndSet(current, newState));
}
});

// on subscribe add it to the map of outbound observers to notify
newState = current.addObserver(subscription, observer);
}
} while (!state.compareAndSet(current, newState));

/**
* Whatever happened above, if we are terminated we run `onTerminated`
*/
if (newState.terminated && !addedObserver) {
onTerminated.call(observer);
}

return s;
} finally {
mutating.set(false);
}
}

};
}

protected void terminate(Action1<Collection<SubjectObserver<? super T>>> onTerminate) {
State<T> current;
Expand Down
99 changes: 98 additions & 1 deletion rxjava-core/src/test/java/rx/subjects/BehaviorSubjectTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,13 @@
*/
package rx.subjects;

import static org.mockito.Matchers.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.*;

import org.junit.Test;
Expand Down Expand Up @@ -166,6 +172,97 @@ public void testCompletedStopsEmittingData() {
inOrderC.verifyNoMoreInteractions();
}

@Test
public void testValueSkipWhileSubscribing() throws InterruptedException {
final BehaviorSubject<Integer> bs = BehaviorSubject.create(1);

final CountDownLatch firstValuePause = new CountDownLatch(1);
final CountDownLatch done = new CountDownLatch(1);

final List<Integer> values = new ArrayList<Integer>();

final Observer<Integer> onNext = new Observer<Integer>() {

@Override
public void onNext(Integer t1) {
values.add(t1);
try {
firstValuePause.await();
} catch (InterruptedException ex) {
throw new RuntimeException(ex);
}
}

@Override
public void onError(Throwable e) {
}

@Override
public void onCompleted() {
done.countDown();
}

};

bs.onNext(2);
bs.onNext(3);

Thread t1 = new Thread(new Runnable() {
@Override
public void run() {
bs.subscribe(onNext);
}
});
t1.start();

final CountDownLatch t2ready = new CountDownLatch(1);

Thread t2 = new Thread(new Runnable() {

@Override
public void run() {
try {
t2ready.countDown();
Thread.sleep(1000);
bs.onNext(4);
Thread.sleep(1000);
bs.onNext(5);
Thread.sleep(1000);
bs.onNext(6);
Thread.sleep(1000);
bs.onNext(7);
Thread.sleep(1000);
bs.onNext(8);
Thread.sleep(1000);
bs.onNext(9);
Thread.sleep(1000);
bs.onCompleted();
} catch (InterruptedException ex) {

}
}
});

t2.start();

if (!t2ready.await(10000, TimeUnit.MILLISECONDS)) {
fail("Couldn't get t2 ready in time.");
}

Thread.sleep(3000);

firstValuePause.countDown();

t1.join();
t2.join();

if (!done.await(10000, TimeUnit.MILLISECONDS)) {
fail("Not done in time");
}

assertEquals(Arrays.asList(3, 4, 5, 6, 7, 8, 9), values);
}

@Test
public void testCompletedAfterErrorIsNotSent() {
BehaviorSubject<String> subject = BehaviorSubject.create("default");
Expand Down