Skip to content

Bring stream combinators over to 0.3 #1022

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
May 25, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions futures-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,28 @@ macro_rules! pinned_field {
)
}

#[macro_export]
macro_rules! unsafe_pinned {
($f:tt -> $t:ty) => (
fn $f<'a>(self: &'a mut PinMut<Self>) -> PinMut<'a, $t> {
unsafe {
pinned_field!(self, $f)
}
}
)
}

#[macro_export]
macro_rules! unsafe_unpinned {
($f:tt -> $t:ty) => (
fn $f<'a>(self: &'a mut PinMut<Self>) -> &'a mut $t {
unsafe {
&mut ::core::mem::PinMut::get_mut(self.reborrow()).$f
}
}
)
}

mod poll;
pub use poll::Poll;

Expand Down
11 changes: 11 additions & 0 deletions futures-core/src/poll.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,14 @@
/// A macro for extracting the successful type of a `Poll<T>`.
///
/// This macro bakes in propagation of `Pending` signals by returning early.
#[macro_export]
macro_rules! ready {
($e:expr) => (match $e {
$crate::Poll::Ready(t) => t,
$crate::Poll::Pending => return $crate::Poll::Pending,
})
}

/// Indicates whether a value is available, or if the current task has been
/// scheduled for later wake-up instead.
#[derive(Copy, Clone, Debug, PartialEq)]
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/future/flatten_sink.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ impl<F> Sink for FlattenSink<F> where F: Future, <F as Future>::Item: Sink<SinkE

fn poll_close(&mut self, cx: &mut task::Context) -> Result<Async<()>, Self::SinkError> {
if let State::Ready(ref mut s) = self.st {
try_ready!(s.poll_close(cx));
ready!(s.poll_close(cx));
}
self.st = State::Closed;
return Ok(Async::Ready(()));
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/lines.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ impl<A> Stream for Lines<A>
type Error = io::Error;

fn poll(&mut self, cx: &mut task::Context) -> Poll<Option<String>, io::Error> {
let n = try_ready!(self.io.read_line(&mut self.line));
let n = ready!(self.io.read_line(&mut self.line));
if n == 0 && self.line.len() == 0 {
return Ok(None.into())
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/io/read_until.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl<A> Future for ReadUntil<A>
// and just return it, as we are finished.
// If we hit "would block" then all the read data so far
// is in our buffer, and otherwise we propagate errors.
try_ready!(a.read_until(byte, buf));
ready!(a.read_until(byte, buf));
},
State::Empty => panic!("poll ReadUntil after it's done"),
}
Expand Down
7 changes: 5 additions & 2 deletions futures-util/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,12 +72,15 @@ pub mod io;
#[cfg(feature = "std")]
pub use io::{AsyncReadExt, AsyncWriteExt};

// pub mod stream;
// pub use stream::StreamExt;
pub mod stream;
pub use stream::StreamExt;

// pub mod sink;
// pub use sink::SinkExt;

mod pin_mut;
pub use pin_mut::{PinMutExt, OptionExt};

pub mod prelude {
//! Prelude containing the extension traits, which add functionality to
//! existing asynchronous types.
Expand Down
29 changes: 29 additions & 0 deletions futures-util/src/pin_mut.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use core::mem::PinMut;

/// Useful additions to the `PinMut` type
pub trait PinMutExt<T> {
/// Overwrite the referenced data, dropping the existing content
fn assign(&mut self, data: T);
}

impl<'a, T> PinMutExt<T> for PinMut<'a, T> {
fn assign(&mut self, data: T) {
unsafe { *PinMut::get_mut(self.reborrow()) = data }
}
}

/// Useful additions to the `Option` type, for working with `PinMut`
pub trait OptionExt<'a, T> {
/// Push `PinMut` through an `Option`
fn as_pin_mut(self) -> Option<PinMut<'a, T>>;
}

impl<'a, T> OptionExt<'a, T> for PinMut<'a, Option<T>> {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TODO: add to core (both this and assign)

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i'll rename to set to match your PR

fn as_pin_mut(self) -> Option<PinMut<'a, T>> {
unsafe {
PinMut::get_mut(self).as_mut().map(|x| {
PinMut::new_unchecked(x)
})
}
}
}
8 changes: 4 additions & 4 deletions futures-util/src/sink/buffer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,11 @@ impl<S: Sink> Buffer<S> {
}

fn try_empty_buffer(&mut self, cx: &mut task::Context) -> Poll<(), S::SinkError> {
try_ready!(self.sink.poll_ready(cx));
ready!(self.sink.poll_ready(cx));
while let Some(item) = self.buf.pop_front() {
self.sink.start_send(item)?;
if self.buf.len() != 0 {
try_ready!(self.sink.poll_ready(cx));
ready!(self.sink.poll_ready(cx));
}
}
Ok(Async::Ready(()))
Expand Down Expand Up @@ -93,13 +93,13 @@ impl<S: Sink> Sink for Buffer<S> {
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.try_empty_buffer(cx));
ready!(self.try_empty_buffer(cx));
debug_assert!(self.buf.is_empty());
self.sink.poll_flush(cx)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.try_empty_buffer(cx));
ready!(self.try_empty_buffer(cx));
debug_assert!(self.buf.is_empty());
self.sink.poll_close(cx)
}
Expand Down
2 changes: 1 addition & 1 deletion futures-util/src/sink/send.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl<S: Sink> Future for Send<S> {

// we're done sending the item, but want to block on flushing the
// sink
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));

// now everything's emptied, so return the sink for further use
Ok(Async::Ready(self.take_sink()))
Expand Down
8 changes: 4 additions & 4 deletions futures-util/src/sink/send_all.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,18 +75,18 @@ impl<T, U> Future for SendAll<T, U>
// If we've got an item buffered already, we need to write it to the
// sink before we can do anything else
if let Some(item) = self.buffered.take() {
try_ready!(self.try_start_send(cx, item))
ready!(self.try_start_send(cx, item))
}

loop {
match self.stream_mut().poll_next(cx)? {
Async::Ready(Some(item)) => try_ready!(self.try_start_send(cx, item)),
Async::Ready(Some(item)) => ready!(self.try_start_send(cx, item)),
Async::Ready(None) => {
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));
return Ok(Async::Ready(self.take_result()))
}
Async::Pending => {
try_ready!(self.sink_mut().poll_flush(cx));
ready!(self.sink_mut().poll_flush(cx));
return Ok(Async::Pending)
}
}
Expand Down
4 changes: 2 additions & 2 deletions futures-util/src/sink/with.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,12 +143,12 @@ impl<S, U, Fut, F> Sink for With<S, U, Fut, F>
}

fn poll_flush(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.poll(cx));
ready!(self.poll(cx));
self.sink.poll_flush(cx).map_err(Into::into)
}

fn poll_close(&mut self, cx: &mut task::Context) -> Poll<(), Self::SinkError> {
try_ready!(self.poll(cx));
ready!(self.poll(cx));
self.sink.poll_close(cx).map_err(Into::into)
}
}
2 changes: 1 addition & 1 deletion futures-util/src/sink/with_flat_map.rs
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ where
}
}
if let Some(mut stream) = self.stream.take() {
while let Some(x) = try_ready!(stream.poll_next(cx)) {
while let Some(x) = ready!(stream.poll_next(cx)) {
match self.sink.poll_ready(cx)? {
Async::Ready(()) => self.sink.start_send(x)?,
Async::Pending => {
Expand Down
97 changes: 0 additions & 97 deletions futures-util/src/stream/and_then.rs

This file was deleted.

2 changes: 1 addition & 1 deletion futures-util/src/stream/buffer_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ impl<S> Stream for BufferUnordered<S>
}

// Try polling a new future
if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
if let Some(val) = ready!(self.queue.poll_next(cx)) {
return Ok(Async::Ready(Some(val)));
}

Expand Down
15 changes: 6 additions & 9 deletions futures-util/src/stream/buffered.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
use core::mem::PinMut;
use std::fmt;

use futures_core::{Async, IntoFuture, Poll, Stream};
use futures_core::{Future, Poll, Stream};
use futures_core::task;
use futures_sink::{Sink};

use stream::{Fuse, FuturesOrdered};

Expand All @@ -13,12 +13,9 @@ use stream::{Fuse, FuturesOrdered};
/// results in the order that they were pulled out of the original stream. This
/// is created by the `Stream::buffered` method.
#[must_use = "streams do nothing unless polled"]
pub struct Buffered<S>
where S: Stream,
S::Item: IntoFuture,
{
pub struct Buffered<S: Stream> {
stream: Fuse<S>,
queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
queue: FuturesOrdered<S::Item>,
max: usize,
}

Expand Down Expand Up @@ -84,7 +81,7 @@ impl<S> Sink for Buffered<S>
{
type SinkItem = S::SinkItem;
type SinkError = S::SinkError;

delegate_sink!(stream);
}

Expand All @@ -109,7 +106,7 @@ impl<S> Stream for Buffered<S>
}

// Try polling a new future
if let Some(val) = try_ready!(self.queue.poll_next(cx)) {
if let Some(val) = ready!(self.queue.poll_next(cx)) {
return Ok(Async::Ready(Some(val)));
}

Expand Down
Loading