Skip to content

Commit 69e8e8c

Browse files
Merge pull request ReactiveX#568 from jloisel/master
Use lock free strategy for several Subscription implementations
2 parents dce9d47 + cd15601 commit 69e8e8c

File tree

4 files changed

+455
-218
lines changed

4 files changed

+455
-218
lines changed

rxjava-core/src/main/java/rx/subscriptions/CompositeSubscription.java

+90-70
Original file line numberDiff line numberDiff line change
@@ -15,103 +15,123 @@
1515
*/
1616
package rx.subscriptions;
1717

18+
import static java.util.Arrays.asList;
19+
import static java.util.Collections.unmodifiableSet;
20+
1821
import java.util.ArrayList;
1922
import java.util.Collection;
20-
import java.util.List;
21-
import java.util.concurrent.ConcurrentHashMap;
22-
import java.util.concurrent.atomic.AtomicBoolean;
23+
import java.util.HashSet;
24+
import java.util.Set;
25+
import java.util.concurrent.atomic.AtomicReference;
2326

2427
import rx.Subscription;
2528
import rx.util.CompositeException;
2629

2730
/**
28-
* Subscription that represents a group of Subscriptions that are unsubscribed together.
31+
* Subscription that represents a group of Subscriptions that are unsubscribed
32+
* together.
2933
*
30-
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net equivalent CompositeDisposable</a>
34+
* @see <a
35+
* href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable(v=vs.103).aspx">Rx.Net
36+
* equivalent CompositeDisposable</a>
3137
*/
3238
public class CompositeSubscription implements Subscription {
39+
private static final Set<Subscription> MUTATE_STATE = unmodifiableSet(new HashSet<Subscription>());
40+
private static final Set<Subscription> UNSUBSCRIBED_STATE = unmodifiableSet(new HashSet<Subscription>());
41+
42+
private final AtomicReference<Set<Subscription>> reference = new AtomicReference<Set<Subscription>>();
3343

34-
/*
35-
* The reason 'synchronized' is used on 'add' and 'unsubscribe' is because AtomicBoolean/ConcurrentLinkedQueue are both being modified so it needs to be done atomically.
36-
*
37-
* TODO evaluate whether use of synchronized is a performance issue here and if it's worth using an atomic state machine or other non-locking approach
38-
*/
39-
private AtomicBoolean unsubscribed = new AtomicBoolean(false);
40-
private final ConcurrentHashMap<Subscription, Boolean> subscriptions = new ConcurrentHashMap<Subscription, Boolean>();
41-
42-
public CompositeSubscription(List<Subscription> subscriptions) {
43-
for (Subscription s : subscriptions) {
44-
this.subscriptions.put(s, Boolean.TRUE);
45-
}
44+
public CompositeSubscription(final Subscription... subscriptions) {
45+
reference.set(new HashSet<Subscription>(asList(subscriptions)));
4646
}
4747

48-
public CompositeSubscription(Subscription... subscriptions) {
49-
for (Subscription s : subscriptions) {
50-
this.subscriptions.put(s, Boolean.TRUE);
51-
}
48+
public boolean isUnsubscribed() {
49+
return reference.get() == UNSUBSCRIBED_STATE;
5250
}
5351

54-
/**
55-
* Remove and unsubscribe all subscriptions but do not unsubscribe the outer CompositeSubscription.
56-
*/
57-
public void clear() {
58-
Collection<Throwable> es = null;
59-
for (Subscription s : subscriptions.keySet()) {
60-
try {
52+
public void add(final Subscription s) {
53+
do {
54+
final Set<Subscription> existing = reference.get();
55+
if (existing == UNSUBSCRIBED_STATE) {
6156
s.unsubscribe();
62-
this.subscriptions.remove(s);
63-
} catch (Throwable e) {
64-
if (es == null) {
65-
es = new ArrayList<Throwable>();
66-
}
67-
es.add(e);
57+
break;
6858
}
69-
}
70-
if (es != null) {
71-
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
72-
}
73-
}
7459

75-
/**
76-
* Remove the {@link Subscription} and unsubscribe it.
77-
*
78-
* @param s
79-
*/
80-
public void remove(Subscription s) {
81-
this.subscriptions.remove(s);
82-
// also unsubscribe from it: http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
83-
s.unsubscribe();
60+
if (reference.compareAndSet(existing, MUTATE_STATE)) {
61+
existing.add(s);
62+
reference.set(existing);
63+
break;
64+
}
65+
} while (true);
8466
}
8567

86-
public boolean isUnsubscribed() {
87-
return unsubscribed.get();
68+
public void remove(final Subscription s) {
69+
do {
70+
final Set<Subscription> subscriptions = reference.get();
71+
if (subscriptions == UNSUBSCRIBED_STATE) {
72+
s.unsubscribe();
73+
break;
74+
}
75+
76+
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
77+
// also unsubscribe from it:
78+
// http://msdn.microsoft.com/en-us/library/system.reactive.disposables.compositedisposable.remove(v=vs.103).aspx
79+
subscriptions.remove(s);
80+
reference.set(subscriptions);
81+
s.unsubscribe();
82+
break;
83+
}
84+
} while (true);
8885
}
8986

90-
public synchronized void add(Subscription s) {
91-
if (unsubscribed.get()) {
92-
s.unsubscribe();
93-
} else {
94-
subscriptions.put(s, Boolean.TRUE);
95-
}
87+
public void clear() {
88+
do {
89+
final Set<Subscription> subscriptions = reference.get();
90+
if (subscriptions == UNSUBSCRIBED_STATE) {
91+
break;
92+
}
93+
94+
if (reference.compareAndSet(subscriptions, MUTATE_STATE)) {
95+
final Set<Subscription> copy = new HashSet<Subscription>(
96+
subscriptions);
97+
subscriptions.clear();
98+
reference.set(subscriptions);
99+
100+
for (final Subscription subscription : copy) {
101+
subscription.unsubscribe();
102+
}
103+
break;
104+
}
105+
} while (true);
96106
}
97107

98108
@Override
99-
public synchronized void unsubscribe() {
100-
if (unsubscribed.compareAndSet(false, true)) {
101-
Collection<Throwable> es = null;
102-
for (Subscription s : subscriptions.keySet()) {
103-
try {
104-
s.unsubscribe();
105-
} catch (Throwable e) {
106-
if (es == null) {
107-
es = new ArrayList<Throwable>();
109+
public void unsubscribe() {
110+
do {
111+
final Set<Subscription> subscriptions = reference.get();
112+
if (subscriptions == UNSUBSCRIBED_STATE) {
113+
break;
114+
}
115+
116+
if (subscriptions == MUTATE_STATE) {
117+
continue;
118+
}
119+
120+
if (reference.compareAndSet(subscriptions, UNSUBSCRIBED_STATE)) {
121+
final Collection<Throwable> es = new ArrayList<Throwable>();
122+
for (final Subscription s : subscriptions) {
123+
try {
124+
s.unsubscribe();
125+
} catch (final Throwable e) {
126+
es.add(e);
108127
}
109-
es.add(e);
110128
}
129+
if (es.isEmpty()) {
130+
break;
131+
}
132+
throw new CompositeException(
133+
"Failed to unsubscribe to 1 or more subscriptions.", es);
111134
}
112-
if (es != null) {
113-
throw new CompositeException("Failed to unsubscribe to 1 or more subscriptions.", es);
114-
}
115-
}
135+
} while (true);
116136
}
117137
}
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,62 @@
1-
/**
2-
* Copyright 2013 Netflix, Inc.
3-
*
4-
* Licensed under the Apache License, Version 2.0 (the "License");
5-
* you may not use this file except in compliance with the License.
6-
* You may obtain a copy of the License at
7-
*
8-
* http://www.apache.org/licenses/LICENSE-2.0
9-
*
10-
* Unless required by applicable law or agreed to in writing, software
11-
* distributed under the License is distributed on an "AS IS" BASIS,
12-
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13-
* See the License for the specific language governing permissions and
14-
* limitations under the License.
15-
*/
16-
package rx.subscriptions;
17-
18-
import rx.Subscription;
19-
20-
/**
21-
* Represents a subscription whose underlying subscription can be swapped for another subscription
22-
* which causes the previous underlying subscription to be unsubscribed.
23-
*
24-
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
25-
*/
26-
public class SerialSubscription implements Subscription {
27-
private boolean unsubscribed;
28-
private Subscription subscription;
29-
private final Object gate = new Object();
30-
31-
@Override
32-
public void unsubscribe() {
33-
Subscription toUnsubscribe = null;
34-
synchronized (gate) {
35-
if (!unsubscribed) {
36-
if (subscription != null) {
37-
toUnsubscribe = subscription;
38-
subscription = null;
39-
}
40-
unsubscribed = true;
41-
}
42-
}
43-
if (toUnsubscribe != null) {
44-
toUnsubscribe.unsubscribe();
45-
}
46-
}
47-
48-
public Subscription getSubscription() {
49-
synchronized (gate) {
50-
return subscription;
51-
}
52-
}
53-
54-
public void setSubscription(Subscription subscription) {
55-
Subscription toUnsubscribe = null;
56-
synchronized (gate) {
57-
if (!unsubscribed) {
58-
if (this.subscription != null) {
59-
toUnsubscribe = this.subscription;
60-
}
61-
this.subscription = subscription;
62-
} else {
63-
toUnsubscribe = subscription;
64-
}
65-
}
66-
if (toUnsubscribe != null) {
67-
toUnsubscribe.unsubscribe();
68-
}
69-
}
70-
}
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.subscriptions;
17+
18+
import static rx.subscriptions.Subscriptions.empty;
19+
20+
import java.util.concurrent.atomic.AtomicReference;
21+
22+
import rx.Subscription;
23+
24+
/**
25+
* Represents a subscription whose underlying subscription can be swapped for another subscription
26+
* which causes the previous underlying subscription to be unsubscribed.
27+
*
28+
* @see <a href="http://msdn.microsoft.com/en-us/library/system.reactive.disposables.serialdisposable(v=vs.103).aspx">Rx.Net equivalent SerialDisposable</a>
29+
*/
30+
public class SerialSubscription implements Subscription {
31+
private final AtomicReference<Subscription> reference = new AtomicReference<Subscription>(empty());
32+
33+
private static final Subscription UNSUBSCRIBED = new Subscription() {
34+
@Override
35+
public void unsubscribe() {
36+
}
37+
};
38+
39+
@Override
40+
public void unsubscribe() {
41+
setSubscription(UNSUBSCRIBED);
42+
}
43+
44+
public void setSubscription(final Subscription subscription) {
45+
do {
46+
final Subscription current = reference.get();
47+
if (current == UNSUBSCRIBED) {
48+
subscription.unsubscribe();
49+
break;
50+
}
51+
if (reference.compareAndSet(current, subscription)) {
52+
current.unsubscribe();
53+
break;
54+
}
55+
} while (true);
56+
}
57+
58+
public Subscription getSubscription() {
59+
final Subscription subscription = reference.get();
60+
return subscription == UNSUBSCRIBED ? null : subscription;
61+
}
62+
}

0 commit comments

Comments
 (0)