diff --git a/src/collections/binary_heap/extend.rs b/src/collections/binary_heap/extend.rs index 439bf139e..0353c56bf 100644 --- a/src/collections/binary_heap/extend.rs +++ b/src/collections/binary_heap/extend.rs @@ -13,6 +13,9 @@ impl stream::Extend for BinaryHeap { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push(item))) + Box::pin(stream.for_each(move |item| { + self.push(item); + async {} + })) } } diff --git a/src/collections/btree_map/extend.rs b/src/collections/btree_map/extend.rs index 19d306ffe..3080e1d72 100644 --- a/src/collections/btree_map/extend.rs +++ b/src/collections/btree_map/extend.rs @@ -11,6 +11,7 @@ impl stream::Extend<(K, V)> for BTreeMap { ) -> Pin + 'a>> { Box::pin(stream.into_stream().for_each(move |(k, v)| { self.insert(k, v); + async {} })) } } diff --git a/src/collections/btree_set/extend.rs b/src/collections/btree_set/extend.rs index 422640b15..29219b562 100644 --- a/src/collections/btree_set/extend.rs +++ b/src/collections/btree_set/extend.rs @@ -11,6 +11,7 @@ impl stream::Extend for BTreeSet { ) -> Pin + 'a>> { Box::pin(stream.into_stream().for_each(move |item| { self.insert(item); + async {} })) } } diff --git a/src/collections/hash_map/extend.rs b/src/collections/hash_map/extend.rs index 0f4ce0c6e..8c0b29721 100644 --- a/src/collections/hash_map/extend.rs +++ b/src/collections/hash_map/extend.rs @@ -32,6 +32,7 @@ where Box::pin(stream.for_each(move |(k, v)| { self.insert(k, v); + async {} })) } } diff --git a/src/collections/hash_set/extend.rs b/src/collections/hash_set/extend.rs index ba872b438..d239a0f26 100644 --- a/src/collections/hash_set/extend.rs +++ b/src/collections/hash_set/extend.rs @@ -35,6 +35,7 @@ where Box::pin(stream.for_each(move |item| { self.insert(item); + async {} })) } } diff --git a/src/collections/linked_list/extend.rs b/src/collections/linked_list/extend.rs index b0dff009d..ca509f49f 100644 --- a/src/collections/linked_list/extend.rs +++ b/src/collections/linked_list/extend.rs @@ -10,6 +10,9 @@ impl stream::Extend for LinkedList { stream: S, ) -> Pin + 'a>> { let stream = stream.into_stream(); - Box::pin(stream.for_each(move |item| self.push_back(item))) + Box::pin(stream.for_each(move |item| { + self.push_back(item); + async {} + })) } } diff --git a/src/collections/vec_deque/extend.rs b/src/collections/vec_deque/extend.rs index dd2ddce3c..be5e9b2f6 100644 --- a/src/collections/vec_deque/extend.rs +++ b/src/collections/vec_deque/extend.rs @@ -13,6 +13,9 @@ impl stream::Extend for VecDeque { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push_back(item))) + Box::pin(stream.for_each(move |item| { + self.push_back(item); + async {} + })) } } diff --git a/src/io/buf_read/mod.rs b/src/io/buf_read/mod.rs index d919a782c..9d20e0a6b 100644 --- a/src/io/buf_read/mod.rs +++ b/src/io/buf_read/mod.rs @@ -262,7 +262,8 @@ extension_trait! { let cursor = io::Cursor::new(b"lorem-ipsum-dolor"); - let mut split_iter = cursor.split(b'-').map(|l| l.unwrap()); + let split_iter = cursor.split(b'-').map(|l| async move { l.unwrap() }); + pin_utils::pin_mut!(split_iter); assert_eq!(split_iter.next().await, Some(b"lorem".to_vec())); assert_eq!(split_iter.next().await, Some(b"ipsum".to_vec())); assert_eq!(split_iter.next().await, Some(b"dolor".to_vec())); diff --git a/src/option/from_stream.rs b/src/option/from_stream.rs index de929ca94..6c03f4fa3 100644 --- a/src/option/from_stream.rs +++ b/src/option/from_stream.rs @@ -29,7 +29,7 @@ where false } }) - .filter_map(identity) + .filter_map(|a| async move { identity(a) }) .collect() .await; diff --git a/src/option/product.rs b/src/option/product.rs index b446c1ffe..6ae5a5a14 100644 --- a/src/option/product.rs +++ b/src/option/product.rs @@ -25,12 +25,13 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2, 4]); - let prod: Option = v.map(|x| + let prod: Option = v.map(|x| async move { if x < 0 { None } else { Some(x) - }).product().await; + } + }).product().await; assert_eq!(prod, Some(8)); # # }) } @@ -53,7 +54,7 @@ where false } }) - .filter_map(identity), + .filter_map(|a| async move { identity(a) }), ) .await; diff --git a/src/option/sum.rs b/src/option/sum.rs index de404f42d..422ce62c0 100644 --- a/src/option/sum.rs +++ b/src/option/sum.rs @@ -25,7 +25,7 @@ where use async_std::stream; let words = stream::from_iter(vec!["have", "a", "great", "day"]); - let total: Option = words.map(|w| w.find('a')).sum().await; + let total: Option = words.map(|w| async move { w.find('a') }).sum().await; assert_eq!(total, Some(5)); # # }) } @@ -48,7 +48,7 @@ where false } }) - .filter_map(identity), + .filter_map(|a| async move { identity(a) }), ) .await; diff --git a/src/result/from_stream.rs b/src/result/from_stream.rs index 8a8e0eaf3..7cfd02b77 100644 --- a/src/result/from_stream.rs +++ b/src/result/from_stream.rs @@ -20,9 +20,9 @@ where /// use async_std::stream; /// /// let v = stream::from_iter(vec![1, 2]); - /// let res: Result, &'static str> = v.map(|x: u32| + /// let res: Result, &'static str> = v.map(|x: u32| async move { /// x.checked_add(1).ok_or("Overflow!") - /// ).collect().await; + /// }).collect().await; /// assert_eq!(res, Ok(vec![2, 3])); /// # /// # }) } @@ -48,12 +48,15 @@ where true }) }) - .filter_map(|elem| match elem { + .filter_map(|elem| { + let res = match elem { Ok(value) => Some(value), Err(err) => { found_error = Some(err); None - } + }, + }; + async { res } }) .collect() .await; diff --git a/src/result/product.rs b/src/result/product.rs index 45782ff70..decebd368 100644 --- a/src/result/product.rs +++ b/src/result/product.rs @@ -24,11 +24,12 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2, 4]); - let res: Result = v.map(|x| - if x < 0 { - Err("Negative element found") - } else { - Ok(x) + let res: Result = v.map(|x| async move { + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + } }).product().await; assert_eq!(res, Ok(8)); # @@ -55,12 +56,15 @@ where true }) }) - .filter_map(|elem| match elem { - Ok(value) => Some(value), - Err(err) => { - found_error = Some(err); - None - } + .filter_map(|elem| { + let res = match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }; + async { res } }), ) .await; diff --git a/src/result/sum.rs b/src/result/sum.rs index b6d84a0c4..4665b57b0 100644 --- a/src/result/sum.rs +++ b/src/result/sum.rs @@ -24,11 +24,12 @@ where use async_std::stream; let v = stream::from_iter(vec![1, 2]); - let res: Result = v.map(|x| - if x < 0 { - Err("Negative element found") - } else { - Ok(x) + let res: Result = v.map(|x| async move { + if x < 0 { + Err("Negative element found") + } else { + Ok(x) + } }).sum().await; assert_eq!(res, Ok(3)); # @@ -55,12 +56,15 @@ where true }) }) - .filter_map(|elem| match elem { - Ok(value) => Some(value), - Err(err) => { - found_error = Some(err); - None - } + .filter_map(|elem| { + let res = match elem { + Ok(value) => Some(value), + Err(err) => { + found_error = Some(err); + None + } + }; + async { res } }), ) .await; diff --git a/src/stream/mod.rs b/src/stream/mod.rs index 0bfd4e865..18eab42b7 100644 --- a/src/stream/mod.rs +++ b/src/stream/mod.rs @@ -216,7 +216,7 @@ //! # use async_std::prelude::*; //! # use async_std::stream; //! let v = stream::repeat(1u8).take(5); -//! v.map(|x| println!("{}", x)); +//! v.map(|x| async move { println!("{}", x)}); //! # //! # Ok(()) }) } //! ``` diff --git a/src/stream/product.rs b/src/stream/product.rs index 15497e87c..a32a9a5c2 100644 --- a/src/stream/product.rs +++ b/src/stream/product.rs @@ -23,7 +23,6 @@ pub trait Product: Sized { S: Stream + 'a; } -use core::ops::Mul; use core::num::Wrapping; use crate::stream::stream::StreamExt; @@ -34,7 +33,7 @@ macro_rules! integer_product { where S: Stream + 'a, { - Box::pin(async move { stream.fold($one, Mul::mul).await } ) + Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } ) } } impl<'a> Product<&'a $a> for $a { @@ -42,7 +41,7 @@ macro_rules! integer_product { where S: Stream + 'b, { - Box::pin(async move { stream.fold($one, Mul::mul).await } ) + Box::pin(async move { stream.fold($one, |a, b| async move { a * b }).await } ) } } )*); @@ -58,14 +57,14 @@ macro_rules! float_product { fn product<'a, S>(stream: S) -> Pin+ 'a>> where S: Stream + 'a, { - Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } ) } } impl<'a> Product<&'a $a> for $a { fn product<'b, S>(stream: S) -> Pin+ 'b>> where S: Stream + 'b, { - Box::pin(async move { stream.fold(1.0, |a, b| a * b).await } ) + Box::pin(async move { stream.fold(1.0, |a, b| async move { a * b }).await } ) } } )*); diff --git a/src/stream/stream/all.rs b/src/stream/stream/all.rs index 06f4d7f80..daeb3039d 100644 --- a/src/stream/stream/all.rs +++ b/src/stream/stream/all.rs @@ -7,47 +7,63 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct AllFuture<'a, S, F, T> { +pub struct AllFuture<'a, S, F, Fut, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, + pub(crate) result: bool, + pub(crate) future: Option, pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AllFuture<'a, S, F, T> { +impl<'a, S, F, Fut, T> AllFuture<'a, S, F, Fut, T> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); +} + +impl<'a, S, Fut, F, T> AllFuture<'a, S, F, Fut, T> { pub(crate) fn new(stream: &'a mut S, f: F) -> Self { Self { stream, f, + result: true, + future: None, _marker: PhantomData, } } } -impl Unpin for AllFuture<'_, S, F, T> {} - -impl Future for AllFuture<'_, S, F, S::Item> +impl Future for AllFuture<'_, S, F, Fut, S::Item> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match next { - Some(v) => { - let result = (&mut self.f)(v); + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(self.result), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - if result { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending - } else { - Poll::Ready(false) + self.as_mut().future().set(None); + if !res { + return Poll::Ready(false); + } } } - None => Poll::Ready(true), } } } diff --git a/src/stream/stream/any.rs b/src/stream/stream/any.rs index 15154c506..4779eadc0 100644 --- a/src/stream/stream/any.rs +++ b/src/stream/stream/any.rs @@ -7,47 +7,61 @@ use crate::task::{Context, Poll}; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct AnyFuture<'a, S, F, T> { +pub struct AnyFuture<'a, S, F, Fut, T> { pub(crate) stream: &'a mut S, pub(crate) f: F, + pub(crate) result: bool, + pub(crate) future: Option, pub(crate) _marker: PhantomData, } -impl<'a, S, F, T> AnyFuture<'a, S, F, T> { +impl<'a, S, F, Fut, T> AnyFuture<'a, S, F, Fut, T> { + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); + pin_utils::unsafe_pinned!(stream: &'a mut S); + pub(crate) fn new(stream: &'a mut S, f: F) -> Self { Self { stream, f, + result: false, + future: None, _marker: PhantomData, } } } -impl Unpin for AnyFuture<'_, S, F, T> {} - -impl Future for AnyFuture<'_, S, F, S::Item> +impl Future for AnyFuture<'_, S, F, Fut, S::Item> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> bool, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = bool; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let next = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match next { - Some(v) => { - let result = (&mut self.f)(v); + loop { + match self.future.is_some() { + false => { + let next = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + match next { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(self.result), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); - if result { - Poll::Ready(true) - } else { - // don't forget to wake this task again to pull the next item from stream - cx.waker().wake_by_ref(); - Poll::Pending + self.as_mut().future().set(None); + if res { + return Poll::Ready(true); + } } } - None => Poll::Ready(false), } } } diff --git a/src/stream/stream/filter_map.rs b/src/stream/stream/filter_map.rs index e43e8f09f..895d62023 100644 --- a/src/stream/stream/filter_map.rs +++ b/src/stream/stream/filter_map.rs @@ -1,44 +1,70 @@ +use core::marker::PhantomData; use core::pin::Pin; use core::task::{Context, Poll}; use pin_project_lite::pin_project; +use crate::future::Future; use crate::stream::Stream; pin_project! { #[derive(Debug)] - pub struct FilterMap { + pub struct FilterMap { #[pin] stream: S, f: F, - } + #[pin] + future: Option, + __from: PhantomData, + __to: PhantomData, + } } -impl FilterMap { +impl FilterMap { pub(crate) fn new(stream: S, f: F) -> Self { - Self { stream, f } + Self { + stream, + f, + future: None, + __from: PhantomData, + __to: PhantomData, + } } } -impl Stream for FilterMap +impl Stream for FilterMap where S: Stream, - F: FnMut(S::Item) -> Option, + F: FnMut(S::Item) -> Fut, + Fut: Future>, { type Item = B; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let next = futures_core::ready!(this.stream.poll_next(cx)); - match next { - Some(v) => match (this.f)(v) { - Some(b) => Poll::Ready(Some(b)), - None => { - cx.waker().wake_by_ref(); - Poll::Pending + let mut this = self.project(); + loop { + match this.future.is_some() { + false => { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + match next { + Some(v) => { + let fut = (this.f)(v); + this.future.as_mut().set(Some(fut)); + } + None => return Poll::Ready(None), + } + } + true => { + let res = + futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + + this.future.as_mut().set(None); + + if let Some(b) = res { + return Poll::Ready(Some(b)); + } } - }, - None => Poll::Ready(None), + } } } } diff --git a/src/stream/stream/find_map.rs b/src/stream/stream/find_map.rs index f7e3c1e04..be74f54a2 100644 --- a/src/stream/stream/find_map.rs +++ b/src/stream/stream/find_map.rs @@ -1,4 +1,5 @@ use core::future::Future; +use core::marker::PhantomData; use core::pin::Pin; use core::task::{Context, Poll}; @@ -6,38 +7,65 @@ use crate::stream::Stream; #[doc(hidden)] #[allow(missing_debug_implementations)] -pub struct FindMapFuture<'a, S, F> { +pub struct FindMapFuture<'a, S, F, Fut, T, B> { stream: &'a mut S, f: F, + future: Option, + __b: PhantomData, + __t: PhantomData, } -impl<'a, S, F> FindMapFuture<'a, S, F> { +impl<'a, S, F, Fut, T, B> FindMapFuture<'a, S, F, Fut, T, B> { + pin_utils::unsafe_pinned!(stream: &'a mut S); + pin_utils::unsafe_unpinned!(f: F); + pin_utils::unsafe_pinned!(future: Option); + pub(super) fn new(stream: &'a mut S, f: F) -> Self { - Self { stream, f } + FindMapFuture { + stream, + f, + future: None, + __b: PhantomData, + __t: PhantomData, + } } } -impl Unpin for FindMapFuture<'_, S, F> {} +impl Unpin for FindMapFuture<'_, S, F, Fut, T, B> {} -impl<'a, S, B, F> Future for FindMapFuture<'a, S, F> +impl<'a, S, F, Fut, B> Future for FindMapFuture<'a, S, F, Fut, S::Item, B> where S: Stream + Unpin + Sized, - F: FnMut(S::Item) -> Option, + F: FnMut(S::Item) -> Fut, + Fut: Future>, { type Output = Option; fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - let item = futures_core::ready!(Pin::new(&mut *self.stream).poll_next(cx)); - - match item { - Some(v) => match (&mut self.f)(v) { - Some(v) => Poll::Ready(Some(v)), - None => { - cx.waker().wake_by_ref(); - Poll::Pending + loop { + match self.future.is_some() { + false => { + let item = futures_core::ready!(self.as_mut().stream().poll_next(cx)); + + match item { + Some(v) => { + let fut = (self.as_mut().f())(v); + self.as_mut().future().set(Some(fut)); + } + None => return Poll::Ready(None), + } + } + true => { + let res = + futures_core::ready!(self.as_mut().future().as_pin_mut().unwrap().poll(cx)); + self.as_mut().future().set(None); + + match res { + Some(v) => return Poll::Ready(Some(v)), + None => (), + } } - }, - None => Poll::Ready(None), + } } } } diff --git a/src/stream/stream/flat_map.rs b/src/stream/stream/flat_map.rs index e07893a94..aea636e3d 100644 --- a/src/stream/stream/flat_map.rs +++ b/src/stream/stream/flat_map.rs @@ -1,7 +1,9 @@ use core::pin::Pin; +use core::marker::PhantomData; use pin_project_lite::pin_project; +use crate::future::Future; use crate::stream::stream::map::Map; use crate::stream::stream::StreamExt; use crate::stream::{IntoStream, Stream}; @@ -16,33 +18,39 @@ pin_project! { /// /// [`flat_map`]: trait.Stream.html#method.flat_map /// [`Stream`]: trait.Stream.html - pub struct FlatMap { + pub struct FlatMap { #[pin] - stream: Map, + stream: Map, #[pin] inner_stream: Option, + __fut: PhantomData, + __from: PhantomData, } } -impl FlatMap +impl FlatMap where S: Stream, U: IntoStream, - F: FnMut(S::Item) -> U, + F: FnMut(S::Item) -> Fut, + Fut: Future, { pub(super) fn new(stream: S, f: F) -> Self { Self { stream: stream.map(f), inner_stream: None, + __fut: PhantomData, + __from: PhantomData, } } } -impl Stream for FlatMap +impl Stream for FlatMap where S: Stream, U: Stream, - F: FnMut(S::Item) -> U, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Item = U::Item; diff --git a/src/stream/stream/fold.rs b/src/stream/stream/fold.rs index 3938a3739..46a48f1a5 100644 --- a/src/stream/stream/fold.rs +++ b/src/stream/stream/fold.rs @@ -1,3 +1,4 @@ +use core::marker::PhantomData; use core::future::Future; use core::pin::Pin; @@ -8,43 +9,59 @@ use crate::task::{Context, Poll}; pin_project! { #[derive(Debug)] - pub struct FoldFuture { + pub struct FoldFuture { #[pin] stream: S, f: F, + #[pin] + future: Option, acc: Option, + __t: PhantomData, } } -impl FoldFuture { +impl FoldFuture { pub(super) fn new(stream: S, init: B, f: F) -> Self { Self { stream, f, + future: None, acc: Some(init), + __t: PhantomData, } } } -impl Future for FoldFuture +impl Future for FoldFuture where S: Stream + Sized, - F: FnMut(B, S::Item) -> B, + F: FnMut(B, S::Item) -> Fut, + Fut: Future, { type Output = B; fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); loop { - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + match this.future.is_some() { + false => { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - match next { - Some(v) => { - let old = this.acc.take().unwrap(); - let new = (this.f)(old, v); - *this.acc = Some(new); + match next { + Some(v) => { + let old = this.acc.take().unwrap(); + let fut = (this.f)(old, v); + this.future.as_mut().set(Some(fut)); + } + None => return Poll::Ready(this.acc.take().unwrap()), + } + } + true => { + let res = + futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + this.future.as_mut().set(None); + *this.acc = Some(res); } - None => return Poll::Ready(this.acc.take().unwrap()), } } } diff --git a/src/stream/stream/for_each.rs b/src/stream/stream/for_each.rs index dbada101c..2733f80ba 100644 --- a/src/stream/stream/for_each.rs +++ b/src/stream/stream/for_each.rs @@ -1,4 +1,5 @@ use core::pin::Pin; +use core::marker::PhantomData; use core::future::Future; use pin_project_lite::pin_project; @@ -9,37 +10,56 @@ use crate::task::{Context, Poll}; pin_project! { #[doc(hidden)] #[allow(missing_debug_implementations)] - pub struct ForEachFuture { + pub struct ForEachFuture { #[pin] stream: S, f: F, + #[pin] + future: Option, + __t: PhantomData, } } -impl ForEachFuture { +impl ForEachFuture { pub(super) fn new(stream: S, f: F) -> Self { Self { stream, f, + future: None, + __t: PhantomData, } } } -impl Future for ForEachFuture +impl Future for ForEachFuture where S: Stream + Sized, - F: FnMut(S::Item), + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Output = (); fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { let mut this = self.project(); loop { - let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + match this.future.is_some() { + false => { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); - match next { - Some(v) => (this.f)(v), - None => return Poll::Ready(()), + match next { + Some(v) => { + let fut = (this.f)(v); + this.future.as_mut().set(Some(fut)); + } + None => return Poll::Ready(()), + } + } + true => { + let _ = + futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + + this.future.as_mut().set(None); + } } } } diff --git a/src/stream/stream/map.rs b/src/stream/stream/map.rs index 0eab3ce2b..562301dd2 100644 --- a/src/stream/stream/map.rs +++ b/src/stream/stream/map.rs @@ -1,39 +1,69 @@ +use core::marker::PhantomData; use core::pin::Pin; use pin_project_lite::pin_project; +use crate::future::Future; use crate::stream::Stream; use crate::task::{Context, Poll}; pin_project! { /// A stream that maps value of another stream with a function. #[derive(Debug)] - pub struct Map { + pub struct Map { #[pin] stream: S, f: F, - } + #[pin] + future: Option, + __from: PhantomData, + __to: PhantomData, + } } -impl Map { +impl Map { pub(crate) fn new(stream: S, f: F) -> Self { Self { stream, f, + future: None, + __from: PhantomData, + __to: PhantomData, } } } -impl Stream for Map +impl Stream for Map where S: Stream, - F: FnMut(S::Item) -> B, + F: FnMut(S::Item) -> Fut, + Fut: Future, { type Item = B; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let this = self.project(); - let next = futures_core::ready!(this.stream.poll_next(cx)); - Poll::Ready(next.map(this.f)) + let mut this = self.project(); + loop { + match this.future.is_some() { + false => { + let next = futures_core::ready!(this.stream.as_mut().poll_next(cx)); + match next { + Some(v) => { + let fut = (this.f)(v); + this.future.as_mut().set(Some(fut)); + } + None => { + return Poll::Ready(None); + } + } + } + true => { + let res = + futures_core::ready!(this.future.as_mut().as_pin_mut().unwrap().poll(cx)); + this.future.as_mut().set(None); + return Poll::Ready(Some(res)); + } + } + } } } diff --git a/src/stream/stream/mod.rs b/src/stream/stream/mod.rs index d0cc718e4..a633af74d 100644 --- a/src/stream/stream/mod.rs +++ b/src/stream/stream/mod.rs @@ -110,10 +110,10 @@ pub use take::Take; pub use take_while::TakeWhile; pub use zip::Zip; -use core::cmp::Ordering; +use std::cmp::Ordering; +use crate::future::Future; cfg_unstable! { - use core::future::Future; use core::pin::Pin; use core::time::Duration; @@ -618,8 +618,9 @@ extension_trait! { use async_std::prelude::*; use async_std::stream; - let s = stream::from_iter(vec![1, 2, 3]); - let mut s = s.map(|x| 2 * x); + let s = stream::from_iter(vec![1u8, 2, 3]); + let s = s.map(|x| async move { 2 * x }); + pin_utils::pin_mut!(s); assert_eq!(s.next().await, Some(2)); assert_eq!(s.next().await, Some(4)); @@ -630,10 +631,11 @@ extension_trait! { # }) } ``` "#] - fn map(self, f: F) -> Map + fn map(self, f: F) -> Map where Self: Sized, - F: FnMut(Self::Item) -> B, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { Map::new(self, f) } @@ -658,7 +660,7 @@ extension_trait! { .inspect(|x| println!("about to filter {}", x)) .filter(|x| x % 2 == 0) .inspect(|x| println!("made it through filter: {}", x)) - .fold(0, |sum, i| sum + i) + .fold(0, |sum, i| async move { sum + i }) .await; assert_eq!(sum, 6); @@ -796,27 +798,28 @@ extension_trait! { let words = stream::from_iter(&["alpha", "beta", "gamma"]); let merged: String = words - .flat_map(|s| stream::from_iter(s.chars())) + .flat_map(|s| async move { stream::from_iter(s.chars()) }) .collect().await; assert_eq!(merged, "alphabetagamma"); - let d3 = stream::from_iter(&[[[1, 2], [3, 4]], [[5, 6], [7, 8]]]); + let d3 = stream::from_iter(&[[[1u8, 2], [3, 4]], [[5, 6], [7, 8]]]); let d1: Vec<_> = d3 - .flat_map(|item| stream::from_iter(item)) - .flat_map(|item| stream::from_iter(item)) + .flat_map(|item| async move { stream::from_iter(item) }) + .flat_map(|item| async move { stream::from_iter(item) }) .collect().await; - assert_eq!(d1, [&1, &2, &3, &4, &5, &6, &7, &8]); + assert_eq!(d1, [&1u8, &2, &3, &4, &5, &6, &7, &8]); # }); ``` "#] #[cfg(feature = "unstable")] #[cfg_attr(feature = "docs", doc(cfg(unstable)))] - fn flat_map(self, f: F) -> FlatMap + fn flat_map(self, f: F) -> FlatMap where Self: Sized, U: IntoStream, - F: FnMut(Self::Item) -> U, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { FlatMap::new(self, f) } @@ -870,7 +873,12 @@ extension_trait! { let s = stream::from_iter(vec!["1", "lol", "3", "NaN", "5"]); - let mut parsed = s.filter_map(|a| a.parse::().ok()); + let parsed = s.filter_map(|a| + async move { + a.parse::().ok() + }); + + pin_utils::pin_mut!(parsed); let one = parsed.next().await; assert_eq!(one, Some(1)); @@ -887,10 +895,11 @@ extension_trait! { # }) } ``` "#] - fn filter_map(self, f: F) -> FilterMap + fn filter_map(self, f: F) -> FilterMap where Self: Sized, - F: FnMut(Self::Item) -> Option, + F: FnMut(Self::Item) -> Fut, + Fut: Future>, { FilterMap::new(self, f) } @@ -1197,7 +1206,7 @@ extension_trait! { use async_std::stream; let mut s = stream::repeat::(42).take(3); - assert!(s.all(|x| x == 42).await); + assert!(s.all(|x| async move { x == 42 }).await); # # }) } @@ -1212,19 +1221,20 @@ extension_trait! { use async_std::stream; let mut s = stream::empty::(); - assert!(s.all(|_| false).await); + assert!(s.all(|_| async { false }).await); # # }) } ``` "#] #[inline] - fn all( + fn all( &mut self, f: F, - ) -> impl Future + '_ [AllFuture<'_, Self, F, Self::Item>] + ) -> impl Future + '_ [AllFuture<'_, Self, F, Fut, Self::Item>] where Self: Unpin + Sized, - F: FnMut(Self::Item) -> bool, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { AllFuture::new(self, f) } @@ -1288,20 +1298,21 @@ extension_trait! { use async_std::stream; let mut s = stream::from_iter(vec!["lol", "NaN", "2", "5"]); - let first_number = s.find_map(|s| s.parse().ok()).await; + let first_number = s.find_map(|s| async move { s.parse().ok()}).await; - assert_eq!(first_number, Some(2)); + assert_eq!(first_number, Some(2u8)); # # }) } ``` "#] - fn find_map( + fn find_map( &mut self, f: F, - ) -> impl Future> + '_ [FindMapFuture<'_, Self, F>] + ) -> impl Future> + '_ [FindMapFuture<'_, Self, F, Fut, Self::Item, B>] where Self: Unpin + Sized, - F: FnMut(Self::Item) -> Option, + F: FnMut(Self::Item) -> Fut, + Fut: Future>, { FindMapFuture::new(self, f) } @@ -1321,21 +1332,22 @@ extension_trait! { use async_std::stream; let s = stream::from_iter(vec![1u8, 2, 3]); - let sum = s.fold(0, |acc, x| acc + x).await; + let sum = s.fold(0, |acc, x| async move { acc + x }).await; assert_eq!(sum, 6); # # }) } ``` "#] - fn fold( + fn fold( self, init: B, f: F, - ) -> impl Future [FoldFuture] + ) -> impl Future [FoldFuture] where Self: Sized, - F: FnMut(B, Self::Item) -> B, + F: FnMut(B, Self::Item) -> Fut, + Fut: Future, { FoldFuture::new(self, init, f) } @@ -1393,7 +1405,7 @@ extension_trait! { let (tx, rx) = channel(); let s = stream::from_iter(vec![1usize, 2, 3]); - let sum = s.for_each(move |x| tx.clone().send(x).unwrap()).await; + let sum = s.for_each(move |x| { tx.clone().send(x).unwrap(); async {} }).await; let v: Vec<_> = rx.iter().collect(); @@ -1402,13 +1414,14 @@ extension_trait! { # }) } ``` "#] - fn for_each( + fn for_each( self, f: F, - ) -> impl Future [ForEachFuture] + ) -> impl Future [ForEachFuture] where Self: Sized, - F: FnMut(Self::Item), + F: FnMut(Self::Item) -> Fut, + Fut: Future, { ForEachFuture::new(self, f) } @@ -1438,7 +1451,7 @@ extension_trait! { use async_std::stream; let mut s = stream::repeat::(42).take(3); - assert!(s.any(|x| x == 42).await); + assert!(s.any(|x| async move { x == 42 }).await); # # }) } ``` @@ -1452,19 +1465,20 @@ extension_trait! { use async_std::stream; let mut s = stream::empty::(); - assert!(!s.any(|_| false).await); + assert!(!s.any(|_| async { false }).await); # # }) } ``` "#] #[inline] - fn any( + fn any( &mut self, f: F, - ) -> impl Future + '_ [AnyFuture<'_, Self, F, Self::Item>] + ) -> impl Future + '_ [AnyFuture<'_, Self, F, Fut, Self::Item>] where Self: Unpin + Sized, - F: FnMut(Self::Item) -> bool, + F: FnMut(Self::Item) -> Fut, + Fut: Future, { AnyFuture::new(self, f) } diff --git a/src/stream/sum.rs b/src/stream/sum.rs index 3b3144e5e..d40c698b6 100644 --- a/src/stream/sum.rs +++ b/src/stream/sum.rs @@ -25,7 +25,6 @@ pub trait Sum: Sized { use crate::stream::stream::StreamExt; use core::num::Wrapping; -use core::ops::Add; macro_rules! integer_sum { (@impls $zero: expr, $($a:ty)*) => ($( @@ -34,7 +33,7 @@ macro_rules! integer_sum { where S: Stream + 'a, { - Box::pin(async move { stream.fold($zero, Add::add).await } ) + Box::pin(async move { stream.fold($zero, |a, b| async move { a + b }).await } ) } } impl<'a> Sum<&'a $a> for $a { @@ -42,7 +41,7 @@ macro_rules! integer_sum { where S: Stream + 'b, { - Box::pin(async move { stream.fold($zero, Add::add).await } ) + Box::pin(async move { stream.fold($zero, |a, b| async move { a + b }).await } ) } } )*); @@ -58,14 +57,14 @@ macro_rules! float_sum { fn sum<'a, S>(stream: S) -> Pin + 'a>> where S: Stream + 'a, { - Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + Box::pin(async move { stream.fold(0.0, |a, b| async move { a + b }).await } ) } } impl<'a> Sum<&'a $a> for $a { fn sum<'b, S>(stream: S) -> Pin + 'b>> where S: Stream + 'b, { - Box::pin(async move { stream.fold(0.0, |a, b| a + b).await } ) + Box::pin(async move { stream.fold(0.0, |a, b| async move { a + b }).await } ) } } )*); diff --git a/src/unit/from_stream.rs b/src/unit/from_stream.rs index 7b784383d..230af17d9 100644 --- a/src/unit/from_stream.rs +++ b/src/unit/from_stream.rs @@ -8,6 +8,6 @@ impl FromStream<()> for () { fn from_stream<'a, S: IntoStream + 'a>( stream: S, ) -> Pin + 'a>> { - Box::pin(stream.into_stream().for_each(drop)) + Box::pin(stream.into_stream().for_each(|s| { drop(s); async {} })) } } diff --git a/src/vec/extend.rs b/src/vec/extend.rs index 302fc7a87..6e9398701 100644 --- a/src/vec/extend.rs +++ b/src/vec/extend.rs @@ -12,6 +12,9 @@ impl stream::Extend for Vec { self.reserve(stream.size_hint().0); - Box::pin(stream.for_each(move |item| self.push(item))) + Box::pin(stream.for_each(move |item| { + self.push(item); + async {} + })) } }