Skip to content

Commit 559f709

Browse files
committed
Removed all unsafe code. Fixing build failures.
1 parent 80aa2c9 commit 559f709

File tree

2 files changed

+55
-61
lines changed

2 files changed

+55
-61
lines changed

futures-util/src/future/poll_immediate.rs

Lines changed: 27 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,17 @@ use crate::FutureExt;
33
use core::pin::Pin;
44
use futures_core::task::{Context, Poll};
55
use futures_core::{FusedFuture, Future, Stream};
6+
use pin_project_lite::pin_project;
67

7-
/// Future for the [`poll_immediate`](poll_immediate()) function.
8-
#[derive(Debug, Clone)]
9-
#[must_use = "futures do nothing unless you `.await` or poll them"]
10-
pub struct PollImmediate<T>(Option<T>);
8+
pin_project! {
9+
/// Future for the [`poll_immediate`](poll_immediate()) function.
10+
#[derive(Debug, Clone)]
11+
#[must_use = "futures do nothing unless you `.await` or poll them"]
12+
pub struct PollImmediate<T> {
13+
#[pin]
14+
future: Option<T>
15+
}
16+
}
1117

1218
impl<T, F> Future for PollImmediate<F>
1319
where
@@ -17,21 +23,22 @@ where
1723

1824
#[inline]
1925
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<T>> {
20-
// # Safety
21-
// This is the only time that this future will ever be polled.
26+
let mut this = self.project();
2227
let inner =
23-
unsafe { self.get_unchecked_mut().0.take().expect("PollOnce polled after completion") };
24-
crate::pin_mut!(inner);
28+
this.future.as_mut().as_pin_mut().expect("PollImmediate polled after completion");
2529
match inner.poll(cx) {
26-
Poll::Ready(t) => Poll::Ready(Some(t)),
30+
Poll::Ready(t) => {
31+
this.future.set(None);
32+
Poll::Ready(Some(t))
33+
}
2734
Poll::Pending => Poll::Ready(None),
2835
}
2936
}
3037
}
3138

3239
impl<T: Future> FusedFuture for PollImmediate<T> {
3340
fn is_terminated(&self) -> bool {
34-
self.0.is_none()
41+
self.future.is_none()
3542
}
3643
}
3744

@@ -64,25 +71,14 @@ where
6471
type Item = Poll<T>;
6572

6673
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
67-
unsafe {
68-
// # Safety
69-
// We never move the inner value until it is done. We only get a reference to it.
70-
let inner = &mut self.get_unchecked_mut().0;
71-
let fut = match inner.as_mut() {
72-
// inner is gone, so we can signal that the stream is closed.
73-
None => return Poll::Ready(None),
74-
Some(inner) => inner,
75-
};
76-
let fut = Pin::new_unchecked(fut);
77-
Poll::Ready(Some(fut.poll(cx).map(|t| {
78-
// # Safety
79-
// The inner option value is done, so we need to drop it. We do it without moving it
80-
// by using drop in place. We then write over the value without trying to drop it first
81-
// This should uphold all the safety requirements of `Pin`
82-
std::ptr::drop_in_place(inner);
83-
std::ptr::write(inner, None);
74+
let mut this = self.project();
75+
match this.future.as_mut().as_pin_mut() {
76+
// inner is gone, so we can signal that the stream is closed.
77+
None => Poll::Ready(None),
78+
Some(fut) => Poll::Ready(Some(fut.poll(cx).map(|t| {
79+
this.future.set(None);
8480
t
85-
})))
81+
}))),
8682
}
8783
}
8884
}
@@ -103,7 +99,7 @@ where
10399
/// # });
104100
/// ```
105101
pub fn poll_immediate<F: Future>(f: F) -> PollImmediate<F> {
106-
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate(Some(f)))
102+
assert_future::<Option<F::Output>, PollImmediate<F>>(PollImmediate { future: Some(f) })
107103
}
108104

109105
/// Future for the [`poll_immediate_reuse`](poll_immediate_reuse()) function.
@@ -119,7 +115,8 @@ where
119115

120116
#[inline]
121117
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<T, F>> {
122-
let mut inner = self.get_mut().0.take().expect("PollOnceReuse polled after completion");
118+
let mut inner =
119+
self.get_mut().0.take().expect("PollImmediateReuse polled after completion");
123120
match inner.poll_unpin(cx) {
124121
Poll::Ready(t) => Poll::Ready(Ok(t)),
125122
Poll::Pending => Poll::Ready(Err(inner)),
Lines changed: 28 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -1,53 +1,50 @@
1+
use core::pin::Pin;
12
use futures_core::task::{Context, Poll};
23
use futures_core::Stream;
3-
use std::pin::Pin;
4+
use pin_project_lite::pin_project;
45

5-
/// Stream for the [`poll_immediate`](poll_immediate()) function.
6-
#[derive(Debug, Clone)]
7-
#[must_use = "futures do nothing unless you `.await` or poll them"]
8-
pub struct PollImmediate<S>(Option<S>);
6+
pin_project! {
7+
/// Stream for the [`poll_immediate`](poll_immediate()) function.
8+
#[derive(Debug, Clone)]
9+
#[must_use = "futures do nothing unless you `.await` or poll them"]
10+
pub struct PollImmediate<S> {
11+
#[pin]
12+
stream: Option<S>
13+
}
14+
}
915

1016
impl<T, S> Stream for PollImmediate<S>
1117
where
12-
S: Stream<Item = T>,
18+
S: Stream<Item = T>,
1319
{
1420
type Item = Poll<T>;
1521

1622
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
17-
unsafe {
18-
// # Safety
19-
// We never move the inner value until it is done. We only get a reference to it.
20-
let inner = &mut self.get_unchecked_mut().0;
21-
let fut = match inner.as_mut() {
22-
// inner is gone, so we can continue to signal that the stream is closed.
23-
None => return Poll::Ready(None),
24-
Some(inner) => inner,
25-
};
26-
let stream = Pin::new_unchecked(fut);
27-
match stream.poll_next(cx) {
28-
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
29-
Poll::Ready(None) => {
30-
// # Safety
31-
// The inner stream is done, so we need to drop it. We do it without moving it
32-
// by using drop in place. We then write over the value without trying to drop it first
33-
// This should uphold all the safety requirements of `Pin`
34-
std::ptr::drop_in_place(inner);
35-
std::ptr::write(inner, None);
36-
Poll::Ready(None)
37-
}
38-
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
23+
let mut this = self.project();
24+
let stream = match this.stream.as_mut().as_pin_mut() {
25+
// inner is gone, so we can continue to signal that the stream is closed.
26+
None => return Poll::Ready(None),
27+
Some(inner) => inner,
28+
};
29+
30+
match stream.poll_next(cx) {
31+
Poll::Ready(Some(t)) => Poll::Ready(Some(Poll::Ready(t))),
32+
Poll::Ready(None) => {
33+
this.stream.set(None);
34+
Poll::Ready(None)
3935
}
36+
Poll::Pending => Poll::Ready(Some(Poll::Pending)),
4037
}
4138
}
4239

4340
fn size_hint(&self) -> (usize, Option<usize>) {
44-
self.0.as_ref().map_or((0, Some(0)), Stream::size_hint)
41+
self.stream.as_ref().map_or((0, Some(0)), Stream::size_hint)
4542
}
4643
}
4744

4845
impl<S: Stream> super::FusedStream for PollImmediate<S> {
4946
fn is_terminated(&self) -> bool {
50-
self.0.is_none()
47+
self.stream.is_none()
5148
}
5249
}
5350

@@ -76,5 +73,5 @@ impl<S: Stream> super::FusedStream for PollImmediate<S> {
7673
/// # });
7774
/// ```
7875
pub fn poll_immediate<S: Stream>(s: S) -> PollImmediate<S> {
79-
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate(Some(s)))
76+
super::assert_stream::<Poll<S::Item>, PollImmediate<S>>(PollImmediate { stream: Some(s) })
8077
}

0 commit comments

Comments
 (0)