diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 054d81b63..7c5678133 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -30,19 +30,17 @@ mod filter_map; mod find; mod find_map; mod fold; -mod for_each; mod fuse; mod inspect; -mod map; mod min_by; mod next; mod nth; +mod partial_cmp; mod scan; mod skip; mod skip_while; mod step_by; mod take; -mod try_for_each; mod zip; use all::AllFuture; @@ -52,17 +50,15 @@ use filter_map::FilterMap; use find::FindFuture; use find_map::FindMapFuture; use fold::FoldFuture; -use for_each::ForEachFuture; use min_by::MinByFuture; use next::NextFuture; use nth::NthFuture; -use try_for_each::TryForEeachFuture; +use partial_cmp::PartialCmpFuture; pub use chain::Chain; pub use filter::Filter; pub use fuse::Fuse; pub use inspect::Inspect; -pub use map::Map; pub use scan::Scan; pub use skip::Skip; pub use skip_while::SkipWhile; @@ -344,37 +340,6 @@ extension_trait! { Enumerate::new(self) } - #[doc = r#" - Takes a closure and creates a stream that calls that closure on every element of this stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - - let s: VecDeque<_> = vec![1, 2, 3].into_iter().collect(); - let mut s = s.map(|x| 2 * x); - - assert_eq!(s.next().await, Some(2)); - assert_eq!(s.next().await, Some(4)); - assert_eq!(s.next().await, Some(6)); - assert_eq!(s.next().await, None); - - # - # }) } - ``` - "#] - fn map(self, f: F) -> Map - where - Self: Sized, - F: FnMut(Self::Item) -> B, - { - Map::new(self, f) - } - #[doc = r#" A combinator that does something with each element in the stream, passing the value on. @@ -791,41 +756,6 @@ extension_trait! { FoldFuture::new(self, init, f) } - #[doc = r#" - Call a closure on each element of the stream. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use async_std::prelude::*; - use std::collections::VecDeque; - use std::sync::mpsc::channel; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; - - let v: Vec<_> = rx.iter().collect(); - - assert_eq!(v, vec![1, 2, 3]); - # - # }) } - ``` - "#] - fn for_each( - self, - f: F, - ) -> impl Future [ForEachFuture] - where - Self: Sized, - F: FnMut(Self::Item), - { - ForEachFuture::new(self, f) - } - #[doc = r#" Tests if any element of the stream matches a predicate. @@ -997,51 +927,6 @@ extension_trait! { Skip::new(self, n) } - #[doc = r#" - Applies a falliable function to each element in a stream, stopping at first error and returning it. - - # Examples - - ``` - # fn main() { async_std::task::block_on(async { - # - use std::collections::VecDeque; - use std::sync::mpsc::channel; - use async_std::prelude::*; - - let (tx, rx) = channel(); - - let s: VecDeque = vec![1, 2, 3].into_iter().collect(); - let s = s.try_for_each(|v| { - if v % 2 == 1 { - tx.clone().send(v).unwrap(); - Ok(()) - } else { - Err("even") - } - }); - - let res = s.await; - drop(tx); - let values: Vec<_> = rx.iter().collect(); - - assert_eq!(values, vec![1]); - assert_eq!(res, Err("even")); - # - # }) } - ``` - "#] - fn try_for_each( - self, - f: F, - ) -> impl Future [TryForEeachFuture] - where - Self: Sized, - F: FnMut(Self::Item) -> Result<(), E>, - { - TryForEeachFuture::new(self, f) - } - #[doc = r#" 'Zips up' two streams into a single stream of pairs. @@ -1187,6 +1072,46 @@ extension_trait! { { Merge::new(self, other) } + + #[doc = r#" + Lexicographically compares the elements of this `Stream` with those + of another. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + use std::cmp::Ordering; + + let s1 = VecDeque::from(vec![1]); + let s2 = VecDeque::from(vec![1, 2]); + let s3 = VecDeque::from(vec![1, 2, 3]); + let s4 = VecDeque::from(vec![1, 2, 4]); + + assert_eq!(s1.clone().partial_cmp(s1.clone()).await, Some(Ordering::Equal)); + assert_eq!(s1.clone().partial_cmp(s2.clone()).await, Some(Ordering::Less)); + assert_eq!(s2.clone().partial_cmp(s1.clone()).await, Some(Ordering::Greater)); + assert_eq!(s3.clone().partial_cmp(s4.clone()).await, Some(Ordering::Less)); + assert_eq!(s4.clone().partial_cmp(s3.clone()).await, Some(Ordering::Greater)); + + # + # }) } + ``` + "#] + fn partial_cmp( + self, + other: S + ) -> impl Future> [PartialCmpFuture] + where + Self: Sized + Stream, + S: Stream, + ::Item: PartialOrd, + { + PartialCmpFuture::new(self, other) + } } impl Stream for Box { diff --git a/src/stream/stream/partial_cmp.rs b/src/stream/stream/partial_cmp.rs new file mode 100644 index 000000000..f7de5cd12 --- /dev/null +++ b/src/stream/stream/partial_cmp.rs @@ -0,0 +1,92 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::future::Future; +use crate::prelude::*; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Lexicographically compares the elements of this `Stream` with those +// of another. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct PartialCmpFuture { + l: Fuse, + r: Fuse, + l_cache: Option, + r_cache: Option, +} + +impl PartialCmpFuture { + pin_utils::unsafe_pinned!(l: Fuse); + pin_utils::unsafe_pinned!(r: Fuse); + pin_utils::unsafe_unpinned!(l_cache: Option); + pin_utils::unsafe_unpinned!(r_cache: Option); + + pub(super) fn new(l: L, r: R) -> Self { + PartialCmpFuture { + l: l.fuse(), + r: r.fuse(), + l_cache: None, + r_cache: None, + } + } +} + +impl Future for PartialCmpFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: PartialOrd, +{ + type Output = Option; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + // Short circuit logic + // Stream that completes earliest can be considered Less, etc + let l_complete = self.l.done && self.as_mut().l_cache.is_none(); + let r_complete = self.r.done && self.as_mut().r_cache.is_none(); + + if l_complete && r_complete { + return Poll::Ready(Some(Ordering::Equal)); + } else if l_complete { + return Poll::Ready(Some(Ordering::Less)); + } else if r_complete { + return Poll::Ready(Some(Ordering::Greater)); + } + + // Get next value if possible and necesary + if !self.l.done && self.as_mut().l_cache.is_none() { + let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); + if let Some(item) = l_next { + *self.as_mut().l_cache() = Some(item); + } + } + + if !self.r.done && self.as_mut().r_cache.is_none() { + let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); + if let Some(item) = r_next { + *self.as_mut().r_cache() = Some(item); + } + } + + // Compare if both values are available. + if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { + let l_value = self.as_mut().l_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); + let result = l_value.partial_cmp(&r_value); + + if let Some(Ordering::Equal) = result { + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; + *self.as_mut().r_cache() = None; + } else { + // Return non equal value + return Poll::Ready(result); + } + } + } + } +} \ No newline at end of file