Skip to content

Commit 2a08d80

Browse files
akarnokdakarnokd
akarnokd
authored and
akarnokd
committed
Make the chunk_test.clj work again for now
1 parent ccbd483 commit 2a08d80

File tree

1 file changed

+13
-8
lines changed

1 file changed

+13
-8
lines changed

rxjava-core/src/main/java/rx/operators/OperatorMerge.java

Lines changed: 13 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -49,6 +49,7 @@ static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>
4949
final Subscriber<T> actual;
5050
final CompositeSubscription childrenSubscriptions;
5151
volatile int wip;
52+
volatile boolean completed;
5253
@SuppressWarnings("rawtypes")
5354
static final AtomicIntegerFieldUpdater<MergeSubscriber> WIP_UPDATER
5455
= AtomicIntegerFieldUpdater.newUpdater(MergeSubscriber.class, "wip");
@@ -57,7 +58,6 @@ public MergeSubscriber(Subscriber<T> actual, CompositeSubscription childrenSubsc
5758
super(actual);
5859
this.actual = actual;
5960
this.childrenSubscriptions = childrenSubscriptions;
60-
this.wip = 1;
6161
}
6262

6363
@Override
@@ -76,11 +76,20 @@ public void onError(Throwable e) {
7676

7777
@Override
7878
public void onCompleted() {
79-
if (WIP_UPDATER.decrementAndGet(this) == 0) {
79+
completed = true;
80+
if (wip == 0) {
8081
actual.onCompleted();
8182
}
8283
}
83-
84+
void completeInner(InnerSubscriber<T> s) {
85+
try {
86+
if (WIP_UPDATER.decrementAndGet(this) == 0 && completed) {
87+
actual.onCompleted();
88+
}
89+
} finally {
90+
childrenSubscriptions.remove(s);
91+
}
92+
}
8493
}
8594
static final class InnerSubscriber<T> extends Subscriber<T> {
8695
final Subscriber<? super T> actual;
@@ -110,11 +119,7 @@ public void onError(Throwable e) {
110119
@Override
111120
public void onCompleted() {
112121
if (ONCE_UPDATER.compareAndSet(this, 0, 1)) {
113-
try {
114-
parent.onCompleted();
115-
} finally {
116-
parent.childrenSubscriptions.remove(this);
117-
}
122+
parent.completeInner(this);
118123
}
119124
}
120125

0 commit comments

Comments
 (0)