Skip to content

Commit bd0c47a

Browse files
Merge pull request #516 from abersnaze/string-observable
Adding utility functions for observables of strings useful for processing non blocking IO.
2 parents 13cd6a9 + a07fb6a commit bd0c47a

File tree

6 files changed

+569
-1
lines changed

6 files changed

+569
-1
lines changed
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
apply plugin: 'osgi'
2+
3+
sourceCompatibility = JavaVersion.VERSION_1_6
4+
targetCompatibility = JavaVersion.VERSION_1_6
5+
6+
dependencies {
7+
compile project(':rxjava-core')
8+
testCompile project(":rxjava-core").sourceSets.test.output
9+
provided 'junit:junit-dep:4.10'
10+
provided 'org.mockito:mockito-core:1.8.5'
11+
}
12+
13+
javadoc {
14+
options {
15+
doclet = "org.benjchristensen.doclet.DocletExclude"
16+
docletpath = [rootProject.file('./gradle/doclet-exclude.jar')]
17+
stylesheetFile = rootProject.file('./gradle/javadocStyleSheet.css')
18+
windowTitle = "RxJava Javadoc ${project.version}"
19+
}
20+
options.addStringOption('top').value = '<h2 class="title" style="padding-top:40px">RxJava</h2>'
21+
}
22+
23+
jar {
24+
manifest {
25+
name = 'rxjava-string'
26+
instruction 'Bundle-Vendor', 'Netflix'
27+
instruction 'Bundle-DocURL', 'https://github.com/Netflix/RxJava'
28+
instruction 'Import-Package', '!org.junit,!junit.framework,!org.mockito.*,*'
29+
}
30+
}
Lines changed: 255 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,255 @@
1+
package rx.observables;
2+
3+
import java.nio.ByteBuffer;
4+
import java.nio.CharBuffer;
5+
import java.nio.charset.CharacterCodingException;
6+
import java.nio.charset.Charset;
7+
import java.nio.charset.CharsetDecoder;
8+
import java.nio.charset.CharsetEncoder;
9+
import java.nio.charset.CoderResult;
10+
import java.nio.charset.CodingErrorAction;
11+
import java.util.Arrays;
12+
import java.util.regex.Pattern;
13+
14+
import rx.Observable;
15+
import rx.Observer;
16+
import rx.Subscription;
17+
import rx.Observable.OnSubscribeFunc;
18+
import rx.util.functions.Func1;
19+
import rx.util.functions.Func2;
20+
21+
public class StringObservable {
22+
/**
23+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
24+
*
25+
* @param src
26+
* @param charsetName
27+
* @return
28+
*/
29+
public static Observable<String> decode(Observable<byte[]> src, String charsetName) {
30+
return decode(src, Charset.forName(charsetName));
31+
}
32+
33+
/**
34+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
35+
*
36+
* @param src
37+
* @param charset
38+
* @return
39+
*/
40+
public static Observable<String> decode(Observable<byte[]> src, Charset charset) {
41+
return decode(src, charset.newDecoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
42+
}
43+
44+
/**
45+
* Decodes a stream the multibyte chunks into a stream of strings that works on infinite streams and where handles when a multibyte character spans two chunks.
46+
* This method allows for more control over how malformed and unmappable characters are handled.
47+
*
48+
* @param src
49+
* @param charsetDecoder
50+
* @return
51+
*/
52+
public static Observable<String> decode(final Observable<byte[]> src, final CharsetDecoder charsetDecoder) {
53+
return Observable.create(new OnSubscribeFunc<String>() {
54+
@Override
55+
public Subscription onSubscribe(final Observer<? super String> observer) {
56+
return src.subscribe(new Observer<byte[]>() {
57+
private ByteBuffer leftOver = null;
58+
59+
@Override
60+
public void onCompleted() {
61+
if (process(null, leftOver, true))
62+
observer.onCompleted();
63+
}
64+
65+
@Override
66+
public void onError(Throwable e) {
67+
if (process(null, leftOver, true))
68+
observer.onError(e);
69+
}
70+
71+
@Override
72+
public void onNext(byte[] bytes) {
73+
process(bytes, leftOver, false);
74+
}
75+
76+
public boolean process(byte[] next, ByteBuffer last, boolean endOfInput) {
77+
ByteBuffer bb;
78+
if (last != null) {
79+
if (next != null) {
80+
// merge leftover in front of the next bytes
81+
bb = ByteBuffer.allocate(last.remaining() + next.length);
82+
bb.put(last);
83+
bb.put(next);
84+
bb.flip();
85+
}
86+
else { // next == null
87+
bb = last;
88+
}
89+
}
90+
else { // last == null
91+
if (next != null) {
92+
bb = ByteBuffer.wrap(next);
93+
}
94+
else { // next == null
95+
return true;
96+
}
97+
}
98+
99+
CharBuffer cb = CharBuffer.allocate((int) (bb.limit() * charsetDecoder.averageCharsPerByte()));
100+
CoderResult cr = charsetDecoder.decode(bb, cb, endOfInput);
101+
cb.flip();
102+
103+
if (cr.isError()) {
104+
try {
105+
cr.throwException();
106+
}
107+
catch (CharacterCodingException e) {
108+
observer.onError(e);
109+
return false;
110+
}
111+
}
112+
113+
if (bb.remaining() > 0) {
114+
leftOver = bb;
115+
}
116+
else {
117+
leftOver = null;
118+
}
119+
120+
String string = cb.toString();
121+
if (!string.isEmpty())
122+
observer.onNext(string);
123+
124+
return true;
125+
}
126+
});
127+
}
128+
});
129+
}
130+
131+
/**
132+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
133+
*
134+
* @param src
135+
* @param charsetName
136+
* @return
137+
*/
138+
public static Observable<byte[]> encode(Observable<String> src, String charsetName) {
139+
return encode(src, Charset.forName(charsetName));
140+
}
141+
142+
/**
143+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
144+
*
145+
* @param src
146+
* @param charset
147+
* @return
148+
*/
149+
public static Observable<byte[]> encode(Observable<String> src, Charset charset) {
150+
return encode(src, charset.newEncoder().onMalformedInput(CodingErrorAction.REPLACE).onUnmappableCharacter(CodingErrorAction.REPLACE));
151+
}
152+
153+
/**
154+
* Encodes a possible infinite stream of strings into a Observable of byte arrays.
155+
* This method allows for more control over how malformed and unmappable characters are handled.
156+
*
157+
* @param src
158+
* @param charsetEncoder
159+
* @return
160+
*/
161+
public static Observable<byte[]> encode(Observable<String> src, final CharsetEncoder charsetEncoder) {
162+
return src.map(new Func1<String, byte[]>() {
163+
@Override
164+
public byte[] call(String str) {
165+
CharBuffer cb = CharBuffer.wrap(str);
166+
ByteBuffer bb;
167+
try {
168+
bb = charsetEncoder.encode(cb);
169+
} catch (CharacterCodingException e) {
170+
throw new RuntimeException(e);
171+
}
172+
return Arrays.copyOfRange(bb.array(), bb.position(), bb.limit());
173+
}
174+
});
175+
}
176+
177+
/**
178+
* Gather up all of the strings in to one string to be able to use it as one message. Don't use this on infinite streams.
179+
*
180+
* @param src
181+
* @return
182+
*/
183+
public static Observable<String> stringConcat(Observable<String> src) {
184+
return src.aggregate(new Func2<String, String, String>() {
185+
public String call(String a, String b) {
186+
return a + b;
187+
}
188+
});
189+
}
190+
191+
/**
192+
* Rechunks the strings based on a regex pattern and works on infinite stream.
193+
*
194+
* resplit(["boo:an", "d:foo"], ":") --> ["boo", "and", "foo"]
195+
* resplit(["boo:an", "d:foo"], "o") --> ["b", "", ":and:f", "", ""]
196+
*
197+
* See {@link Pattern}
198+
*
199+
* @param src
200+
* @param regex
201+
* @return
202+
*/
203+
public static Observable<String> split(final Observable<String> src, String regex) {
204+
final Pattern pattern = Pattern.compile(regex);
205+
return Observable.create(new OnSubscribeFunc<String>() {
206+
@Override
207+
public Subscription onSubscribe(final Observer<? super String> observer) {
208+
return src.subscribe(new Observer<String>() {
209+
private String leftOver = null;
210+
211+
@Override
212+
public void onCompleted() {
213+
output(leftOver);
214+
observer.onCompleted();
215+
}
216+
217+
@Override
218+
public void onError(Throwable e) {
219+
output(leftOver);
220+
observer.onError(e);
221+
}
222+
223+
@Override
224+
public void onNext(String segment) {
225+
String[] parts = pattern.split(segment, -1);
226+
227+
if (leftOver != null)
228+
parts[0] = leftOver + parts[0];
229+
for (int i = 0; i < parts.length - 1; i++) {
230+
String part = parts[i];
231+
output(part);
232+
}
233+
leftOver = parts[parts.length - 1];
234+
}
235+
236+
private int emptyPartCount = 0;
237+
/**
238+
* when limit == 0 trailing empty parts are not emitted.
239+
* @param part
240+
*/
241+
private void output(String part) {
242+
if (part.isEmpty()) {
243+
emptyPartCount++;
244+
}
245+
else {
246+
for(; emptyPartCount>0; emptyPartCount--)
247+
observer.onNext("");
248+
observer.onNext(part);
249+
}
250+
}
251+
});
252+
}
253+
});
254+
}
255+
}

0 commit comments

Comments
 (0)