Skip to content

Commit 9281281

Browse files
authored
2.x: Fix publish(-|Function) subscriber swap possible data loss (#5893)
1 parent 3aae12e commit 9281281

File tree

3 files changed

+439
-40
lines changed

3 files changed

+439
-40
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublish.java

Lines changed: 50 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -148,7 +148,7 @@ static final class PublishSubscriber<T>
148148
final int bufferSize;
149149

150150
/** Tracks the subscribed InnerSubscribers. */
151-
final AtomicReference<InnerSubscriber[]> subscribers;
151+
final AtomicReference<InnerSubscriber<T>[]> subscribers;
152152
/**
153153
* Atomically changed from false to true by connect to make sure the
154154
* connection is only performed by one thread.
@@ -165,8 +165,9 @@ static final class PublishSubscriber<T>
165165
/** Holds notifications from upstream. */
166166
volatile SimpleQueue<T> queue;
167167

168+
@SuppressWarnings("unchecked")
168169
PublishSubscriber(AtomicReference<PublishSubscriber<T>> current, int bufferSize) {
169-
this.subscribers = new AtomicReference<InnerSubscriber[]>(EMPTY);
170+
this.subscribers = new AtomicReference<InnerSubscriber<T>[]>(EMPTY);
170171
this.current = current;
171172
this.shouldConnect = new AtomicBoolean();
172173
this.bufferSize = bufferSize;
@@ -175,6 +176,7 @@ static final class PublishSubscriber<T>
175176
@Override
176177
public void dispose() {
177178
if (subscribers.get() != TERMINATED) {
179+
@SuppressWarnings("unchecked")
178180
InnerSubscriber[] ps = subscribers.getAndSet(TERMINATED);
179181
if (ps != TERMINATED) {
180182
current.compareAndSet(PublishSubscriber.this, null);
@@ -263,15 +265,16 @@ boolean add(InnerSubscriber<T> producer) {
263265
// the state can change so we do a CAS loop to achieve atomicity
264266
for (;;) {
265267
// get the current producer array
266-
InnerSubscriber[] c = subscribers.get();
268+
InnerSubscriber<T>[] c = subscribers.get();
267269
// if this subscriber-to-source reached a terminal state by receiving
268270
// an onError or onComplete, just refuse to add the new producer
269271
if (c == TERMINATED) {
270272
return false;
271273
}
272274
// we perform a copy-on-write logic
273275
int len = c.length;
274-
InnerSubscriber[] u = new InnerSubscriber[len + 1];
276+
@SuppressWarnings("unchecked")
277+
InnerSubscriber<T>[] u = new InnerSubscriber[len + 1];
275278
System.arraycopy(c, 0, u, 0, len);
276279
u[len] = producer;
277280
// try setting the subscribers array
@@ -287,11 +290,12 @@ boolean add(InnerSubscriber<T> producer) {
287290
* Atomically removes the given InnerSubscriber from the subscribers array.
288291
* @param producer the producer to remove
289292
*/
293+
@SuppressWarnings("unchecked")
290294
void remove(InnerSubscriber<T> producer) {
291295
// the state can change so we do a CAS loop to achieve atomicity
292296
for (;;) {
293297
// let's read the current subscribers array
294-
InnerSubscriber[] c = subscribers.get();
298+
InnerSubscriber<T>[] c = subscribers.get();
295299
int len = c.length;
296300
// if it is either empty or terminated, there is nothing to remove so we quit
297301
if (len == 0) {
@@ -311,7 +315,7 @@ void remove(InnerSubscriber<T> producer) {
311315
return;
312316
}
313317
// we do copy-on-write logic here
314-
InnerSubscriber[] u;
318+
InnerSubscriber<T>[] u;
315319
// we don't create a new empty array if producer was the single inhabitant
316320
// but rather reuse an empty array
317321
if (len == 1) {
@@ -340,6 +344,7 @@ void remove(InnerSubscriber<T> producer) {
340344
* @param empty set to true if the queue is empty
341345
* @return true if there is indeed a terminal condition
342346
*/
347+
@SuppressWarnings("unchecked")
343348
boolean checkTerminated(Object term, boolean empty) {
344349
// first of all, check if there is actually a terminal event
345350
if (term != null) {
@@ -404,6 +409,17 @@ void dispatch() {
404409
return;
405410
}
406411
int missed = 1;
412+
413+
// saving a local copy because this will be accessed after every item
414+
// delivered to detect changes in the subscribers due to an onNext
415+
// and thus not dropping items
416+
AtomicReference<InnerSubscriber<T>[]> subscribers = this.subscribers;
417+
418+
// We take a snapshot of the current child subscribers.
419+
// Concurrent subscribers may miss this iteration, but it is to be expected
420+
InnerSubscriber<T>[] ps = subscribers.get();
421+
422+
outer:
407423
for (;;) {
408424
/*
409425
* We need to read terminalEvent before checking the queue for emptiness because
@@ -434,10 +450,6 @@ void dispatch() {
434450
// this loop is the only one which can turn a non-empty queue into an empty one
435451
// and as such, no need to ask the queue itself again for that.
436452
if (!empty) {
437-
// We take a snapshot of the current child subscribers.
438-
// Concurrent subscribers may miss this iteration, but it is to be expected
439-
@SuppressWarnings("unchecked")
440-
InnerSubscriber<T>[] ps = subscribers.get();
441453

442454
int len = ps.length;
443455
// Let's assume everyone requested the maximum value.
@@ -452,14 +464,11 @@ void dispatch() {
452464
long r = ip.get();
453465
// if there is one child subscriber that hasn't requested yet
454466
// we can't emit anything to anyone
455-
if (r >= 0L) {
456-
maxRequested = Math.min(maxRequested, r);
457-
} else
458-
// cancellation is indicated by a special value
459-
if (r == CANCELLED) {
467+
if (r != CANCELLED) {
468+
maxRequested = Math.min(maxRequested, r - ip.emitted);
469+
} else {
460470
cancelled++;
461471
}
462-
// we ignore those with NOT_REQUESTED as if they aren't even there
463472
}
464473

465474
// it may happen everyone has cancelled between here and subscribers.get()
@@ -518,20 +527,36 @@ void dispatch() {
518527
}
519528
// we need to unwrap potential nulls
520529
T value = NotificationLite.getValue(v);
530+
531+
boolean subscribersChanged = false;
532+
521533
// let's emit this value to all child subscribers
522534
for (InnerSubscriber<T> ip : ps) {
523535
// if ip.get() is negative, the child has either cancelled in the
524536
// meantime or hasn't requested anything yet
525537
// this eager behavior will skip cancelled children in case
526538
// multiple values are available in the queue
527-
if (ip.get() > 0L) {
539+
long ipr = ip.get();
540+
if (ipr != CANCELLED) {
541+
if (ipr != Long.MAX_VALUE) {
542+
// indicate this child has received 1 element
543+
ip.emitted++;
544+
}
528545
ip.child.onNext(value);
529-
// indicate this child has received 1 element
530-
ip.produced(1);
546+
} else {
547+
subscribersChanged = true;
531548
}
532549
}
533550
// indicate we emitted one element
534551
d++;
552+
553+
// see if the array of subscribers changed as a consequence
554+
// of emission or concurrent activity
555+
InnerSubscriber<T>[] freshArray = subscribers.get();
556+
if (subscribersChanged || freshArray != ps) {
557+
ps = freshArray;
558+
continue outer;
559+
}
535560
}
536561

537562
// if we did emit at least one element, request more to replenish the queue
@@ -552,6 +577,9 @@ void dispatch() {
552577
if (missed == 0) {
553578
break;
554579
}
580+
581+
// get a fresh copy of the current subscribers
582+
ps = subscribers.get();
555583
}
556584
}
557585
}
@@ -571,6 +599,9 @@ static final class InnerSubscriber<T> extends AtomicLong implements Subscription
571599
*/
572600
volatile PublishSubscriber<T> parent;
573601

602+
/** Track the number of emitted items (avoids decrementing the request counter). */
603+
long emitted;
604+
574605
InnerSubscriber(Subscriber<? super T> child) {
575606
this.child = child;
576607
}
@@ -586,15 +617,6 @@ public void request(long n) {
586617
}
587618
}
588619

589-
/**
590-
* Indicate that values have been emitted to this child subscriber by the dispatch() method.
591-
* @param n the number of items emitted
592-
* @return the updated request value (may indicate how much can be produced or a terminal state)
593-
*/
594-
public long produced(long n) {
595-
return BackpressureHelper.producedCancel(this, n);
596-
}
597-
598620
@Override
599621
public void cancel() {
600622
long r = get();

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 27 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -153,8 +153,6 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Flowable
153153

154154
int consumed;
155155

156-
long emitted;
157-
158156
@SuppressWarnings("unchecked")
159157
MulticastProcessor(int prefetch, boolean delayError) {
160158
this.prefetch = prefetch;
@@ -325,18 +323,20 @@ void drain() {
325323
int upstreamConsumed = consumed;
326324
int localLimit = limit;
327325
boolean canRequest = sourceMode != QueueSubscription.SYNC;
328-
long e = emitted;
326+
AtomicReference<MulticastSubscription<T>[]> subs = subscribers;
327+
328+
MulticastSubscription<T>[] array = subs.get();
329329

330+
outer:
330331
for (;;) {
331-
MulticastSubscription<T>[] array = subscribers.get();
332332

333333
int n = array.length;
334334

335335
if (q != null && n != 0) {
336336
long r = Long.MAX_VALUE;
337337

338338
for (MulticastSubscription<T> ms : array) {
339-
long u = ms.get();
339+
long u = ms.get() - ms.emitted;
340340
if (u != Long.MIN_VALUE) {
341341
if (r > u) {
342342
r = u;
@@ -347,10 +347,10 @@ void drain() {
347347
}
348348

349349
if (n == 0) {
350-
r = e;
350+
r = 0;
351351
}
352352

353-
while (e != r) {
353+
while (r != 0) {
354354
if (isDisposed()) {
355355
q.clear();
356356
return;
@@ -393,21 +393,35 @@ void drain() {
393393
break;
394394
}
395395

396+
boolean subscribersChange = false;
397+
396398
for (MulticastSubscription<T> ms : array) {
397-
if (ms.get() != Long.MIN_VALUE) {
399+
long msr = ms.get();
400+
if (msr != Long.MIN_VALUE) {
401+
if (msr != Long.MAX_VALUE) {
402+
ms.emitted++;
403+
}
398404
ms.actual.onNext(v);
405+
} else {
406+
subscribersChange = true;
399407
}
400408
}
401409

402-
e++;
410+
r--;
403411

404412
if (canRequest && ++upstreamConsumed == localLimit) {
405413
upstreamConsumed = 0;
406414
s.get().request(localLimit);
407415
}
416+
417+
MulticastSubscription<T>[] freshArray = subs.get();
418+
if (subscribersChange || freshArray != array) {
419+
array = freshArray;
420+
continue outer;
421+
}
408422
}
409423

410-
if (e == r) {
424+
if (r == 0) {
411425
if (isDisposed()) {
412426
q.clear();
413427
return;
@@ -435,7 +449,6 @@ void drain() {
435449
}
436450
}
437451

438-
emitted = e;
439452
consumed = upstreamConsumed;
440453
missed = wip.addAndGet(-missed);
441454
if (missed == 0) {
@@ -444,6 +457,7 @@ void drain() {
444457
if (q == null) {
445458
q = queue;
446459
}
460+
array = subs.get();
447461
}
448462
}
449463

@@ -476,6 +490,8 @@ static final class MulticastSubscription<T>
476490

477491
final MulticastProcessor<T> parent;
478492

493+
long emitted;
494+
479495
MulticastSubscription(Subscriber<? super T> actual, MulticastProcessor<T> parent) {
480496
this.actual = actual;
481497
this.parent = parent;

0 commit comments

Comments
 (0)