Skip to content

Commit 108fc90

Browse files
aturoncramertj
authored andcommitted
Bring stream combinators over to 0.3
1 parent 99ec3c2 commit 108fc90

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

44 files changed

+765
-1433
lines changed

futures-core/src/lib.rs

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,28 @@ macro_rules! pinned_field {
4040
)
4141
}
4242

43+
#[macro_export]
44+
macro_rules! unsafe_pinned {
45+
($f:tt -> $t:ty) => (
46+
fn $f<'a>(self: &'a mut PinMut<Self>) -> PinMut<'a, $t> {
47+
unsafe {
48+
pinned_field!(self, $f)
49+
}
50+
}
51+
)
52+
}
53+
54+
#[macro_export]
55+
macro_rules! unsafe_unpinned {
56+
($f:tt -> $t:ty) => (
57+
fn $f<'a>(self: &'a mut PinMut<Self>) -> &'a mut $t {
58+
unsafe {
59+
&mut ::core::mem::PinMut::get_mut(self.reborrow()).$f
60+
}
61+
}
62+
)
63+
}
64+
4365
mod poll;
4466
pub use poll::Poll;
4567

futures-core/src/poll.rs

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,14 @@
1+
/// A macro for extracting the successful type of a `Poll<T>`.
2+
///
3+
/// This macro bakes in propagation of `Pending` signals by returning early.
4+
#[macro_export]
5+
macro_rules! try_ready {
6+
($e:expr) => (match $e {
7+
$crate::Poll::Ready(t) => t,
8+
$crate::Poll::Pending => return $crate::Poll::Pending,
9+
})
10+
}
11+
112
/// Indicates whether a value is available, or if the current task has been
213
/// scheduled for later wake-up instead.
314
#[derive(Copy, Clone, Debug, PartialEq)]

futures-util/src/lib.rs

Lines changed: 5 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,12 +72,15 @@ pub mod io;
7272
#[cfg(feature = "std")]
7373
pub use io::{AsyncReadExt, AsyncWriteExt};
7474

75-
// pub mod stream;
76-
// pub use stream::StreamExt;
75+
pub mod stream;
76+
pub use stream::StreamExt;
7777

7878
// pub mod sink;
7979
// pub use sink::SinkExt;
8080

81+
mod pin_mut;
82+
pub use pin_mut::{PinMutExt, OptionExt};
83+
8184
pub mod prelude {
8285
//! Prelude containing the extension traits, which add functionality to
8386
//! existing asynchronous types.

futures-util/src/pin_mut.rs

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
use core::mem::PinMut;
2+
3+
/// Useful additions to the `PinMut` type
4+
pub trait PinMutExt<T> {
5+
/// Overwrite the referenced data, dropping the existing content
6+
fn assign(&mut self, data: T);
7+
}
8+
9+
impl<'a, T> PinMutExt<T> for PinMut<'a, T> {
10+
fn assign(&mut self, data: T) {
11+
unsafe { *PinMut::get_mut(self.reborrow()) = data }
12+
}
13+
}
14+
15+
/// Useful additions to the `Option` type, for working with `PinMut`
16+
pub trait OptionExt<'a, T> {
17+
/// Push `PinMut` through an `Option`
18+
fn as_pin_mut(self) -> Option<PinMut<'a, T>>;
19+
}
20+
21+
impl<'a, T> OptionExt<'a, T> for PinMut<'a, Option<T>> {
22+
fn as_pin_mut(self) -> Option<PinMut<'a, T>> {
23+
unsafe {
24+
PinMut::get_mut(self).as_mut().map(|x| {
25+
PinMut::new_unchecked(x)
26+
})
27+
}
28+
}
29+
}

futures-util/src/stream/and_then.rs

Lines changed: 0 additions & 97 deletions
This file was deleted.

futures-util/src/stream/buffered.rs

Lines changed: 5 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
1+
use core::mem::PinMut;
12
use std::fmt;
23

3-
use futures_core::{Async, IntoFuture, Poll, Stream};
4+
use futures_core::{Future, Poll, Stream};
45
use futures_core::task;
5-
use futures_sink::{Sink};
66

77
use stream::{Fuse, FuturesOrdered};
88

@@ -13,12 +13,9 @@ use stream::{Fuse, FuturesOrdered};
1313
/// results in the order that they were pulled out of the original stream. This
1414
/// is created by the `Stream::buffered` method.
1515
#[must_use = "streams do nothing unless polled"]
16-
pub struct Buffered<S>
17-
where S: Stream,
18-
S::Item: IntoFuture,
19-
{
16+
pub struct Buffered<S: Stream> {
2017
stream: Fuse<S>,
21-
queue: FuturesOrdered<<S::Item as IntoFuture>::Future>,
18+
queue: FuturesOrdered<S::Item>,
2219
max: usize,
2320
}
2421

@@ -84,7 +81,7 @@ impl<S> Sink for Buffered<S>
8481
{
8582
type SinkItem = S::SinkItem;
8683
type SinkError = S::SinkError;
87-
84+
8885
delegate_sink!(stream);
8986
}
9087

Lines changed: 11 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,9 @@
11
use std::prelude::v1::*;
22
use std::any::Any;
33
use std::panic::{catch_unwind, UnwindSafe, AssertUnwindSafe};
4-
use std::mem;
4+
use std::mem::PinMut;
55

6-
use futures_core::{Poll, Async, Stream};
6+
use futures_core::{Poll, Stream};
77
use futures_core::task;
88

99
/// Stream for the `catch_unwind` combinator.
@@ -12,51 +12,29 @@ use futures_core::task;
1212
#[derive(Debug)]
1313
#[must_use = "streams do nothing unless polled"]
1414
pub struct CatchUnwind<S> where S: Stream {
15-
state: CatchUnwindState<S>,
15+
stream: S,
1616
}
1717

1818
pub fn new<S>(stream: S) -> CatchUnwind<S>
1919
where S: Stream + UnwindSafe,
2020
{
21-
CatchUnwind {
22-
state: CatchUnwindState::Stream(stream),
23-
}
21+
CatchUnwind { stream }
2422
}
2523

26-
#[derive(Debug)]
27-
enum CatchUnwindState<S> {
28-
Stream(S),
29-
Eof,
30-
Done,
24+
impl<S: Stream> CatchUnwind<S> {
25+
unsafe_pinned!(stream -> S);
3126
}
3227

3328
impl<S> Stream for CatchUnwind<S>
3429
where S: Stream + UnwindSafe,
3530
{
36-
type Item = Result<S::Item, S::Error>;
37-
type Error = Box<Any + Send>;
31+
type Item = Result<S::Item, Box<Any + Send>>;
3832

39-
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
40-
let mut stream = match mem::replace(&mut self.state, CatchUnwindState::Eof) {
41-
CatchUnwindState::Done => panic!("cannot poll after eof"),
42-
CatchUnwindState::Eof => {
43-
self.state = CatchUnwindState::Done;
44-
return Ok(Async::Ready(None));
45-
}
46-
CatchUnwindState::Stream(stream) => stream,
47-
};
48-
let res = catch_unwind(AssertUnwindSafe(|| (stream.poll_next(cx), stream)));
33+
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
34+
let res = catch_unwind(AssertUnwindSafe(|| self.stream().poll_next(cx)));
4935
match res {
50-
Err(e) => Err(e), // and state is already Eof
51-
Ok((poll, stream)) => {
52-
self.state = CatchUnwindState::Stream(stream);
53-
match poll {
54-
Err(e) => Ok(Async::Ready(Some(Err(e)))),
55-
Ok(Async::Pending) => Ok(Async::Pending),
56-
Ok(Async::Ready(Some(r))) => Ok(Async::Ready(Some(Ok(r)))),
57-
Ok(Async::Ready(None)) => Ok(Async::Ready(None)),
58-
}
59-
}
36+
Ok(poll) => poll.map(|opt| opt.map(Ok)),
37+
Err(e) => Poll::Ready(Some(Err(e))),
6038
}
6139
}
6240
}

futures-util/src/stream/chain.rs

Lines changed: 24 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,9 @@
1-
use core::mem;
1+
use core::mem::PinMut;
22

3-
use futures_core::{Stream, Async, Poll};
4-
use futures_core::task;
3+
use {PinMutExt, OptionExt};
54

6-
/// State of chain stream.
7-
#[derive(Debug)]
8-
enum State<S1, S2> {
9-
/// Emitting elements of first stream
10-
First(S1, S2),
11-
/// Emitting elements of second stream
12-
Second(S2),
13-
/// Temporary value to replace first with second
14-
Temp,
15-
}
5+
use futures_core::{Stream, Poll};
6+
use futures_core::task;
167

178
/// An adapter for chaining the output of two streams.
189
///
@@ -21,36 +12,37 @@ enum State<S1, S2> {
2112
#[derive(Debug)]
2213
#[must_use = "streams do nothing unless polled"]
2314
pub struct Chain<S1, S2> {
24-
state: State<S1, S2>
15+
first: Option<S1>,
16+
second: S2,
2517
}
2618

2719
pub fn new<S1, S2>(s1: S1, s2: S2) -> Chain<S1, S2>
28-
where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
20+
where S1: Stream, S2: Stream<Item=S1::Item>,
2921
{
30-
Chain { state: State::First(s1, s2) }
22+
Chain {
23+
first: Some(s1),
24+
second: s2,
25+
}
26+
}
27+
28+
// All interactions with `PinMut<Chain<..>>` happen through these methods
29+
impl<S1, S2> Chain<S1, S2> {
30+
unsafe_pinned!(first -> Option<S1>);
31+
unsafe_pinned!(second -> S2);
3132
}
3233

3334
impl<S1, S2> Stream for Chain<S1, S2>
34-
where S1: Stream, S2: Stream<Item=S1::Item, Error=S1::Error>,
35+
where S1: Stream, S2: Stream<Item=S1::Item>,
3536
{
3637
type Item = S1::Item;
37-
type Error = S1::Error;
3838

39-
fn poll_next(&mut self, cx: &mut task::Context) -> Poll<Option<Self::Item>, Self::Error> {
40-
loop {
41-
match self.state {
42-
State::First(ref mut s1, ref _s2) => match s1.poll_next(cx) {
43-
Ok(Async::Ready(None)) => (), // roll
44-
x => return x,
45-
},
46-
State::Second(ref mut s2) => return s2.poll_next(cx),
47-
State::Temp => unreachable!(),
39+
fn poll_next(mut self: PinMut<Self>, cx: &mut task::Context) -> Poll<Option<Self::Item>> {
40+
if let Some(first) = self.first().as_pin_mut() {
41+
if let Some(item) = try_ready!(first.poll_next(cx)) {
42+
return Poll::Ready(Some(item))
4843
}
49-
50-
self.state = match mem::replace(&mut self.state, State::Temp) {
51-
State::First(_s1, s2) => State::Second(s2),
52-
_ => unreachable!(),
53-
};
5444
}
45+
self.first().assign(None);
46+
self.second().poll_next(cx)
5547
}
5648
}

0 commit comments

Comments
 (0)