Skip to content

Commit ccf921b

Browse files
Merge pull request #604 from akarnokd/StringObservableOps1
Added op:join to concat objects with separator between elements.
2 parents 97c8146 + 5f0dfc5 commit ccf921b

File tree

2 files changed

+142
-2
lines changed

2 files changed

+142
-2
lines changed

rxjava-contrib/rxjava-string/src/main/java/rx/observables/StringObservable.java

Lines changed: 55 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -197,6 +197,7 @@ public byte[] call(String str) {
197197
*/
198198
public static Observable<String> stringConcat(Observable<String> src) {
199199
return src.aggregate(new Func2<String, String, String>() {
200+
@Override
200201
public String call(String a, String b) {
201202
return a + b;
202203
}
@@ -267,4 +268,58 @@ private void output(String part) {
267268
}
268269
});
269270
}
271+
/**
272+
* Concatenates the sequence of values by adding a separator
273+
* between them and emitting the result once the source completes.
274+
* <p>
275+
* The conversion from the value type to String is performed via
276+
* {@link java.lang.String#valueOf(java.lang.Object)} calls.
277+
* <p>
278+
* For example:
279+
* <pre>
280+
* Observable&lt;Object> source = Observable.from("a", 1, "c");
281+
* Observable&lt;String> result = join(source, ", ");
282+
* </pre>
283+
*
284+
* will yield a single element equal to "a, 1, c".
285+
*
286+
* @param source the source sequence of CharSequence values
287+
* @param separator the separator to a
288+
* @return an Observable which emits a single String value having the concatenated
289+
* values of the source observable with the separator between elements
290+
*/
291+
public static <T> Observable<String> join(final Observable<T> source, final CharSequence separator) {
292+
return Observable.create(new OnSubscribeFunc<String>() {
293+
294+
@Override
295+
public Subscription onSubscribe(final Observer<? super String> t1) {
296+
return source.subscribe(new Observer<T>() {
297+
boolean mayAddSeparator;
298+
StringBuilder b = new StringBuilder();
299+
@Override
300+
public void onNext(T args) {
301+
if (mayAddSeparator) {
302+
b.append(separator);
303+
}
304+
mayAddSeparator = true;
305+
b.append(String.valueOf(args));
306+
}
307+
308+
@Override
309+
public void onError(Throwable e) {
310+
b = null;
311+
t1.onError(e);
312+
}
313+
314+
@Override
315+
public void onCompleted() {
316+
String str = b.toString();
317+
b = null;
318+
t1.onNext(str);
319+
t1.onCompleted();
320+
}
321+
});
322+
}
323+
});
324+
}
270325
}

rxjava-contrib/rxjava-string/src/test/java/rx/observables/StringObservableTest.java

Lines changed: 87 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,10 @@
2323
import java.nio.charset.MalformedInputException;
2424

2525
import org.junit.Test;
26+
import static org.mockito.Mockito.*;
2627

2728
import rx.Observable;
28-
import rx.observables.BlockingObservable;
29-
import rx.observables.StringObservable;
29+
import rx.Observer;
3030
import rx.util.AssertObservable;
3131

3232
public class StringObservableTest {
@@ -127,4 +127,89 @@ public void testSplit(String message, String regex, int limit, Observable<String
127127
Observable<String> exp = Observable.from(parts);
128128
AssertObservable.assertObservableEqualsBlocking("when input is "+message+" and limit = "+ limit, exp, act);
129129
}
130+
131+
@Test
132+
public void testJoinMixed() {
133+
Observable<Object> source = Observable.<Object>from("a", 1, "c");
134+
135+
Observable<String> result = StringObservable.join(source, ", ");
136+
137+
Observer<Object> observer = mock(Observer.class);
138+
139+
result.subscribe(observer);
140+
141+
verify(observer, times(1)).onNext("a, 1, c");
142+
verify(observer, times(1)).onCompleted();
143+
verify(observer, never()).onError(any(Throwable.class));
144+
}
145+
@Test
146+
public void testJoinWithEmptyString() {
147+
Observable<String> source = Observable.from("", "b", "c");
148+
149+
Observable<String> result = StringObservable.join(source, ", ");
150+
151+
Observer<Object> observer = mock(Observer.class);
152+
153+
result.subscribe(observer);
154+
155+
verify(observer, times(1)).onNext(", b, c");
156+
verify(observer, times(1)).onCompleted();
157+
verify(observer, never()).onError(any(Throwable.class));
158+
}
159+
@Test
160+
public void testJoinWithNull() {
161+
Observable<String> source = Observable.from("a", null, "c");
162+
163+
Observable<String> result = StringObservable.join(source, ", ");
164+
165+
Observer<Object> observer = mock(Observer.class);
166+
167+
result.subscribe(observer);
168+
169+
verify(observer, times(1)).onNext("a, null, c");
170+
verify(observer, times(1)).onCompleted();
171+
verify(observer, never()).onError(any(Throwable.class));
172+
}
173+
@Test
174+
public void testJoinSingle() {
175+
Observable<String> source = Observable.from("a");
176+
177+
Observable<String> result = StringObservable.join(source, ", ");
178+
179+
Observer<Object> observer = mock(Observer.class);
180+
181+
result.subscribe(observer);
182+
183+
verify(observer, times(1)).onNext("a");
184+
verify(observer, times(1)).onCompleted();
185+
verify(observer, never()).onError(any(Throwable.class));
186+
}
187+
@Test
188+
public void testJoinEmpty() {
189+
Observable<String> source = Observable.empty();
190+
191+
Observable<String> result = StringObservable.join(source, ", ");
192+
193+
Observer<Object> observer = mock(Observer.class);
194+
195+
result.subscribe(observer);
196+
197+
verify(observer, times(1)).onNext("");
198+
verify(observer, times(1)).onCompleted();
199+
verify(observer, never()).onError(any(Throwable.class));
200+
}
201+
@Test
202+
public void testJoinThrows() {
203+
Observable<String> source = Observable.concat(Observable.just("a"), Observable.<String>error(new RuntimeException("Forced failure")));
204+
205+
Observable<String> result = StringObservable.join(source, ", ");
206+
207+
Observer<Object> observer = mock(Observer.class);
208+
209+
result.subscribe(observer);
210+
211+
verify(observer, never()).onNext("a");
212+
verify(observer, never()).onCompleted();
213+
verify(observer, times(1)).onError(any(Throwable.class));
214+
}
130215
}

0 commit comments

Comments
 (0)