Skip to content

Commit 7dce953

Browse files
Merge pull request ReactiveX#230 from johngmyers/wrap-unsubscribed
An unsubscribed AtomicObservableSubscription.wrap() unsubscribes its argument
2 parents 300a1df + e0e965d commit 7dce953

File tree

1 file changed

+32
-9
lines changed

1 file changed

+32
-9
lines changed

rxjava-core/src/main/java/rx/util/AtomicObservableSubscription.java

Lines changed: 32 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -15,10 +15,14 @@
1515
*/
1616
package rx.util;
1717

18-
import java.util.concurrent.atomic.AtomicBoolean;
18+
import org.junit.Test;
19+
import rx.Subscription;
20+
1921
import java.util.concurrent.atomic.AtomicReference;
2022

21-
import rx.Subscription;
23+
import static org.mockito.Mockito.mock;
24+
import static org.mockito.Mockito.times;
25+
import static org.mockito.Mockito.verify;
2226

2327
/**
2428
* Thread-safe wrapper around Observable Subscription that ensures unsubscribe can be called only once.
@@ -32,11 +36,16 @@
3236
*/
3337
public final class AtomicObservableSubscription implements Subscription {
3438

35-
private AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
36-
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
39+
private static final Subscription UNSUBSCRIBED = new Subscription()
40+
{
41+
@Override
42+
public void unsubscribe()
43+
{
44+
}
45+
};
46+
private final AtomicReference<Subscription> actualSubscription = new AtomicReference<Subscription>();
3747

3848
public AtomicObservableSubscription() {
39-
4049
}
4150

4251
public AtomicObservableSubscription(Subscription actualSubscription) {
@@ -46,12 +55,16 @@ public AtomicObservableSubscription(Subscription actualSubscription) {
4655
/**
4756
* Wraps the actual subscription once it exists (if it wasn't available when constructed)
4857
*
49-
* @param actualSubscription
58+
* @param actualSubscription the wrapped subscription
5059
* @throws IllegalStateException
5160
* if trying to set more than once (or use this method after setting via constructor)
5261
*/
5362
public AtomicObservableSubscription wrap(Subscription actualSubscription) {
5463
if (!this.actualSubscription.compareAndSet(null, actualSubscription)) {
64+
if (this.actualSubscription.get() == UNSUBSCRIBED) {
65+
actualSubscription.unsubscribe();
66+
return this;
67+
}
5568
throw new IllegalStateException("Can not set subscription more than once.");
5669
}
5770
return this;
@@ -60,15 +73,25 @@ public AtomicObservableSubscription wrap(Subscription actualSubscription) {
6073
@Override
6174
public void unsubscribe() {
6275
// get the real thing and set to null in an atomic operation so we will only ever call unsubscribe once
63-
Subscription actual = actualSubscription.getAndSet(null);
76+
Subscription actual = actualSubscription.getAndSet(UNSUBSCRIBED);
6477
// if it's not null we will unsubscribe
6578
if (actual != null) {
6679
actual.unsubscribe();
67-
unsubscribed.set(true);
6880
}
6981
}
7082

7183
public boolean isUnsubscribed() {
72-
return unsubscribed.get();
84+
return actualSubscription.get() == UNSUBSCRIBED;
85+
}
86+
87+
public static class UnitTest {
88+
@Test
89+
public void testWrapAfterUnsubscribe() {
90+
AtomicObservableSubscription atomicObservableSubscription = new AtomicObservableSubscription();
91+
atomicObservableSubscription.unsubscribe();
92+
Subscription innerSubscription = mock(Subscription.class);
93+
atomicObservableSubscription.wrap(innerSubscription);
94+
verify(innerSubscription, times(1)).unsubscribe();
95+
}
7396
}
7497
}

0 commit comments

Comments
 (0)