diff --git a/benches/baseline.rs b/benches/baseline.rs index 0f15d20e..b1b90714 100644 --- a/benches/baseline.rs +++ b/benches/baseline.rs @@ -4,6 +4,7 @@ extern crate test; mod baseline { use futures::executor; + use futures::future::RemoteHandle; use futures::prelude::*; use std::pin::Pin; use std::task::{Context, Poll}; @@ -42,7 +43,7 @@ mod baseline { let tasks = (0..300) .map(|_| { - spawn(async move { + spawn_remote(async move { Task { depth: 0 }.await; }) }) @@ -59,7 +60,7 @@ mod baseline { fn spawn_many(b: &mut test::Bencher) { b.iter(|| { executor::block_on(async { - let tasks = (0..25_000).map(|_| spawn(async {})).collect::>(); + let tasks = (0..25_000).map(|_| spawn_remote(async {})).collect::>(); for task in tasks { task.await; @@ -78,7 +79,7 @@ mod baseline { executor::block_on(async { let tasks = (0..300) .map(|_| { - spawn(async { + spawn_remote(async { let (r, s) = mio::Registration::new2(); let registration = Registration::new(); registration.register(&r).unwrap(); @@ -112,39 +113,24 @@ mod baseline { }); } + /// Spawn function for juliex + pub fn spawn(fut: F) + where + F: Future + Send + 'static, + { + juliex::spawn(fut); + } + /// Spawn function for juliex to get back a handle - pub fn spawn(fut: F) -> JoinHandle + pub fn spawn_remote(fut: F) -> RemoteHandle where F: Future + Send + 'static, T: Send + 'static, { - let (tx, rx) = futures::channel::oneshot::channel(); - - let fut = async move { - let t = fut.await; - let _ = tx.send(t); - }; + let (fut, handle) = fut.remote_handle(); juliex::spawn(fut); - JoinHandle { rx } - } - - /// Handle returned from Juliex. - // We should patch Juliex to support this natively, and be more efficient on channel use. - #[derive(Debug)] - pub struct JoinHandle { - pub(crate) rx: futures::channel::oneshot::Receiver, - } - - impl Future for JoinHandle { - type Output = T; - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.rx.poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(t)) => Poll::Ready(t), - Poll::Ready(Err(_)) => panic!(), // TODO: Is this OK? Print a better error message? - } - } + handle } } diff --git a/benches/common/mod.rs b/benches/common/mod.rs index a8c136fd..368f1adf 100644 --- a/benches/common/mod.rs +++ b/benches/common/mod.rs @@ -30,7 +30,7 @@ macro_rules! benchmark_suite { let tasks = (0..300) .map(|_| { - runtime::spawn(async { + runtime::task::spawn_remote(async { Task { depth: 0 }.await; }) }) @@ -44,7 +44,7 @@ macro_rules! benchmark_suite { #[runtime::bench($rt)] async fn spawn_many() { let tasks = (0..25_000) - .map(|_| runtime::spawn(async {})) + .map(|_| runtime::task::spawn_remote(async {})) .collect::>(); for task in tasks { @@ -61,7 +61,7 @@ macro_rules! benchmark_suite { let tasks = (0..300) .map(|_| { - runtime::spawn(async { + runtime::task::spawn_remote(async { let (r, s) = mio::Registration::new2(); let registration = Registration::new(); registration.register(&r).unwrap(); @@ -69,7 +69,7 @@ macro_rules! benchmark_suite { let mut depth = 0; let mut capture = Some(r); - runtime::spawn( + runtime::task::spawn( Compat01As03::new(future::poll_fn(move || loop { if registration.poll_read_ready().unwrap().is_ready() { depth += 1; diff --git a/examples/guessing.rs b/examples/guessing.rs index 4471eb8b..eb0058ac 100644 --- a/examples/guessing.rs +++ b/examples/guessing.rs @@ -65,7 +65,7 @@ async fn main() -> Result<(), failure::Error> { incoming .try_for_each_concurrent(None, |stream| { async move { - runtime::spawn(play(stream)).await?; + runtime::task::spawn_remote(play(stream)).await?; Ok::<(), failure::Error>(()) } }) diff --git a/examples/tcp-echo.rs b/examples/tcp-echo.rs index ac15ee51..615c03a1 100644 --- a/examples/tcp-echo.rs +++ b/examples/tcp-echo.rs @@ -18,7 +18,7 @@ async fn main() -> std::io::Result<()> { .incoming() .try_for_each_concurrent(None, |stream| { async move { - runtime::spawn(async move { + runtime::task::spawn_remote(async move { println!("Accepting from: {}", stream.peer_addr()?); let (reader, writer) = &mut stream.split(); diff --git a/examples/tcp-proxy.rs b/examples/tcp-proxy.rs index c2f67306..178a5020 100644 --- a/examples/tcp-proxy.rs +++ b/examples/tcp-proxy.rs @@ -16,7 +16,7 @@ async fn main() -> std::io::Result<()> { .incoming() .try_for_each_concurrent(None, |client| { async move { - runtime::spawn(async move { + runtime::task::spawn_remote(async move { let server = TcpStream::connect("127.0.0.1:8080").await?; println!( "Proxying {} to {}", diff --git a/runtime-attributes/src/lib.rs b/runtime-attributes/src/lib.rs index a6cbb0a4..30c2d91b 100644 --- a/runtime-attributes/src/lib.rs +++ b/runtime-attributes/src/lib.rs @@ -140,7 +140,7 @@ pub fn test(attr: TokenStream, item: TokenStream) -> TokenStream { /// /// #[runtime::test] /// async fn spawn_and_await() { -/// runtime::spawn(async {}).await; +/// runtime::task::spawn_remote(async {}).await; /// } /// ``` #[proc_macro_attribute] diff --git a/src/lib.rs b/src/lib.rs index 02ea90db..9060700c 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -120,9 +120,6 @@ pub mod prelude { pub use super::time::StreamExt as _; } -#[doc(inline)] -pub use task::spawn; - #[doc(inline)] pub use runtime_attributes::{bench, test}; diff --git a/src/net/tcp.rs b/src/net/tcp.rs index fd7721e2..711712c3 100644 --- a/src/net/tcp.rs +++ b/src/net/tcp.rs @@ -304,14 +304,14 @@ impl fmt::Debug for ConnectFuture { /// // accept connections and process them in parallel /// let mut incoming = listener.incoming(); /// while let Some(stream) = incoming.next().await { -/// runtime::spawn(async move { +/// runtime::task::spawn_remote(async move { /// let stream = stream?; /// println!("Accepting from: {}", stream.peer_addr()?); /// /// let (reader, writer) = &mut stream.split(); /// reader.copy_into(writer).await?; /// Ok::<(), std::io::Error>(()) -/// }); +/// }).forget(); /// } /// Ok(()) /// } diff --git a/src/task.rs b/src/task.rs index c30ad55b..4084f7bf 100644 --- a/src/task.rs +++ b/src/task.rs @@ -1,10 +1,8 @@ //! Types and Functions for working with asynchronous tasks. -use std::pin::Pin; - -use futures::future::FutureObj; +use futures::future::{FutureObj, RemoteHandle}; use futures::prelude::*; -use futures::task::{Context, Poll, Spawn, SpawnError}; +use futures::task::{Spawn, SpawnError}; /// A [`Spawn`] handle to runtime's thread pool for spawning futures. /// @@ -56,49 +54,49 @@ impl<'a> Spawn for &'a Spawner { /// /// #[runtime::main] /// async fn main() { -/// let handle = runtime::spawn(async { +/// runtime::task::spawn(async { +/// // might not run at all as we're not waiting for it to be done +/// println!("running the future"); +/// }); +/// } +/// ``` +pub fn spawn(fut: F) +where + F: Future + Send + 'static, +{ + runtime_raw::current_runtime() + .spawn_boxed(fut.boxed()) + .expect("cannot spawn a future"); +} + +/// Spawns a future on the runtime's thread pool and makes the result available. +/// +/// This function can only be called after a runtime has been initialized. +/// +/// If the returned handle is dropped the future is aborted by default. +/// +/// ``` +/// #![feature(async_await)] +/// +/// #[runtime::main] +/// async fn main() { +/// let handle = runtime::task::spawn_remote(async { /// println!("running the future"); /// 42 /// }); /// assert_eq!(handle.await, 42); /// } /// ``` -pub fn spawn(fut: F) -> JoinHandle +pub fn spawn_remote(fut: F) -> RemoteHandle where F: Future + Send + 'static, T: Send + 'static, { - let (tx, rx) = futures::channel::oneshot::channel(); - - let fut = async move { - let t = fut.await; - let _ = tx.send(t); - }; + let (fut, handle) = fut.remote_handle(); runtime_raw::current_runtime() .spawn_boxed(fut.boxed()) .expect("cannot spawn a future"); - JoinHandle { rx } -} - -/// A handle that awaits the result of a [`spawn`]ed future. -/// -/// [`spawn`]: fn.spawn.html -#[must_use = "futures do nothing unless you `.await` or poll them"] -#[derive(Debug)] -pub struct JoinHandle { - pub(crate) rx: futures::channel::oneshot::Receiver, -} - -impl Future for JoinHandle { - type Output = T; - - fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { - match self.rx.poll_unpin(cx) { - Poll::Pending => Poll::Pending, - Poll::Ready(Ok(t)) => Poll::Ready(t), - Poll::Ready(Err(_)) => panic!(), // TODO: Is this OK? Print a better error message? - } - } + handle } diff --git a/tests/native.rs b/tests/native.rs index 7b5a0f20..171d46c9 100644 --- a/tests/native.rs +++ b/tests/native.rs @@ -4,7 +4,7 @@ use runtime_native::Native; #[runtime::test(Native)] async fn spawn() { - let handle = runtime::spawn(async { + let handle = runtime::task::spawn_remote(async { println!("hello planet from Native"); 42 }); diff --git a/tests/tokio-current-thread.rs b/tests/tokio-current-thread.rs index ab70c36f..49fffe67 100644 --- a/tests/tokio-current-thread.rs +++ b/tests/tokio-current-thread.rs @@ -2,7 +2,7 @@ #[runtime::test(runtime_tokio::TokioCurrentThread)] async fn spawn() { - let handle = runtime::spawn(async { + let handle = runtime::task::spawn_remote(async { println!("hello planet from Tokio current-thread"); 42 }); diff --git a/tests/tokio.rs b/tests/tokio.rs index 6fb605c2..ecb679a1 100644 --- a/tests/tokio.rs +++ b/tests/tokio.rs @@ -4,7 +4,7 @@ use runtime_tokio::Tokio; #[runtime::test(Tokio)] async fn spawn() { - let handle = runtime::spawn(async { + let handle = runtime::task::spawn_remote(async { println!("hello planet from Tokio"); 42 });