diff --git a/src/stream/stream/lt.rs b/src/stream/stream/lt.rs new file mode 100644 index 000000000..b774d7b43 --- /dev/null +++ b/src/stream/stream/lt.rs @@ -0,0 +1,47 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::partial_cmp::PartialCmpFuture; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Determines if the elements of this `Stream` are lexicographically +// less than those of another. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct LtFuture { + partial_cmp: PartialCmpFuture, +} + +impl LtFuture +where + L::Item: PartialOrd, +{ + pin_utils::unsafe_pinned!(partial_cmp: PartialCmpFuture); + + pub(super) fn new(l: L, r: R) -> Self { + LtFuture { + partial_cmp: l.partial_cmp(r), + } + } +} + +impl Future for LtFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialOrd, +{ + type Output = bool; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let result = futures_core::ready!(self.as_mut().partial_cmp().poll(cx)); + + match result { + Some(Ordering::Less) => Poll::Ready(true), + _ => Poll::Ready(false), + } + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 8849605ca..7963a8f67 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,10 +30,9 @@ mod filter_map; mod find; mod find_map; mod fold; -mod for_each; mod fuse; mod inspect; -mod map; +mod lt; mod min_by; mod next; mod nth; @@ -42,7 +41,6 @@ mod skip; mod skip_while; mod step_by; mod take; -mod try_for_each; mod zip; use all::AllFuture; @@ -52,17 +50,15 @@ use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; -use for_each::ForEachFuture; +use lt::LtFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; -use try_for_each::TryForEeachFuture; pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; -pub use map::Map; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -340,37 +336,6 @@ extension_trait! { Enumerate::new(self) } - #[doc = r#" - Takes a closure and creates a stream that calls that closure on every element of this stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - - let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); - let mut s = s.map(|x| 2 * x); - - assert_eq!(s.next().await, Some(2)); - assert_eq!(s.next().await, Some(4)); - assert_eq!(s.next().await, Some(6)); - assert_eq!(s.next().await, None); - - # - # }) } - ``` - "#] - fn map(self, f: F) -> Map - where - Self: Sized, - F: FnMut(Self::Item) -> B, - { - Map::new(self, f) - } - #[doc = r#" A combinator that does something with each element in the stream, passing the value on. @@ -787,41 +752,6 @@ extension_trait! { FoldFuture::new(self, init, f) } - #[doc = r#" - Call a closure on each element of the stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - use std::sync::mpsc::channel; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; - - let v: Vec<_> = rx.iter().collect(); - - assert_eq!(v, vec![1, 2, 3]); - # - # }) } - ``` - "#] - fn for_each( - self, - f: F, - ) -> impl Future [ForEachFuture] - where - Self: Sized, - F: FnMut(Self::Item), - { - ForEachFuture::new(self, f) - } - #[doc = r#" Tests if any element of the stream matches a predicate. @@ -993,51 +923,6 @@ extension_trait! { Skip::new(self, n) } - #[doc = r#" - Applies a falliable function to each element in a stream, stopping at first error and returning it. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use std::collections::VecDeque; - use std::sync::mpsc::channel; - use async_std::prelude::*; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let s = s.try_for_each(|v| { - if v % 2 == 1 { - tx.clone().send(v).unwrap(); - Ok(()) - } else { - Err("even") - } - }); - - let res = s.await; - drop(tx); - let values: Vec<_> = rx.iter().collect(); - - assert_eq!(values, vec![1]); - assert_eq!(res, Err("even")); - # - # }) } - ``` - "#] - fn try_for_each( - self, - f: F, - ) -> impl Future [TryForEeachFuture] - where - Self: Sized, - F: FnMut(Self::Item) -> Result<(), E>, - { - TryForEeachFuture::new(self, f) - } - #[doc = r#" 'Zips up' two streams into a single stream of pairs. @@ -1147,6 +1032,44 @@ extension_trait! { { FromStream::from_stream(self) } + + #[doc = r#" + Determines if the elements of this `Stream` are lexicographically + less than those of another. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + let single = VecDeque::from(vec![1]); + let single_gt = VecDeque::from(vec![10]); + let multi = VecDeque::from(vec![1,2]); + let multi_gt = VecDeque::from(vec![1,5]); + + assert_eq!(single.clone().lt(single.clone()).await, false); + assert_eq!(single.clone().lt(single_gt.clone()).await, true); + assert_eq!(multi.clone().lt(single_gt.clone()).await, true); + assert_eq!(multi_gt.clone().lt(multi.clone()).await, false); + + # + # }) } + ``` + "#] + fn lt( + self, + other: S + ) -> impl Future + '_ [LtFuture] + where + Self: Sized + Stream, + S: Stream, + Self::Item: PartialOrd, + { + LtFuture::new(self, other) + } + } impl Stream for Box {