Skip to content

Commit 629f6e8

Browse files
committed
Implement KillHandle::kill() and friends (unkillable, atomically). Close #6377.
1 parent 2a99320 commit 629f6e8

File tree

2 files changed

+227
-37
lines changed

2 files changed

+227
-37
lines changed

src/libstd/rt/kill.rs

Lines changed: 150 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,15 +10,38 @@
1010

1111
//! Task death: asynchronous killing, linked failure, exit code propagation.
1212
13+
use cast;
1314
use cell::Cell;
1415
use option::{Option, Some, None};
1516
use prelude::*;
17+
use rt::task::Task;
18+
use unstable::atomics::{AtomicUint, SeqCst};
1619
use unstable::sync::{UnsafeAtomicRcBox, LittleLock};
1720
use util;
1821

22+
static KILLED_MSG: &'static str = "killed by linked failure";
23+
24+
// State values for the 'killed' and 'unkillable' atomic flags below.
25+
static KILL_RUNNING: uint = 0;
26+
static KILL_KILLED: uint = 1;
27+
static KILL_UNKILLABLE: uint = 2;
28+
1929
// FIXME(#7544)(bblum): think about the cache efficiency of this
2030
struct KillHandleInner {
21-
// ((more fields to be added in a future commit))
31+
// Is the task running, blocked, or killed? Possible values:
32+
// * KILL_RUNNING - Not unkillable, no kill pending.
33+
// * KILL_KILLED - Kill pending.
34+
// * <ptr> - A transmuted blocked ~Task pointer.
35+
// This flag is refcounted because it may also be referenced by a blocking
36+
// concurrency primitive, used to wake the task normally, whose reference
37+
// may outlive the handle's if the task is killed.
38+
killed: UnsafeAtomicRcBox<AtomicUint>,
39+
// Has the task deferred kill signals? This flag guards the above one.
40+
// Possible values:
41+
// * KILL_RUNNING - Not unkillable, no kill pending.
42+
// * KILL_KILLED - Kill pending.
43+
// * KILL_UNKILLABLE - Kill signals deferred.
44+
unkillable: AtomicUint,
2245

2346
// Shared state between task and children for exit code propagation. These
2447
// are here so we can re-use the kill handle to implement watched children
@@ -47,20 +70,73 @@ pub struct Death {
4770
// Action to be done with the exit code. If set, also makes the task wait
4871
// until all its watched children exit before collecting the status.
4972
on_exit: Option<~fn(bool)>,
73+
// nesting level counter for task::unkillable calls (0 == killable).
74+
unkillable: int,
75+
// nesting level counter for task::atomically calls (0 == can yield).
76+
wont_sleep: int,
5077
}
5178

5279
impl KillHandle {
5380
pub fn new() -> KillHandle {
5481
KillHandle(UnsafeAtomicRcBox::new(KillHandleInner {
5582
// Linked failure fields
56-
// ((none yet))
83+
killed: UnsafeAtomicRcBox::new(AtomicUint::new(KILL_RUNNING)),
84+
unkillable: AtomicUint::new(KILL_RUNNING),
5785
// Exit code propagation fields
5886
any_child_failed: false,
5987
child_tombstones: None,
6088
graveyard_lock: LittleLock(),
6189
}))
6290
}
6391

92+
// Will begin unwinding if a kill signal was received, unless already_failing.
93+
// This can't be used recursively, because a task which sees a KILLED
94+
// signal must fail immediately, which an already-unkillable task can't do.
95+
#[inline]
96+
pub fn inhibit_kill(&mut self, already_failing: bool) {
97+
let inner = unsafe { &mut *self.get() };
98+
// Expect flag to contain RUNNING. If KILLED, it should stay KILLED.
99+
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
100+
match inner.unkillable.compare_and_swap(KILL_RUNNING, KILL_UNKILLABLE, SeqCst) {
101+
KILL_RUNNING => { }, // normal case
102+
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
103+
_ => rtabort!("inhibit_kill: task already unkillable"),
104+
}
105+
}
106+
107+
// Will begin unwinding if a kill signal was received, unless already_failing.
108+
#[inline]
109+
pub fn allow_kill(&mut self, already_failing: bool) {
110+
let inner = unsafe { &mut *self.get() };
111+
// Expect flag to contain UNKILLABLE. If KILLED, it should stay KILLED.
112+
// FIXME(#7544)(bblum): is it really necessary to prohibit double kill?
113+
match inner.unkillable.compare_and_swap(KILL_UNKILLABLE, KILL_RUNNING, SeqCst) {
114+
KILL_UNKILLABLE => { }, // normal case
115+
KILL_KILLED => if !already_failing { fail!(KILLED_MSG) },
116+
_ => rtabort!("allow_kill: task already killable"),
117+
}
118+
}
119+
120+
// Send a kill signal to the handle's owning task. Returns the task itself
121+
// if it was blocked and needs punted awake. To be called by other tasks.
122+
pub fn kill(&mut self) -> Option<~Task> {
123+
let inner = unsafe { &mut *self.get() };
124+
if inner.unkillable.swap(KILL_KILLED, SeqCst) == KILL_RUNNING {
125+
// Got in. Allowed to try to punt the task awake.
126+
let flag = unsafe { &mut *inner.killed.get() };
127+
match flag.swap(KILL_KILLED, SeqCst) {
128+
// Task either not blocked or already taken care of.
129+
KILL_RUNNING | KILL_KILLED => None,
130+
// Got ownership of the blocked task.
131+
task_ptr => Some(unsafe { cast::transmute(task_ptr) }),
132+
}
133+
} else {
134+
// Otherwise it was either unkillable or already killed. Somebody
135+
// else was here first who will deal with the kill signal.
136+
None
137+
}
138+
}
139+
64140
pub fn notify_immediate_failure(&mut self) {
65141
// A benign data race may happen here if there are failing sibling
66142
// tasks that were also spawned-watched. The refcount's write barriers
@@ -123,6 +199,7 @@ impl KillHandle {
123199
}
124200

125201
// NB: Takes a pthread mutex -- 'blk' not allowed to reschedule.
202+
#[inline]
126203
fn add_lazy_tombstone(parent: &mut KillHandle,
127204
blk: &fn(Option<~fn() -> bool>) -> ~fn() -> bool) {
128205

@@ -144,6 +221,8 @@ impl Death {
144221
kill_handle: Some(KillHandle::new()),
145222
watching_parent: None,
146223
on_exit: None,
224+
unkillable: 0,
225+
wont_sleep: 0,
147226
}
148227
}
149228

@@ -153,11 +232,22 @@ impl Death {
153232
kill_handle: Some(KillHandle::new()),
154233
watching_parent: self.kill_handle.clone(),
155234
on_exit: None,
235+
unkillable: 0,
236+
wont_sleep: 0,
156237
}
157238
}
158239

159240
/// Collect failure exit codes from children and propagate them to a parent.
160241
pub fn collect_failure(&mut self, mut success: bool) {
242+
// This may run after the task has already failed, so even though the
243+
// task appears to need to be killed, the scheduler should not fail us
244+
// when we block to unwrap.
245+
// (XXX: Another less-elegant reason for doing this is so that the use
246+
// of the LittleLock in reparent_children_to doesn't need to access the
247+
// unkillable flag in the kill_handle, since we'll have removed it.)
248+
rtassert!(self.unkillable == 0);
249+
self.unkillable = 1;
250+
161251
// Step 1. Decide if we need to collect child failures synchronously.
162252
do self.on_exit.take_map |on_exit| {
163253
if success {
@@ -191,6 +281,64 @@ impl Death {
191281
parent_handle.notify_immediate_failure();
192282
}
193283
};
284+
285+
// Can't use allow_kill directly; that would require the kill handle.
286+
rtassert!(self.unkillable == 1);
287+
self.unkillable = 0;
288+
}
289+
290+
/// Enter a possibly-nested unkillable section of code.
291+
/// All calls must be paired with a subsequent call to allow_kill.
292+
#[inline]
293+
pub fn inhibit_kill(&mut self, already_failing: bool) {
294+
if self.unkillable == 0 {
295+
rtassert!(self.kill_handle.is_some());
296+
self.kill_handle.get_mut_ref().inhibit_kill(already_failing);
297+
}
298+
self.unkillable += 1;
299+
}
300+
301+
/// Exit a possibly-nested unkillable section of code.
302+
/// All calls must be paired with a preceding call to inhibit_kill.
303+
#[inline]
304+
pub fn allow_kill(&mut self, already_failing: bool) {
305+
rtassert!(self.unkillable != 0);
306+
self.unkillable -= 1;
307+
if self.unkillable == 0 {
308+
rtassert!(self.kill_handle.is_some());
309+
self.kill_handle.get_mut_ref().allow_kill(already_failing);
310+
}
311+
}
312+
313+
/// Enter a possibly-nested "atomic" section of code. Just for assertions.
314+
/// All calls must be paired with a subsequent call to allow_yield.
315+
#[inline]
316+
pub fn inhibit_yield(&mut self) {
317+
self.wont_sleep += 1;
318+
}
319+
320+
/// Exit a possibly-nested "atomic" section of code. Just for assertions.
321+
/// All calls must be paired with a preceding call to inhibit_yield.
322+
#[inline]
323+
pub fn allow_yield(&mut self) {
324+
rtassert!(self.wont_sleep != 0);
325+
self.wont_sleep -= 1;
326+
}
327+
328+
/// Ensure that the task is allowed to become descheduled.
329+
#[inline]
330+
pub fn assert_may_sleep(&self) {
331+
if self.wont_sleep != 0 {
332+
rtabort!("illegal atomic-sleep: can't deschedule inside atomically()");
333+
}
334+
}
335+
}
336+
337+
impl Drop for Death {
338+
fn drop(&self) {
339+
// Mustn't be in an atomic or unkillable section at task death.
340+
rtassert!(self.unkillable == 0);
341+
rtassert!(self.wont_sleep == 0);
194342
}
195343
}
196344

src/libstd/task/mod.rs

Lines changed: 77 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,8 @@ use cmp::Eq;
4242
use comm::{stream, Chan, GenericChan, GenericPort, Port};
4343
use result::Result;
4444
use result;
45-
use rt::{context, OldTaskContext};
45+
use rt::{context, OldTaskContext, TaskContext};
46+
use rt::local::Local;
4647
use task::rt::{task_id, sched_id};
4748
use unstable::finally::Finally;
4849
use util::replace;
@@ -526,8 +527,6 @@ pub fn yield() {
526527
pub fn failing() -> bool {
527528
//! True if the running task has failed
528529
529-
use rt::{context, OldTaskContext};
530-
use rt::local::Local;
531530
use rt::task::Task;
532531

533532
match context() {
@@ -572,33 +571,59 @@ pub fn get_scheduler() -> Scheduler {
572571
* ~~~
573572
*/
574573
pub unsafe fn unkillable<U>(f: &fn() -> U) -> U {
575-
if context() == OldTaskContext {
576-
let t = rt::rust_get_task();
577-
do (|| {
578-
rt::rust_task_inhibit_kill(t);
579-
f()
580-
}).finally {
581-
rt::rust_task_allow_kill(t);
574+
use rt::task::Task;
575+
576+
match context() {
577+
OldTaskContext => {
578+
let t = rt::rust_get_task();
579+
do (|| {
580+
rt::rust_task_inhibit_kill(t);
581+
f()
582+
}).finally {
583+
rt::rust_task_allow_kill(t);
584+
}
585+
}
586+
TaskContext => {
587+
// The inhibits/allows might fail and need to borrow the task.
588+
let t = Local::unsafe_borrow::<Task>();
589+
do (|| {
590+
(*t).death.inhibit_kill((*t).unwinder.unwinding);
591+
f()
592+
}).finally {
593+
(*t).death.allow_kill((*t).unwinder.unwinding);
594+
}
582595
}
583-
} else {
584-
// FIXME #6377
585-
f()
596+
// FIXME(#3095): This should be an rtabort as soon as the scheduler
597+
// no longer uses a workqueue implemented with an Exclusive.
598+
_ => f()
586599
}
587600
}
588601

589602
/// The inverse of unkillable. Only ever to be used nested in unkillable().
590603
pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
591-
if context() == OldTaskContext {
592-
let t = rt::rust_get_task();
593-
do (|| {
594-
rt::rust_task_allow_kill(t);
595-
f()
596-
}).finally {
597-
rt::rust_task_inhibit_kill(t);
604+
use rt::task::Task;
605+
606+
match context() {
607+
OldTaskContext => {
608+
let t = rt::rust_get_task();
609+
do (|| {
610+
rt::rust_task_allow_kill(t);
611+
f()
612+
}).finally {
613+
rt::rust_task_inhibit_kill(t);
614+
}
598615
}
599-
} else {
600-
// FIXME #6377
601-
f()
616+
TaskContext => {
617+
let t = Local::unsafe_borrow::<Task>();
618+
do (|| {
619+
(*t).death.allow_kill((*t).unwinder.unwinding);
620+
f()
621+
}).finally {
622+
(*t).death.inhibit_kill((*t).unwinder.unwinding);
623+
}
624+
}
625+
// FIXME(#3095): As in unkillable().
626+
_ => f()
602627
}
603628
}
604629

@@ -607,19 +632,36 @@ pub unsafe fn rekillable<U>(f: &fn() -> U) -> U {
607632
* For use with exclusive ARCs, which use pthread mutexes directly.
608633
*/
609634
pub unsafe fn atomically<U>(f: &fn() -> U) -> U {
610-
if context() == OldTaskContext {
611-
let t = rt::rust_get_task();
612-
do (|| {
613-
rt::rust_task_inhibit_kill(t);
614-
rt::rust_task_inhibit_yield(t);
615-
f()
616-
}).finally {
617-
rt::rust_task_allow_yield(t);
618-
rt::rust_task_allow_kill(t);
635+
use rt::task::Task;
636+
637+
match context() {
638+
OldTaskContext => {
639+
let t = rt::rust_get_task();
640+
do (|| {
641+
rt::rust_task_inhibit_kill(t);
642+
rt::rust_task_inhibit_yield(t);
643+
f()
644+
}).finally {
645+
rt::rust_task_allow_yield(t);
646+
rt::rust_task_allow_kill(t);
647+
}
648+
}
649+
TaskContext => {
650+
let t = Local::unsafe_borrow::<Task>();
651+
do (|| {
652+
// It's important to inhibit kill after inhibiting yield, because
653+
// inhibit-kill might fail if we were already killed, and the
654+
// inhibit-yield must happen to match the finally's allow-yield.
655+
(*t).death.inhibit_yield();
656+
(*t).death.inhibit_kill((*t).unwinder.unwinding);
657+
f()
658+
}).finally {
659+
(*t).death.allow_kill((*t).unwinder.unwinding);
660+
(*t).death.allow_yield();
661+
}
619662
}
620-
} else {
621-
// FIXME #6377
622-
f()
663+
// FIXME(#3095): As in unkillable().
664+
_ => f()
623665
}
624666
}
625667

0 commit comments

Comments
 (0)