Skip to content

Commit 9d4f9c3

Browse files
author
Aaron Tull
committed
Merge pull request #3556 from akarnokd/ToMapErrorPropagation1x
1.x: fix toMap and toMultimap not handling exceptions of the callbacks
2 parents 8316519 + 4949ee3 commit 9d4f9c3

File tree

4 files changed

+233
-19
lines changed

4 files changed

+233
-19
lines changed

src/main/java/rx/internal/operators/OperatorToMap.java

Lines changed: 30 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -21,8 +21,10 @@
2121

2222
import rx.Observable.Operator;
2323
import rx.Subscriber;
24+
import rx.exceptions.Exceptions;
2425
import rx.functions.Func0;
2526
import rx.functions.Func1;
27+
import rx.observers.Subscribers;
2628

2729
/**
2830
* Maps the elements of the source observable into a java.util.Map instance and
@@ -75,9 +77,24 @@ public OperatorToMap(
7577

7678
@Override
7779
public Subscriber<? super T> call(final Subscriber<? super Map<K, V>> subscriber) {
80+
81+
Map<K, V> localMap;
82+
83+
try {
84+
localMap = mapFactory.call();
85+
} catch (Throwable ex) {
86+
Exceptions.throwIfFatal(ex);
87+
subscriber.onError(ex);
88+
Subscriber<? super T> parent = Subscribers.empty();
89+
parent.unsubscribe();
90+
return parent;
91+
}
92+
93+
final Map<K, V> fLocalMap = localMap;
94+
7895
return new Subscriber<T>(subscriber) {
7996

80-
private Map<K, V> map = mapFactory.call();
97+
private Map<K, V> map = fLocalMap;
8198

8299
@Override
83100
public void onStart() {
@@ -86,8 +103,18 @@ public void onStart() {
86103

87104
@Override
88105
public void onNext(T v) {
89-
K key = keySelector.call(v);
90-
V value = valueSelector.call(v);
106+
K key;
107+
V value;
108+
109+
try {
110+
key = keySelector.call(v);
111+
value = valueSelector.call(v);
112+
} catch (Throwable ex) {
113+
Exceptions.throwIfFatal(ex);
114+
subscriber.onError(ex);
115+
return;
116+
}
117+
91118
map.put(key, value);
92119
}
93120

src/main/java/rx/internal/operators/OperatorToMultimap.java

Lines changed: 38 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,11 @@
2222
import java.util.Map;
2323

2424
import rx.Observable.Operator;
25+
import rx.exceptions.Exceptions;
2526
import rx.Subscriber;
2627
import rx.functions.Func0;
2728
import rx.functions.Func1;
29+
import rx.observers.Subscribers;
2830

2931
/**
3032
* Maps the elements of the source observable into a multimap
@@ -103,8 +105,24 @@ public OperatorToMultimap(
103105

104106
@Override
105107
public Subscriber<? super T> call(final Subscriber<? super Map<K, Collection<V>>> subscriber) {
108+
109+
Map<K, Collection<V>> localMap;
110+
111+
try {
112+
localMap = mapFactory.call();
113+
} catch (Throwable ex) {
114+
Exceptions.throwIfFatal(ex);
115+
subscriber.onError(ex);
116+
117+
Subscriber<? super T> parent = Subscribers.empty();
118+
parent.unsubscribe();
119+
return parent;
120+
}
121+
122+
final Map<K, Collection<V>> fLocalMap = localMap;
123+
106124
return new Subscriber<T>(subscriber) {
107-
private Map<K, Collection<V>> map = mapFactory.call();
125+
private Map<K, Collection<V>> map = fLocalMap;
108126

109127
@Override
110128
public void onStart() {
@@ -113,11 +131,27 @@ public void onStart() {
113131

114132
@Override
115133
public void onNext(T v) {
116-
K key = keySelector.call(v);
117-
V value = valueSelector.call(v);
134+
K key;
135+
V value;
136+
137+
try {
138+
key = keySelector.call(v);
139+
value = valueSelector.call(v);
140+
} catch (Throwable ex) {
141+
Exceptions.throwIfFatal(ex);
142+
subscriber.onError(ex);
143+
return;
144+
}
145+
118146
Collection<V> collection = map.get(key);
119147
if (collection == null) {
120-
collection = collectionFactory.call(key);
148+
try {
149+
collection = collectionFactory.call(key);
150+
} catch (Throwable ex) {
151+
Exceptions.throwIfFatal(ex);
152+
subscriber.onError(ex);
153+
return;
154+
}
121155
map.put(key, collection);
122156
}
123157
collection.add(value);

src/test/java/rx/internal/operators/OperatorToMapTest.java

Lines changed: 69 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -16,24 +16,19 @@
1616
package rx.internal.operators;
1717

1818
import static org.mockito.Matchers.any;
19-
import static org.mockito.Mockito.never;
20-
import static org.mockito.Mockito.times;
21-
import static org.mockito.Mockito.verify;
19+
import static org.mockito.Mockito.*;
2220

23-
import java.util.HashMap;
24-
import java.util.LinkedHashMap;
25-
import java.util.Map;
21+
import java.util.*;
2622

27-
import org.junit.Before;
28-
import org.junit.Test;
29-
import org.mockito.Mock;
30-
import org.mockito.MockitoAnnotations;
23+
import org.junit.*;
24+
import org.mockito.*;
3125

3226
import rx.Observable;
3327
import rx.Observer;
34-
import rx.functions.Func0;
35-
import rx.functions.Func1;
28+
import rx.exceptions.TestException;
29+
import rx.functions.*;
3630
import rx.internal.util.UtilityFunctions;
31+
import rx.observers.TestSubscriber;
3732

3833
public class OperatorToMapTest {
3934
@Mock
@@ -224,4 +219,66 @@ public Integer call(String t1) {
224219
verify(objectObserver, times(1)).onError(any(Throwable.class));
225220
}
226221

222+
@Test
223+
public void testKeySelectorThrows() {
224+
TestSubscriber<Object> ts = TestSubscriber.create();
225+
226+
Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
227+
@Override
228+
public Integer call(Integer v) {
229+
throw new TestException();
230+
}
231+
}).subscribe(ts);
232+
233+
ts.assertError(TestException.class);
234+
ts.assertNoValues();
235+
ts.assertNotCompleted();
236+
}
237+
238+
@Test
239+
public void testValueSelectorThrows() {
240+
TestSubscriber<Object> ts = TestSubscriber.create();
241+
242+
Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
243+
@Override
244+
public Integer call(Integer v) {
245+
return v;
246+
}
247+
}, new Func1<Integer, Integer>() {
248+
@Override
249+
public Integer call(Integer v) {
250+
throw new TestException();
251+
}
252+
}).subscribe(ts);
253+
254+
ts.assertError(TestException.class);
255+
ts.assertNoValues();
256+
ts.assertNotCompleted();
257+
}
258+
259+
@Test
260+
public void testMapFactoryThrows() {
261+
TestSubscriber<Object> ts = TestSubscriber.create();
262+
263+
Observable.just(1, 2).toMap(new Func1<Integer, Integer>() {
264+
@Override
265+
public Integer call(Integer v) {
266+
return v;
267+
}
268+
}, new Func1<Integer, Integer>() {
269+
@Override
270+
public Integer call(Integer v) {
271+
return v;
272+
}
273+
}, new Func0<Map<Integer, Integer>>() {
274+
@Override
275+
public Map<Integer, Integer> call() {
276+
throw new TestException();
277+
}
278+
}).subscribe(ts);
279+
280+
ts.assertError(TestException.class);
281+
ts.assertNoValues();
282+
ts.assertNotCompleted();
283+
}
227284
}

src/test/java/rx/internal/operators/OperatorToMultimapTest.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,11 +36,13 @@
3636

3737
import rx.Observable;
3838
import rx.Observer;
39+
import rx.exceptions.TestException;
3940
import rx.functions.Func0;
4041
import rx.functions.Func1;
4142
import rx.internal.operators.OperatorToMultimap.DefaultMultimapCollectionFactory;
4243
import rx.internal.operators.OperatorToMultimap.DefaultToMultimapFactory;
4344
import rx.internal.util.UtilityFunctions;
45+
import rx.observers.TestSubscriber;
4446

4547
public class OperatorToMultimapTest {
4648
@Mock
@@ -269,4 +271,98 @@ public Collection<String> call(Integer t1) {
269271
verify(objectObserver, never()).onNext(expected);
270272
verify(objectObserver, never()).onCompleted();
271273
}
274+
275+
@Test
276+
public void testKeySelectorThrows() {
277+
TestSubscriber<Object> ts = TestSubscriber.create();
278+
279+
Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
280+
@Override
281+
public Integer call(Integer v) {
282+
throw new TestException();
283+
}
284+
}).subscribe(ts);
285+
286+
ts.assertError(TestException.class);
287+
ts.assertNoValues();
288+
ts.assertNotCompleted();
289+
}
290+
291+
@Test
292+
public void testValueSelectorThrows() {
293+
TestSubscriber<Object> ts = TestSubscriber.create();
294+
295+
Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
296+
@Override
297+
public Integer call(Integer v) {
298+
return v;
299+
}
300+
}, new Func1<Integer, Integer>() {
301+
@Override
302+
public Integer call(Integer v) {
303+
throw new TestException();
304+
}
305+
}).subscribe(ts);
306+
307+
ts.assertError(TestException.class);
308+
ts.assertNoValues();
309+
ts.assertNotCompleted();
310+
}
311+
312+
@Test
313+
public void testMapFactoryThrows() {
314+
TestSubscriber<Object> ts = TestSubscriber.create();
315+
316+
Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
317+
@Override
318+
public Integer call(Integer v) {
319+
return v;
320+
}
321+
}, new Func1<Integer, Integer>() {
322+
@Override
323+
public Integer call(Integer v) {
324+
return v;
325+
}
326+
}, new Func0<Map<Integer, Collection<Integer>>>() {
327+
@Override
328+
public Map<Integer, Collection<Integer>> call() {
329+
throw new TestException();
330+
}
331+
}).subscribe(ts);
332+
333+
ts.assertError(TestException.class);
334+
ts.assertNoValues();
335+
ts.assertNotCompleted();
336+
}
337+
338+
@Test
339+
public void testCollectionFactoryThrows() {
340+
TestSubscriber<Object> ts = TestSubscriber.create();
341+
342+
Observable.just(1, 2).toMultimap(new Func1<Integer, Integer>() {
343+
@Override
344+
public Integer call(Integer v) {
345+
return v;
346+
}
347+
}, new Func1<Integer, Integer>() {
348+
@Override
349+
public Integer call(Integer v) {
350+
return v;
351+
}
352+
}, new Func0<Map<Integer, Collection<Integer>>>() {
353+
@Override
354+
public Map<Integer, Collection<Integer>> call() {
355+
return new HashMap<Integer, Collection<Integer>>();
356+
}
357+
}, new Func1<Integer, Collection<Integer>>() {
358+
@Override
359+
public Collection<Integer> call(Integer k) {
360+
throw new TestException();
361+
}
362+
}).subscribe(ts);
363+
364+
ts.assertError(TestException.class);
365+
ts.assertNoValues();
366+
ts.assertNotCompleted();
367+
}
272368
}

0 commit comments

Comments
 (0)