Skip to content

Spawn more than one blocking thread #475

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
3 commits merged into from Nov 7, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
110 changes: 42 additions & 68 deletions src/task/spawn_blocking.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use std::sync::atomic::{AtomicU64, Ordering};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::thread;
use std::time::Duration;

Expand All @@ -8,8 +8,6 @@ use once_cell::sync::Lazy;
use crate::task::{JoinHandle, Task};
use crate::utils::{abort_on_panic, random};

type Runnable = async_task::Task<Task>;

/// Spawns a blocking task.
///
/// The task will be spawned onto a thread pool specifically dedicated to blocking tasks. This
Expand Down Expand Up @@ -44,93 +42,69 @@ where
F: FnOnce() -> T + Send + 'static,
T: Send + 'static,
{
let schedule = |task| POOL.sender.send(task).unwrap();
let (task, handle) = async_task::spawn(async { f() }, schedule, Task::new(None));
task.schedule();
JoinHandle::new(handle)
}

const MAX_THREADS: u64 = 10_000;
type Runnable = async_task::Task<Task>;

static DYNAMIC_THREAD_COUNT: AtomicU64 = AtomicU64::new(0);
/// The number of sleeping worker threads.
static SLEEPING: AtomicUsize = AtomicUsize::new(0);

struct Pool {
sender: Sender<Runnable>,
receiver: Receiver<Runnable>,
}

static POOL: Lazy<Pool> = Lazy::new(|| {
for _ in 0..2 {
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(|| {
abort_on_panic(|| {
for task in &POOL.receiver {
task.run();
}
})
})
.expect("cannot start a thread driving blocking tasks");
}
// Start a single worker thread waiting for the first task.
start_thread();

// We want to use an unbuffered channel here to help
// us drive our dynamic control. In effect, the
// kernel's scheduler becomes the queue, reducing
// the number of buffers that work must flow through
// before being acted on by a core. This helps keep
// latency snappy in the overall async system by
// reducing bufferbloat.
let (sender, receiver) = unbounded();
Pool { sender, receiver }
});

// Create up to MAX_THREADS dynamic blocking task worker threads.
// Dynamic threads will terminate themselves if they don't
// receive any work after between one and ten seconds.
fn maybe_create_another_blocking_thread() {
// We use a `Relaxed` atomic operation because
// it's just a heuristic, and would not lose correctness
// even if it's random.
let workers = DYNAMIC_THREAD_COUNT.load(Ordering::Relaxed);
if workers >= MAX_THREADS {
return;
}
fn start_thread() {
SLEEPING.fetch_add(1, Ordering::SeqCst);

let n_to_spawn = std::cmp::min(2 + (workers / 10), 10);
// Generate a random duration of time between 1 second and 10 seconds. If the thread doesn't
// receive the next task in this duration of time, it will stop running.
let timeout = Duration::from_millis(1000 + u64::from(random(9_000)));

for _ in 0..n_to_spawn {
// We want to avoid having all threads terminate at
// exactly the same time, causing thundering herd
// effects. We want to stagger their destruction over
// 10 seconds or so to make the costs fade into
// background noise.
//
// Generate a simple random number of milliseconds
let rand_sleep_ms = u64::from(random(10_000));
thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
loop {
let task = match POOL.receiver.recv_timeout(timeout) {
Ok(task) => task,
Err(_) => {
// Check whether this is the last sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
// If so, then restart the thread to make sure there is always at least
// one sleeping thread.
if SLEEPING.compare_and_swap(0, 1, Ordering::SeqCst) == 0 {
continue;
}
}

thread::Builder::new()
.name("async-std/blocking".to_string())
.spawn(move || {
let wait_limit = Duration::from_millis(1000 + rand_sleep_ms);
// Stop the thread.
return;
}
};

DYNAMIC_THREAD_COUNT.fetch_add(1, Ordering::Relaxed);
while let Ok(task) = POOL.receiver.recv_timeout(wait_limit) {
abort_on_panic(|| task.run());
// If there are no sleeping threads, then start one to make sure there is always at
// least one sleeping thread.
if SLEEPING.fetch_sub(1, Ordering::SeqCst) == 1 {
start_thread();
}
DYNAMIC_THREAD_COUNT.fetch_sub(1, Ordering::Relaxed);
})
.expect("cannot start a dynamic thread driving blocking tasks");
}
}

// Enqueues work, attempting to send to the threadpool in a
// nonblocking way and spinning up another worker thread if
// there is not a thread ready to accept the work.
pub(crate) fn schedule(task: Runnable) {
if let Err(err) = POOL.sender.try_send(task) {
// We were not able to send to the channel without
// blocking. Try to spin up another thread and then
// retry sending while blocking.
maybe_create_another_blocking_thread();
POOL.sender.send(err.into_inner()).unwrap();
}
// Run the task.
abort_on_panic(|| task.run());

SLEEPING.fetch_add(1, Ordering::SeqCst);
}
})
.expect("cannot start a blocking thread");
}
8 changes: 7 additions & 1 deletion src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,13 @@ pub fn random(n: u32) -> u32 {
use std::num::Wrapping;

thread_local! {
static RNG: Cell<Wrapping<u32>> = Cell::new(Wrapping(1_406_868_647));
static RNG: Cell<Wrapping<u32>> = {
// Take the address of a local value as seed.
let mut x = 0i32;
let r = &mut x;
let addr = r as *mut i32 as usize;
Cell::new(Wrapping(addr as u32))
}
}

RNG.with(|rng| {
Expand Down