Skip to content

Commit 00d7c3b

Browse files
Merge pull request #381 from jmhofer/mapWithIndex
Implemented `mapWithIndex`
2 parents 141bb6b + 8f10729 commit 00d7c3b

File tree

2 files changed

+144
-40
lines changed

2 files changed

+144
-40
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 18 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3090,11 +3090,29 @@ public Observable<T> where(Func1<? super T, Boolean> predicate) {
30903090
* a function to apply to each item emitted by the Observable
30913091
* @return an Observable that emits the items from the source Observable, transformed by the
30923092
* given function
3093+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244306%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
30933094
*/
30943095
public <R> Observable<R> map(Func1<? super T, ? extends R> func) {
30953096
return create(OperationMap.map(this, func));
30963097
}
30973098

3099+
/**
3100+
* Returns an Observable that applies the given function to each item emitted by an
3101+
* Observable and emits the result.
3102+
* <p>
3103+
* <img width="640" src="https://raw.github.com/wiki/Netflix/RxJava/images/rx-operators/map.png">
3104+
*
3105+
* @param func
3106+
* a function to apply to each item emitted by the Observable. The function takes the
3107+
* index of the emitted item as additional parameter.
3108+
* @return an Observable that emits the items from the source Observable, transformed by the
3109+
* given function
3110+
* @see <a href="http://msdn.microsoft.com/en-us/library/hh244311%28v=vs.103%29.aspx">MSDN: Observable.Select</a>
3111+
*/
3112+
public <R> Observable<R> mapWithIndex(Func2<? super T, Integer, ? extends R> func) {
3113+
return create(OperationMap.mapWithIndex(this, func));
3114+
}
3115+
30983116
/**
30993117
* Creates a new Observable by applying a function that you supply to each item emitted by
31003118
* the source Observable, where that function returns an Observable, and then merging those

rxjava-core/src/main/java/rx/operators/OperationMap.java

Lines changed: 126 additions & 40 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import org.junit.Before;
2727
import org.junit.Test;
28+
import org.mockito.InOrder;
2829
import org.mockito.Mock;
2930
import org.mockito.MockitoAnnotations;
3031

@@ -33,6 +34,7 @@
3334
import rx.Observer;
3435
import rx.Subscription;
3536
import rx.util.functions.Func1;
37+
import rx.util.functions.Func2;
3638

3739
/**
3840
* Applies a function of your choosing to every item emitted by an Observable, and returns this
@@ -56,8 +58,42 @@ public final class OperationMap {
5658
* the type of the output sequence.
5759
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
5860
*/
59-
public static <T, R> OnSubscribeFunc<R> map(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
60-
return new MapObservable<T, R>(sequence, func);
61+
public static <T, R> OnSubscribeFunc<R> map(final Observable<? extends T> sequence, final Func1<? super T, ? extends R> func) {
62+
return new OnSubscribeFunc<R>() {
63+
@Override
64+
public Subscription onSubscribe(Observer<? super R> observer) {
65+
return new MapObservable<T, R>(sequence, new Func2<T, Integer, R>() {
66+
@Override
67+
public R call(T value, @SuppressWarnings("unused") Integer unused) {
68+
return func.call(value);
69+
}
70+
}).onSubscribe(observer);
71+
}
72+
};
73+
}
74+
75+
/**
76+
* Accepts a sequence and a transformation function. Returns a sequence that is the result of
77+
* applying the transformation function to each item in the sequence.
78+
*
79+
* @param sequence
80+
* the input sequence.
81+
* @param func
82+
* a function to apply to each item in the sequence. The function gets the index of the emitted item
83+
* as additional parameter.
84+
* @param <T>
85+
* the type of the input sequence.
86+
* @param <R>
87+
* the type of the output sequence.
88+
* @return a sequence that is the result of applying the transformation function to each item in the input sequence.
89+
*/
90+
public static <T, R> OnSubscribeFunc<R> mapWithIndex(final Observable<? extends T> sequence, final Func2<? super T, Integer, ? extends R> func) {
91+
return new OnSubscribeFunc<R>() {
92+
@Override
93+
public Subscription onSubscribe(Observer<? super R> observer) {
94+
return new MapObservable<T, R>(sequence, func).onSubscribe(observer);
95+
}
96+
};
6197
}
6298

6399
/**
@@ -89,56 +125,50 @@ public static <T, R> OnSubscribeFunc<R> mapMany(Observable<? extends T> sequence
89125
* the type of the output sequence.
90126
*/
91127
private static class MapObservable<T, R> implements OnSubscribeFunc<R> {
92-
public MapObservable(Observable<? extends T> sequence, Func1<? super T, ? extends R> func) {
128+
public MapObservable(Observable<? extends T> sequence, Func2<? super T, Integer, ? extends R> func) {
93129
this.sequence = sequence;
94130
this.func = func;
95131
}
96132

97-
private Observable<? extends T> sequence;
98-
99-
private Func1<? super T, ? extends R> func;
100-
101-
public Subscription onSubscribe(Observer<? super R> observer) {
102-
return sequence.subscribe(new MapObserver<T, R>(observer, func));
103-
}
104-
}
105-
106-
/**
107-
* An observer that applies a transformation function to each item and forwards the result to an inner observer.
108-
*
109-
* @param <T>
110-
* the type of the observer items.
111-
* @param <R>
112-
* the type of the inner observer items.
113-
*/
114-
private static class MapObserver<T, R> implements Observer<T> {
115-
public MapObserver(Observer<? super R> observer, Func1<? super T, ? extends R> func) {
116-
this.observer = observer;
117-
this.func = func;
118-
}
119-
120-
Observer<? super R> observer;
121-
122-
Func1<? super T, ? extends R> func;
133+
private final Observable<? extends T> sequence;
134+
private final Func2<? super T, Integer, ? extends R> func;
135+
private int index;
123136

124-
public void onNext(T value) {
125-
// let the exception be thrown if func fails as a SafeObserver wrapping this will handle it
126-
observer.onNext(func.call(value));
127-
}
137+
@Override
138+
public Subscription onSubscribe(final Observer<? super R> observer) {
139+
return sequence.subscribe(new Observer<T>() {
140+
@Override
141+
public void onNext(T value) {
142+
observer.onNext(func.call(value, index));
143+
index++;
144+
}
128145

129-
public void onError(Throwable ex) {
130-
observer.onError(ex);
131-
}
146+
@Override
147+
public void onError(Throwable ex) {
148+
observer.onError(ex);
149+
}
132150

133-
public void onCompleted() {
134-
observer.onCompleted();
151+
@Override
152+
public void onCompleted() {
153+
observer.onCompleted();
154+
}
155+
});
135156
}
136157
}
137158

138159
public static class UnitTest {
139160
@Mock
140161
Observer<String> stringObserver;
141-
162+
@Mock
163+
Observer<String> stringObserver2;
164+
165+
final static Func2<String, Integer, String> APPEND_INDEX = new Func2<String, Integer, String>() {
166+
@Override
167+
public String call(String value, Integer index) {
168+
return value + index;
169+
}
170+
};
171+
142172
@Before
143173
public void before() {
144174
MockitoAnnotations.initMocks(this);
@@ -164,9 +194,42 @@ public String call(Map<String, String> map) {
164194
verify(stringObserver, times(1)).onNext("OneFirst");
165195
verify(stringObserver, times(1)).onNext("TwoFirst");
166196
verify(stringObserver, times(1)).onCompleted();
197+
}
167198

199+
@Test
200+
public void testMapWithIndex() {
201+
Observable<String> w = Observable.from("a", "b", "c");
202+
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
203+
m.subscribe(stringObserver);
204+
InOrder inOrder = inOrder(stringObserver);
205+
inOrder.verify(stringObserver, times(1)).onNext("a0");
206+
inOrder.verify(stringObserver, times(1)).onNext("b1");
207+
inOrder.verify(stringObserver, times(1)).onNext("c2");
208+
inOrder.verify(stringObserver, times(1)).onCompleted();
209+
verify(stringObserver, never()).onError(any(Throwable.class));
168210
}
211+
212+
@Test
213+
public void testMapWithIndexAndMultipleSubscribers() {
214+
Observable<String> w = Observable.from("a", "b", "c");
215+
Observable<String> m = Observable.create(mapWithIndex(w, APPEND_INDEX));
216+
m.subscribe(stringObserver);
217+
m.subscribe(stringObserver2);
218+
InOrder inOrder = inOrder(stringObserver);
219+
inOrder.verify(stringObserver, times(1)).onNext("a0");
220+
inOrder.verify(stringObserver, times(1)).onNext("b1");
221+
inOrder.verify(stringObserver, times(1)).onNext("c2");
222+
inOrder.verify(stringObserver, times(1)).onCompleted();
223+
verify(stringObserver, never()).onError(any(Throwable.class));
169224

225+
InOrder inOrder2 = inOrder(stringObserver2);
226+
inOrder2.verify(stringObserver2, times(1)).onNext("a0");
227+
inOrder2.verify(stringObserver2, times(1)).onNext("b1");
228+
inOrder2.verify(stringObserver2, times(1)).onNext("c2");
229+
inOrder2.verify(stringObserver2, times(1)).onCompleted();
230+
verify(stringObserver2, never()).onError(any(Throwable.class));
231+
}
232+
170233
@Test
171234
public void testMapMany() {
172235
/* simulate a top-level async call which returns IDs */
@@ -246,12 +309,34 @@ public String call(Map<String, String> map) {
246309

247310
}
248311

312+
@Test
313+
public void testMapWithError() {
314+
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
315+
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
316+
@Override
317+
public String call(String s) {
318+
if ("fail".equals(s)) {
319+
throw new RuntimeException("Forced Failure");
320+
}
321+
return s;
322+
}
323+
}));
324+
325+
m.subscribe(stringObserver);
326+
verify(stringObserver, times(1)).onNext("one");
327+
verify(stringObserver, never()).onNext("two");
328+
verify(stringObserver, never()).onNext("three");
329+
verify(stringObserver, never()).onCompleted();
330+
verify(stringObserver, times(1)).onError(any(Throwable.class));
331+
}
332+
249333
@Test
250334
public void testMapWithSynchronousObservableContainingError() {
251335
Observable<String> w = Observable.from("one", "fail", "two", "three", "fail");
252336
final AtomicInteger c1 = new AtomicInteger();
253337
final AtomicInteger c2 = new AtomicInteger();
254338
Observable<String> m = Observable.create(map(w, new Func1<String, String>() {
339+
@Override
255340
public String call(String s) {
256341
if ("fail".equals(s))
257342
throw new RuntimeException("Forced Failure");
@@ -260,6 +345,7 @@ public String call(String s) {
260345
return s;
261346
}
262347
})).map(new Func1<String, String>() {
348+
@Override
263349
public String call(String s) {
264350
System.out.println("SecondMapper:" + s);
265351
c2.incrementAndGet();
@@ -280,7 +366,7 @@ public String call(String s) {
280366
assertEquals(1, c2.get());
281367
}
282368

283-
private Map<String, String> getMap(String prefix) {
369+
private static Map<String, String> getMap(String prefix) {
284370
Map<String, String> m = new HashMap<String, String>();
285371
m.put("firstName", prefix + "First");
286372
m.put("lastName", prefix + "Last");

0 commit comments

Comments
 (0)