diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 7190f97621..1683f12520 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -244,6 +244,16 @@ def class ObservableTests { def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3}) assertEquals("default", val) } + + public void testSingle1() { + def s = Observable.toObservable("one").single({ x -> x.length() == 3}) + assertEquals("one", s) + } + + @Test(expected = IllegalStateException.class) + public void testSingle2() { + Observable.toObservable("one", "two").single({ x -> x.length() == 3}) + } def class AsyncObservable implements Func1, Subscription> { diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3319f0b074..e8b4365d1a 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -408,6 +408,66 @@ public void call(Object args) { }); } + /** + * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. + * + * @return The single element in the observable sequence. + */ + public T single() { + return single(this); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Func1 predicate) { + return single(this, predicate); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Object predicate) { + return single(this, predicate); + } + + /** + * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. + * + * @param defaultValue default value for a sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue) { + return singleOrDefault(this, defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * @param defaultValue default value for a sequence. + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, Func1 predicate) { + return singleOrDefault(this, defaultValue, predicate); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param defaultValue default value for a sequence. + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, Object predicate) { + return singleOrDefault(this, defaultValue, predicate); + } + /** * Allow the {@link RxJavaErrorHandler} to receive the exception from onError. * @@ -1520,6 +1580,7 @@ public static Observable takeWhile(final Observable items, Func1 Observable takeWhile(final Observable items, Object predicate) { + @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return takeWhile(items, new Func1() { @@ -1542,6 +1603,7 @@ public static Observable takeWhileWithIndex(final Observable items, Fu } public static Observable takeWhileWithIndex(final Observable items, Object predicate) { + @SuppressWarnings("rawtypes") final FuncN _f = Functions.from(predicate); return create(OperationTake.takeWhileWithIndex(items, new Func2() { @@ -1648,6 +1710,127 @@ public Iterator iterator() { }; } + /** + * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence + */ + public static T single(Observable that) { + return singleOrDefault(that, false, null); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence that matches the predicate + */ + public static T single(Observable that, Func1 predicate) { + return single(that.filter(predicate)); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param that + * the source Observable + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + * @throws IllegalStateException + * if there is not exactly one element in the observable sequence that matches the predicate + */ + public static T single(Observable that, Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return single(that, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + + /** + * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable that, T defaultValue) { + return singleOrDefault(that, true, defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable that, T defaultValue, Func1 predicate) { + return singleOrDefault(that.filter(predicate), defaultValue); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param that + * the source Observable + * @param defaultValue + * default value for a sequence. + * @param predicate + * A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public static T singleOrDefault(Observable that, T defaultValue, Object predicate) { + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return singleOrDefault(that, defaultValue, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + + private static T singleOrDefault(Observable that, boolean hasDefault, T defaultVal) { + Iterator it = that.toIterable().iterator(); + + if (!it.hasNext()) { + if (hasDefault) { + return defaultVal; + } + throw new IllegalStateException("Expected single entry. Actually empty stream."); + } + + T result = it.next(); + + if (it.hasNext()) { + throw new IllegalStateException("Expected single entry. Actually more than one entry."); + } + + return result; + } + /** * Converts an Iterable sequence to an Observable sequence. * @@ -2878,6 +3061,74 @@ public Boolean call(Integer args) { assertEquals(-1, last); } + + public void testSingle() { + Observable observable = toObservable("one"); + assertEquals("one", observable.single()); + } + + @Test + public void testSingleDefault() { + Observable observable = toObservable(); + assertEquals("default", observable.singleOrDefault("default")); + } + + @Test(expected = IllegalStateException.class) + public void testSingleDefaultWithMoreThanOne() { + Observable observable = toObservable("one", "two", "three"); + observable.singleOrDefault("default"); + } + + @Test + public void testSingleWithPredicateDefault() { + Observable observable = toObservable("one", "two", "four"); + assertEquals("four", observable.single(new Func1() { + @Override + public Boolean call(String s) { + return s.length() == 4; + } + })); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrong() { + Observable observable = toObservable(1, 2); + observable.single(); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrongPredicate() { + Observable observable = toObservable(-1); + observable.single(new Func1() { + @Override + public Boolean call(Integer args) { + return args > 0; + } + }); + } + + @Test + public void testSingleDefaultPredicateMatchesNothing() { + Observable observable = toObservable("one", "two"); + String result = observable.singleOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() == 4; + } + }); + assertEquals("default", result); + } + + @Test(expected = IllegalStateException.class) + public void testSingleDefaultPredicateMatchesMoreThanOne() { + Observable observable = toObservable("one", "two"); + String result = observable.singleOrDefault("default", new Func1() { + @Override + public Boolean call(String args) { + return args.length() == 3; + } + }); + } private static class TestException extends RuntimeException { diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 6f6d03bcd6..3267e0f540 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -550,4 +550,18 @@ public Void call(Object... args) { }; } + @SuppressWarnings("unchecked") + public static Func1 alwaysTrue() { + return (Func1) AlwaysTrue.INSTANCE; + } + + private enum AlwaysTrue implements Func1 { + INSTANCE; + + @Override + public Boolean call(Object o) { + return true; + } + } + }