Skip to content

Commit 8316519

Browse files
committed
Merge pull request #3444 from akarnokd/Completable1x
1.x: Completable class to support valueless event composition + tests
2 parents 91a1d04 + 01f34a7 commit 8316519

11 files changed

+6795
-0
lines changed

src/main/java/rx/Completable.java

Lines changed: 2181 additions & 0 deletions
Large diffs are not rendered by default.
Lines changed: 152 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,152 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.*;
20+
21+
import rx.*;
22+
import rx.Completable.*;
23+
import rx.exceptions.MissingBackpressureException;
24+
import rx.internal.util.unsafe.SpscArrayQueue;
25+
import rx.plugins.RxJavaPlugins;
26+
import rx.subscriptions.SerialSubscription;
27+
28+
public final class CompletableOnSubscribeConcat implements CompletableOnSubscribe {
29+
final Observable<? extends Completable> sources;
30+
final int prefetch;
31+
32+
public CompletableOnSubscribeConcat(Observable<? extends Completable> sources, int prefetch) {
33+
this.sources = sources;
34+
this.prefetch = prefetch;
35+
}
36+
37+
@Override
38+
public void call(CompletableSubscriber s) {
39+
CompletableConcatSubscriber parent = new CompletableConcatSubscriber(s, prefetch);
40+
s.onSubscribe(parent);
41+
sources.subscribe(parent);
42+
}
43+
44+
static final class CompletableConcatSubscriber
45+
extends Subscriber<Completable> {
46+
final CompletableSubscriber actual;
47+
final int prefetch;
48+
final SerialSubscription sr;
49+
50+
final SpscArrayQueue<Completable> queue;
51+
52+
volatile boolean done;
53+
54+
volatile int once;
55+
static final AtomicIntegerFieldUpdater<CompletableConcatSubscriber> ONCE =
56+
AtomicIntegerFieldUpdater.newUpdater(CompletableConcatSubscriber.class, "once");
57+
58+
final ConcatInnerSubscriber inner;
59+
60+
final AtomicInteger wip;
61+
62+
public CompletableConcatSubscriber(CompletableSubscriber actual, int prefetch) {
63+
this.actual = actual;
64+
this.prefetch = prefetch;
65+
this.queue = new SpscArrayQueue<Completable>(prefetch);
66+
this.sr = new SerialSubscription();
67+
this.inner = new ConcatInnerSubscriber();
68+
this.wip = new AtomicInteger();
69+
add(sr);
70+
request(prefetch);
71+
}
72+
73+
@Override
74+
public void onNext(Completable t) {
75+
if (!queue.offer(t)) {
76+
onError(new MissingBackpressureException());
77+
return;
78+
}
79+
if (wip.getAndIncrement() == 0) {
80+
next();
81+
}
82+
}
83+
84+
@Override
85+
public void onError(Throwable t) {
86+
if (ONCE.compareAndSet(this, 0, 1)) {
87+
actual.onError(t);
88+
return;
89+
}
90+
RxJavaPlugins.getInstance().getErrorHandler().handleError(t);
91+
}
92+
93+
@Override
94+
public void onCompleted() {
95+
if (done) {
96+
return;
97+
}
98+
done = true;
99+
if (wip.getAndIncrement() == 0) {
100+
next();
101+
}
102+
}
103+
104+
void innerError(Throwable e) {
105+
unsubscribe();
106+
onError(e);
107+
}
108+
109+
void innerComplete() {
110+
if (wip.decrementAndGet() != 0) {
111+
next();
112+
}
113+
if (!done) {
114+
request(1);
115+
}
116+
}
117+
118+
void next() {
119+
boolean d = done;
120+
Completable c = queue.poll();
121+
if (c == null) {
122+
if (d) {
123+
if (ONCE.compareAndSet(this, 0, 1)) {
124+
actual.onCompleted();
125+
}
126+
return;
127+
}
128+
RxJavaPlugins.getInstance().getErrorHandler().handleError(new IllegalStateException("Queue is empty?!"));
129+
return;
130+
}
131+
132+
c.subscribe(inner);
133+
}
134+
135+
final class ConcatInnerSubscriber implements CompletableSubscriber {
136+
@Override
137+
public void onSubscribe(Subscription d) {
138+
sr.set(d);
139+
}
140+
141+
@Override
142+
public void onError(Throwable e) {
143+
innerError(e);
144+
}
145+
146+
@Override
147+
public void onCompleted() {
148+
innerComplete();
149+
}
150+
}
151+
}
152+
}
Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,96 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.concurrent.atomic.AtomicInteger;
20+
21+
import rx.*;
22+
import rx.Completable.*;
23+
import rx.subscriptions.SerialSubscription;
24+
25+
public final class CompletableOnSubscribeConcatArray implements CompletableOnSubscribe {
26+
final Completable[] sources;
27+
28+
public CompletableOnSubscribeConcatArray(Completable[] sources) {
29+
this.sources = sources;
30+
}
31+
32+
@Override
33+
public void call(CompletableSubscriber s) {
34+
ConcatInnerSubscriber inner = new ConcatInnerSubscriber(s, sources);
35+
s.onSubscribe(inner.sd);
36+
inner.next();
37+
}
38+
39+
static final class ConcatInnerSubscriber extends AtomicInteger implements CompletableSubscriber {
40+
/** */
41+
private static final long serialVersionUID = -7965400327305809232L;
42+
43+
final CompletableSubscriber actual;
44+
final Completable[] sources;
45+
46+
int index;
47+
48+
final SerialSubscription sd;
49+
50+
public ConcatInnerSubscriber(CompletableSubscriber actual, Completable[] sources) {
51+
this.actual = actual;
52+
this.sources = sources;
53+
this.sd = new SerialSubscription();
54+
}
55+
56+
@Override
57+
public void onSubscribe(Subscription d) {
58+
sd.set(d);
59+
}
60+
61+
@Override
62+
public void onError(Throwable e) {
63+
actual.onError(e);
64+
}
65+
66+
@Override
67+
public void onCompleted() {
68+
next();
69+
}
70+
71+
void next() {
72+
if (sd.isUnsubscribed()) {
73+
return;
74+
}
75+
76+
if (getAndIncrement() != 0) {
77+
return;
78+
}
79+
80+
Completable[] a = sources;
81+
do {
82+
if (sd.isUnsubscribed()) {
83+
return;
84+
}
85+
86+
int idx = index++;
87+
if (idx == a.length) {
88+
actual.onCompleted();
89+
return;
90+
}
91+
92+
a[idx].subscribe(this);
93+
} while (decrementAndGet() != 0);
94+
}
95+
}
96+
}
Lines changed: 135 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,135 @@
1+
/**
2+
* Copyright 2014 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package rx.internal.operators;
18+
19+
import java.util.Iterator;
20+
import java.util.concurrent.atomic.AtomicInteger;
21+
22+
import rx.*;
23+
import rx.Completable.*;
24+
import rx.subscriptions.*;
25+
26+
public final class CompletableOnSubscribeConcatIterable implements CompletableOnSubscribe {
27+
final Iterable<? extends Completable> sources;
28+
29+
public CompletableOnSubscribeConcatIterable(Iterable<? extends Completable> sources) {
30+
this.sources = sources;
31+
}
32+
33+
@Override
34+
public void call(CompletableSubscriber s) {
35+
36+
Iterator<? extends Completable> it;
37+
38+
try {
39+
it = sources.iterator();
40+
} catch (Throwable e) {
41+
s.onSubscribe(Subscriptions.unsubscribed());
42+
s.onError(e);
43+
return;
44+
}
45+
46+
if (it == null) {
47+
s.onSubscribe(Subscriptions.unsubscribed());
48+
s.onError(new NullPointerException("The iterator returned is null"));
49+
return;
50+
}
51+
52+
ConcatInnerSubscriber inner = new ConcatInnerSubscriber(s, it);
53+
s.onSubscribe(inner.sd);
54+
inner.next();
55+
}
56+
57+
static final class ConcatInnerSubscriber extends AtomicInteger implements CompletableSubscriber {
58+
/** */
59+
private static final long serialVersionUID = -7965400327305809232L;
60+
61+
final CompletableSubscriber actual;
62+
final Iterator<? extends Completable> sources;
63+
64+
int index;
65+
66+
final SerialSubscription sd;
67+
68+
public ConcatInnerSubscriber(CompletableSubscriber actual, Iterator<? extends Completable> sources) {
69+
this.actual = actual;
70+
this.sources = sources;
71+
this.sd = new SerialSubscription();
72+
}
73+
74+
@Override
75+
public void onSubscribe(Subscription d) {
76+
sd.set(d);
77+
}
78+
79+
@Override
80+
public void onError(Throwable e) {
81+
actual.onError(e);
82+
}
83+
84+
@Override
85+
public void onCompleted() {
86+
next();
87+
}
88+
89+
void next() {
90+
if (sd.isUnsubscribed()) {
91+
return;
92+
}
93+
94+
if (getAndIncrement() != 0) {
95+
return;
96+
}
97+
98+
Iterator<? extends Completable> a = sources;
99+
do {
100+
if (sd.isUnsubscribed()) {
101+
return;
102+
}
103+
104+
boolean b;
105+
try {
106+
b = a.hasNext();
107+
} catch (Throwable ex) {
108+
actual.onError(ex);
109+
return;
110+
}
111+
112+
if (!b) {
113+
actual.onCompleted();
114+
return;
115+
}
116+
117+
Completable c;
118+
119+
try {
120+
c = a.next();
121+
} catch (Throwable ex) {
122+
actual.onError(ex);
123+
return;
124+
}
125+
126+
if (c == null) {
127+
actual.onError(new NullPointerException("The completable returned is null"));
128+
return;
129+
}
130+
131+
c.subscribe(this);
132+
} while (decrementAndGet() != 0);
133+
}
134+
}
135+
}

0 commit comments

Comments
 (0)