Skip to content

Commit fd6dd16

Browse files
Collect Operator
1 parent 3674c2c commit fd6dd16

File tree

2 files changed

+58
-0
lines changed

2 files changed

+58
-0
lines changed

rxjava-core/src/main/java/rx/Observable.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
*/
1616
package rx;
1717

18+
import static org.junit.Assert.*;
1819
import static rx.util.functions.Functions.*;
1920

2021
import java.util.ArrayList;
@@ -123,6 +124,7 @@
123124
import rx.util.Timestamped;
124125
import rx.util.functions.Action0;
125126
import rx.util.functions.Action1;
127+
import rx.util.functions.Action2;
126128
import rx.util.functions.Async;
127129
import rx.util.functions.Func0;
128130
import rx.util.functions.Func1;
@@ -5350,6 +5352,29 @@ public <R> Observable<R> reduce(R initialValue, Func2<R, ? super T, R> accumulat
53505352
return create(OperationScan.scan(this, initialValue, accumulator)).takeLast(1);
53515353
}
53525354

5355+
/**
5356+
* Collect values into a single mutable data structure.
5357+
* <p>
5358+
* A simplified version of `reduce` that does not need to return the state on each pass.
5359+
* <p>
5360+
*
5361+
* @param state
5362+
* @param collector
5363+
* @return
5364+
*/
5365+
public <R> Observable<R> collect(R state, final Action2<R, ? super T> collector) {
5366+
Func2<R, T, R> accumulator = new Func2<R, T, R>() {
5367+
5368+
@Override
5369+
public R call(R state, T value) {
5370+
collector.call(state, value);
5371+
return state;
5372+
}
5373+
5374+
};
5375+
return reduce(state, accumulator);
5376+
}
5377+
53535378
/**
53545379
* Synonymous with <code>reduce()</code>.
53555380
* <p>

rxjava-core/src/test/java/rx/ObservableTests.java

Lines changed: 33 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@
4343
import rx.subscriptions.BooleanSubscription;
4444
import rx.subscriptions.Subscriptions;
4545
import rx.util.functions.Action1;
46+
import rx.util.functions.Action2;
4647
import rx.util.functions.Func0;
4748
import rx.util.functions.Func1;
4849
import rx.util.functions.Func2;
@@ -1091,5 +1092,37 @@ public String answer(InvocationOnMock invocation) throws Throwable {
10911092

10921093
verify(func, times(1)).call();
10931094
}
1095+
1096+
@Test
1097+
public void testCollectToList() {
1098+
List<Integer> list = Observable.from(1, 2, 3).collect(new ArrayList<Integer>(), new Action2<List<Integer>, Integer>() {
1099+
1100+
@Override
1101+
public void call(List<Integer> list, Integer v) {
1102+
list.add(v);
1103+
}
1104+
}).toBlockingObservable().last();
1105+
1106+
assertEquals(3, list.size());
1107+
assertEquals(1, list.get(0).intValue());
1108+
assertEquals(2, list.get(1).intValue());
1109+
assertEquals(3, list.get(2).intValue());
1110+
}
1111+
1112+
@Test
1113+
public void testCollectToString() {
1114+
String value = Observable.from(1, 2, 3).collect(new StringBuilder(), new Action2<StringBuilder, Integer>() {
1115+
1116+
@Override
1117+
public void call(StringBuilder sb, Integer v) {
1118+
if (sb.length() > 0) {
1119+
sb.append("-");
1120+
}
1121+
sb.append(v);
1122+
}
1123+
}).toBlockingObservable().last().toString();
1124+
1125+
assertEquals("1-2-3", value);
1126+
}
10941127

10951128
}

0 commit comments

Comments
 (0)