Skip to content

Use a trait-based design for stop-token. #10

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 6 commits into from
Oct 2, 2021
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,15 @@ jobs:
toolchain: nightly
override: true

- name: tests
- name: tests async-io
uses: actions-rs/cargo@v1
with:
command: test
args: --features async-io

- name: tests tokio
uses: actions-rs/cargo@v1
with:
command: test
args: --features tokio

16 changes: 15 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,20 @@ repository = "https://github.com/async-rs/stop-token"

description = "Experimental cooperative cancellation for async-std"

[package.metadata.docs.rs]
features = ["docs"]
rustdoc-args = ["--cfg", "feature=\"docs\""]

[features]
docs = ["async-io"]

[dependencies]
pin-project-lite = "0.2.0"
async-std = "1.8"
async-channel = "1.6.1"
futures-core = "0.3.17"
tokio = { version = "1.12.0", features = ["time"], optional = true }
async-io = { version = "1.6.0", optional = true }

[dev-dependencies]
async-std = "1.10.0"
tokio = { version = "1.12.0", features = ["rt", "macros"] }
45 changes: 45 additions & 0 deletions src/deadline.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use core::fmt;
use std::{error::Error, future::Future, io};

/// An error returned when a future times out.
#[derive(Clone, Copy, Eq, PartialEq, PartialOrd, Ord)]
pub struct TimedOutError {
_private: (),
}

impl fmt::Debug for TimedOutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("TimeoutError").finish()
}
}

impl TimedOutError {
pub(crate) fn new() -> Self {
Self { _private: () }
}
}

impl Error for TimedOutError {}

impl Into<io::Error> for TimedOutError {
fn into(self) -> io::Error {
io::Error::new(io::ErrorKind::TimedOut, "Future has timed out")
}
}

impl fmt::Display for TimedOutError {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
"Future has timed out".fmt(f)
}
}

/// Conversion into a deadline.
///
/// A deadline is a future which resolves after a certain period or event.
pub trait IntoDeadline {
/// Which kind of future are we turning this into?
type Deadline: Future<Output = ()>;

/// Creates a deadline from a value.
fn into_deadline(self) -> Self::Deadline;
}
56 changes: 56 additions & 0 deletions src/future.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
//! Extension methods and types for the `Future` trait.

use crate::{deadline::TimedOutError, IntoDeadline};
use core::future::Future;
use core::pin::Pin;

use pin_project_lite::pin_project;
use std::task::{Context, Poll};

/// Extend the `Future` trait with the `until` method.
pub trait FutureExt: Future {
/// Run a future until it resolves, or until a deadline is hit.
fn until<T, D>(self, target: T) -> Stop<Self, D>
where
Self: Sized,
T: IntoDeadline<Deadline = D>,
{
Stop {
deadline: target.into_deadline(),
future: self,
}
}
}

pin_project! {
/// Run a future until it resolves, or until a deadline is hit.
///
/// This method is returned by [`FutureExt::deadline`].
#[must_use = "Futures do nothing unless polled or .awaited"]
#[derive(Debug)]
pub struct Stop<F, D> {
#[pin]
future: F,
#[pin]
deadline: D,
}
}

impl<F, D> Future for Stop<F, D>
where
F: Future,
D: Future<Output = ()>,
{
type Output = Result<F::Output, TimedOutError>;

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let this = self.project();
if let Poll::Ready(()) = this.deadline.poll(cx) {
return Poll::Ready(Err(TimedOutError::new()));
}
match this.future.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(it) => Poll::Ready(Ok(it)),
}
}
}
170 changes: 24 additions & 146 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,15 +5,6 @@
//! Experimental. The library works as is, breaking changes will bump major
//! version, but there are no guarantees of long-term support.
//!
//! Additionally, this library uses unstable cargo feature feature of `async-std` and, for
//! this reason, should be used like this:
//!
//! ```toml
//! [dependencies.stop-token]
//! version = "0.1.0"
//! features = [ "unstable" ]
//! ```
//!
//! # Motivation
//!
//! Rust futures come with a build-in cancellation mechanism: dropping a future
Expand Down Expand Up @@ -47,13 +38,14 @@
//!
//! ```
//! use async_std::prelude::*;
//! use stop_token::prelude::*;
//! use stop_token::StopToken;
//!
//! struct Event;
//!
//! async fn do_work(work: impl Stream<Item = Event> + Unpin, stop_token: StopToken) {
//! let mut work = stop_token.stop_stream(work);
//! while let Some(event) = work.next().await {
//! async fn do_work(work: impl Stream<Item = Event> + Unpin, stop: StopToken) {
//! let mut work = work.until(stop);
//! while let Some(Ok(event)) = work.next().await {
//! process_event(event).await
//! }
//! }
Expand All @@ -62,145 +54,31 @@
//! }
//! ```
//!
//! # Features
//!
//! The `time` submodule is empty when no features are enabled. To implement [`Deadline`]
//! for `Instant` and `Duration` you can enable one of the following features:
//!
//! - `async-io`: for use with the `async-std` or `smol` runtimes.
//! - `tokio`: for use with the `tokio` runtime.
//!
//! # Lineage
//!
//! The cancellation system is a subset of `C#` [`CancellationToken / CancellationTokenSource`](https://docs.microsoft.com/en-us/dotnet/standard/threading/cancellation-in-managed-threads).
//! The `StopToken / StopTokenSource` terminology is borrowed from C++ paper P0660: https://wg21.link/p0660.

use std::pin::Pin;
use std::task::{Context, Poll};

use async_std::prelude::*;
//! The `StopToken / StopTokenSource` terminology is borrowed from [C++ paper P0660](https://wg21.link/p0660).

use async_std::channel::{self, Receiver, Sender};
use pin_project_lite::pin_project;

enum Never {}

/// `StopSource` produces `StopToken` and cancels all of its tokens on drop.
///
/// # Example:
///
/// ```ignore
/// let stop_source = StopSource::new();
/// let stop_token = stop_source.stop_token();
/// schedule_some_work(stop_token);
/// drop(stop_source); // At this point, scheduled work notices that it is canceled.
/// ```
#[derive(Debug)]
pub struct StopSource {
/// Solely for `Drop`.
_chan: Sender<Never>,
stop_token: StopToken,
}

/// `StopToken` is a future which completes when the associated `StopSource` is dropped.
#[derive(Debug, Clone)]
pub struct StopToken {
chan: Receiver<Never>,
}

impl Default for StopSource {
fn default() -> StopSource {
let (sender, receiver) = channel::bounded::<Never>(1);

StopSource {
_chan: sender,
stop_token: StopToken { chan: receiver },
}
}
}
pub mod future;
pub mod stream;
pub mod time;

impl StopSource {
/// Creates a new `StopSource`.
pub fn new() -> StopSource {
StopSource::default()
}

/// Produces a new `StopToken`, associated with this source.
///
/// Once the source is destroyed, `StopToken` future completes.
pub fn stop_token(&self) -> StopToken {
self.stop_token.clone()
}
}

impl Future for StopToken {
type Output = ();

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<()> {
let chan = Pin::new(&mut self.chan);
match Stream::poll_next(chan, cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(Some(never)) => match never {},
Poll::Ready(None) => Poll::Ready(()),
}
}
}

impl StopToken {
/// Applies the token to the `stream`, such that the resulting stream
/// produces no more items once the token becomes cancelled.
pub fn stop_stream<S: Stream>(&self, stream: S) -> StopStream<S> {
StopStream {
stop_token: self.clone(),
stream,
}
}

/// Applies the token to the `future`, such that the resulting future
/// completes with `None` if the token is cancelled.
pub fn stop_future<F: Future>(&self, future: F) -> StopFuture<F> {
StopFuture {
stop_token: self.clone(),
future,
}
}
}

pin_project! {
#[derive(Debug)]
pub struct StopStream<S> {
#[pin]
stop_token: StopToken,
#[pin]
stream: S,
}
}

impl<S: Stream> Stream for StopStream<S> {
type Item = S::Item;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.project();
if let Poll::Ready(()) = this.stop_token.poll(cx) {
return Poll::Ready(None);
}
this.stream.poll_next(cx)
}
}

pin_project! {
#[derive(Debug)]
pub struct StopFuture<F> {
#[pin]
stop_token: StopToken,
#[pin]
future: F,
}
}
mod deadline;
mod stop_source;

impl<F: Future> Future for StopFuture<F> {
type Output = Option<F::Output>;
pub use deadline::{IntoDeadline, TimedOutError};
pub use stop_source::{StopSource, StopToken};

fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<F::Output>> {
let this = self.project();
if let Poll::Ready(()) = this.stop_token.poll(cx) {
return Poll::Ready(None);
}
match this.future.poll(cx) {
Poll::Pending => Poll::Pending,
Poll::Ready(it) => Poll::Ready(Some(it)),
}
}
/// A prelude for `stop-token`.
pub mod prelude {
pub use crate::future::FutureExt as _;
pub use crate::stream::StreamExt as _;
}
Loading