Skip to content

Commit 7217232

Browse files
Merge pull request #374 from jmhofer/distinctUntilChanged
Implemented distinctUntilChanged operation
2 parents f84565f + faf37ab commit 7217232

File tree

2 files changed

+250
-0
lines changed

2 files changed

+250
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@
3535
import rx.operators.OperationConcat;
3636
import rx.operators.OperationDefer;
3737
import rx.operators.OperationDematerialize;
38+
import rx.operators.OperationDistinctUntilChanged;
3839
import rx.operators.OperationFilter;
3940
import rx.operators.OperationFinally;
4041
import rx.operators.OperationFirstOrDefault;
@@ -2922,6 +2923,30 @@ public Observable<T> filter(Func1<? super T, Boolean> predicate) {
29222923
return create(OperationFilter.filter(this, predicate));
29232924
}
29242925

2926+
/**
2927+
* Returns an Observable that forwards all sequentially distinct items emitted from the source Observable.
2928+
*
2929+
* @return an Observable of sequentially distinct items
2930+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229494%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
2931+
*/
2932+
public Observable<T> distinctUntilChanged() {
2933+
return create(OperationDistinctUntilChanged.distinctUntilChanged(this));
2934+
}
2935+
2936+
/**
2937+
* Returns an Observable that forwards all items emitted from the source Observable that are sequentially distinct according to
2938+
* a key selector function.
2939+
*
2940+
* @param keySelector
2941+
* a function that projects an emitted item to a key value which is used for deciding whether an item is sequentially
2942+
* distinct from another one or not
2943+
* @return an Observable of sequentially distinct items
2944+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229508%28v=vs.103%29.aspx">MSDN: Observable.distinctUntilChanged</a>
2945+
*/
2946+
public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keySelector) {
2947+
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
2948+
}
2949+
29252950
/**
29262951
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
29272952
* <p>
Lines changed: 225 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,225 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
import static rx.Observable.create;
22+
import static rx.Observable.empty;
23+
import static rx.Observable.from;
24+
25+
import org.junit.Before;
26+
import org.junit.Test;
27+
import org.mockito.InOrder;
28+
import org.mockito.Mock;
29+
30+
import rx.Observable;
31+
import rx.Observable.OnSubscribeFunc;
32+
import rx.Observer;
33+
import rx.Subscription;
34+
import rx.subscriptions.Subscriptions;
35+
import rx.util.functions.Action0;
36+
import rx.util.functions.Func1;
37+
import rx.util.functions.Functions;
38+
39+
/**
40+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
41+
*/
42+
public final class OperationDistinctUntilChanged {
43+
44+
/**
45+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
46+
* @param source
47+
* The source Observable to emit the sequentially distinct items for.
48+
* @return A subscription function for creating the target Observable.
49+
*/
50+
public static <T, U> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector) {
51+
return new DistinctUntilChanged<T, U>(source, keySelector);
52+
}
53+
54+
/**
55+
* Returns an Observable that emits all sequentially distinct items emitted by the source.
56+
* @param source
57+
* The source Observable to emit the sequentially distinct items for.
58+
* @return A subscription function for creating the target Observable.
59+
*/
60+
public static <T> OnSubscribeFunc<T> distinctUntilChanged(Observable<? extends T> source) {
61+
return new DistinctUntilChanged<T, T>(source, Functions.<T>identity());
62+
}
63+
64+
private static class DistinctUntilChanged<T, U> implements OnSubscribeFunc<T> {
65+
private final Observable<? extends T> source;
66+
private final Func1<? super T, ? extends U> keySelector;
67+
68+
private DistinctUntilChanged(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector) {
69+
this.source = source;
70+
this.keySelector = keySelector;
71+
}
72+
73+
@Override
74+
public Subscription onSubscribe(final Observer<? super T> observer) {
75+
final Subscription sourceSub = source.subscribe(new Observer<T>() {
76+
private U lastEmittedKey;
77+
private boolean hasEmitted;
78+
79+
@Override
80+
public void onCompleted() {
81+
observer.onCompleted();
82+
}
83+
84+
@Override
85+
public void onError(Throwable e) {
86+
observer.onError(e);
87+
}
88+
89+
@Override
90+
public void onNext(T next) {
91+
U lastKey = lastEmittedKey;
92+
try {
93+
U nextKey = keySelector.call(next);
94+
lastEmittedKey = nextKey;
95+
if (!hasEmitted) {
96+
hasEmitted = true;
97+
observer.onNext(next);
98+
} else {
99+
if (lastKey == null) {
100+
if (nextKey != null) {
101+
observer.onNext(next);
102+
}
103+
} else {
104+
if (!lastKey.equals(nextKey)) {
105+
observer.onNext(next);
106+
}
107+
}
108+
}
109+
} catch (Throwable t) {
110+
// keySelector is a user function, may throw something
111+
observer.onError(t);
112+
}
113+
}
114+
});
115+
116+
return Subscriptions.create(new Action0() {
117+
@Override
118+
public void call() {
119+
sourceSub.unsubscribe();
120+
}
121+
});
122+
}
123+
}
124+
125+
public static class UnitTest {
126+
@Mock
127+
Observer<? super String> w;
128+
129+
// nulls lead to exceptions
130+
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
131+
@Override
132+
public String call(String s) {
133+
return s.toUpperCase();
134+
}
135+
};
136+
137+
@Before
138+
public void before() {
139+
initMocks(this);
140+
}
141+
142+
@Test
143+
public void testDistinctUntilChangedOfNone() {
144+
Observable<String> src = empty();
145+
create(distinctUntilChanged(src)).subscribe(w);
146+
147+
verify(w, never()).onNext(anyString());
148+
verify(w, never()).onError(any(Throwable.class));
149+
verify(w, times(1)).onCompleted();
150+
}
151+
152+
@Test
153+
public void testDistinctUntilChangedOfNoneWithKeySelector() {
154+
Observable<String> src = empty();
155+
create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
156+
157+
verify(w, never()).onNext(anyString());
158+
verify(w, never()).onError(any(Throwable.class));
159+
verify(w, times(1)).onCompleted();
160+
}
161+
162+
@Test
163+
public void testDistinctUntilChangedOfNormalSource() {
164+
Observable<String> src = from("a", "b", "c", "c", "c", "b", "b", "a", "e");
165+
create(distinctUntilChanged(src)).subscribe(w);
166+
167+
InOrder inOrder = inOrder(w);
168+
inOrder.verify(w, times(1)).onNext("a");
169+
inOrder.verify(w, times(1)).onNext("b");
170+
inOrder.verify(w, times(1)).onNext("c");
171+
inOrder.verify(w, times(1)).onNext("b");
172+
inOrder.verify(w, times(1)).onNext("a");
173+
inOrder.verify(w, times(1)).onNext("e");
174+
inOrder.verify(w, times(1)).onCompleted();
175+
inOrder.verify(w, never()).onNext(anyString());
176+
verify(w, never()).onError(any(Throwable.class));
177+
}
178+
179+
@Test
180+
public void testDistinctUntilChangedOfNormalSourceWithKeySelector() {
181+
Observable<String> src = from("a", "b", "c", "C", "c", "B", "b", "a", "e");
182+
create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
183+
184+
InOrder inOrder = inOrder(w);
185+
inOrder.verify(w, times(1)).onNext("a");
186+
inOrder.verify(w, times(1)).onNext("b");
187+
inOrder.verify(w, times(1)).onNext("c");
188+
inOrder.verify(w, times(1)).onNext("B");
189+
inOrder.verify(w, times(1)).onNext("a");
190+
inOrder.verify(w, times(1)).onNext("e");
191+
inOrder.verify(w, times(1)).onCompleted();
192+
inOrder.verify(w, never()).onNext(anyString());
193+
verify(w, never()).onError(any(Throwable.class));
194+
}
195+
196+
@Test
197+
public void testDistinctUntilChangedOfSourceWithNulls() {
198+
Observable<String> src = from(null, "a", "a", null, null, "b", null, null);
199+
create(distinctUntilChanged(src)).subscribe(w);
200+
201+
InOrder inOrder = inOrder(w);
202+
inOrder.verify(w, times(1)).onNext(null);
203+
inOrder.verify(w, times(1)).onNext("a");
204+
inOrder.verify(w, times(1)).onNext(null);
205+
inOrder.verify(w, times(1)).onNext("b");
206+
inOrder.verify(w, times(1)).onNext(null);
207+
inOrder.verify(w, times(1)).onCompleted();
208+
inOrder.verify(w, never()).onNext(anyString());
209+
verify(w, never()).onError(any(Throwable.class));
210+
}
211+
212+
@Test
213+
public void testDistinctUntilChangedOfSourceWithExceptionsFromKeySelector() {
214+
Observable<String> src = from("a", "b", null, "c");
215+
create(distinctUntilChanged(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
216+
217+
InOrder inOrder = inOrder(w);
218+
inOrder.verify(w, times(1)).onNext("a");
219+
inOrder.verify(w, times(1)).onNext("b");
220+
verify(w, times(1)).onError(any(NullPointerException.class));
221+
inOrder.verify(w, never()).onNext(anyString());
222+
inOrder.verify(w, never()).onCompleted();
223+
}
224+
}
225+
}

0 commit comments

Comments
 (0)