Skip to content

Commit 3ba1d35

Browse files
authored
2.x: Cleanup, coverage and related component fixes (#5889)
1 parent 9f2c435 commit 3ba1d35

File tree

10 files changed

+490
-139
lines changed

10 files changed

+490
-139
lines changed

src/main/java/io/reactivex/internal/operators/flowable/FlowableConcatMapEager.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -132,10 +132,6 @@ public void onNext(T t) {
132132

133133
subscribers.offer(inner);
134134

135-
if (cancelled) {
136-
return;
137-
}
138-
139135
p.subscribe(inner);
140136

141137
if (cancelled) {

src/main/java/io/reactivex/internal/operators/flowable/FlowablePublishMulticast.java

Lines changed: 12 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,8 @@ static final class MulticastProcessor<T> extends Flowable<T> implements Flowable
153153

154154
int consumed;
155155

156+
long emitted;
157+
156158
@SuppressWarnings("unchecked")
157159
MulticastProcessor(int prefetch, boolean delayError) {
158160
this.prefetch = prefetch;
@@ -261,10 +263,10 @@ boolean add(MulticastSubscription<T> s) {
261263
void remove(MulticastSubscription<T> s) {
262264
for (;;) {
263265
MulticastSubscription<T>[] current = subscribers.get();
264-
if (current == TERMINATED || current == EMPTY) {
266+
int n = current.length;
267+
if (n == 0) {
265268
return;
266269
}
267-
int n = current.length;
268270
int j = -1;
269271

270272
for (int i = 0; i < n; i++) {
@@ -323,6 +325,7 @@ void drain() {
323325
int upstreamConsumed = consumed;
324326
int localLimit = limit;
325327
boolean canRequest = sourceMode != QueueSubscription.SYNC;
328+
long e = emitted;
326329

327330
for (;;) {
328331
MulticastSubscription<T>[] array = subscribers.get();
@@ -338,10 +341,15 @@ void drain() {
338341
if (r > u) {
339342
r = u;
340343
}
344+
} else {
345+
n--;
341346
}
342347
}
343348

344-
long e = 0L;
349+
if (n == 0) {
350+
r = e;
351+
}
352+
345353
while (e != r) {
346354
if (isDisposed()) {
347355
q.clear();
@@ -425,12 +433,9 @@ void drain() {
425433
return;
426434
}
427435
}
428-
429-
for (MulticastSubscription<T> ms : array) {
430-
BackpressureHelper.produced(ms, e);
431-
}
432436
}
433437

438+
emitted = e;
434439
consumed = upstreamConsumed;
435440
missed = wip.addAndGet(-missed);
436441
if (missed == 0) {
@@ -465,7 +470,6 @@ static final class MulticastSubscription<T>
465470
extends AtomicLong
466471
implements Subscription {
467472

468-
469473
private static final long serialVersionUID = 8664815189257569791L;
470474

471475
final Subscriber<? super T> actual;

src/main/java/io/reactivex/processors/UnicastProcessor.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -180,8 +180,8 @@ public static <T> UnicastProcessor<T> create(int capacityHint, Runnable onCancel
180180
}
181181

182182
void doTerminate() {
183-
Runnable r = onTerminate.get();
184-
if (r != null && onTerminate.compareAndSet(r, null)) {
183+
Runnable r = onTerminate.getAndSet(null);
184+
if (r != null) {
185185
r.run();
186186
}
187187
}

src/main/java/io/reactivex/subjects/BehaviorSubject.java

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -442,26 +442,20 @@ void remove(BehaviorDisposable<T> rs) {
442442
@SuppressWarnings("unchecked")
443443
BehaviorDisposable<T>[] terminate(Object terminalValue) {
444444

445-
BehaviorDisposable<T>[] a = subscribers.get();
445+
BehaviorDisposable<T>[] a = subscribers.getAndSet(TERMINATED);
446446
if (a != TERMINATED) {
447-
a = subscribers.getAndSet(TERMINATED);
448-
if (a != TERMINATED) {
449-
// either this or atomics with lots of allocation
450-
setCurrent(terminalValue);
451-
}
447+
// either this or atomics with lots of allocation
448+
setCurrent(terminalValue);
452449
}
453450

454451
return a;
455452
}
456453

457454
void setCurrent(Object o) {
458455
writeLock.lock();
459-
try {
460-
index++;
461-
value.lazySet(o);
462-
} finally {
463-
writeLock.unlock();
464-
}
456+
index++;
457+
value.lazySet(o);
458+
writeLock.unlock();
465459
}
466460

467461
static final class BehaviorDisposable<T> implements Disposable, NonThrowingPredicate<Object> {

src/test/java/io/reactivex/internal/operators/flowable/FlowableMulticastTest.java

Lines changed: 0 additions & 111 deletions
This file was deleted.

0 commit comments

Comments
 (0)