Skip to content

Commit 5977202

Browse files
authored
Merge pull request #10 from async-rs/spawn-local
Add spawn_local and clarify what the schedule function can do
2 parents 2b0427a + 5d80be6 commit 5977202

10 files changed

+214
-28
lines changed

examples/panic-propagation.rs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,8 @@ use futures::executor;
1111
use futures::future::FutureExt;
1212
use lazy_static::lazy_static;
1313

14+
type Task = async_task::Task<()>;
15+
1416
/// Spawns a future on the executor.
1517
fn spawn<F, R>(future: F) -> JoinHandle<R>
1618
where
@@ -19,8 +21,8 @@ where
1921
{
2022
lazy_static! {
2123
// A channel that holds scheduled tasks.
22-
static ref QUEUE: Sender<async_task::Task<()>> = {
23-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
24+
static ref QUEUE: Sender<Task> = {
25+
let (sender, receiver) = unbounded::<Task>();
2426

2527
// Start the executor thread.
2628
thread::spawn(|| {

examples/panic-result.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,16 +9,19 @@ use futures::executor;
99
use futures::future::FutureExt;
1010
use lazy_static::lazy_static;
1111

12+
type Task = async_task::Task<()>;
13+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
14+
1215
/// Spawns a future on the executor.
13-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<thread::Result<R>, ()>
16+
fn spawn<F, R>(future: F) -> JoinHandle<thread::Result<R>>
1417
where
1518
F: Future<Output = R> + Send + 'static,
1619
R: Send + 'static,
1720
{
1821
lazy_static! {
1922
// A channel that holds scheduled tasks.
20-
static ref QUEUE: Sender<async_task::Task<()>> = {
21-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
23+
static ref QUEUE: Sender<Task> = {
24+
let (sender, receiver) = unbounded::<Task>();
2225

2326
// Start the executor thread.
2427
thread::spawn(|| {

examples/spawn-local.rs

Lines changed: 76 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,76 @@
1+
//! A simple single-threaded executor that can spawn non-`Send` futures.
2+
3+
use std::cell::Cell;
4+
use std::future::Future;
5+
use std::rc::Rc;
6+
7+
use crossbeam::channel::{unbounded, Receiver, Sender};
8+
9+
type Task = async_task::Task<()>;
10+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
11+
12+
thread_local! {
13+
// A channel that holds scheduled tasks.
14+
static QUEUE: (Sender<Task>, Receiver<Task>) = unbounded();
15+
}
16+
17+
/// Spawns a future on the executor.
18+
fn spawn<F, R>(future: F) -> JoinHandle<R>
19+
where
20+
F: Future<Output = R> + 'static,
21+
R: 'static,
22+
{
23+
// Create a task that is scheduled by sending itself into the channel.
24+
let schedule = |t| QUEUE.with(|(s, _)| s.send(t).unwrap());
25+
let (task, handle) = async_task::spawn_local(future, schedule, ());
26+
27+
// Schedule the task by sending it into the queue.
28+
task.schedule();
29+
30+
handle
31+
}
32+
33+
/// Runs a future to completion.
34+
fn run<F, R>(future: F) -> R
35+
where
36+
F: Future<Output = R> + 'static,
37+
R: 'static,
38+
{
39+
// Spawn a task that sends its result through a channel.
40+
let (s, r) = unbounded();
41+
spawn(async move { s.send(future.await).unwrap() });
42+
43+
loop {
44+
// If the original task has completed, return its result.
45+
if let Ok(val) = r.try_recv() {
46+
return val;
47+
}
48+
49+
// Otherwise, take a task from the queue and run it.
50+
QUEUE.with(|(_, r)| r.recv().unwrap().run());
51+
}
52+
}
53+
54+
fn main() {
55+
let val = Rc::new(Cell::new(0));
56+
57+
// Run a future that increments a non-`Send` value.
58+
run({
59+
let val = val.clone();
60+
async move {
61+
// Spawn a future that increments the value.
62+
let handle = spawn({
63+
let val = val.clone();
64+
async move {
65+
val.set(dbg!(val.get()) + 1);
66+
}
67+
});
68+
69+
val.set(dbg!(val.get()) + 1);
70+
handle.await;
71+
}
72+
});
73+
74+
// The value should be 2 at the end of the program.
75+
dbg!(val.get());
76+
}

examples/spawn-on-thread.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,12 @@ use std::thread;
77
use crossbeam::channel;
88
use futures::executor;
99

10+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
11+
1012
/// Spawns a future on a new dedicated thread.
1113
///
1214
/// The returned handle can be used to await the output of the future.
13-
fn spawn_on_thread<F, R>(future: F) -> async_task::JoinHandle<R, ()>
15+
fn spawn_on_thread<F, R>(future: F) -> JoinHandle<R>
1416
where
1517
F: Future<Output = R> + Send + 'static,
1618
R: Send + 'static,

examples/spawn.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -8,16 +8,19 @@ use crossbeam::channel::{unbounded, Sender};
88
use futures::executor;
99
use lazy_static::lazy_static;
1010

11+
type Task = async_task::Task<()>;
12+
type JoinHandle<T> = async_task::JoinHandle<T, ()>;
13+
1114
/// Spawns a future on the executor.
12-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, ()>
15+
fn spawn<F, R>(future: F) -> JoinHandle<R>
1316
where
1417
F: Future<Output = R> + Send + 'static,
1518
R: Send + 'static,
1619
{
1720
lazy_static! {
1821
// A channel that holds scheduled tasks.
19-
static ref QUEUE: Sender<async_task::Task<()>> = {
20-
let (sender, receiver) = unbounded::<async_task::Task<()>>();
22+
static ref QUEUE: Sender<Task> = {
23+
let (sender, receiver) = unbounded::<Task>();
2124

2225
// Start the executor thread.
2326
thread::spawn(|| {

examples/task-id.rs

Lines changed: 6 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,9 @@ use lazy_static::lazy_static;
1313
#[derive(Clone, Copy, Debug)]
1414
struct TaskId(usize);
1515

16+
type Task = async_task::Task<TaskId>;
17+
type JoinHandle<T> = async_task::JoinHandle<T, TaskId>;
18+
1619
thread_local! {
1720
/// The ID of the current task.
1821
static TASK_ID: Cell<Option<TaskId>> = Cell::new(None);
@@ -26,15 +29,15 @@ fn task_id() -> Option<TaskId> {
2629
}
2730

2831
/// Spawns a future on the executor.
29-
fn spawn<F, R>(future: F) -> async_task::JoinHandle<R, TaskId>
32+
fn spawn<F, R>(future: F) -> JoinHandle<R>
3033
where
3134
F: Future<Output = R> + Send + 'static,
3235
R: Send + 'static,
3336
{
3437
lazy_static! {
3538
// A channel that holds scheduled tasks.
36-
static ref QUEUE: Sender<async_task::Task<TaskId>> = {
37-
let (sender, receiver) = unbounded::<async_task::Task<TaskId>>();
39+
static ref QUEUE: Sender<Task> = {
40+
let (sender, receiver) = unbounded::<Task>();
3841

3942
// Start the executor thread.
4043
thread::spawn(|| {

src/join_handle.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,7 @@ pub struct JoinHandle<R, T> {
2424
pub(crate) _marker: PhantomData<(R, T)>,
2525
}
2626

27-
unsafe impl<R, T> Send for JoinHandle<R, T> {}
27+
unsafe impl<R: Send, T> Send for JoinHandle<R, T> {}
2828
unsafe impl<R, T> Sync for JoinHandle<R, T> {}
2929

3030
impl<R, T> Unpin for JoinHandle<R, T> {}

src/lib.rs

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,7 +23,7 @@
2323
//! # let (task, handle) = async_task::spawn(future, schedule, ());
2424
//! ```
2525
//!
26-
//! A task is constructed using the [`spawn`] function:
26+
//! A task is constructed using either [`spawn`] or [`spawn_local`]:
2727
//!
2828
//! ```
2929
//! # let (sender, receiver) = crossbeam::channel::unbounded();
@@ -93,6 +93,7 @@
9393
//! union of the future and its output.
9494
//!
9595
//! [`spawn`]: fn.spawn.html
96+
//! [`spawn_local`]: fn.spawn_local.html
9697
//! [`Task`]: struct.Task.html
9798
//! [`JoinHandle`]: struct.JoinHandle.html
9899
@@ -108,4 +109,4 @@ mod task;
108109
mod utils;
109110

110111
pub use crate::join_handle::JoinHandle;
111-
pub use crate::task::{spawn, Task};
112+
pub use crate::task::{spawn, spawn_local, Task};

src/raw.rs

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -95,15 +95,13 @@ impl<F, R, S, T> Clone for RawTask<F, R, S, T> {
9595

9696
impl<F, R, S, T> RawTask<F, R, S, T>
9797
where
98-
F: Future<Output = R> + Send + 'static,
99-
R: Send + 'static,
98+
F: Future<Output = R> + 'static,
10099
S: Fn(Task<T>) + Send + Sync + 'static,
101-
T: Send + 'static,
102100
{
103101
/// Allocates a task with the given `future` and `schedule` function.
104102
///
105103
/// It is assumed that initially only the `Task` reference and the `JoinHandle` exist.
106-
pub(crate) fn allocate(tag: T, future: F, schedule: S) -> NonNull<()> {
104+
pub(crate) fn allocate(future: F, schedule: S, tag: T) -> NonNull<()> {
107105
// Compute the layout of the task for allocation. Abort if the computation fails.
108106
let task_layout = abort_on_panic(|| Self::task_layout());
109107

@@ -592,17 +590,13 @@ where
592590
/// A guard that closes the task if polling its future panics.
593591
struct Guard<F, R, S, T>(RawTask<F, R, S, T>)
594592
where
595-
F: Future<Output = R> + Send + 'static,
596-
R: Send + 'static,
597-
S: Fn(Task<T>) + Send + Sync + 'static,
598-
T: Send + 'static;
593+
F: Future<Output = R> + 'static,
594+
S: Fn(Task<T>) + Send + Sync + 'static;
599595

600596
impl<F, R, S, T> Drop for Guard<F, R, S, T>
601597
where
602-
F: Future<Output = R> + Send + 'static,
603-
R: Send + 'static,
598+
F: Future<Output = R> + 'static,
604599
S: Fn(Task<T>) + Send + Sync + 'static,
605-
T: Send + 'static,
606600
{
607601
fn drop(&mut self) {
608602
let raw = self.0;

src/task.rs

Lines changed: 104 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,11 @@
11
use std::fmt;
22
use std::future::Future;
33
use std::marker::PhantomData;
4-
use std::mem;
4+
use std::mem::{self, ManuallyDrop};
5+
use std::pin::Pin;
56
use std::ptr::NonNull;
7+
use std::task::{Context, Poll};
8+
use std::thread::{self, ThreadId};
69

710
use crate::header::Header;
811
use crate::raw::RawTask;
@@ -16,8 +19,16 @@ use crate::JoinHandle;
1619
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
1720
/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
1821
///
22+
/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
23+
/// push the task into some kind of queue so that it can be processed later.
24+
///
25+
/// If you need to spawn a future that does not implement [`Send`], consider using the
26+
/// [`spawn_local`] function instead.
27+
///
1928
/// [`Task`]: struct.Task.html
2029
/// [`JoinHandle`]: struct.JoinHandle.html
30+
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
31+
/// [`spawn_local`]: fn.spawn_local.html
2132
///
2233
/// # Examples
2334
///
@@ -43,7 +54,98 @@ where
4354
S: Fn(Task<T>) + Send + Sync + 'static,
4455
T: Send + Sync + 'static,
4556
{
46-
let raw_task = RawTask::<F, R, S, T>::allocate(tag, future, schedule);
57+
let raw_task = RawTask::<F, R, S, T>::allocate(future, schedule, tag);
58+
let task = Task {
59+
raw_task,
60+
_marker: PhantomData,
61+
};
62+
let handle = JoinHandle {
63+
raw_task,
64+
_marker: PhantomData,
65+
};
66+
(task, handle)
67+
}
68+
69+
/// Creates a new local task.
70+
///
71+
/// This constructor returns a [`Task`] reference that runs the future and a [`JoinHandle`] that
72+
/// awaits its result.
73+
///
74+
/// When run, the task polls `future`. When woken up, it gets scheduled for running by the
75+
/// `schedule` function. Argument `tag` is an arbitrary piece of data stored inside the task.
76+
///
77+
/// The schedule function should not attempt to run the task nor to drop it. Instead, it should
78+
/// push the task into some kind of queue so that it can be processed later.
79+
///
80+
/// Unlike [`spawn`], this function does not require the future to implement [`Send`]. If the
81+
/// [`Task`] reference is run or dropped on a thread it was not created on, a panic will occur.
82+
///
83+
/// [`Task`]: struct.Task.html
84+
/// [`JoinHandle`]: struct.JoinHandle.html
85+
/// [`spawn`]: fn.spawn.html
86+
/// [`Send`]: https://doc.rust-lang.org/std/marker/trait.Send.html
87+
///
88+
/// # Examples
89+
///
90+
/// ```
91+
/// use crossbeam::channel;
92+
///
93+
/// // The future inside the task.
94+
/// let future = async {
95+
/// println!("Hello, world!");
96+
/// };
97+
///
98+
/// // If the task gets woken up, it will be sent into this channel.
99+
/// let (s, r) = channel::unbounded();
100+
/// let schedule = move |task| s.send(task).unwrap();
101+
///
102+
/// // Create a task with the future and the schedule function.
103+
/// let (task, handle) = async_task::spawn_local(future, schedule, ());
104+
/// ```
105+
pub fn spawn_local<F, R, S, T>(future: F, schedule: S, tag: T) -> (Task<T>, JoinHandle<R, T>)
106+
where
107+
F: Future<Output = R> + 'static,
108+
R: 'static,
109+
S: Fn(Task<T>) + Send + Sync + 'static,
110+
T: Send + Sync + 'static,
111+
{
112+
thread_local! {
113+
static ID: ThreadId = thread::current().id();
114+
}
115+
116+
struct Checked<F> {
117+
id: ThreadId,
118+
inner: ManuallyDrop<F>,
119+
}
120+
121+
impl<F> Drop for Checked<F> {
122+
fn drop(&mut self) {
123+
if ID.with(|id| *id) != self.id {
124+
panic!("local task dropped by a thread that didn't spawn it");
125+
}
126+
unsafe {
127+
ManuallyDrop::drop(&mut self.inner);
128+
}
129+
}
130+
}
131+
132+
impl<F: Future> Future for Checked<F> {
133+
type Output = F::Output;
134+
135+
fn poll(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
136+
if ID.with(|id| *id) != self.id {
137+
panic!("local task polled by a thread that didn't spawn it");
138+
}
139+
unsafe { self.map_unchecked_mut(|c| &mut *c.inner).poll(cx) }
140+
}
141+
}
142+
143+
let future = Checked {
144+
id: ID.with(|id| *id),
145+
inner: ManuallyDrop::new(future),
146+
};
147+
148+
let raw_task = RawTask::<_, R, S, T>::allocate(future, schedule, tag);
47149
let task = Task {
48150
raw_task,
49151
_marker: PhantomData,

0 commit comments

Comments
 (0)