From 86d45eeee13974247962ff30068eb00751cddc72 Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 4 Oct 2019 15:02:08 -0400 Subject: [PATCH 1/6] Adds cmp --- src/stream/stream/cmp.rs | 91 ++++++++++++++++++++++++++++++++++++++++ src/stream/stream/mod.rs | 47 +++++++++++++++++++++ 2 files changed, 138 insertions(+) create mode 100644 src/stream/stream/cmp.rs diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs new file mode 100644 index 000000000..d08aad012 --- /dev/null +++ b/src/stream/stream/cmp.rs @@ -0,0 +1,91 @@ +use std::cmp::Ordering; +use std::pin::Pin; + +use super::fuse::Fuse; +use crate::prelude::*; +use crate::future::Future; +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +// Lexicographically compares the elements of this `Stream` with those +// of another using `Ord`. +#[doc(hidden)] +#[allow(missing_debug_implementations)] +pub struct CmpFuture { + l: Fuse, + r: Fuse, + l_cache: Option, + r_cache: Option, +} + +impl CmpFuture { + pin_utils::unsafe_pinned!(l: Fuse); + pin_utils::unsafe_pinned!(r: Fuse); + pin_utils::unsafe_unpinned!(l_cache: Option); + pin_utils::unsafe_unpinned!(r_cache: Option); + + pub(super) fn new(l: L, r: R) -> Self { + CmpFuture { + l: l.fuse(), + r: r.fuse(), + l_cache: None, + r_cache: None, + } + } +} + +impl Future for CmpFuture +where + L: Stream + Sized, + R: Stream + Sized, + L::Item: Ord, +{ + type Output = Ordering; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + loop { + // Stream that completes earliest can be considered Less, etc + let l_complete = self.l.done && self.as_mut().l_cache.is_none(); + let r_complete = self.r.done && self.as_mut().r_cache.is_none(); + + if l_complete && r_complete { + return Poll::Ready(Ordering::Equal) + } else if l_complete { + return Poll::Ready(Ordering::Less) + } else if r_complete { + return Poll::Ready(Ordering::Greater) + } + + // Get next value if possible and necesary + if !self.l.done && self.as_mut().l_cache.is_none() { + let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); + if let Some(item) = l_next { + *self.as_mut().l_cache() = Some(item); + } + } + + if !self.r.done && self.as_mut().r_cache.is_none() { + let r_next = futures_core::ready!(self.as_mut().r().poll_next(cx)); + if let Some(item) = r_next { + *self.as_mut().r_cache() = Some(item); + } + } + + // Compare if both values are available. + if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { + let l_value = self.as_mut().l_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); + let result = l_value.cmp(&r_value); + + if let Ordering::Equal = result { + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; + *self.as_mut().r_cache() = None; + } else { + // Return non equal value + return Poll::Ready(result); + } + } + } + } +} \ No newline at end of file diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 0e563f6d3..9c5c6c571 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -24,6 +24,7 @@ mod all; mod any; mod chain; +mod cmp; mod enumerate; mod filter; mod filter_map; @@ -49,6 +50,7 @@ mod zip; use all::AllFuture; use any::AnyFuture; +use cmp::CmpFuture; use enumerate::Enumerate; use filter_map::FilterMap; use find::FindFuture; @@ -1186,6 +1188,7 @@ extension_trait! { } #[doc = r#" +<<<<<<< HEAD Combines multiple streams into a single stream of all their outputs. Items are yielded as soon as they're received, and the stream continues yield until both @@ -1231,6 +1234,7 @@ extension_trait! { # use async_std::prelude::*; use std::collections::VecDeque; + use std::cmp::Ordering; let s1 = VecDeque::from(vec![1]); let s2 = VecDeque::from(vec![1, 2]); @@ -1256,6 +1260,49 @@ extension_trait! { { PartialCmpFuture::new(self, other) } + + #[doc = r#" + Lexicographically compares the elements of this `Stream` with those + of another using 'Ord'. + + # Examples + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use std::collections::VecDeque; + + use std::cmp::Ordering; + let result_equal = vec![1].into_iter().collect::>() + .cmp(vec![1].into_iter().collect::>()).await; + let result_less_count = vec![1].into_iter().collect::>() + .cmp(vec![1, 2].into_iter().collect::>()).await; + let result_greater_count = vec![1, 2].into_iter().collect::>() + .cmp(vec![1].into_iter().collect::>()).await; + let result_less_vals = vec![1, 2, 3].into_iter().collect::>() + .cmp(vec![1, 2, 4].into_iter().collect::>()).await; + let result_greater_vals = vec![1, 2, 4].into_iter().collect::>() + .cmp(vec![1, 2, 3].into_iter().collect::>()).await; + assert_eq!(result_equal, Ordering::Equal); + assert_eq!(result_less_count, Ordering::Less); + assert_eq!(result_greater_count, Ordering::Greater); + assert_eq!(result_less_vals, Ordering::Less); + assert_eq!(result_greater_vals, Ordering::Greater); + # + # }) } + ``` + "#] + fn cmp( + self, + other: S + ) -> impl Future + '_ [CmpFuture] + where + Self: Sized + Stream, + S: Stream, + Self::Item: Ord, + { + CmpFuture::new(self, other) + } } impl Stream for Box { From 2f38dc236b8fb335919915bcf27d6b11bfd2669b Mon Sep 17 00:00:00 2001 From: arheard Date: Fri, 4 Oct 2019 16:05:32 -0400 Subject: [PATCH 2/6] Fixes formatting --- src/stream/stream/cmp.rs | 36 ++++++++++++++++++------------------ src/stream/stream/mod.rs | 5 ++--- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/src/stream/stream/cmp.rs b/src/stream/stream/cmp.rs index d08aad012..fc7161ad8 100644 --- a/src/stream/stream/cmp.rs +++ b/src/stream/stream/cmp.rs @@ -2,8 +2,8 @@ use std::cmp::Ordering; use std::pin::Pin; use super::fuse::Fuse; -use crate::prelude::*; use crate::future::Future; +use crate::prelude::*; use crate::stream::Stream; use crate::task::{Context, Poll}; @@ -15,7 +15,7 @@ pub struct CmpFuture { l: Fuse, r: Fuse, l_cache: Option, - r_cache: Option, + r_cache: Option, } impl CmpFuture { @@ -28,35 +28,35 @@ impl CmpFuture { CmpFuture { l: l.fuse(), r: r.fuse(), - l_cache: None, - r_cache: None, + l_cache: None, + r_cache: None, } } } -impl Future for CmpFuture -where +impl Future for CmpFuture +where L: Stream + Sized, R: Stream + Sized, - L::Item: Ord, + L::Item: Ord, { type Output = Ordering; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { loop { // Stream that completes earliest can be considered Less, etc let l_complete = self.l.done && self.as_mut().l_cache.is_none(); let r_complete = self.r.done && self.as_mut().r_cache.is_none(); if l_complete && r_complete { - return Poll::Ready(Ordering::Equal) + return Poll::Ready(Ordering::Equal); } else if l_complete { - return Poll::Ready(Ordering::Less) + return Poll::Ready(Ordering::Less); } else if r_complete { - return Poll::Ready(Ordering::Greater) + return Poll::Ready(Ordering::Greater); } - // Get next value if possible and necesary + // Get next value if possible and necesary if !self.l.done && self.as_mut().l_cache.is_none() { let l_next = futures_core::ready!(self.as_mut().l().poll_next(cx)); if let Some(item) = l_next { @@ -74,18 +74,18 @@ where // Compare if both values are available. if self.as_mut().l_cache.is_some() && self.as_mut().r_cache.is_some() { let l_value = self.as_mut().l_cache().take().unwrap(); - let r_value = self.as_mut().r_cache().take().unwrap(); + let r_value = self.as_mut().r_cache().take().unwrap(); let result = l_value.cmp(&r_value); if let Ordering::Equal = result { - // Reset cache to prepare for next comparison - *self.as_mut().l_cache() = None; + // Reset cache to prepare for next comparison + *self.as_mut().l_cache() = None; *self.as_mut().r_cache() = None; } else { - // Return non equal value + // Return non equal value return Poll::Ready(result); - } + } } } } -} \ No newline at end of file +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 9c5c6c571..270edf831 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1188,7 +1188,6 @@ extension_trait! { } #[doc = r#" -<<<<<<< HEAD Combines multiple streams into a single stream of all their outputs. Items are yielded as soon as they're received, and the stream continues yield until both @@ -1295,10 +1294,10 @@ extension_trait! { fn cmp( self, other: S - ) -> impl Future + '_ [CmpFuture] + ) -> impl Future + '_ [CmpFuture] where Self: Sized + Stream, - S: Stream, + S: Stream, Self::Item: Ord, { CmpFuture::new(self, other) From ae5a3e028ca2dac2659f19779430ec21c50235de Mon Sep 17 00:00:00 2001 From: arheard Date: Thu, 10 Oct 2019 13:06:01 -0400 Subject: [PATCH 3/6] cleans up examples --- src/stream/stream/mod.rs | 26 +++++++++++--------------- 1 file changed, 11 insertions(+), 15 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 270edf831..cdd674656 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1272,21 +1272,17 @@ extension_trait! { use std::collections::VecDeque; use std::cmp::Ordering; - let result_equal = vec![1].into_iter().collect::>() - .cmp(vec![1].into_iter().collect::>()).await; - let result_less_count = vec![1].into_iter().collect::>() - .cmp(vec![1, 2].into_iter().collect::>()).await; - let result_greater_count = vec![1, 2].into_iter().collect::>() - .cmp(vec![1].into_iter().collect::>()).await; - let result_less_vals = vec![1, 2, 3].into_iter().collect::>() - .cmp(vec![1, 2, 4].into_iter().collect::>()).await; - let result_greater_vals = vec![1, 2, 4].into_iter().collect::>() - .cmp(vec![1, 2, 3].into_iter().collect::>()).await; - assert_eq!(result_equal, Ordering::Equal); - assert_eq!(result_less_count, Ordering::Less); - assert_eq!(result_greater_count, Ordering::Greater); - assert_eq!(result_less_vals, Ordering::Less); - assert_eq!(result_greater_vals, Ordering::Greater); + let s1 = VecDeque::from(vec![1]); + let s2 = VecDeque::from(vec![1, 2]); + let s3 = VecDeque::from(vec![1, 2, 3]); + let s4 = VecDeque::from(vec![1, 2, 4]); + + assert_eq!(s1.clone().cmp(s1.clone()).await, Ordering::Equal); + assert_eq!(s1.clone().cmp(s2.clone()).await, Ordering::Less); + assert_eq!(s2.clone().cmp(s1.clone()).await, Ordering::Greater); + assert_eq!(s3.clone().cmp(s4.clone()).await, Ordering::Less); + assert_eq!(s4.clone().cmp(s3.clone()).await, Ordering::Greater); + # # }) } ``` From 507307ff4a9ccee71f89a77fce4a2149b97b82a5 Mon Sep 17 00:00:00 2001 From: arheard Date: Thu, 10 Oct 2019 13:27:53 -0400 Subject: [PATCH 4/6] attempts to fix rustdoc issue --- src/stream/stream/mod.rs | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index cdd674656..1cd47cef3 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1290,11 +1290,10 @@ extension_trait! { fn cmp( self, other: S - ) -> impl Future + '_ [CmpFuture] + ) -> impl Future [CmpFuture] where Self: Sized + Stream, - S: Stream, - Self::Item: Ord, + S: Stream, { CmpFuture::new(self, other) } From 42a8a2e667e4c94a9b453d1afdd2c6b92bdaa561 Mon Sep 17 00:00:00 2001 From: arheard Date: Thu, 10 Oct 2019 13:42:42 -0400 Subject: [PATCH 5/6] formats with cargo fmt --- src/stream/stream/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 1cd47cef3..6f2192c6d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1293,7 +1293,7 @@ extension_trait! { ) -> impl Future [CmpFuture] where Self: Sized + Stream, - S: Stream, + S: Stream, { CmpFuture::new(self, other) } From cc049a02553cd65128354ef984520a22b6452171 Mon Sep 17 00:00:00 2001 From: assemblaj Date: Mon, 14 Oct 2019 13:56:30 -0400 Subject: [PATCH 6/6] Adds proper trait bounds for cmp --- src/stream/stream/mod.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index 6f2192c6d..47900d6c2 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -1294,6 +1294,7 @@ extension_trait! { where Self: Sized + Stream, S: Stream, + ::Item: Ord { CmpFuture::new(self, other) }