Skip to content

Commit 80177f0

Browse files
Merge pull request #593 from akarnokd/LockFreeSubscriptions
Lock-free subscriptions
2 parents d5c9fa7 + ba1d448 commit 80177f0

8 files changed

+369
-120
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

Lines changed: 80 additions & 57 deletions
Original file line numberDiff line numberDiff line change
@@ -1,25 +1,26 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
1616
package rx.subscriptions;
1717

1818
import static java.util.Arrays.asList;
1919
import static java.util.Collections.unmodifiableSet;
2020

2121
import java.util.ArrayList;
2222
import java.util.Collection;
23+
import java.util.Collections;
2324
import java.util.HashSet;
2425
import java.util.Set;
2526
import java.util.concurrent.atomic.AtomicReference;
@@ -30,50 +31,60 @@
3031
/**
3132
* Subscription that represents a group of Subscriptions that are unsubscribed
3233
* together.
33-
*
34+
*
3435
* @see <a
3536
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
3637
* equivalent CompositeDisposable</a>
3738
*/
3839
public class CompositeSubscription implements Subscription {
39-
private static final Set<Subscription> MUTATE_STATE = unmodifiableSet(new HashSet<Subscription>());
40-
private static final Set<Subscription> UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet<Subscription>());
41-
40+
/** Sentinel to indicate a thread is modifying the subscription set. */
41+
private static final Set<Subscription> MUTATE_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
42+
/** Sentinel to indicate the entire CompositeSubscription has been unsubscribed.*/
43+
private static final Set<Subscription> UNSUBSCRIBED_SENTINEL = unmodifiableSet(Collections.<Subscription>emptySet());
44+
/** The reference to the set of subscriptions. */
4245
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();
43-
46+
4447
public CompositeSubscription(final Subscription... subscriptions) {
4548
reference.set(new HashSet<Subscription>(asList(subscriptions)));
4649
}
47-
50+
4851
public boolean isUnsubscribed() {
49-
return reference.get() == UNSUBSCRIBED_STATE;
52+
return reference.get() == UNSUBSCRIBED_SENTINEL;
5053
}
51-
54+
5255
public void add(final Subscription s) {
5356
do {
5457
final Set<Subscription> existing = reference.get();
55-
if (existing == UNSUBSCRIBED_STATE) {
58+
if (existing == UNSUBSCRIBED_SENTINEL) {
5659
s.unsubscribe();
5760
break;
5861
}
59-
60-
if (reference.compareAndSet(existing, MUTATE_STATE)) {
62+
63+
if (existing == MUTATE_SENTINEL) {
64+
continue;
65+
}
66+
67+
if (reference.compareAndSet(existing, MUTATE_SENTINEL)) {
6168
existing.add(s);
6269
reference.set(existing);
6370
break;
6471
}
6572
} while (true);
6673
}
67-
74+
6875
public void remove(final Subscription s) {
6976
do {
7077
final Set<Subscription> subscriptions = reference.get();
71-
if (subscriptions == UNSUBSCRIBED_STATE) {
78+
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
7279
s.unsubscribe();
7380
break;
7481
}
75-
76-
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
82+
83+
if (subscriptions == MUTATE_SENTINEL) {
84+
continue;
85+
}
86+
87+
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
7788
// also unsubscribe from it:
7889
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
7990
subscriptions.remove(s);
@@ -83,54 +94,66 @@ public void remove(final Subscription s) {
8394
}
8495
} while (true);
8596
}
86-
97+
8798
public void clear() {
8899
do {
89100
final Set<Subscription> subscriptions = reference.get();
90-
if (subscriptions == UNSUBSCRIBED_STATE) {
101+
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
91102
break;
92103
}
93-
94-
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
104+
105+
if (subscriptions == MUTATE_SENTINEL) {
106+
continue;
107+
}
108+
109+
if (reference.compareAndSet(subscriptions, MUTATE_SENTINEL)) {
95110
final Set<Subscription> copy = new HashSet<Subscription>(
96111
subscriptions);
97112
subscriptions.clear();
98113
reference.set(subscriptions);
99-
100-
for (final Subscription subscription : copy) {
101-
subscription.unsubscribe();
102-
}
114+
115+
unsubscribeAll(copy);
103116
break;
104117
}
105118
} while (true);
106119
}
107-
120+
/**
121+
* Unsubscribe from the collection of subscriptions.
122+
* <p>
123+
* Exceptions thrown by any of the {@code unsubscribe()} methods are
124+
* collected into a {@link CompositeException} and thrown once
125+
* all unsubscriptions have been attempted.
126+
* @param subs the collection of subscriptions
127+
*/
128+
private void unsubscribeAll(Collection<Subscription> subs) {
129+
final Collection<Throwable> es = new ArrayList<Throwable>();
130+
for (final Subscription s : subs) {
131+
try {
132+
s.unsubscribe();
133+
} catch (final Throwable e) {
134+
es.add(e);
135+
}
136+
}
137+
if (!es.isEmpty()) {
138+
throw new CompositeException(
139+
"Failed to unsubscribe to 1 or more subscriptions.", es);
140+
}
141+
}
108142
@Override
109143
public void unsubscribe() {
110144
do {
111145
final Set<Subscription> subscriptions = reference.get();
112-
if (subscriptions == UNSUBSCRIBED_STATE) {
146+
if (subscriptions == UNSUBSCRIBED_SENTINEL) {
113147
break;
114148
}
115-
116-
if (subscriptions == MUTATE_STATE) {
149+
150+
if (subscriptions == MUTATE_SENTINEL) {
117151
continue;
118152
}
119-
120-
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
121-
final Collection<Throwable> es = new ArrayList<Throwable>();
122-
for (final Subscription s : subscriptions) {
123-
try {
124-
s.unsubscribe();
125-
} catch (final Throwable e) {
126-
es.add(e);
127-
}
128-
}
129-
if (es.isEmpty()) {
130-
break;
131-
}
132-
throw new CompositeException(
133-
"Failed to unsubscribe to 1 or more subscriptions.", es);
153+
154+
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_SENTINEL)) {
155+
unsubscribeAll(subscriptions);
156+
break;
134157
}
135158
} while (true);
136159
}

rxjava-core/src/main/java/rx/subscriptions/MultipleAssignmentSubscription.java

Lines changed: 23 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -27,34 +27,41 @@
2727
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.multipleassignmentdisposable">Rx.Net equivalent MultipleAssignmentDisposable</a>
2828
*/
2929
public class MultipleAssignmentSubscription implements Subscription {
30-
31-
private final AtomicBoolean unsubscribed = new AtomicBoolean(false);
32-
private AtomicReference<Subscription> subscription = new AtomicReference<Subscription>();
33-
30+
private AtomicReference<Subscription> reference = new AtomicReference<Subscription>();
31+
/** Sentinel for the unsubscribed state. */
32+
private static final Subscription UNSUBSCRIBED_SENTINEL = new Subscription() {
33+
@Override
34+
public void unsubscribe() {
35+
}
36+
};
3437
public boolean isUnsubscribed() {
35-
return unsubscribed.get();
38+
return reference.get() == UNSUBSCRIBED_SENTINEL;
3639
}
3740

3841
@Override
39-
public synchronized void unsubscribe() {
40-
unsubscribed.set(true);
41-
Subscription s = getSubscription();
42+
public void unsubscribe() {
43+
Subscription s = reference.getAndSet(UNSUBSCRIBED_SENTINEL);
4244
if (s != null) {
4345
s.unsubscribe();
4446
}
45-
4647
}
4748

48-
public synchronized void setSubscription(Subscription s) {
49-
if (unsubscribed.get()) {
50-
s.unsubscribe();
51-
} else {
52-
subscription.set(s);
53-
}
49+
public void setSubscription(Subscription s) {
50+
do {
51+
Subscription r = reference.get();
52+
if (r == UNSUBSCRIBED_SENTINEL) {
53+
s.unsubscribe();
54+
return;
55+
}
56+
if (reference.compareAndSet(r, s)) {
57+
break;
58+
}
59+
} while (true);
5460
}
5561

5662
public Subscription getSubscription() {
57-
return subscription.get();
63+
Subscription s = reference.get();
64+
return s != UNSUBSCRIBED_SENTINEL ? s : Subscriptions.empty();
5865
}
5966

6067
}

0 commit comments

Comments
 (0)