Skip to content

Manual merge of mairbek/single Pull #157 #160

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -244,6 +244,16 @@ def class ObservableTests {
def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3})
assertEquals("default", val)
}

public void testSingle1() {
def s = Observable.toObservable("one").single({ x -> x.length() == 3})
assertEquals("one", s)
}

@Test(expected = IllegalStateException.class)
public void testSingle2() {
Observable.toObservable("one", "two").single({ x -> x.length() == 3})
}

def class AsyncObservable implements Func1<Observer<Integer>, Subscription> {

Expand Down
251 changes: 251 additions & 0 deletions rxjava-core/src/main/java/rx/Observable.java
Original file line number Diff line number Diff line change
Expand Up @@ -408,6 +408,66 @@ public void call(Object args) {
});
}

/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
* @return The single element in the observable sequence.
*/
public T single() {
return single(this);
}

/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param predicate A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
*/
public T single(Func1<T, Boolean> predicate) {
return single(this, predicate);
}

/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param predicate A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
*/
public T single(Object predicate) {
return single(this, predicate);
}

/**
* Returns the only element of an observable sequence, or a default value if the observable sequence is empty.
*
* @param defaultValue default value for a sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue) {
return singleOrDefault(this, defaultValue);
}

/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
* @param defaultValue default value for a sequence.
* @param predicate A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue, Func1<T, Boolean> predicate) {
return singleOrDefault(this, defaultValue, predicate);
}

/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param defaultValue default value for a sequence.
* @param predicate A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public T singleOrDefault(T defaultValue, Object predicate) {
return singleOrDefault(this, defaultValue, predicate);
}

/**
* Allow the {@link RxJavaErrorHandler} to receive the exception from onError.
*
Expand Down Expand Up @@ -1520,6 +1580,7 @@ public static <T> Observable<T> takeWhile(final Observable<T> items, Func1<T, Bo
* @return
*/
public static <T> Observable<T> takeWhile(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return takeWhile(items, new Func1<T, Boolean>() {
Expand All @@ -1542,6 +1603,7 @@ public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Fu
}

public static <T> Observable<T> takeWhileWithIndex(final Observable<T> items, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return create(OperationTake.takeWhileWithIndex(items, new Func2<T, Integer, Boolean>() {
Expand Down Expand Up @@ -1648,6 +1710,127 @@ public Iterator<T> iterator() {
};
}

/**
* Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence.
*
* @param that
* the source Observable
* @return The single element in the observable sequence.
* @throws IllegalStateException
* if there is not exactly one element in the observable sequence
*/
public static <T> T single(Observable<T> that) {
return singleOrDefault(that, false, null);
}

/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param that
* the source Observable
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
* @throws IllegalStateException
* if there is not exactly one element in the observable sequence that matches the predicate
*/
public static <T> T single(Observable<T> that, Func1<T, Boolean> predicate) {
return single(that.filter(predicate));
}

/**
* Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence.
*
* @param that
* the source Observable
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence.
* @throws IllegalStateException
* if there is not exactly one element in the observable sequence that matches the predicate
*/
public static <T> T single(Observable<T> that, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return single(that, new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return (Boolean) _f.call(t);
}
});
}

/**
* Returns the only element of an observable sequence, or a default value if the observable sequence is empty.
*
* @param that
* the source Observable
* @param defaultValue
* default value for a sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public static <T> T singleOrDefault(Observable<T> that, T defaultValue) {
return singleOrDefault(that, true, defaultValue);
}

/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param that
* the source Observable
* @param defaultValue
* default value for a sequence.
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public static <T> T singleOrDefault(Observable<T> that, T defaultValue, Func1<T, Boolean> predicate) {
return singleOrDefault(that.filter(predicate), defaultValue);
}

/**
* Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found.
*
* @param that
* the source Observable
* @param defaultValue
* default value for a sequence.
* @param predicate
* A predicate function to evaluate for elements in the sequence.
* @return The single element in the observable sequence, or a default value if no value is found.
*/
public static <T> T singleOrDefault(Observable<T> that, T defaultValue, Object predicate) {
@SuppressWarnings("rawtypes")
final FuncN _f = Functions.from(predicate);

return singleOrDefault(that, defaultValue, new Func1<T, Boolean>() {
@Override
public Boolean call(T t) {
return (Boolean) _f.call(t);
}
});
}

private static <T> T singleOrDefault(Observable<T> that, boolean hasDefault, T defaultVal) {
Iterator<T> it = that.toIterable().iterator();

if (!it.hasNext()) {
if (hasDefault) {
return defaultVal;
}
throw new IllegalStateException("Expected single entry. Actually empty stream.");
}

T result = it.next();

if (it.hasNext()) {
throw new IllegalStateException("Expected single entry. Actually more than one entry.");
}

return result;
}

/**
* Converts an Iterable sequence to an Observable sequence.
*
Expand Down Expand Up @@ -2878,6 +3061,74 @@ public Boolean call(Integer args) {

assertEquals(-1, last);
}

public void testSingle() {
Observable<String> observable = toObservable("one");
assertEquals("one", observable.single());
}

@Test
public void testSingleDefault() {
Observable<String> observable = toObservable();
assertEquals("default", observable.singleOrDefault("default"));
}

@Test(expected = IllegalStateException.class)
public void testSingleDefaultWithMoreThanOne() {
Observable<String> observable = toObservable("one", "two", "three");
observable.singleOrDefault("default");
}

@Test
public void testSingleWithPredicateDefault() {
Observable<String> observable = toObservable("one", "two", "four");
assertEquals("four", observable.single(new Func1<String, Boolean>() {
@Override
public Boolean call(String s) {
return s.length() == 4;
}
}));
}

@Test(expected = IllegalStateException.class)
public void testSingleWrong() {
Observable<Integer> observable = toObservable(1, 2);
observable.single();
}

@Test(expected = IllegalStateException.class)
public void testSingleWrongPredicate() {
Observable<Integer> observable = toObservable(-1);
observable.single(new Func1<Integer, Boolean>() {
@Override
public Boolean call(Integer args) {
return args > 0;
}
});
}

@Test
public void testSingleDefaultPredicateMatchesNothing() {
Observable<String> observable = toObservable("one", "two");
String result = observable.singleOrDefault("default", new Func1<String, Boolean>() {
@Override
public Boolean call(String args) {
return args.length() == 4;
}
});
assertEquals("default", result);
}

@Test(expected = IllegalStateException.class)
public void testSingleDefaultPredicateMatchesMoreThanOne() {
Observable<String> observable = toObservable("one", "two");
String result = observable.singleOrDefault("default", new Func1<String, Boolean>() {
@Override
public Boolean call(String args) {
return args.length() == 3;
}
});
}

private static class TestException extends RuntimeException {

Expand Down
14 changes: 14 additions & 0 deletions rxjava-core/src/main/java/rx/util/functions/Functions.java
Original file line number Diff line number Diff line change
Expand Up @@ -550,4 +550,18 @@ public Void call(Object... args) {
};
}

@SuppressWarnings("unchecked")
public static <T> Func1<T, Boolean> alwaysTrue() {
return (Func1<T, Boolean>) AlwaysTrue.INSTANCE;
}

private enum AlwaysTrue implements Func1<Object, Boolean> {
INSTANCE;

@Override
public Boolean call(Object o) {
return true;
}
}

}