Skip to content

More newsched fixes #8282

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 1 addition & 8 deletions src/libextra/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@ Higher level communication abstractions.


use std::comm::{GenericChan, GenericSmartChan, GenericPort};
use std::comm::{Chan, Port, Selectable, Peekable};
use std::comm::{Chan, Port, Peekable};
use std::comm;
use std::pipes;

/// An extension of `pipes::stream` that allows both sending and receiving.
pub struct DuplexStream<T, U> {
Expand Down Expand Up @@ -75,12 +74,6 @@ impl<T:Send,U:Send> Peekable<U> for DuplexStream<T, U> {
}
}

impl<T:Send,U:Send> Selectable for DuplexStream<T, U> {
fn header(&mut self) -> *mut pipes::PacketHeader {
self.port.header()
}
}

/// Creates a bidirectional stream.
pub fn DuplexStream<T:Send,U:Send>()
-> (DuplexStream<T, U>, DuplexStream<U, T>)
Expand Down
110 changes: 2 additions & 108 deletions src/libstd/comm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ Message passing

#[allow(missing_doc)];

use cast::transmute;
use either::{Either, Left, Right};
use kinds::Send;
use option::{Option, Some};
Expand All @@ -23,12 +22,6 @@ pub use rt::comm::SendDeferred;
use rtcomm = rt::comm;
use rt;

use pipes::{wait_many, PacketHeader};

// FIXME #5160: Making this public exposes some plumbing from
// pipes. Needs some refactoring
pub use pipes::Selectable;

/// A trait for things that can send multiple messages.
pub trait GenericChan<T> {
/// Sends a message.
Expand Down Expand Up @@ -146,15 +139,6 @@ impl<T: Send> Peekable<T> for Port<T> {
}
}

impl<T: Send> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.inner {
Left(ref mut port) => port.header(),
Right(_) => fail!("can't select on newsched ports")
}
}
}

/// A channel that can be shared between many senders.
pub struct SharedChan<T> {
inner: Either<Exclusive<pipesy::Chan<T>>, rtcomm::SharedChan<T>>
Expand Down Expand Up @@ -318,8 +302,8 @@ mod pipesy {

use kinds::Send;
use option::{Option, Some, None};
use pipes::{recv, try_recv, peek, PacketHeader};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable, Selectable};
use pipes::{recv, try_recv, peek};
use super::{GenericChan, GenericSmartChan, GenericPort, Peekable};
use cast::transmute_mut;

/*proto! oneshot (
Expand Down Expand Up @@ -651,103 +635,13 @@ mod pipesy {
}
}

impl<T: Send> Selectable for Port<T> {
fn header(&mut self) -> *mut PacketHeader {
match self.endp {
Some(ref mut endp) => endp.header(),
None => fail!("peeking empty stream")
}
}
}

}

/// Returns the index of an endpoint that is ready to receive.
pub fn selecti<T: Selectable>(endpoints: &mut [T]) -> uint {
wait_many(endpoints)
}

/// Returns 0 or 1 depending on which endpoint is ready to receive
pub fn select2i<A:Selectable, B:Selectable>(a: &mut A, b: &mut B)
-> Either<(), ()> {
let mut endpoints = [ a.header(), b.header() ];
match wait_many(endpoints) {
0 => Left(()),
1 => Right(()),
_ => fail!("wait returned unexpected index"),
}
}

/// Receive a message from one of two endpoints.
pub trait Select2<T: Send, U: Send> {
/// Receive a message or return `None` if a connection closes.
fn try_select(&mut self) -> Either<Option<T>, Option<U>>;
/// Receive a message or fail if a connection closes.
fn select(&mut self) -> Either<T, U>;
}

impl<T:Send,
U:Send,
Left:Selectable + GenericPort<T>,
Right:Selectable + GenericPort<U>>
Select2<T, U>
for (Left, Right) {
fn select(&mut self) -> Either<T, U> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left(lp.recv()),
Right(()) => Right(rp.recv()),
}
}
}
}
}

fn try_select(&mut self) -> Either<Option<T>, Option<U>> {
// XXX: Bad borrow check workaround.
unsafe {
let this: &(Left, Right) = transmute(self);
match *this {
(ref lp, ref rp) => {
let lp: &mut Left = transmute(lp);
let rp: &mut Right = transmute(rp);
match select2i(lp, rp) {
Left(()) => Left (lp.try_recv()),
Right(()) => Right(rp.try_recv()),
}
}
}
}
}
}

#[cfg(test)]
mod test {
use either::Right;
use super::{Chan, Port, oneshot, stream};

#[test]
fn test_select2() {
let (p1, c1) = stream();
let (p2, c2) = stream();

c1.send(~"abc");

let mut tuple = (p1, p2);
match tuple.select() {
Right(_) => fail!(),
_ => (),
}

c2.send(123);
}

#[test]
fn test_oneshot() {
let (p, c) = oneshot();
Expand Down
44 changes: 0 additions & 44 deletions src/libstd/pipes.rs
Original file line number Diff line number Diff line change
Expand Up @@ -868,47 +868,3 @@ pub mod rt {
pub fn make_some<T>(val: T) -> Option<T> { Some(val) }
pub fn make_none<T>() -> Option<T> { None }
}

#[cfg(test)]
mod test {
use either::Right;
use comm::{Chan, Port, oneshot, recv_one, stream, Select2,
GenericChan, Peekable};

#[test]
fn test_select2() {
let (p1, c1) = stream();
let (p2, c2) = stream();

c1.send(~"abc");

let mut tuple = (p1, p2);
match tuple.select() {
Right(_) => fail!(),
_ => (),
}

c2.send(123);
}

#[test]
fn test_oneshot() {
let (p, c) = oneshot();

c.send(());

recv_one(p)
}

#[test]
fn test_peek_terminated() {
let (port, chan): (Port<int>, Chan<int>) = stream();

{
// Destroy the channel
let _chan = chan;
}

assert!(!port.peek());
}
}
81 changes: 45 additions & 36 deletions src/libstd/rt/local.rs
Original file line number Diff line number Diff line change
Expand Up @@ -126,63 +126,72 @@ impl Local for IoFactoryObject {

#[cfg(test)]
mod test {
use unstable::run_in_bare_thread;
use rt::test::*;
use super::*;
use rt::task::Task;
use rt::local_ptr;

#[test]
fn thread_local_task_smoke_test() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}
}

#[test]
fn thread_local_task_two_instances() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);
let task: ~Task = Local::take();
cleanup_task(task);
}

}

#[test]
fn borrow_smoke_test() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

unsafe {
let _task: *mut Task = Local::unsafe_borrow();
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

unsafe {
let _task: *mut Task = Local::unsafe_borrow();
}
let task: ~Task = Local::take();
cleanup_task(task);
}
let task: ~Task = Local::take();
cleanup_task(task);
}

#[test]
fn borrow_with_return() {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

let res = do Local::borrow::<Task,bool> |_task| {
true
};
assert!(res)
let task: ~Task = Local::take();
cleanup_task(task);
do run_in_bare_thread {
local_ptr::init_tls_key();
let mut sched = ~new_test_uv_sched();
let task = ~Task::new_root(&mut sched.stack_pool, || {});
Local::put(task);

let res = do Local::borrow::<Task,bool> |_task| {
true
};
assert!(res)
let task: ~Task = Local::take();
cleanup_task(task);
}
}

}
Expand Down
26 changes: 15 additions & 11 deletions src/libstd/rt/local_ptr.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,9 @@ pub unsafe fn put<T>(sched: ~T) {
pub unsafe fn take<T>() -> ~T {
let key = tls_key();
let void_ptr: *mut c_void = tls::get(key);
rtassert!(void_ptr.is_not_null());
if void_ptr.is_null() {
rtabort!("thread-local pointer is null. bogus!");
}
let ptr: ~T = cast::transmute(void_ptr);
tls::set(key, ptr::mut_null());
return ptr;
Expand All @@ -68,8 +70,8 @@ pub fn exists() -> bool {
}
}

/// Borrow the thread-local scheduler from thread-local storage.
/// While the scheduler is borrowed it is not available in TLS.
/// Borrow the thread-local value from thread-local storage.
/// While the value is borrowed it is not available in TLS.
///
/// # Safety note
///
Expand All @@ -88,21 +90,23 @@ pub unsafe fn borrow<T>(f: &fn(&mut T)) {
}
}

/// Borrow a mutable reference to the thread-local Scheduler
/// Borrow a mutable reference to the thread-local value
///
/// # Safety Note
///
/// Because this leaves the Scheduler in thread-local storage it is possible
/// Because this leaves the value in thread-local storage it is possible
/// For the Scheduler pointer to be aliased
pub unsafe fn unsafe_borrow<T>() -> *mut T {
let key = tls_key();
let mut void_sched: *mut c_void = tls::get(key);
rtassert!(void_sched.is_not_null());
let mut void_ptr: *mut c_void = tls::get(key);
if void_ptr.is_null() {
rtabort!("thread-local pointer is null. bogus!");
}
{
let sched: *mut *mut c_void = &mut void_sched;
let sched: *mut ~T = sched as *mut ~T;
let sched: *mut T = &mut **sched;
return sched;
let ptr: *mut *mut c_void = &mut void_ptr;
let ptr: *mut ~T = ptr as *mut ~T;
let ptr: *mut T = &mut **ptr;
return ptr;
}
}

Expand Down
Loading