Skip to content

Commit 449fee9

Browse files
Merge pull request ReactiveX#575 from zsxwing/sequence-equal
Reimplement the 'SequenceEqual' operator
2 parents c236eda + 30f83b4 commit 449fee9

File tree

4 files changed

+245
-21
lines changed

4 files changed

+245
-21
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 14 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@
7676
import rx.operators.OperationRetry;
7777
import rx.operators.OperationSample;
7878
import rx.operators.OperationScan;
79+
import rx.operators.OperationSequenceEqual;
7980
import rx.operators.OperationSkip;
8081
import rx.operators.OperationSkipLast;
8182
import rx.operators.OperationSkipUntil;
@@ -2296,31 +2297,34 @@ public static <T> Observable<T> from(Future<? extends T> future, long timeout, T
22962297
}
22972298

22982299
/**
2299-
* Returns an Observable that emits Boolean values that indicate whether the
2300-
* pairs of items emitted by two source Observables are equal.
2300+
* Returns an Observable that emits a Boolean value that indicate
2301+
* whether two sequences are equal by comparing the elements pairwise.
23012302
* <p>
23022303
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
23032304
*
23042305
* @param first the first Observable to compare
23052306
* @param second the second Observable to compare
23062307
* @param <T> the type of items emitted by each Observable
2307-
* @return an Observable that emits Booleans that indicate whether the
2308-
* corresponding items emitted by the source Observables are equal
2308+
* @return an Observable that emits a Boolean value that indicate
2309+
* whether two sequences are equal by comparing the elements pairwise.
23092310
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
23102311
*/
23112312
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second) {
23122313
return sequenceEqual(first, second, new Func2<T, T, Boolean>() {
23132314
@Override
23142315
public Boolean call(T first, T second) {
2316+
if(first == null) {
2317+
return second == null;
2318+
}
23152319
return first.equals(second);
23162320
}
23172321
});
23182322
}
23192323

23202324
/**
2321-
* Returns an Observable that emits Boolean values that indicate whether the
2322-
* pairs of items emitted by two source Observables are equal based on the
2323-
* results of a specified equality function.
2325+
* Returns an Observable that emits a Boolean value that indicate
2326+
* whether two sequences are equal by comparing the elements pairwise
2327+
* based on the results of a specified equality function.
23242328
* <p>
23252329
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/sequenceEqual.png">
23262330
*
@@ -2329,12 +2333,12 @@ public Boolean call(T first, T second) {
23292333
* @param equality a function used to compare items emitted by both
23302334
* Observables
23312335
* @param <T> the type of items emitted by each Observable
2332-
* @return an Observable that emits Booleans that indicate whether the
2333-
* corresponding items emitted by the source Observables are equal
2336+
* @return an Observable that emits a Boolean value that indicate
2337+
* whether two sequences are equal by comparing the elements pairwise.
23342338
* @see <a href="https://github.com/Netflix/RxJava/wiki/Observable-Utility-Operators#sequenceequal">RxJava Wiki: sequenceEqual()</a>
23352339
*/
23362340
public static <T> Observable<Boolean> sequenceEqual(Observable<? extends T> first, Observable<? extends T> second, Func2<? super T, ? super T, Boolean> equality) {
2337-
return zip(first, second, equality);
2341+
return OperationSequenceEqual.sequenceEqual(first, second, equality);
23382342
}
23392343

23402344
/**
Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,73 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static rx.Observable.concat;
19+
import static rx.Observable.from;
20+
import static rx.Observable.zip;
21+
import rx.Notification;
22+
import rx.Observable;
23+
import rx.util.functions.Func1;
24+
import rx.util.functions.Func2;
25+
import rx.util.functions.Functions;
26+
27+
/**
28+
* Returns an Observable that emits a Boolean value that indicate whether two
29+
* sequences are equal by comparing the elements pairwise.
30+
*/
31+
public class OperationSequenceEqual {
32+
33+
public static <T> Observable<Boolean> sequenceEqual(
34+
Observable<? extends T> first, Observable<? extends T> second,
35+
final Func2<? super T, ? super T, Boolean> equality) {
36+
Observable<Notification<T>> firstObservable = concat(
37+
first.map(new Func1<T, Notification<T>>() {
38+
39+
@Override
40+
public Notification<T> call(T t1) {
41+
return new Notification<T>(t1);
42+
}
43+
44+
}), from(new Notification<T>()));
45+
46+
Observable<Notification<T>> secondObservable = concat(
47+
second.map(new Func1<T, Notification<T>>() {
48+
49+
@Override
50+
public Notification<T> call(T t1) {
51+
return new Notification<T>(t1);
52+
}
53+
54+
}), from(new Notification<T>()));
55+
56+
return zip(firstObservable, secondObservable,
57+
new Func2<Notification<T>, Notification<T>, Boolean>() {
58+
59+
@Override
60+
public Boolean call(Notification<T> t1, Notification<T> t2) {
61+
if (t1.isOnCompleted() && t2.isOnCompleted()) {
62+
return true;
63+
}
64+
if (t1.isOnCompleted() || t2.isOnCompleted()) {
65+
return false;
66+
}
67+
// Now t1 and t2 must be 'onNext'.
68+
return equality.call(t1.getValue(), t2.getValue());
69+
}
70+
71+
}).all(Functions.<Boolean> identity());
72+
}
73+
}

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -275,17 +275,6 @@ public Integer call(Integer t1, Integer t2) {
275275
verify(w).onNext(60);
276276
}
277277

278-
@Test
279-
public void testSequenceEqual() {
280-
Observable<Integer> first = Observable.from(1, 2, 3);
281-
Observable<Integer> second = Observable.from(1, 2, 4);
282-
@SuppressWarnings("unchecked")
283-
Observer<Boolean> result = mock(Observer.class);
284-
Observable.sequenceEqual(first, second).subscribe(result);
285-
verify(result, times(2)).onNext(true);
286-
verify(result, times(1)).onNext(false);
287-
}
288-
289278
@Test
290279
public void testOnSubscribeFails() {
291280
@SuppressWarnings("unchecked")
Lines changed: 158 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,158 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.isA;
19+
import static org.mockito.Mockito.inOrder;
20+
import static org.mockito.Mockito.mock;
21+
import static org.mockito.Mockito.times;
22+
23+
import org.junit.Test;
24+
import org.mockito.InOrder;
25+
26+
import rx.Observable;
27+
import rx.Observer;
28+
import rx.util.functions.Func2;
29+
30+
public class OperationSequenceEqualTests {
31+
32+
@Test
33+
public void test1() {
34+
Observable<Boolean> observable = Observable.sequenceEqual(
35+
Observable.from("one", "two", "three"),
36+
Observable.from("one", "two", "three"));
37+
verifyResult(observable, true);
38+
}
39+
40+
@Test
41+
public void test2() {
42+
Observable<Boolean> observable = Observable.sequenceEqual(
43+
Observable.from("one", "two", "three"),
44+
Observable.from("one", "two", "three", "four"));
45+
verifyResult(observable, false);
46+
}
47+
48+
@Test
49+
public void test3() {
50+
Observable<Boolean> observable = Observable.sequenceEqual(
51+
Observable.from("one", "two", "three", "four"),
52+
Observable.from("one", "two", "three"));
53+
verifyResult(observable, false);
54+
}
55+
56+
@Test
57+
public void testWithError1() {
58+
Observable<Boolean> observable = Observable.sequenceEqual(
59+
Observable.concat(Observable.from("one"),
60+
Observable.<String> error(new TestException())),
61+
Observable.from("one", "two", "three"));
62+
verifyError(observable);
63+
}
64+
65+
@Test
66+
public void testWithError2() {
67+
Observable<Boolean> observable = Observable.sequenceEqual(
68+
Observable.from("one", "two", "three"),
69+
Observable.concat(Observable.from("one"),
70+
Observable.<String> error(new TestException())));
71+
verifyError(observable);
72+
}
73+
74+
@Test
75+
public void testWithError3() {
76+
Observable<Boolean> observable = Observable.sequenceEqual(
77+
Observable.concat(Observable.from("one"),
78+
Observable.<String> error(new TestException())),
79+
Observable.concat(Observable.from("one"),
80+
Observable.<String> error(new TestException())));
81+
verifyError(observable);
82+
}
83+
84+
@Test
85+
public void testWithEmpty1() {
86+
Observable<Boolean> observable = Observable.sequenceEqual(
87+
Observable.<String> empty(),
88+
Observable.from("one", "two", "three"));
89+
verifyResult(observable, false);
90+
}
91+
92+
@Test
93+
public void testWithEmpty2() {
94+
Observable<Boolean> observable = Observable.sequenceEqual(
95+
Observable.from("one", "two", "three"),
96+
Observable.<String> empty());
97+
verifyResult(observable, false);
98+
}
99+
100+
@Test
101+
public void testWithEmpty3() {
102+
Observable<Boolean> observable = Observable.sequenceEqual(
103+
Observable.<String> empty(), Observable.<String> empty());
104+
verifyResult(observable, true);
105+
}
106+
107+
@Test
108+
public void testWithNull1() {
109+
Observable<Boolean> observable = Observable.sequenceEqual(
110+
Observable.from((String) null), Observable.from("one"));
111+
verifyResult(observable, false);
112+
}
113+
114+
@Test
115+
public void testWithNull2() {
116+
Observable<Boolean> observable = Observable.sequenceEqual(
117+
Observable.from((String) null), Observable.from((String) null));
118+
verifyResult(observable, true);
119+
}
120+
121+
@Test
122+
public void testWithEqualityError() {
123+
Observable<Boolean> observable = Observable.sequenceEqual(
124+
Observable.from("one"), Observable.from("one"),
125+
new Func2<String, String, Boolean>() {
126+
@Override
127+
public Boolean call(String t1, String t2) {
128+
throw new TestException();
129+
}
130+
});
131+
verifyError(observable);
132+
}
133+
134+
private void verifyResult(Observable<Boolean> observable, boolean result) {
135+
@SuppressWarnings("unchecked")
136+
Observer<Boolean> observer = mock(Observer.class);
137+
observable.subscribe(observer);
138+
139+
InOrder inOrder = inOrder(observer);
140+
inOrder.verify(observer, times(1)).onNext(result);
141+
inOrder.verify(observer).onCompleted();
142+
inOrder.verifyNoMoreInteractions();
143+
}
144+
145+
private void verifyError(Observable<Boolean> observable) {
146+
@SuppressWarnings("unchecked")
147+
Observer<Boolean> observer = mock(Observer.class);
148+
observable.subscribe(observer);
149+
150+
InOrder inOrder = inOrder(observer);
151+
inOrder.verify(observer, times(1)).onError(isA(TestException.class));
152+
inOrder.verifyNoMoreInteractions();
153+
}
154+
155+
private class TestException extends RuntimeException {
156+
private static final long serialVersionUID = 1L;
157+
}
158+
}

0 commit comments

Comments
 (0)