Skip to content

Commit 3c34884

Browse files
Merge branch 'distinct' of git://github.com/jmhofer/RxJava into distinct-merge
Conflicts: rxjava-core/src/main/java/rx/Observable.java
2 parents 7217232 + 243e624 commit 3c34884

File tree

2 files changed

+237
-0
lines changed

2 files changed

+237
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import rx.operators.OperationDefer;
3737
import rx.operators.OperationDematerialize;
3838
import rx.operators.OperationDistinctUntilChanged;
39+
import rx.operators.OperationDistinct;
3940
import rx.operators.OperationFilter;
4041
import rx.operators.OperationFinally;
4142
import rx.operators.OperationFirstOrDefault;
@@ -2947,6 +2948,30 @@ public <U> Observable<T> distinctUntilChanged(Func1<? super T, ? extends U> keyS
29472948
return create(OperationDistinctUntilChanged.distinctUntilChanged(this, keySelector));
29482949
}
29492950

2951+
/**
2952+
* Returns an Observable that forwards all distinct items emitted from the source Observable.
2953+
*
2954+
* @return an Observable of distinct items
2955+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh229764%28v=vs.103%29.aspx">MSDN: Observable.distinct</a>
2956+
*/
2957+
public Observable<T> distinct() {
2958+
return create(OperationDistinct.distinct(this));
2959+
}
2960+
2961+
/**
2962+
* Returns an Observable that forwards all items emitted from the source Observable that are distinct according to
2963+
* a key selector function.
2964+
*
2965+
* @param keySelector
2966+
* a function that projects an emitted item to a key value which is used for deciding whether an item is
2967+
* distinct from another one or not
2968+
* @return an Observable of distinct items
2969+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244310%28v=vs.103%29.aspx">MSDN: Observable.distinct</a>
2970+
*/
2971+
public <U> Observable<T> distinct(Func1<? super T, ? extends U> keySelector) {
2972+
return create(OperationDistinct.distinct(this, keySelector));
2973+
}
2974+
29502975
/**
29512976
* Registers an {@link Action0} to be called when this Observable invokes {@link Observer#onCompleted onCompleted} or {@link Observer#onError onError}.
29522977
* <p>
Lines changed: 212 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,212 @@
1+
/**
2+
* Copyright 2013 Netflix, Inc.
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package rx.operators;
17+
18+
import static org.mockito.Matchers.*;
19+
import static org.mockito.Mockito.*;
20+
import static org.mockito.MockitoAnnotations.initMocks;
21+
import static rx.Observable.create;
22+
import static rx.Observable.empty;
23+
import static rx.Observable.from;
24+
25+
import java.util.HashSet;
26+
import java.util.Set;
27+
28+
import org.junit.Before;
29+
import org.junit.Test;
30+
import org.mockito.InOrder;
31+
import org.mockito.Mock;
32+
33+
import rx.Observable;
34+
import rx.Observable.OnSubscribeFunc;
35+
import rx.Observer;
36+
import rx.Subscription;
37+
import rx.subscriptions.Subscriptions;
38+
import rx.util.functions.Action0;
39+
import rx.util.functions.Func1;
40+
import rx.util.functions.Functions;
41+
42+
/**
43+
* Returns an Observable that emits all distinct items emitted by the source.
44+
*
45+
* Be careful with this operation when using infinite or very large observables
46+
* as it has to store all distinct values it has received.
47+
*/
48+
public final class OperationDistinct {
49+
50+
/**
51+
* Returns an Observable that emits all distinct items emitted by the source
52+
* @param source
53+
* The source Observable to emit the distinct items for.
54+
* @return A subscription function for creating the target Observable.
55+
*/
56+
public static <T, U> OnSubscribeFunc<T> distinct(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector) {
57+
return new Distinct<T, U>(source, keySelector);
58+
}
59+
60+
/**
61+
* Returns an Observable that emits all distinct items emitted by the source
62+
* @param source
63+
* The source Observable to emit the distinct items for.
64+
* @return A subscription function for creating the target Observable.
65+
*/
66+
public static <T> OnSubscribeFunc<T> distinct(Observable<? extends T> source) {
67+
return new Distinct<T, T>(source, Functions.<T>identity());
68+
}
69+
70+
private static class Distinct<T, U> implements OnSubscribeFunc<T> {
71+
private final Observable<? extends T> source;
72+
private final Func1<? super T, ? extends U> keySelector;
73+
74+
private Distinct(Observable<? extends T> source, Func1<? super T, ? extends U> keySelector) {
75+
this.source = source;
76+
this.keySelector = keySelector;
77+
}
78+
79+
@Override
80+
public Subscription onSubscribe(final Observer<? super T> observer) {
81+
final Subscription sourceSub = source.subscribe(new Observer<T>() {
82+
private final Set<U> emittedKeys = new HashSet<U>();
83+
84+
@Override
85+
public void onCompleted() {
86+
observer.onCompleted();
87+
}
88+
89+
@Override
90+
public void onError(Throwable e) {
91+
observer.onError(e);
92+
}
93+
94+
@Override
95+
public void onNext(T next) {
96+
try {
97+
U nextKey = keySelector.call(next);
98+
if (!emittedKeys.contains(nextKey)) {
99+
emittedKeys.add(nextKey);
100+
observer.onNext(next);
101+
}
102+
} catch (Throwable t) {
103+
// keySelector is a user function, may throw something
104+
observer.onError(t);
105+
}
106+
}
107+
});
108+
109+
return Subscriptions.create(new Action0() {
110+
@Override
111+
public void call() {
112+
sourceSub.unsubscribe();
113+
}
114+
});
115+
}
116+
}
117+
118+
public static class UnitTest {
119+
@Mock
120+
Observer<? super String> w;
121+
122+
// nulls lead to exceptions
123+
final Func1<String, String> TO_UPPER_WITH_EXCEPTION = new Func1<String, String>() {
124+
@Override
125+
public String call(String s) {
126+
return s.toUpperCase();
127+
}
128+
};
129+
130+
@Before
131+
public void before() {
132+
initMocks(this);
133+
}
134+
135+
@Test
136+
public void testDistinctOfNone() {
137+
Observable<String> src = empty();
138+
create(distinct(src)).subscribe(w);
139+
140+
verify(w, never()).onNext(anyString());
141+
verify(w, never()).onError(any(Throwable.class));
142+
verify(w, times(1)).onCompleted();
143+
}
144+
145+
@Test
146+
public void testDistinctOfNoneWithKeySelector() {
147+
Observable<String> src = empty();
148+
create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
149+
150+
verify(w, never()).onNext(anyString());
151+
verify(w, never()).onError(any(Throwable.class));
152+
verify(w, times(1)).onCompleted();
153+
}
154+
155+
@Test
156+
public void testDistinctOfNormalSource() {
157+
Observable<String> src = from("a", "b", "c", "c", "c", "b", "b", "a", "e");
158+
create(distinct(src)).subscribe(w);
159+
160+
InOrder inOrder = inOrder(w);
161+
inOrder.verify(w, times(1)).onNext("a");
162+
inOrder.verify(w, times(1)).onNext("b");
163+
inOrder.verify(w, times(1)).onNext("c");
164+
inOrder.verify(w, times(1)).onNext("e");
165+
inOrder.verify(w, times(1)).onCompleted();
166+
inOrder.verify(w, never()).onNext(anyString());
167+
verify(w, never()).onError(any(Throwable.class));
168+
}
169+
170+
@Test
171+
public void testDistinctOfNormalSourceWithKeySelector() {
172+
Observable<String> src = from("a", "B", "c", "C", "c", "B", "b", "a", "E");
173+
create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
174+
175+
InOrder inOrder = inOrder(w);
176+
inOrder.verify(w, times(1)).onNext("a");
177+
inOrder.verify(w, times(1)).onNext("B");
178+
inOrder.verify(w, times(1)).onNext("c");
179+
inOrder.verify(w, times(1)).onNext("E");
180+
inOrder.verify(w, times(1)).onCompleted();
181+
inOrder.verify(w, never()).onNext(anyString());
182+
verify(w, never()).onError(any(Throwable.class));
183+
}
184+
185+
@Test
186+
public void testDistinctOfSourceWithNulls() {
187+
Observable<String> src = from(null, "a", "a", null, null, "b", null);
188+
create(distinct(src)).subscribe(w);
189+
190+
InOrder inOrder = inOrder(w);
191+
inOrder.verify(w, times(1)).onNext(null);
192+
inOrder.verify(w, times(1)).onNext("a");
193+
inOrder.verify(w, times(1)).onNext("b");
194+
inOrder.verify(w, times(1)).onCompleted();
195+
inOrder.verify(w, never()).onNext(anyString());
196+
verify(w, never()).onError(any(Throwable.class));
197+
}
198+
199+
@Test
200+
public void testDistinctOfSourceWithExceptionsFromKeySelector() {
201+
Observable<String> src = from("a", "b", null, "c");
202+
create(distinct(src, TO_UPPER_WITH_EXCEPTION)).subscribe(w);
203+
204+
InOrder inOrder = inOrder(w);
205+
inOrder.verify(w, times(1)).onNext("a");
206+
inOrder.verify(w, times(1)).onNext("b");
207+
inOrder.verify(w, times(1)).onError(any(NullPointerException.class));
208+
inOrder.verify(w, never()).onNext(anyString());
209+
inOrder.verify(w, never()).onCompleted();
210+
}
211+
}
212+
}

0 commit comments

Comments
 (0)