Skip to content

Commit 58012f8

Browse files
committed
Shutdown gracefully on panic (#2596)
## Proposed Changes * Modify the `TaskExecutor` so that it spawns a "monitor" future for each future spawned by `spawn` or `spawn_blocking`. This monitor future joins the handle of the child future and shuts down the executor if it detects a panic. * Enable backtraces by default by setting the environment variable `RUST_BACKTRACE`. * Spawn the `ProductionBeaconNode` on the `TaskExecutor` so that if a panic occurs during start-up it will take down the whole process. Previously we were using a raw Tokio `spawn`, but I can't see any reason not to use the executor (perhaps someone else can). ## Additional Info I considered using [`std::panic::set_hook`](https://doc.rust-lang.org/std/panic/fn.set_hook.html) to instantiate a custom panic handler, however this doesn't allow us to send a shutdown signal because `Fn` functions can't move variables (i.e. the shutdown sender) out of their environment. This also prevents it from receiving a `Logger`. Hence I decided to leave the panic handler untouched, but with backtraces turned on by default. I did a run through the code base with all the raw Tokio spawn functions disallowed by Clippy, and found only two instances where we bypass the `TaskExecutor`: the HTTP API and `InitializedValidators` in the VC. In both places we use `spawn_blocking` and handle the return value, so I figured that was OK for now. In terms of performance I think the overhead should be minimal. The monitor tasks will just get parked by the executor until their child resolves. I've checked that this covers Discv5, as the `TaskExecutor` gets injected into Discv5 here: https://github.com/sigp/lighthouse/blob/f9bba92db3468321b28ddd9010e26b359f88bafe/beacon_node/src/lib.rs#L125-L126
1 parent 95b1713 commit 58012f8

File tree

2 files changed

+87
-81
lines changed

2 files changed

+87
-81
lines changed

common/task_executor/src/lib.rs

Lines changed: 49 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@ mod metrics;
22

33
use futures::channel::mpsc::Sender;
44
use futures::prelude::*;
5-
use slog::{debug, o, trace};
5+
use slog::{crit, debug, o, trace};
66
use std::sync::Weak;
77
use tokio::runtime::Runtime;
88

@@ -83,34 +83,56 @@ impl TaskExecutor {
8383
self.spawn(task.map(|_| ()), name)
8484
}
8585

86-
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit`. The task is canceled
87-
/// when the corresponding exit_future `Signal` is fired/dropped.
86+
/// Spawn a task to monitor the completion of another task.
8887
///
89-
/// This function generates prometheus metrics on number of tasks and task duration.
90-
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, name: &'static str) {
91-
let exit = self.exit.clone();
88+
/// If the other task exits by panicking, then the monitor task will shut down the executor.
89+
fn spawn_monitor<R: Send>(
90+
&self,
91+
task_handle: impl Future<Output = Result<R, tokio::task::JoinError>> + Send + 'static,
92+
name: &'static str,
93+
) {
94+
let mut shutdown_sender = self.shutdown_sender();
9295
let log = self.log.clone();
9396

94-
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::ASYNC_TASKS_COUNT, &[name]) {
95-
// Task is shutdown before it completes if `exit` receives
96-
let int_gauge_1 = int_gauge.clone();
97-
let future = future::select(Box::pin(task), exit).then(move |either| {
98-
match either {
99-
future::Either::Left(_) => trace!(log, "Async task completed"; "task" => name),
100-
future::Either::Right(_) => {
101-
debug!(log, "Async task shutdown, exit received"; "task" => name)
97+
if let Some(runtime) = self.runtime.upgrade() {
98+
runtime.spawn(async move {
99+
if let Err(join_error) = task_handle.await {
100+
if let Ok(panic) = join_error.try_into_panic() {
101+
let message = panic.downcast_ref::<&str>().unwrap_or(&"<none>");
102+
103+
crit!(
104+
log,
105+
"Task panic. This is a bug!";
106+
"task_name" => name,
107+
"message" => message,
108+
"advice" => "Please check above for a backtrace and notify \
109+
the developers"
110+
);
111+
let _ = shutdown_sender
112+
.try_send(ShutdownReason::Failure("Panic (fatal error)"));
102113
}
103114
}
104-
int_gauge_1.dec();
105-
futures::future::ready(())
106115
});
116+
} else {
117+
debug!(
118+
self.log,
119+
"Couldn't spawn monitor task. Runtime shutting down"
120+
)
121+
}
122+
}
107123

108-
int_gauge.inc();
109-
if let Some(runtime) = self.runtime.upgrade() {
110-
runtime.spawn(future);
111-
} else {
112-
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
113-
}
124+
/// Spawn a future on the tokio runtime.
125+
///
126+
/// The future is wrapped in an `exit_future::Exit`. The task is canceled when the corresponding
127+
/// exit_future `Signal` is fired/dropped.
128+
///
129+
/// The future is monitored via another spawned future to ensure that it doesn't panic. In case
130+
/// of a panic, the executor will be shut down via `self.signal_tx`.
131+
///
132+
/// This function generates prometheus metrics on number of tasks and task duration.
133+
pub fn spawn(&self, task: impl Future<Output = ()> + Send + 'static, name: &'static str) {
134+
if let Some(task_handle) = self.spawn_handle(task, name) {
135+
self.spawn_monitor(task_handle, name)
114136
}
115137
}
116138

@@ -150,38 +172,11 @@ impl TaskExecutor {
150172
where
151173
F: FnOnce() + Send + 'static,
152174
{
153-
let log = self.log.clone();
154-
155-
if let Some(metric) = metrics::get_histogram(&metrics::BLOCKING_TASKS_HISTOGRAM, &[name]) {
156-
if let Some(int_gauge) = metrics::get_int_gauge(&metrics::BLOCKING_TASKS_COUNT, &[name])
157-
{
158-
let int_gauge_1 = int_gauge.clone();
159-
let timer = metric.start_timer();
160-
let join_handle = if let Some(runtime) = self.runtime.upgrade() {
161-
runtime.spawn_blocking(task)
162-
} else {
163-
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
164-
return;
165-
};
166-
167-
let future = async move {
168-
match join_handle.await {
169-
Ok(_) => trace!(log, "Blocking task completed"; "task" => name),
170-
Err(e) => debug!(log, "Blocking task failed"; "error" => %e),
171-
};
172-
timer.observe_duration();
173-
int_gauge_1.dec();
174-
};
175-
176-
int_gauge.inc();
177-
if let Some(runtime) = self.runtime.upgrade() {
178-
runtime.spawn(future);
179-
} else {
180-
debug!(self.log, "Couldn't spawn task. Runtime shutting down");
181-
}
182-
}
175+
if let Some(task_handle) = self.spawn_blocking_handle(task, name) {
176+
self.spawn_monitor(task_handle, name)
183177
}
184178
}
179+
185180
/// Spawn a future on the tokio runtime wrapped in an `exit_future::Exit` returning an optional
186181
/// join handle to the future.
187182
/// The task is canceled when the corresponding exit_future `Signal` is fired/dropped.
@@ -200,9 +195,9 @@ impl TaskExecutor {
200195
let int_gauge_1 = int_gauge.clone();
201196
let future = future::select(Box::pin(task), exit).then(move |either| {
202197
let result = match either {
203-
future::Either::Left((task, _)) => {
198+
future::Either::Left((value, _)) => {
204199
trace!(log, "Async task completed"; "task" => name);
205-
Some(task)
200+
Some(value)
206201
}
207202
future::Either::Right(_) => {
208203
debug!(log, "Async task shutdown, exit received"; "task" => name);

lighthouse/src/main.rs

Lines changed: 38 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,11 @@ fn bls_library_name() -> &'static str {
3232
}
3333

3434
fn main() {
35+
// Enable backtraces unless a RUST_BACKTRACE value has already been explicitly provided.
36+
if std::env::var("RUST_BACKTRACE").is_err() {
37+
std::env::set_var("RUST_BACKTRACE", "1");
38+
}
39+
3540
// Parse the CLI parameters.
3641
let matches = App::new("Lighthouse")
3742
.version(VERSION.replace("Lighthouse/", "").as_str())
@@ -344,20 +349,23 @@ fn run<E: EthSpec>(
344349
.map_err(|e| format!("Error serializing config: {:?}", e))?;
345350
};
346351

347-
environment.runtime().spawn(async move {
348-
if let Err(e) = ProductionBeaconNode::new(context.clone(), config).await {
349-
crit!(log, "Failed to start beacon node"; "reason" => e);
350-
// Ignore the error since it always occurs during normal operation when
351-
// shutting down.
352-
let _ = executor
353-
.shutdown_sender()
354-
.try_send(ShutdownReason::Failure("Failed to start beacon node"));
355-
} else if shutdown_flag {
356-
let _ = executor.shutdown_sender().try_send(ShutdownReason::Success(
357-
"Beacon node immediate shutdown triggered.",
358-
));
359-
}
360-
});
352+
executor.clone().spawn(
353+
async move {
354+
if let Err(e) = ProductionBeaconNode::new(context.clone(), config).await {
355+
crit!(log, "Failed to start beacon node"; "reason" => e);
356+
// Ignore the error since it always occurs during normal operation when
357+
// shutting down.
358+
let _ = executor
359+
.shutdown_sender()
360+
.try_send(ShutdownReason::Failure("Failed to start beacon node"));
361+
} else if shutdown_flag {
362+
let _ = executor.shutdown_sender().try_send(ShutdownReason::Success(
363+
"Beacon node immediate shutdown triggered.",
364+
));
365+
}
366+
},
367+
"beacon_node",
368+
);
361369
}
362370
("validator_client", Some(matches)) => {
363371
let context = environment.core_context();
@@ -374,19 +382,22 @@ fn run<E: EthSpec>(
374382
.map_err(|e| format!("Error serializing config: {:?}", e))?;
375383
};
376384
if !shutdown_flag {
377-
environment.runtime().spawn(async move {
378-
if let Err(e) = ProductionValidatorClient::new(context, config)
379-
.await
380-
.and_then(|mut vc| vc.start_service())
381-
{
382-
crit!(log, "Failed to start validator client"; "reason" => e);
383-
// Ignore the error since it always occurs during normal operation when
384-
// shutting down.
385-
let _ = executor
386-
.shutdown_sender()
387-
.try_send(ShutdownReason::Failure("Failed to start validator client"));
388-
}
389-
});
385+
executor.clone().spawn(
386+
async move {
387+
if let Err(e) = ProductionValidatorClient::new(context, config)
388+
.await
389+
.and_then(|mut vc| vc.start_service())
390+
{
391+
crit!(log, "Failed to start validator client"; "reason" => e);
392+
// Ignore the error since it always occurs during normal operation when
393+
// shutting down.
394+
let _ = executor.shutdown_sender().try_send(ShutdownReason::Failure(
395+
"Failed to start validator client",
396+
));
397+
}
398+
},
399+
"validator_client",
400+
);
390401
} else {
391402
let _ = executor.shutdown_sender().try_send(ShutdownReason::Success(
392403
"Validator client immediate shutdown triggered.",

0 commit comments

Comments
 (0)