Skip to content

Commit 84b7030

Browse files
Merge pull request #1246 from akarnokd/AtomicReferencesToFieldUpdaters
Moved to atomic field updaters.
2 parents 4f6705b + 2a08d80 commit 84b7030

34 files changed

+1048
-820
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5425,7 +5425,7 @@ public final Observable<T> serialize() {
54255425
* there is more than 1 {@link Subscriber} this {@link Observable} will be subscribed and emitting data.
54265426
* When all subscribers have unsubscribed it will unsubscribe from the source {@link Observable}.
54275427
* <p>
5428-
* This is an alias for {@link #publish().refCount()}.
5428+
* This is an alias for {@link #publish()}.{@link ConnectableObservable#refCount()}.
54295429
* <p>
54305430
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/publishRefCount.png">
54315431
*

rxjava-core/src/main/java/rx/operators/BlockingOperatorLatest.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,7 @@
1818
import java.util.Iterator;
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.Semaphore;
21-
import java.util.concurrent.atomic.AtomicReference;
21+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2222

2323
import rx.Notification;
2424
import rx.Observable;
@@ -51,11 +51,15 @@ public Iterator<T> iterator() {
5151
static final class LatestObserverIterator<T> extends Subscriber<Notification<? extends T>> implements Iterator<T> {
5252
final Semaphore notify = new Semaphore(0);
5353
// observer's notification
54-
final AtomicReference<Notification<? extends T>> reference = new AtomicReference<Notification<? extends T>>();
54+
volatile Notification<? extends T> value;
55+
/** Updater for the value field. */
56+
@SuppressWarnings("rawtypes")
57+
static final AtomicReferenceFieldUpdater<LatestObserverIterator, Notification> REFERENCE_UPDATER
58+
= AtomicReferenceFieldUpdater.newUpdater(LatestObserverIterator.class, Notification.class, "value");
5559

5660
@Override
5761
public void onNext(Notification<? extends T> args) {
58-
boolean wasntAvailable = reference.getAndSet(args) == null;
62+
boolean wasntAvailable = REFERENCE_UPDATER.getAndSet(this, args) == null;
5963
if (wasntAvailable) {
6064
notify.release();
6165
}
@@ -89,7 +93,9 @@ public boolean hasNext() {
8993
throw Exceptions.propagate(ex);
9094
}
9195

92-
iNotif = reference.getAndSet(null);
96+
@SuppressWarnings("unchecked")
97+
Notification<? extends T> n = (Notification<? extends T>)REFERENCE_UPDATER.getAndSet(this, null);
98+
iNotif = n;
9399
if (iNotif.isOnError()) {
94100
throw Exceptions.propagate(iNotif.getThrowable());
95101
}

rxjava-core/src/main/java/rx/operators/BlockingOperatorMostRecent.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,6 @@
1616
package rx.operators;
1717

1818
import java.util.Iterator;
19-
import java.util.concurrent.atomic.AtomicBoolean;
20-
import java.util.concurrent.atomic.AtomicReference;
2119

2220
import rx.Observable;
2321
import rx.Subscriber;
@@ -79,39 +77,40 @@ public void remove() {
7977
}
8078

8179
private static class MostRecentObserver<T> extends Subscriber<T> {
82-
private final AtomicBoolean completed = new AtomicBoolean(false);
83-
private final AtomicReference<T> value;
84-
private final AtomicReference<Throwable> exception = new AtomicReference<Throwable>();
85-
80+
static final NotificationLite<Object> nl = NotificationLite.instance();
81+
volatile Object value;
82+
8683
private MostRecentObserver(T value) {
87-
this.value = new AtomicReference<T>(value);
84+
this.value = nl.next(value);
8885
}
8986

9087
@Override
9188
public void onCompleted() {
92-
completed.set(true);
89+
value = nl.completed();
9390
}
9491

9592
@Override
9693
public void onError(Throwable e) {
97-
exception.set(e);
94+
value = nl.error(e);
9895
}
9996

10097
@Override
10198
public void onNext(T args) {
102-
value.set(args);
99+
value = nl.next(args);
103100
}
104101

105102
private boolean isCompleted() {
106-
return completed.get();
103+
return nl.isCompleted(value);
107104
}
108105

109106
private Throwable getThrowable() {
110-
return exception.get();
107+
Object v = value;
108+
return nl.isError(v) ? nl.getError(v) : null;
111109
}
112110

111+
@SuppressWarnings("unchecked")
113112
private T getRecentValue() {
114-
return value.get();
113+
return (T)value;
115114
}
116115

117116
}

rxjava-core/src/main/java/rx/operators/BlockingOperatorNext.java

Lines changed: 18 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@
1919
import java.util.NoSuchElementException;
2020
import java.util.concurrent.ArrayBlockingQueue;
2121
import java.util.concurrent.BlockingQueue;
22-
import java.util.concurrent.atomic.AtomicBoolean;
22+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
2323

2424
import rx.Notification;
2525
import rx.Observable;
@@ -61,10 +61,16 @@ private NextIterator(NextObserver<? extends T> observer) {
6161
this.observer = observer;
6262
}
6363

64+
6465
// in tests, set the waiting flag without blocking for the next value to
6566
// allow lockstepping instead of multi-threading
66-
void setWaiting(boolean value) {
67-
observer.waiting.set(value);
67+
/**
68+
* In tests, set the waiting flag without blocking for the next value to
69+
* allow lockstepping instead of multi-threading
70+
* @param value use 1 to enter into the waiting state
71+
*/
72+
void setWaiting(int value) {
73+
observer.setWaiting(value);
6874
}
6975

7076
@Override
@@ -135,7 +141,10 @@ public void remove() {
135141

136142
private static class NextObserver<T> extends Subscriber<Notification<? extends T>> {
137143
private final BlockingQueue<Notification<? extends T>> buf = new ArrayBlockingQueue<Notification<? extends T>>(1);
138-
private final AtomicBoolean waiting = new AtomicBoolean(false);
144+
volatile int waiting;
145+
@SuppressWarnings("rawtypes")
146+
static final AtomicIntegerFieldUpdater<NextObserver> WAITING_UPDATER
147+
= AtomicIntegerFieldUpdater.newUpdater(NextObserver.class, "waiting");
139148

140149
@Override
141150
public void onCompleted() {
@@ -150,7 +159,7 @@ public void onError(Throwable e) {
150159
@Override
151160
public void onNext(Notification<? extends T> args) {
152161

153-
if (waiting.getAndSet(false) || !args.isOnNext()) {
162+
if (WAITING_UPDATER.getAndSet(this, 0) == 1 || !args.isOnNext()) {
154163
Notification<? extends T> toOffer = args;
155164
while (!buf.offer(toOffer)) {
156165
Notification<? extends T> concurrentItem = buf.poll();
@@ -165,9 +174,11 @@ public void onNext(Notification<? extends T> args) {
165174
}
166175

167176
public Notification<? extends T> takeNext() throws InterruptedException {
168-
waiting.set(true);
177+
setWaiting(1);
169178
return buf.take();
170179
}
171-
180+
void setWaiting(int value) {
181+
waiting = value;
182+
}
172183
}
173184
}

rxjava-core/src/main/java/rx/operators/BufferUntilSubscriber.java

Lines changed: 38 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -16,8 +16,8 @@
1616
package rx.operators;
1717

1818
import java.util.concurrent.ConcurrentLinkedQueue;
19-
import java.util.concurrent.atomic.AtomicBoolean;
20-
import java.util.concurrent.atomic.AtomicReference;
19+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
20+
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
2121

2222
import rx.Observer;
2323
import rx.Subscriber;
@@ -53,12 +53,28 @@ public static <T> BufferUntilSubscriber<T> create() {
5353

5454
/** The common state. */
5555
static final class State<T> {
56-
/** Lite notifications of type T. */
57-
final NotificationLite<T> nl = NotificationLite.instance();
5856
/** The first observer or the one which buffers until the first arrives. */
59-
final AtomicReference<Observer<? super T>> observerRef = new AtomicReference<Observer<? super T>>(new BufferedObserver<T>());
57+
volatile Observer<? super T> observerRef = new BufferedObserver<T>();
6058
/** Allow a single subscriber only. */
61-
final AtomicBoolean first = new AtomicBoolean();
59+
volatile int first;
60+
/** Field updater for observerRef. */
61+
@SuppressWarnings("rawtypes")
62+
static final AtomicReferenceFieldUpdater<State, Observer> OBSERVER_UPDATER
63+
= AtomicReferenceFieldUpdater.newUpdater(State.class, Observer.class, "observerRef");
64+
/** Field updater for first. */
65+
@SuppressWarnings("rawtypes")
66+
static final AtomicIntegerFieldUpdater<State> FIRST_UPDATER
67+
= AtomicIntegerFieldUpdater.newUpdater(State.class, "first");
68+
69+
boolean casFirst(int expected, int next) {
70+
return FIRST_UPDATER.compareAndSet(this, expected, next);
71+
}
72+
void setObserverRef(Observer<? super T> o) {
73+
observerRef = o;
74+
}
75+
boolean casObserverRef(Observer<? super T> expected, Observer<? super T> next) {
76+
return OBSERVER_UPDATER.compareAndSet(this, expected, next);
77+
}
6278
}
6379

6480
static final class OnSubscribeAction<T> implements OnSubscribe<T> {
@@ -70,20 +86,21 @@ public OnSubscribeAction(State<T> state) {
7086

7187
@Override
7288
public void call(final Subscriber<? super T> s) {
73-
if (state.first.compareAndSet(false, true)) {
89+
if (state.casFirst(0, 1)) {
90+
final NotificationLite<T> nl = NotificationLite.instance();
7491
// drain queued notifications before subscription
7592
// we do this here before PassThruObserver so the consuming thread can do this before putting itself in the line of the producer
76-
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef.get();
93+
BufferedObserver<? super T> buffered = (BufferedObserver<? super T>)state.observerRef;
7794
Object o;
7895
while ((o = buffered.buffer.poll()) != null) {
79-
state.nl.accept(s, o);
96+
nl.accept(s, o);
8097
}
8198
// register real observer for pass-thru ... and drain any further events received on first notification
82-
state.observerRef.set(new PassThruObserver<T>(s, buffered.buffer, state.observerRef));
99+
state.setObserverRef(new PassThruObserver<T>(s, buffered.buffer, state));
83100
s.add(Subscriptions.create(new Action0() {
84101
@Override
85102
public void call() {
86-
state.observerRef.set(Subscribers.empty());
103+
state.setObserverRef(Subscribers.empty());
87104
}
88105
}));
89106
} else {
@@ -101,17 +118,17 @@ private BufferUntilSubscriber(State<T> state) {
101118

102119
@Override
103120
public void onCompleted() {
104-
state.observerRef.get().onCompleted();
121+
state.observerRef.onCompleted();
105122
}
106123

107124
@Override
108125
public void onError(Throwable e) {
109-
state.observerRef.get().onError(e);
126+
state.observerRef.onError(e);
110127
}
111128

112129
@Override
113130
public void onNext(T t) {
114-
state.observerRef.get().onNext(t);
131+
state.observerRef.onNext(t);
115132
}
116133

117134
/**
@@ -127,13 +144,13 @@ private static final class PassThruObserver<T> extends Subscriber<T> {
127144
private final Observer<? super T> actual;
128145
// this assumes single threaded synchronous notifications (the Rx contract for a single Observer)
129146
private final ConcurrentLinkedQueue<Object> buffer;
130-
private final AtomicReference<Observer<? super T>> observerRef;
131-
private final NotificationLite<T> nl = NotificationLite.instance();
147+
private final State<T> state;
132148

133-
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer, AtomicReference<Observer<? super T>> observerRef) {
149+
PassThruObserver(Observer<? super T> actual, ConcurrentLinkedQueue<Object> buffer,
150+
State<T> state) {
134151
this.actual = actual;
135152
this.buffer = buffer;
136-
this.observerRef = observerRef;
153+
this.state = state;
137154
}
138155

139156
@Override
@@ -155,20 +172,21 @@ public void onNext(T t) {
155172
}
156173

157174
private void drainIfNeededAndSwitchToActual() {
175+
final NotificationLite<T> nl = NotificationLite.instance();
158176
Object o;
159177
while ((o = buffer.poll()) != null) {
160178
nl.accept(this, o);
161179
}
162180
// now we can safely change over to the actual and get rid of the pass-thru
163181
// but only if not unsubscribed
164-
observerRef.compareAndSet(this, actual);
182+
state.casObserverRef(this, actual);
165183
}
166184

167185
}
168186

169187
private static final class BufferedObserver<T> extends Subscriber<T> {
170188
private final ConcurrentLinkedQueue<Object> buffer = new ConcurrentLinkedQueue<Object>();
171-
private final NotificationLite<T> nl = NotificationLite.instance();
189+
private static final NotificationLite<Object> nl = NotificationLite.instance();
172190

173191
@Override
174192
public void onCompleted() {

rxjava-core/src/main/java/rx/operators/OperatorCache.java

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@
1515
*/
1616
package rx.operators;
1717

18-
import java.util.concurrent.atomic.AtomicBoolean;
18+
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
1919

2020
import rx.Observable;
2121
import rx.Observable.OnSubscribe;
@@ -43,7 +43,10 @@
4343
public final class OperatorCache<T> implements OnSubscribe<T> {
4444
protected final Observable<? extends T> source;
4545
protected final Subject<? super T, ? extends T> cache;
46-
protected final AtomicBoolean sourceSubscribed;
46+
volatile int sourceSubscribed;
47+
@SuppressWarnings("rawtypes")
48+
static final AtomicIntegerFieldUpdater<OperatorCache> SRC_SUBSCRIBED_UPDATER
49+
= AtomicIntegerFieldUpdater.newUpdater(OperatorCache.class, "sourceSubscribed");
4750

4851
public OperatorCache(Observable<? extends T> source) {
4952
this(source, ReplaySubject.<T> create());
@@ -52,12 +55,11 @@ public OperatorCache(Observable<? extends T> source) {
5255
/* accessible to tests */OperatorCache(Observable<? extends T> source, Subject<? super T, ? extends T> cache) {
5356
this.source = source;
5457
this.cache = cache;
55-
this.sourceSubscribed = new AtomicBoolean();
5658
}
5759

5860
@Override
5961
public void call(Subscriber<? super T> s) {
60-
if (sourceSubscribed.compareAndSet(false, true)) {
62+
if (SRC_SUBSCRIBED_UPDATER.compareAndSet(this, 0, 1)) {
6163
source.unsafeSubscribe(Subscribers.from(cache));
6264
/*
6365
* Note that we will never unsubscribe from 'source' as we want to receive and cache all of its values.

0 commit comments

Comments
 (0)