diff --git a/src/stream/stream/delay.rs b/src/stream/stream/delay.rs new file mode 100644 index 000000000..576879293 --- /dev/null +++ b/src/stream/stream/delay.rs @@ -0,0 +1,48 @@ +use std::future::Future; +use std::pin::Pin; +use std::time::Duration; + +use pin_project_lite::pin_project; + +use crate::stream::Stream; +use crate::task::{Context, Poll}; + +pin_project! { + #[doc(hidden)] + #[allow(missing_debug_implementations)] + pub struct Delay { + #[pin] + stream: S, + #[pin] + delay: futures_timer::Delay, + delay_done: bool, + } +} + +impl Delay { + pub(super) fn new(stream: S, dur: Duration) -> Self { + Delay { + stream, + delay: futures_timer::Delay::new(dur), + delay_done: false, + } + } +} + +impl Stream for Delay +where + S: Stream, +{ + type Item = S::Item; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + + if !*this.delay_done { + futures_core::ready!(this.delay.poll(cx)); + *this.delay_done = true; + } + + this.stream.poll_next(cx) + } +} diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index e68e6acda..4fefbad53 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -130,6 +130,7 @@ cfg_unstable! { pub use flat_map::FlatMap; pub use timeout::{TimeoutError, Timeout}; pub use throttle::Throttle; + pub use delay::Delay; mod count; mod merge; @@ -138,6 +139,7 @@ cfg_unstable! { mod partition; mod timeout; mod throttle; + mod delay; mod unzip; } @@ -573,6 +575,47 @@ extension_trait! { Enumerate::new(self) } + #[doc = r#" + Creates a stream that is delayed before it starts yielding items. + + # Examples + + ``` + # fn main() { async_std::task::block_on(async { + # + use async_std::prelude::*; + use async_std::stream; + use std::time::{Duration, Instant}; + + let start = Instant::now(); + let mut s = stream::from_iter(vec![0u8, 1, 2]).delay(Duration::from_millis(200)); + + assert_eq!(s.next().await, Some(0)); + // The first time will take more than 200ms due to delay. + assert!(start.elapsed().as_millis() >= 200); + + assert_eq!(s.next().await, Some(1)); + // There will be no delay after the first time. + assert!(start.elapsed().as_millis() <= 210); + + assert_eq!(s.next().await, Some(2)); + assert!(start.elapsed().as_millis() <= 210); + + assert_eq!(s.next().await, None); + assert!(start.elapsed().as_millis() <= 210); + # + # }) } + ``` + "#] + #[cfg(any(feature = "unstable", feature = "docs"))] + #[cfg_attr(feature = "docs", doc(cfg(unstable)))] + fn delay(self, dur: std::time::Duration) -> Delay + where + Self: Sized, + { + Delay::new(self, dur) + } + #[doc = r#" Takes a closure and creates a stream that calls that closure on every element of this stream.