Skip to content

Task killing, linked failure, and exit code propagation in the new runtime. #7858

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 29 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
96c1082
Add Either::expect_{left,right}
bblum Jul 2, 2013
5a9b33a
Add Option::take_map{,_default}()
bblum Jul 2, 2013
28c9ba9
Remove redundant Atomic{Ui,I}nt types from unstable::sync
bblum Jul 2, 2013
55adc44
Add AtomicOption::fill() and AtomicOption::is_empty()
bblum Jul 2, 2013
10a400f
Reimplement ARC::unwrap() and friends.
bblum Jul 2, 2013
2a99163
Add UnsafeAtomicRcBox::try_unwrap()
bblum Jul 2, 2013
52ca256
Add KillHandle and implement exit code propagation to replace join_latch
bblum Jul 3, 2013
6882508
Add kill::Death for task death services and use it in Task.
bblum Jul 2, 2013
afc199b
Remove join_latch
bblum Jul 2, 2013
2a99320
Add tests for KillHandle
bblum Jul 3, 2013
629f6e8
Implement KillHandle::kill() and friends (unkillable, atomically). Cl…
bblum Jul 8, 2013
e80efe3
Do a task-killed check at the start of task 'timeslices'.
bblum Jul 8, 2013
0101f35
Add BlockedTask (wake, try_block, etc) in kill.rs.
bblum Jul 11, 2013
9ad1997
Change the HOF context switchers to pass a BlockedTask instead of a ~…
bblum Jul 11, 2013
a093b54
Add test::with_test_task() convenience function.
bblum Jul 11, 2013
e283c4d
Add tests for task killing and blocking.
bblum Jul 11, 2013
2a7273c
Stash a spare kill flag inside tasks, to save two atomic xadds in the…
bblum Jul 12, 2013
e2a4241
Add option::take(), the building block of the option::take_* family.
bblum Jul 13, 2013
9bbec65
Replace *rust_task ptrs in taskgroup code with TaskHandle, for transi…
bblum Jul 13, 2013
87bbcb5
(cleanup) Modernize taskgroup code for the new borrow-checker.
bblum Jul 15, 2013
6d91846
(cleanup) Don't check taskgroup generation monotonicity unless cfg(te…
bblum Jul 15, 2013
728edb5
(cleanup) impl TaskSet
bblum Jul 15, 2013
0e1be5f
Fix linked failure tests to block forever instead of looping around y…
bblum Jul 16, 2013
f3c79c4
Enable taskgroup code for newsched spawns.
bblum Jul 15, 2013
2183145
Rename TCB to Taskgroup
bblum Jul 16, 2013
7ad7911
Add watched and indestructible spawn modes.
bblum Jul 16, 2013
4bcda71
Fix warnings in src/test/bench tests. Nobody will ever care.
bblum Jul 17, 2013
621bc79
Fix warnings in stdtest and extratest. Maybe somebody will care.
bblum Jul 17, 2013
980646a
Use Option .take() or .take_unwrap() instead of util::replace where p…
Jul 17, 2013
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 72 additions & 8 deletions src/libextra/arc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,9 +50,9 @@ use std::borrow;

/// As sync::condvar, a mechanism for unlock-and-descheduling and signaling.
pub struct Condvar<'self> {
is_mutex: bool,
failed: &'self mut bool,
cond: &'self sync::Condvar<'self>
priv is_mutex: bool,
priv failed: &'self mut bool,
priv cond: &'self sync::Condvar<'self>
}

impl<'self> Condvar<'self> {
Expand Down Expand Up @@ -108,7 +108,7 @@ impl<'self> Condvar<'self> {
****************************************************************************/

/// An atomically reference counted wrapper for shared immutable state.
pub struct ARC<T> { x: UnsafeAtomicRcBox<T> }
pub struct ARC<T> { priv x: UnsafeAtomicRcBox<T> }

/// Create an atomically reference counted wrapper.
pub fn ARC<T:Freeze + Send>(data: T) -> ARC<T> {
Expand All @@ -123,6 +123,20 @@ impl<T:Freeze+Send> ARC<T> {
pub fn get<'a>(&'a self) -> &'a T {
unsafe { &*self.x.get_immut() }
}

/**
* Retrieve the data back out of the ARC. This function blocks until the
* reference given to it is the last existing one, and then unwrap the data
* instead of destroying it.
*
* If multiple tasks call unwrap, all but the first will fail. Do not call
* unwrap from a task that holds another reference to the same ARC; it is
* guaranteed to deadlock.
*/
pub fn unwrap(self) -> T {
let ARC { x: x } = self;
unsafe { x.unwrap() }
}
}

/**
Expand All @@ -143,9 +157,9 @@ impl<T:Freeze + Send> Clone for ARC<T> {
****************************************************************************/

#[doc(hidden)]
struct MutexARCInner<T> { lock: Mutex, failed: bool, data: T }
struct MutexARCInner<T> { priv lock: Mutex, priv failed: bool, priv data: T }
/// An ARC with mutable data protected by a blocking mutex.
struct MutexARC<T> { x: UnsafeAtomicRcBox<MutexARCInner<T>> }
struct MutexARC<T> { priv x: UnsafeAtomicRcBox<MutexARCInner<T>> }

/// Create a mutex-protected ARC with the supplied data.
pub fn MutexARC<T:Send>(user_data: T) -> MutexARC<T> {
Expand Down Expand Up @@ -225,6 +239,22 @@ impl<T:Send> MutexARC<T> {
cond: cond })
}
}

/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc.
*/
pub fn unwrap(self) -> T {
let MutexARC { x: x } = self;
let inner = unsafe { x.unwrap() };
let MutexARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned MutexARC - another task failed inside!");
}
data
}
}

// Common code for {mutex.access,rwlock.write}{,_cond}.
Expand Down Expand Up @@ -268,7 +298,7 @@ fn PoisonOnFail<'r>(failed: &'r mut bool) -> PoisonOnFail {
****************************************************************************/

#[doc(hidden)]
struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
struct RWARCInner<T> { priv lock: RWlock, priv failed: bool, priv data: T }
/**
* A dual-mode ARC protected by a reader-writer lock. The data can be accessed
* mutably or immutably, and immutably-accessing tasks may run concurrently.
Expand All @@ -278,7 +308,7 @@ struct RWARCInner<T> { lock: RWlock, failed: bool, data: T }
#[mutable] // XXX remove after snap
#[no_freeze]
struct RWARC<T> {
x: UnsafeAtomicRcBox<RWARCInner<T>>,
priv x: UnsafeAtomicRcBox<RWARCInner<T>>,
}

/// Create a reader/writer ARC with the supplied data.
Expand Down Expand Up @@ -429,6 +459,23 @@ impl<T:Freeze + Send> RWARC<T> {
}
}
}

/**
* Retrieves the data, blocking until all other references are dropped,
* exactly as arc::unwrap.
*
* Will additionally fail if another task has failed while accessing the arc
* in write mode.
*/
pub fn unwrap(self) -> T {
let RWARC { x: x, _ } = self;
let inner = unsafe { x.unwrap() };
let RWARCInner { failed: failed, data: data, _ } = inner;
if failed {
fail!(~"Can't unwrap poisoned RWARC - another task failed inside!")
}
data
}
}

// Borrowck rightly complains about immutably aliasing the rwlock in order to
Expand Down Expand Up @@ -611,6 +658,23 @@ mod tests {
}
}
#[test] #[should_fail] #[ignore(cfg(windows))]
pub fn test_mutex_arc_unwrap_poison() {
let arc = MutexARC(1);
let arc2 = ~(&arc).clone();
let (p, c) = comm::stream();
do task::spawn {
unsafe {
do arc2.access |one| {
c.send(());
assert!(*one == 2);
}
}
}
let _ = p.recv();
let one = arc.unwrap();
assert!(one == 1);
}
#[test] #[should_fail] #[ignore(cfg(windows))]
fn test_rw_arc_poison_wr() {
let arc = ~RWARC(1);
let arc2 = (*arc).clone();
Expand Down
2 changes: 1 addition & 1 deletion src/libextra/dlist.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,7 @@ impl<T> Deque<T> for DList<T> {
///
/// O(1)
fn pop_front(&mut self) -> Option<T> {
match util::replace(&mut self.list_head, None) {
match self.list_head.take() {
None => None,
Some(old_head) => {
self.length -= 1;
Expand Down
5 changes: 2 additions & 3 deletions src/libextra/ringbuf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@
//! extra::container::Deque`.

use std::num;
use std::util;
use std::uint;
use std::vec;
use std::iterator::{FromIterator, InvertIterator};
Expand Down Expand Up @@ -72,7 +71,7 @@ impl<T> Deque<T> for RingBuf<T> {

/// Remove and return the first element in the RingBuf, or None if it is empty
fn pop_front(&mut self) -> Option<T> {
let result = util::replace(&mut self.elts[self.lo], None);
let result = self.elts[self.lo].take();
if result.is_some() {
self.lo = (self.lo + 1u) % self.elts.len();
self.nelts -= 1u;
Expand All @@ -85,7 +84,7 @@ impl<T> Deque<T> for RingBuf<T> {
if self.nelts > 0 {
self.nelts -= 1;
let hi = self.raw_index(self.nelts);
util::replace(&mut self.elts[hi], None)
self.elts[hi].take()
} else {
None
}
Expand Down
2 changes: 1 addition & 1 deletion src/libextra/smallintmap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,7 @@ impl<V> MutableMap<uint, V> for SmallIntMap<V> {
if *key >= self.v.len() {
return None;
}
replace(&mut self.v[*key], None)
self.v[*key].take()
}
}

Expand Down
2 changes: 0 additions & 2 deletions src/libextra/sort.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1020,8 +1020,6 @@ mod big_tests {

use sort::*;

use std::cast::unsafe_copy;
use std::local_data;
use std::rand::RngUtil;
use std::rand;
use std::uint;
Expand Down
2 changes: 1 addition & 1 deletion src/libextra/treemap.rs
Original file line number Diff line number Diff line change
Expand Up @@ -697,7 +697,7 @@ fn remove<K: TotalOrd, V>(node: &mut Option<~TreeNode<K, V>>,
}
}
}
return match replace(node, None) {
return match node.take() {
Some(~TreeNode{value, _}) => Some(value), None => fail!()
};
}
Expand Down
5 changes: 2 additions & 3 deletions src/libextra/workcache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ use std::result;
use std::run;
use std::task;
use std::to_bytes;
use std::util::replace;

/**
*
Expand Down Expand Up @@ -353,7 +352,7 @@ impl TPrep for Prep {

_ => {
let (port, chan) = oneshot();
let blk = replace(&mut bo, None).unwrap();
let blk = bo.take_unwrap();
let chan = Cell::new(chan);

do task::spawn {
Expand Down Expand Up @@ -385,7 +384,7 @@ fn unwrap<T:Send +
Decodable<json::Decoder>>( // FIXME(#5121)
w: Work<T>) -> T {
let mut ww = w;
let s = replace(&mut ww.res, None);
let s = ww.res.take();

match s {
None => fail!(),
Expand Down
3 changes: 1 addition & 2 deletions src/libstd/cell.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use cast::transmute_mut;
use prelude::*;
use util::replace;

/*
A dynamic, mutable location.
Expand Down Expand Up @@ -48,7 +47,7 @@ impl<T> Cell<T> {
fail!("attempt to take an empty cell");
}

replace(&mut this.value, None).unwrap()
this.value.take_unwrap()
}

/// Returns the value, failing if the cell is full.
Expand Down
19 changes: 7 additions & 12 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -242,8 +242,7 @@ impl<T: Send> GenericChan<T> for SharedChan<T> {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.send(x.unwrap())
chan.send(xx.take_unwrap())
}
}
}
Expand All @@ -259,8 +258,7 @@ impl<T: Send> GenericSmartChan<T> for SharedChan<T> {
unsafe {
let mut xx = Some(x);
do chan.with_imm |chan| {
let x = replace(&mut xx, None);
chan.try_send(x.unwrap())
chan.try_send(xx.take_unwrap())
}
}
}
Expand Down Expand Up @@ -372,7 +370,6 @@ mod pipesy {
use pipes::{recv, try_recv, peek, PacketHeader};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable};
use cast::transmute_mut;
use util::replace;

/*proto! oneshot (
Oneshot:send<T:Send> {
Expand Down Expand Up @@ -638,8 +635,7 @@ mod pipesy {
fn send(&self, x: T) {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
*self_endp = Some(streamp::client::data(endp.unwrap(), x))
*self_endp = Some(streamp::client::data(self_endp.take_unwrap(), x))
}
}
}
Expand All @@ -649,8 +645,7 @@ mod pipesy {
fn try_send(&self, x: T) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
match streamp::client::try_data(endp.unwrap(), x) {
match streamp::client::try_data(self_endp.take_unwrap(), x) {
Some(next) => {
*self_endp = Some(next);
true
Expand All @@ -666,7 +661,7 @@ mod pipesy {
fn recv(&self) -> T {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
let endp = self_endp.take();
let streamp::data(x, endp) = recv(endp.unwrap());
*self_endp = Some(endp);
x
Expand All @@ -677,7 +672,7 @@ mod pipesy {
fn try_recv(&self) -> Option<T> {
unsafe {
let self_endp = transmute_mut(&self.endp);
let endp = replace(self_endp, None);
let endp = self_endp.take();
match try_recv(endp.unwrap()) {
Some(streamp::data(x, endp)) => {
*self_endp = Some(endp);
Expand All @@ -694,7 +689,7 @@ mod pipesy {
fn peek(&self) -> bool {
unsafe {
let self_endp = transmute_mut(&self.endp);
let mut endp = replace(self_endp, None);
let mut endp = self_endp.take();
let peek = match endp {
Some(ref mut endp) => peek(endp),
None => fail!("peeking empty stream")
Expand Down
32 changes: 26 additions & 6 deletions src/libstd/either.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ use cmp::Eq;
use iterator::IteratorUtil;
use result::Result;
use result;
use str::StrSlice;
use vec;
use vec::{OwnedVector, ImmutableVector};

Expand Down Expand Up @@ -121,24 +122,37 @@ pub fn is_right<T, U>(eith: &Either<T, U>) -> bool {
}
}

/// Retrieves the value in the left branch. Fails if the either is Right.
/// Retrieves the value in the left branch.
/// Fails with a specified reason if the either is Right.
#[inline]
pub fn unwrap_left<T,U>(eith: Either<T,U>) -> T {
pub fn expect_left<T,U>(eith: Either<T,U>, reason: &str) -> T {
match eith {
Left(x) => x,
Right(_) => fail!("either::unwrap_left Right")
Right(_) => fail!(reason.to_owned())
}
}

/// Retrieves the value in the right branch. Fails if the either is Left.
/// Retrieves the value in the left branch. Fails if the either is Right.
#[inline]
pub fn unwrap_right<T,U>(eith: Either<T,U>) -> U {
pub fn unwrap_left<T,U>(eith: Either<T,U>) -> T {
expect_left(eith, "either::unwrap_left Right")
}

/// Retrieves the value in the right branch.
/// Fails with a specified reason if the either is Left.
#[inline]
pub fn expect_right<T,U>(eith: Either<T,U>, reason: &str) -> U {
match eith {
Right(x) => x,
Left(_) => fail!("either::unwrap_right Left")
Left(_) => fail!(reason.to_owned())
}
}

/// Retrieves the value in the right branch. Fails if the either is Left.
pub fn unwrap_right<T,U>(eith: Either<T,U>) -> U {
expect_right(eith, "either::unwrap_right Left")
}

impl<T, U> Either<T, U> {
#[inline]
pub fn either<V>(&self, f_left: &fn(&T) -> V, f_right: &fn(&U) -> V) -> V {
Expand All @@ -157,9 +171,15 @@ impl<T, U> Either<T, U> {
#[inline]
pub fn is_right(&self) -> bool { is_right(self) }

#[inline]
pub fn expect_left(self, reason: &str) -> T { expect_left(self, reason) }

#[inline]
pub fn unwrap_left(self) -> T { unwrap_left(self) }

#[inline]
pub fn expect_right(self, reason: &str) -> U { expect_right(self, reason) }

#[inline]
pub fn unwrap_right(self) -> U { unwrap_right(self) }
}
Expand Down
Loading