From e5f6c917b6f7e9d7896e0583235e3e6ae9c793c3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Feb 2013 16:43:33 +0200 Subject: [PATCH 1/9] Next implemented --- rxjava-core/src/main/java/rx/Observable.java | 134 +++++++++++++++++-- 1 file changed, 121 insertions(+), 13 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 5cf8d82ec2..e8bd626a85 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -15,19 +15,15 @@ */ package rx; +import static org.junit.Assert.assertEquals; import static org.mockito.Matchers.*; import static org.mockito.Mockito.*; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.Future; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; +import java.util.*; +import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicReference; +import org.junit.Assert; import org.junit.Before; import org.junit.Test; import org.mockito.Mock; @@ -334,6 +330,73 @@ public void onNext(T args) { }); } + public Iterable next() { + final BlockingQueue> notifications = new LinkedBlockingQueue>(); + + materialize().subscribe(new Observer>() { + @Override + public void onCompleted() { + // ignore + } + + @Override + public void onError(Exception e) { + // ignore + } + + @Override + public void onNext(Notification args) { + notifications.offer(args); + } + }); + + final Iterator it = new Iterator() { + private Notification buf; + + @Override + public boolean hasNext() { + if (buf == null) { + buf = take(); + } + return !buf.isOnCompleted(); + } + + @Override + public T next() { + if (buf == null) { + buf = take(); + } + if (buf.isOnError()) { + throw new RuntimeException(buf.getException()); + } + + T result = buf.getValue(); + buf = null; + return result; + } + + private Notification take() { + try { + return notifications.take(); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Read-only iterator"); + } + }; + + return new Iterable() { + @Override + public Iterator iterator() { + return it; + } + }; + } + /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

@@ -2560,11 +2623,11 @@ public void testCreate() { Observable observable = create(new Func1, Subscription>() { @Override - public Subscription call(Observer Observer) { - Observer.onNext("one"); - Observer.onNext("two"); - Observer.onNext("three"); - Observer.onCompleted(); + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onNext("two"); + observer.onNext("three"); + observer.onCompleted(); return Observable.noOpSubscription(); } @@ -2623,6 +2686,51 @@ public void testSequenceEqual() { verify(result, times(1)).onNext(false); } + @Test + public void testNext() { + Observable obs = toObservable("one", "two", "three"); + + Iterable next = obs.next(); + Iterator it = next.iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("two", it.next()); + + assertEquals(true, it.hasNext()); + assertEquals("three", it.next()); + + assertEquals(false, it.hasNext()); + + } + + @Test(expected = RuntimeException.class) + public void testNextWithException() { + Observable obs = create(new Func1, Subscription>() { + + @Override + public Subscription call(Observer observer) { + observer.onNext("one"); + observer.onError(new IllegalStateException()); + return Observable.noOpSubscription(); + } + }); + + Iterable next = obs.next(); + Iterator it = next.iterator(); + + assertEquals(true, it.hasNext()); + assertEquals("one", it.next()); + + assertEquals(true, it.hasNext()); + it.next(); + + } + + + } } From 05ba6f020684e08220bb43c676691f91835196d0 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Feb 2013 17:45:42 +0200 Subject: [PATCH 2/9] Proper exception handling --- rxjava-core/src/main/java/rx/Observable.java | 7 +++-- .../src/main/java/rx/util/Exceptions.java | 31 +++++++++++++++++++ 2 files changed, 35 insertions(+), 3 deletions(-) create mode 100644 rxjava-core/src/main/java/rx/util/Exceptions.java diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e8bd626a85..3b325ba027 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -54,6 +54,7 @@ import rx.plugins.RxJavaPlugins; import rx.util.AtomicObservableSubscription; import rx.util.AtomicObserver; +import rx.util.Exceptions; import rx.util.Range; import rx.util.functions.Action0; import rx.util.functions.Action1; @@ -367,7 +368,7 @@ public T next() { buf = take(); } if (buf.isOnError()) { - throw new RuntimeException(buf.getException()); + throw Exceptions.propagate(buf.getException()); } T result = buf.getValue(); @@ -379,7 +380,7 @@ private Notification take() { try { return notifications.take(); } catch (InterruptedException e) { - throw new RuntimeException(e); + throw Exceptions.propagate(e); } } @@ -2706,7 +2707,7 @@ public void testNext() { } - @Test(expected = RuntimeException.class) + @Test(expected = IllegalStateException.class) public void testNextWithException() { Observable obs = create(new Func1, Subscription>() { diff --git a/rxjava-core/src/main/java/rx/util/Exceptions.java b/rxjava-core/src/main/java/rx/util/Exceptions.java new file mode 100644 index 0000000000..9d72b86f10 --- /dev/null +++ b/rxjava-core/src/main/java/rx/util/Exceptions.java @@ -0,0 +1,31 @@ +/** + * Copyright 2013 Netflix, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package rx.util; + +public class Exceptions { + private Exceptions() { + + } + + public static RuntimeException propagate(Throwable t) { + if (t instanceof RuntimeException) { + throw (RuntimeException) t; + } else { + throw new RuntimeException(t); + } + } + +} From 289810a3265d7bddeb5e80b0b79317440b8b9ffc Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Feb 2013 23:02:40 +0200 Subject: [PATCH 3/9] Added Javadoc --- rxjava-core/src/main/java/rx/Observable.java | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 3b325ba027..d93fb54eb6 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -331,6 +331,12 @@ public void onNext(T args) { }); } + /** + * Samples the next value (blocking without buffering) from in an observable sequence. + * + * @return The enumerable sequence that blocks upon each iteration until the next element + * in the observable source sequence becomes available. + */ public Iterable next() { final BlockingQueue> notifications = new LinkedBlockingQueue>(); From b77b91f72afb343b3ca6ccb5a1ce7e118f0eab54 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Tue, 19 Feb 2013 23:18:36 +0200 Subject: [PATCH 4/9] Implemented single/singleDefault --- rxjava-core/src/main/java/rx/Observable.java | 95 +++++++++++++++++++ .../java/rx/util/functions/Functions.java | 17 ++++ 2 files changed, 112 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index d93fb54eb6..419425936d 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -404,6 +404,71 @@ public Iterator iterator() { }; } + /** + * Returns the only element of an observable sequence and throws an exception if there is not exactly one element in the observable sequence. + * + * @return The single element in the observable sequence. + */ + public T single() { + return single(Functions.alwaysTrue()); + } + + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Func1 predicate) { + return singleOrDefault(false, null, predicate); + } + + /** + * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. + * + * @param defaultValue default value for a sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue) { + return singleOrDefault(defaultValue, Functions.alwaysTrue()); + } + + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * @param defaultValue default value for a sequence. + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, Func1 predicate) { + return singleOrDefault(true, defaultValue, predicate); + } + + + private T singleOrDefault(boolean hasDefault, T defaultVal, Func1 predicate) { + Iterator it = next().iterator(); + + if (!it.hasNext()) { + if (hasDefault) { + return defaultVal; + } else { + throw new IllegalStateException("Expected single entry. Actually empty stream."); + } + } + + T result = it.next(); + + if (it.hasNext()) { + throw new IllegalStateException("Expected single entry. Actually more than one entry."); + } + + if (!predicate.call(result)) { + throw new IllegalStateException("Single value should match the predicate"); + } + + return result; + } + + /** * Invokes an action for each element in the observable sequence, and blocks until the sequence is terminated. *

@@ -2736,6 +2801,36 @@ public Subscription call(Observer observer) { } + @Test + public void testSingle() { + Observable observable = toObservable("one"); + assertEquals("one", observable.single()); + } + + @Test + public void testSingleDefault() { + Observable observable = toObservable(); + assertEquals("default", observable.singleOrDefault("default")); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrong() { + Observable observable = toObservable(1, 2); + observable.single(); + } + + @Test(expected = IllegalStateException.class) + public void testSingleWrongPredicate() { + Observable observable = toObservable(-1); + observable.single(new Func1() { + @Override + public Boolean call(Integer args) { + return args > 0; + } + }); + } + + } diff --git a/rxjava-core/src/main/java/rx/util/functions/Functions.java b/rxjava-core/src/main/java/rx/util/functions/Functions.java index 6f6d03bcd6..5e3f33bb0a 100644 --- a/rxjava-core/src/main/java/rx/util/functions/Functions.java +++ b/rxjava-core/src/main/java/rx/util/functions/Functions.java @@ -550,4 +550,21 @@ public Void call(Object... args) { }; } + + @SuppressWarnings("unchecked") + public static Func1 alwaysTrue() { + return (Func1) AlwaysTrue.INSTANCE; + } + + + private enum AlwaysTrue implements Func1 { + INSTANCE; + + @Override + public Boolean call(Object o) { + return true; + } + } + + } From ad9ae998d438cc623a09a193f540f9746d47290a Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Wed, 20 Feb 2013 18:43:31 +0200 Subject: [PATCH 5/9] Dynamic languages support for single/singleDefault --- .../rx/lang/groovy/ObservableTests.groovy | 12 +++++++ rxjava-core/src/main/java/rx/Observable.java | 35 +++++++++++++++++++ 2 files changed, 47 insertions(+) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 8d4d4b656a..00b5884ed6 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -223,6 +223,18 @@ def class ObservableTests { verify(a, times(1)).received(3); } + @Test + public void testSingle1() { + def s = Observable.toObservable("one").single({ x -> x.length() == 3}) + assertEquals("one", s) + } + + @Test(expected = IllegalStateException.class) + public void testSingle2() { + Observable.toObservable("one", "two").single({ x -> x.length() == 3}) + } + + @Test public void testForEachWithError() { try { diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 419425936d..6dbe892267 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -423,6 +423,23 @@ public T single(Func1 predicate) { return singleOrDefault(false, null, predicate); } + /** + * Returns the only element of an observable sequence that matches the predicate and throws an exception if there is not exactly one element in the observable sequence. + * + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence. + */ + public T single(Object predicate) { + final FuncN _f = Functions.from(predicate); + + return singleOrDefault(false, null, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + /** * Returns the only element of an observable sequence, or a default value if the observable sequence is empty. * @@ -443,6 +460,24 @@ public T singleOrDefault(T defaultValue, Func1 predicate) { return singleOrDefault(true, defaultValue, predicate); } + /** + * Returns the only element of an observable sequence that matches the predicate, or a default value if no value is found. + * + * @param defaultValue default value for a sequence. + * @param predicate A predicate function to evaluate for elements in the sequence. + * @return The single element in the observable sequence, or a default value if no value is found. + */ + public T singleOrDefault(T defaultValue, Object predicate) { + final FuncN _f = Functions.from(predicate); + + return singleOrDefault(true, defaultValue, new Func1() { + @Override + public Boolean call(T t) { + return (Boolean) _f.call(t); + } + }); + } + private T singleOrDefault(boolean hasDefault, T defaultVal, Func1 predicate) { Iterator it = next().iterator(); From 6a9abe188c1cf0b42ad40084094b7ba56f2277e3 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Thu, 21 Feb 2013 00:14:46 +0200 Subject: [PATCH 6/9] Implemented forEach using next method --- rxjava-core/src/main/java/rx/Observable.java | 41 ++------------------ 1 file changed, 3 insertions(+), 38 deletions(-) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 6dbe892267..e02eca2af8 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -517,47 +517,12 @@ private T singleOrDefault(boolean hasDefault, T defaultVal, Func1 pr * if error occurs */ public void forEach(final Action1 onNext) { - final CountDownLatch latch = new CountDownLatch(1); - final AtomicReference exceptionFromOnError = new AtomicReference(); + Iterable next = next(); - subscribe(new Observer() { - public void onCompleted() { - latch.countDown(); - } - - public void onError(Exception e) { - /* - * If we receive an onError event we set the reference on the outer thread - * so we can git it and throw after the latch.await(). - * - * We do this instead of throwing directly since this may be on a different thread and the latch is still waiting. - */ - exceptionFromOnError.set(e); - latch.countDown(); - } - - public void onNext(T args) { - onNext.call(args); - } - }); - // block until the subscription completes and then return - try { - latch.await(); - } catch (InterruptedException e) { - // set the interrupted flag again so callers can still get it - // for more information see https://github.com/Netflix/RxJava/pull/147#issuecomment-13624780 - Thread.currentThread().interrupt(); - // using Runtime so it is not checked - throw new RuntimeException("Interrupted while waiting for subscription to complete.", e); + for (T val : next) { + onNext.call(val); } - if (exceptionFromOnError.get() != null) { - if (exceptionFromOnError.get() instanceof RuntimeException) { - throw (RuntimeException) exceptionFromOnError.get(); - } else { - throw new RuntimeException(exceptionFromOnError.get()); - } - } } @SuppressWarnings({ "rawtypes", "unchecked" }) From 700b03924cc34692e5301ebe8867b1550d2d1976 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Sat, 23 Feb 2013 12:12:59 +0200 Subject: [PATCH 7/9] Implemented lastOrDefault --- rxjava-core/src/main/java/rx/Observable.java | 60 ++++++++++++++++++++ 1 file changed, 60 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index e02eca2af8..40b4117bd0 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -824,6 +824,45 @@ public static Observable last(final Observable that) { return _create(OperationLast.last(that)); } + /** + * Returns the last element of an observable sequence, or a default value if no value is found. + * @param source the source observable. + * @param defaultValue a default value that would be returned if observable is empty. + * @param the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue) { + return lastOrDefault(source, defaultValue, Functions.alwaysTrue()); + } + + + /** + * Returns the last element of an observable sequence that matches the predicate, or a default value if no value is found. + * @param source the source observable. + * @param defaultValue a default value that would be returned if observable is empty. + * @param predicate a predicate function to evaluate for elements in the sequence. + * @param the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue, Func1 predicate) { + boolean found = false; + T result = null; + for (T value : source.next()) { + found = true; + result = value; + } + + if (!found) { + return defaultValue; + } + + if (!predicate.call(result)) { + throw new IllegalStateException("Last value should match the predicate"); + } + + return result; + } + /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. @@ -2096,6 +2135,27 @@ public Observable last() { return last(this); } + /** + * Returns the last element, or a default value if no value is found. + * + * @param defaultValue a default value that would be returned if observable is empty. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue) { + return lastOrDefault(this, defaultValue); + } + + /** + * Returns the last element that matches the predicate, or a default value if no value is found. + * + * @param defaultValue a default value that would be returned if observable is empty. + * @param predicate a predicate function to evaluate for elements in the sequence. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue, Func1 predicate) { + return lastOrDefault(this, defaultValue, predicate); + } + /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence. From 8a715c3cd8e00b7edce50cdfc603198a7d7d8af4 Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Sat, 23 Feb 2013 12:16:45 +0200 Subject: [PATCH 8/9] Unit tests added --- rxjava-core/src/main/java/rx/Observable.java | 36 ++++++++++++++++++++ 1 file changed, 36 insertions(+) diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index 40b4117bd0..dd99e1a19b 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -2891,6 +2891,42 @@ public Boolean call(Integer args) { } + @Test + public void testLastOrDefault1() { + Observable observable = toObservable("one", "two", "three"); + assertEquals("three", observable.lastOrDefault("default")); + } + + @Test + public void testLastOrDefault2() { + Observable observable = toObservable(); + assertEquals("default", observable.lastOrDefault("default")); + } + + @Test(expected = IllegalStateException.class) + public void testLastOrDefaultWrongPredicate() { + Observable observable = toObservable(1, 0, -1); + observable.lastOrDefault(0, new Func1() { + @Override + public Boolean call(Integer args) { + return args >= 0; + } + }); + } + + @Test + public void testLastOrDefaultWithPredicate() { + Observable observable = toObservable(1, 0, -1); + int last = observable.lastOrDefault(0, new Func1() { + @Override + public Boolean call(Integer args) { + return args < 0; + } + }); + + assertEquals(-1, last); + } + } From 6cd26f785ffdd4672ed40d722f5b81c559e93b5f Mon Sep 17 00:00:00 2001 From: Mairbek Khadikov Date: Sat, 23 Feb 2013 12:26:13 +0200 Subject: [PATCH 9/9] Dynamic languages support for lastDefault --- .../rx/lang/groovy/ObservableTests.groovy | 11 ++++++ rxjava-core/src/main/java/rx/Observable.java | 34 ++++++++++++++++++- 2 files changed, 44 insertions(+), 1 deletion(-) diff --git a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy index 00b5884ed6..5502e672d3 100644 --- a/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy +++ b/language-adaptors/rxjava-groovy/src/test/groovy/rx/lang/groovy/ObservableTests.groovy @@ -234,6 +234,17 @@ def class ObservableTests { Observable.toObservable("one", "two").single({ x -> x.length() == 3}) } + @Test + public void testLastOrDefault() { + def val = Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() == 3}) + assertEquals("two", val) + } + + @Test(expected = IllegalStateException.class) + public void testLastOrDefault2() { + Observable.toObservable("one", "two").lastOrDefault("default", { x -> x.length() > 3}) + } + @Test public void testForEachWithError() { diff --git a/rxjava-core/src/main/java/rx/Observable.java b/rxjava-core/src/main/java/rx/Observable.java index dd99e1a19b..35b43eaad4 100644 --- a/rxjava-core/src/main/java/rx/Observable.java +++ b/rxjava-core/src/main/java/rx/Observable.java @@ -863,12 +863,33 @@ public static T lastOrDefault(Observable source, T defaultValue, Func1 the type of source. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public static T lastOrDefault(Observable source, T defaultValue, Object predicate){ + @SuppressWarnings("rawtypes") + final FuncN _f = Functions.from(predicate); + + return lastOrDefault(source, defaultValue, new Func1() { + @Override + public Boolean call(T args) { + return (Boolean) _f.call(args); + } + }); + } + /** * Applies a function of your choosing to every notification emitted by an Observable, and returns * this transformation as a new Observable sequence. *

* - * + * * @param sequence * the source Observable * @param func @@ -2156,6 +2177,17 @@ public T lastOrDefault(T defaultValue, Func1 predicate) { return lastOrDefault(this, defaultValue, predicate); } + /** + * Returns the last element that matches the predicate, or a default value if no value is found. + * + * @param defaultValue a default value that would be returned if observable is empty. + * @param predicate a predicate function to evaluate for elements in the sequence. + * @return the last element of an observable sequence that matches the predicate, or a default value if no value is found. + */ + public T lastOrDefault(T defaultValue, Object predicate) { + return lastOrDefault(this, defaultValue, predicate); + } + /** * Applies a function of your choosing to every item emitted by an Observable, and returns this * transformation as a new Observable sequence.