Skip to content

Alternative lockless Mutex implementation (2nd try) #11520

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 9 commits into from
3 changes: 2 additions & 1 deletion src/etc/licenseck.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
"libstd/sync/mpsc_queue.rs", # BSD
"libstd/sync/spsc_queue.rs", # BSD
"libstd/sync/mpmc_bounded_queue.rs", # BSD
"libstd/sync/mpsc_intrusive.rs", # BSD
]

def check_license(name, contents):
Expand All @@ -59,4 +60,4 @@ def check_license(name, contents):
if (boilerplate.find(license1) == -1 or boilerplate.find(license2) == -1) and \
(boilerplate.find(license3) == -1 or boilerplate.find(license4) == -1):
return False
return True
return True
51 changes: 28 additions & 23 deletions src/libextra/sync.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,12 +19,13 @@


use std::borrow;
use std::unstable::sync::Exclusive;
use std::cast;
use std::sync::arc::UnsafeArc;
use std::sync::atomics;
use std::sync;
use std::unstable::finally::Finally;
use std::util;
use std::util::NonCopyable;
use std::util;

/****************************************************************************
* Internals
Expand Down Expand Up @@ -52,7 +53,7 @@ impl WaitQueue {
Some(ch) => {
// Send a wakeup signal. If the waiter was killed, its port will
// have closed. Keep trying until we get a live task.
if ch.try_send_deferred(()) {
if ch.try_send(()) {
true
} else {
self.signal()
Expand All @@ -68,7 +69,7 @@ impl WaitQueue {
match self.head.try_recv() {
None => break,
Some(ch) => {
if ch.try_send_deferred(()) {
if ch.try_send(()) {
count += 1;
}
}
Expand All @@ -79,36 +80,44 @@ impl WaitQueue {

fn wait_end(&self) -> WaitEnd {
let (wait_end, signal_end) = Chan::new();
assert!(self.tail.try_send_deferred(signal_end));
assert!(self.tail.try_send(signal_end));
wait_end
}
}

// The building-block used to make semaphores, mutexes, and rwlocks.
#[doc(hidden)]
struct SemInner<Q> {
lock: sync::Mutex,
count: int,
waiters: WaitQueue,
waiters: WaitQueue,
// Can be either unit or another waitqueue. Some sems shouldn't come with
// a condition variable attached, others should.
blocked: Q
blocked: Q
}

#[doc(hidden)]
struct Sem<Q>(Exclusive<SemInner<Q>>);
struct Sem<Q>(UnsafeArc<SemInner<Q>>);

#[doc(hidden)]
impl<Q:Send> Sem<Q> {
fn new(count: int, q: Q) -> Sem<Q> {
Sem(Exclusive::new(SemInner {
count: count, waiters: WaitQueue::new(), blocked: q }))
Sem(UnsafeArc::new(SemInner {
count: count,
waiters: WaitQueue::new(),
blocked: q,
lock: sync::Mutex::new(),
}))
}

unsafe fn with(&self, f: |&mut SemInner<Q>|) {
let Sem(ref arc) = *self;
let state = arc.get();
let _g = (*state).lock.lock();
f(cast::transmute(state));
}

pub fn acquire(&self) {
unsafe {
let mut waiter_nobe = None;
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count -= 1;
if state.count < 0 {
// Create waiter nobe, enqueue ourself, and tell
Expand All @@ -127,8 +136,7 @@ impl<Q:Send> Sem<Q> {

pub fn release(&self) {
unsafe {
let Sem(ref lock) = *self;
lock.with(|state| {
self.with(|state| {
state.count += 1;
if state.count <= 0 {
state.waiters.signal();
Expand Down Expand Up @@ -208,8 +216,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
// Release lock, 'atomically' enqueuing ourselves in so doing.
unsafe {
let Sem(ref queue) = *self.sem;
queue.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// Drop the lock.
state.count += 1;
Expand Down Expand Up @@ -251,8 +258,7 @@ impl<'a> Condvar<'a> {
unsafe {
let mut out_of_bounds = None;
let mut result = false;
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
result = state.blocked[condvar_id].signal();
} else {
Expand All @@ -274,8 +280,7 @@ impl<'a> Condvar<'a> {
let mut out_of_bounds = None;
let mut queue = None;
unsafe {
let Sem(ref lock) = *self.sem;
lock.with(|state| {
self.sem.with(|state| {
if condvar_id < state.blocked.len() {
// To avoid :broadcast_heavy, we make a new waitqueue,
// swap it out with the old one, and broadcast on the
Expand Down
5 changes: 3 additions & 2 deletions src/libgreen/simple.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ impl Runtime for SimpleTask {
// See libnative/task.rs for what's going on here with the `awoken`
// field and the while loop around wait()
unsafe {
let mut guard = (*me).lock.lock();
let guard = (*me).lock.lock();
(*me).awoken = false;
match f(task) {
Ok(()) => {
Expand All @@ -54,7 +54,7 @@ impl Runtime for SimpleTask {
}
Local::put(cur_task);
}
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
let me = &mut *self as *mut SimpleTask;
to_wake.put_runtime(self as ~Runtime);
unsafe {
Expand All @@ -76,6 +76,7 @@ impl Runtime for SimpleTask {
}
fn local_io<'a>(&'a mut self) -> Option<rtio::LocalIo<'a>> { None }
fn stack_bounds(&self) -> (uint, uint) { fail!() }
fn can_block(&self) -> bool { true }
fn wrap(~self) -> ~Any { fail!() }
}

Expand Down
13 changes: 5 additions & 8 deletions src/libgreen/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -376,7 +376,7 @@ impl Runtime for GreenTask {
}
}

fn reawaken(mut ~self, to_wake: ~Task, can_resched: bool) {
fn reawaken(mut ~self, to_wake: ~Task) {
self.put_task(to_wake);
assert!(self.sched.is_none());

Expand Down Expand Up @@ -409,15 +409,10 @@ impl Runtime for GreenTask {
match running_task.maybe_take_runtime::<GreenTask>() {
Some(mut running_green_task) => {
running_green_task.put_task(running_task);
let mut sched = running_green_task.sched.take_unwrap();
let sched = running_green_task.sched.take_unwrap();

if sched.pool_id == self.pool_id {
if can_resched {
sched.run_task(running_green_task, self);
} else {
sched.enqueue_task(self);
running_green_task.put_with_sched(sched);
}
sched.run_task(running_green_task, self);
} else {
self.reawaken_remotely();

Expand Down Expand Up @@ -462,6 +457,8 @@ impl Runtime for GreenTask {
c.current_stack_segment.end() as uint)
}

fn can_block(&self) -> bool { false }

fn wrap(~self) -> ~Any { self as ~Any }
}

Expand Down
7 changes: 4 additions & 3 deletions src/libnative/bookeeping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,11 @@
//! The green counterpart for this is bookeeping on sched pools.

use std::sync::atomics;
use std::unstable::mutex::{Mutex, MUTEX_INIT};
use std::unstable::mutex::{Cond, COND_INIT, Mutex, MUTEX_INIT};

static mut TASK_COUNT: atomics::AtomicUint = atomics::INIT_ATOMIC_UINT;
static mut TASK_LOCK: Mutex = MUTEX_INIT;
static mut TASK_COND: Cond = COND_INIT;

pub fn increment() {
unsafe { TASK_COUNT.fetch_add(1, atomics::SeqCst); }
Expand All @@ -30,7 +31,7 @@ pub fn decrement() {
unsafe {
if TASK_COUNT.fetch_sub(1, atomics::SeqCst) == 1 {
TASK_LOCK.lock();
TASK_LOCK.signal();
TASK_COND.signal();
TASK_LOCK.unlock();
}
}
Expand All @@ -42,7 +43,7 @@ pub fn wait_for_other_tasks() {
unsafe {
TASK_LOCK.lock();
while TASK_COUNT.load(atomics::SeqCst) > 0 {
TASK_LOCK.wait();
TASK_COND.wait(&TASK_LOCK);
}
TASK_LOCK.unlock();
}
Expand Down
2 changes: 1 addition & 1 deletion src/libnative/io/net.rs
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,7 @@ pub fn init() {
}

unsafe {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;
INIT.doit(|| {
let mut data: WSADATA = intrinsics::init();
Expand Down
14 changes: 9 additions & 5 deletions src/libnative/task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::rt::task::{Task, BlockedTask};
use std::rt::thread::Thread;
use std::rt;
use std::task::TaskOpts;
use std::unstable::mutex::Mutex;
use std::unstable::mutex::{Mutex, Cond};
use std::unstable::stack;

use io;
Expand All @@ -41,6 +41,7 @@ pub fn new(stack_bounds: (uint, uint)) -> ~Task {
fn ops() -> ~Ops {
~Ops {
lock: unsafe { Mutex::new() },
cond: unsafe { Cond::new() },
awoken: false,
io: io::IoFactory::new(),
// these *should* get overwritten
Expand Down Expand Up @@ -112,6 +113,7 @@ pub fn spawn_opts(opts: TaskOpts, f: proc()) {
// structure is allocated once per task.
struct Ops {
lock: Mutex, // native synchronization
cond: Cond,
awoken: bool, // used to prevent spurious wakeups
io: io::IoFactory, // local I/O factory

Expand Down Expand Up @@ -142,6 +144,8 @@ impl rt::Runtime for Ops {

fn stack_bounds(&self) -> (uint, uint) { self.stack_bounds }

fn can_block(&self) -> bool { true }

// This function gets a little interesting. There are a few safety and
// ownership violations going on here, but this is all done in the name of
// shared state. Additionally, all of the violations are protected with a
Expand Down Expand Up @@ -196,7 +200,7 @@ impl rt::Runtime for Ops {
match f(task) {
Ok(()) => {
while !(*me).awoken {
(*me).lock.wait();
(*me).cond.wait(&(*me).lock);
}
}
Err(task) => { cast::forget(task.wake()); }
Expand All @@ -216,7 +220,7 @@ impl rt::Runtime for Ops {
}
});
while success && !(*me).awoken {
(*me).lock.wait();
(*me).cond.wait(&(*me).lock);
}
(*me).lock.unlock();
}
Expand All @@ -230,14 +234,14 @@ impl rt::Runtime for Ops {

// See the comments on `deschedule` for why the task is forgotten here, and
// why it's valid to do so.
fn reawaken(mut ~self, mut to_wake: ~Task, _can_resched: bool) {
fn reawaken(mut ~self, mut to_wake: ~Task) {
unsafe {
let me = &mut *self as *mut Ops;
to_wake.put_runtime(self as ~rt::Runtime);
cast::forget(to_wake);
(*me).lock.lock();
(*me).awoken = true;
(*me).lock.signal();
(*me).cond.signal();
(*me).lock.unlock();
}
}
Expand Down
2 changes: 1 addition & 1 deletion src/librustc/back/link.rs
Original file line number Diff line number Diff line change
Expand Up @@ -311,7 +311,7 @@ pub mod write {
}

unsafe fn configure_llvm(sess: Session) {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;

// Copy what clan does by turning on loop vectorization at O2 and
Expand Down
2 changes: 1 addition & 1 deletion src/librustc/middle/trans/base.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3295,7 +3295,7 @@ pub fn trans_crate(sess: session::Session,
output: &Path) -> CrateTranslation {
// Before we touch LLVM, make sure that multithreading is enabled.
unsafe {
use std::unstable::mutex::{Once, ONCE_INIT};
use std::sync::{Once, ONCE_INIT};
static mut INIT: Once = ONCE_INIT;
static mut POISONED: bool = false;
INIT.doit(|| {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,7 @@ fn wait_until_woken_after(slot: *mut Option<BlockedTask>, f: ||) {

fn wakeup(slot: &mut Option<BlockedTask>) {
assert!(slot.is_some());
slot.take_unwrap().wake().map(|t| t.reawaken(true));
slot.take_unwrap().wake().map(|t| t.reawaken());
}

pub struct Request {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/queue.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ extern fn async_cb(handle: *uvll::uv_async_t, status: c_int) {
loop {
match state.consumer.pop() {
mpsc::Data(Task(task)) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
mpsc::Data(Increment) => unsafe {
if state.refcnt == 0 {
Expand Down
2 changes: 1 addition & 1 deletion src/librustuv/timer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ extern fn timer_cb(handle: *uvll::uv_timer_t, status: c_int) {

match timer.action.take_unwrap() {
WakeTask(task) => {
task.wake().map(|t| t.reawaken(true));
task.wake().map(|t| t.reawaken());
}
SendOnce(chan) => { chan.try_send(()); }
SendMany(chan, id) => {
Expand Down
Loading