Skip to content

Commit 9047a3c

Browse files
VictorAlbertosakarnokd
authored andcommitted
2.x: Merge AmbArray and AmbIterable into Amb for Single, Maybe and Completable types (#4647)
1 parent ce21ecf commit 9047a3c

File tree

13 files changed

+242
-403
lines changed

13 files changed

+242
-403
lines changed

src/main/java/io/reactivex/Completable.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -61,7 +61,7 @@ public static Completable ambArray(final CompletableSource... sources) {
6161
return wrap(sources[0]);
6262
}
6363

64-
return RxJavaPlugins.onAssembly(new CompletableAmbArray(sources));
64+
return RxJavaPlugins.onAssembly(new CompletableAmb(sources, null));
6565
}
6666

6767
/**
@@ -79,7 +79,7 @@ public static Completable ambArray(final CompletableSource... sources) {
7979
public static Completable amb(final Iterable<? extends CompletableSource> sources) {
8080
ObjectHelper.requireNonNull(sources, "sources is null");
8181

82-
return RxJavaPlugins.onAssembly(new CompletableAmbIterable(sources));
82+
return RxJavaPlugins.onAssembly(new CompletableAmb(null, sources));
8383
}
8484

8585
/**

src/main/java/io/reactivex/Maybe.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ public abstract class Maybe<T> implements MaybeSource<T> {
5757
@SchedulerSupport(SchedulerSupport.NONE)
5858
public static <T> Maybe<T> amb(final Iterable<? extends MaybeSource<? extends T>> sources) {
5959
ObjectHelper.requireNonNull(sources, "sources is null");
60-
return RxJavaPlugins.onAssembly(new MaybeAmbIterable<T>(sources));
60+
return RxJavaPlugins.onAssembly(new MaybeAmb<T>(null, sources));
6161
}
6262

6363
/**
@@ -80,7 +80,7 @@ public static <T> Maybe<T> ambArray(final MaybeSource<? extends T>... sources) {
8080
if (sources.length == 1) {
8181
return wrap((MaybeSource<T>)sources[0]);
8282
}
83-
return RxJavaPlugins.onAssembly(new MaybeAmbArray<T>(sources));
83+
return RxJavaPlugins.onAssembly(new MaybeAmb<T>(sources, null));
8484
}
8585

8686
/**

src/main/java/io/reactivex/Single.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@ public abstract class Single<T> implements SingleSource<T> {
7373
@SchedulerSupport(SchedulerSupport.NONE)
7474
public static <T> Single<T> amb(final Iterable<? extends SingleSource<? extends T>> sources) {
7575
ObjectHelper.requireNonNull(sources, "sources is null");
76-
return RxJavaPlugins.onAssembly(new SingleAmbIterable<T>(sources));
76+
return RxJavaPlugins.onAssembly(new SingleAmb<T>(null, sources));
7777
}
7878

7979
/**
@@ -97,7 +97,7 @@ public static <T> Single<T> ambArray(final SingleSource<? extends T>... sources)
9797
if (sources.length == 1) {
9898
return wrap((SingleSource<T>)sources[0]);
9999
}
100-
return RxJavaPlugins.onAssembly(new SingleAmbArray<T>(sources));
100+
return RxJavaPlugins.onAssembly(new SingleAmb<T>(sources, null));
101101
}
102102

103103
/**

src/main/java/io/reactivex/internal/operators/completable/CompletableAmbArray.java renamed to src/main/java/io/reactivex/internal/operators/completable/CompletableAmb.java

Lines changed: 39 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,18 +17,47 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.*;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.internal.disposables.EmptyDisposable;
2022
import io.reactivex.plugins.RxJavaPlugins;
2123

22-
public final class CompletableAmbArray extends Completable {
24+
public final class CompletableAmb extends Completable {
25+
private final CompletableSource[] sources;
26+
private final Iterable<? extends CompletableSource> sourcesIterable;
2327

24-
final CompletableSource[] sources;
25-
26-
public CompletableAmbArray(CompletableSource[] sources) {
28+
public CompletableAmb(CompletableSource[] sources, Iterable<? extends CompletableSource> sourcesIterable) {
2729
this.sources = sources;
30+
this.sourcesIterable = sourcesIterable;
2831
}
2932

3033
@Override
3134
public void subscribeActual(final CompletableObserver s) {
35+
CompletableSource[] sources = this.sources;
36+
int count = 0;
37+
if (sources == null) {
38+
sources = new CompletableSource[8];
39+
try {
40+
for (CompletableSource element : sourcesIterable) {
41+
if (element == null) {
42+
EmptyDisposable.error(new NullPointerException("One of the sources is null"), s);
43+
return;
44+
}
45+
if (count == sources.length) {
46+
CompletableSource[] b = new CompletableSource[count + (count >> 2)];
47+
System.arraycopy(sources, 0, b, 0, count);
48+
sources = b;
49+
}
50+
sources[count++] = element;
51+
};
52+
} catch (Throwable e) {
53+
Exceptions.throwIfFatal(e);
54+
EmptyDisposable.error(e, s);
55+
return;
56+
}
57+
} else {
58+
count = sources.length;
59+
}
60+
3261
final CompositeDisposable set = new CompositeDisposable();
3362
s.onSubscribe(set);
3463

@@ -60,7 +89,8 @@ public void onSubscribe(Disposable d) {
6089

6190
};
6291

63-
for (CompletableSource c : sources) {
92+
for (int i = 0; i < count; i++) {
93+
CompletableSource c = sources[i];
6494
if (set.isDisposed()) {
6595
return;
6696
}
@@ -81,5 +111,9 @@ public void onSubscribe(Disposable d) {
81111
// no need to have separate subscribers because inner is stateless
82112
c.subscribe(inner);
83113
}
114+
115+
if (count == 0) {
116+
s.onComplete();
117+
}
84118
}
85119
}

src/main/java/io/reactivex/internal/operators/completable/CompletableAmbIterable.java

Lines changed: 0 additions & 150 deletions
This file was deleted.

src/main/java/io/reactivex/internal/operators/maybe/MaybeAmbArray.java renamed to src/main/java/io/reactivex/internal/operators/maybe/MaybeAmb.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,28 +17,58 @@
1717

1818
import io.reactivex.*;
1919
import io.reactivex.disposables.*;
20+
import io.reactivex.exceptions.Exceptions;
21+
import io.reactivex.internal.disposables.EmptyDisposable;
2022
import io.reactivex.plugins.RxJavaPlugins;
2123

2224
/**
2325
* Signals the event of the first MaybeSource that signals.
2426
*
2527
* @param <T> the value type emitted
2628
*/
27-
public final class MaybeAmbArray<T> extends Maybe<T> {
29+
public final class MaybeAmb<T> extends Maybe<T> {
30+
private final MaybeSource<? extends T>[] sources;
31+
private final Iterable<? extends MaybeSource<? extends T>> sourcesIterable;
2832

29-
final MaybeSource<? extends T>[] sources;
30-
31-
public MaybeAmbArray(MaybeSource<? extends T>[] sources) {
33+
public MaybeAmb(MaybeSource<? extends T>[] sources, Iterable<? extends MaybeSource<? extends T>> sourcesIterable) {
3234
this.sources = sources;
35+
this.sourcesIterable = sourcesIterable;
3336
}
3437

3538
@Override
39+
@SuppressWarnings("unchecked")
3640
protected void subscribeActual(MaybeObserver<? super T> observer) {
41+
MaybeSource<? extends T>[] sources = this.sources;
42+
int count = 0;
43+
if (sources == null) {
44+
sources = new MaybeSource[8];
45+
try {
46+
for (MaybeSource<? extends T> element : sourcesIterable) {
47+
if (element == null) {
48+
EmptyDisposable.error(new NullPointerException("One of the sources is null"), observer);
49+
return;
50+
}
51+
if (count == sources.length) {
52+
MaybeSource<? extends T>[] b = new MaybeSource[count + (count >> 2)];
53+
System.arraycopy(sources, 0, b, 0, count);
54+
sources = b;
55+
}
56+
sources[count++] = element;
57+
}
58+
} catch (Throwable e) {
59+
Exceptions.throwIfFatal(e);
60+
EmptyDisposable.error(e, observer);
61+
return;
62+
}
63+
} else {
64+
count = sources.length;
65+
}
3766

3867
AmbMaybeObserver<T> parent = new AmbMaybeObserver<T>(observer);
3968
observer.onSubscribe(parent);
4069

41-
for (MaybeSource<? extends T> s : sources) {
70+
for (int i = 0; i < count; i++) {
71+
MaybeSource<? extends T> s = sources[i];
4272
if (parent.isDisposed()) {
4373
return;
4474
}
@@ -50,6 +80,11 @@ protected void subscribeActual(MaybeObserver<? super T> observer) {
5080

5181
s.subscribe(parent);
5282
}
83+
84+
if (count == 0) {
85+
observer.onComplete();
86+
}
87+
5388
}
5489

5590
static final class AmbMaybeObserver<T>

0 commit comments

Comments
 (0)