diff --git a/src/libstd/comm/mod.rs b/src/libstd/comm/mod.rs index 267140a0089bd..b33e2ee7bd3e0 100644 --- a/src/libstd/comm/mod.rs +++ b/src/libstd/comm/mod.rs @@ -238,6 +238,7 @@ use cast; use cell::Cell; use clone::Clone; +use cmp; use iter::Iterator; use kinds::Send; use kinds::marker; @@ -248,6 +249,7 @@ use result::{Ok, Err, Result}; use rt::local::Local; use rt::task::{Task, BlockedTask}; use sync::arc::UnsafeArc; +use uint; pub use comm::select::{Select, Handle}; @@ -330,16 +332,55 @@ enum Flavor { Shared(UnsafeArc>), } -/// Creates a new channel, returning the sender/receiver halves. All data sent -/// on the sender will become available on the receiver. See the documentation -/// of `Receiver` and `Sender` to see what's possible with them. +/// Creates a new asynchronous, unbounded channel. +/// +/// The return value is the sender/receiver pair which can be deconstructed to +/// move ownership separately. All data sent on the sender will become +/// available on the receiver. See the documentation of `Receiver` and +/// `Sender` to see what's possible with them. +/// +/// The `Receiver` returned will always block in `recv` waiting for new values, +/// but the `Sender` will never block when sending values (this is an +/// asynchronous channel). Additionally, the `Sender` will never fail if the +/// receiver has not disconnected, because the channel is unbounded. +/// +/// # Example +/// +/// ``` +/// let (tx, rx) = channel(); +/// +/// spawn(proc() { +/// tx.send(100); +/// }); +/// +/// println!("received: {}", rx.recv()); +/// ``` pub fn channel() -> (Sender, Receiver) { let (a, b) = UnsafeArc::new2(oneshot::Packet::new()); - (Sender::my_new(Oneshot(b)), Receiver::my_new(Oneshot(a))) + (Sender::new(Oneshot(b)), Receiver::new(Oneshot(a))) +} + +/// Creates a new asynchronous, unbounded channel which disables asserts about +/// the size of the channel. +/// +/// The asynchronous channels in Rust will normally assert that the size of the +/// channel is under a certain "very high" threshold, but using this function +/// disables this assertion, enabling a truly unbounded number of sends. +/// +/// It is not recommended to use this method, the `channel()` constructor is +/// likely what you want. +#[experimental] +pub fn unchecked_channel() -> (Sender, Receiver) { + let mut packet = shared::Packet::new(); + packet.inherit_blocker(None); // finish upgrade protocol + packet.bound_checks = false; // disable assertions + + let (a, b) = UnsafeArc::new2(packet); + (Sender::new(Shared(a)), Receiver::new(Shared(b))) } impl Sender { - fn my_new(inner: Flavor) -> Sender { + fn new(inner: Flavor) -> Sender { Sender { inner: inner, sends: Cell::new(0), marker: marker::NoShare } } @@ -405,7 +446,7 @@ impl Sender { return (*p).send(t); } else { let (a, b) = UnsafeArc::new2(stream::Packet::new()); - match (*p).upgrade(Receiver::my_new(Stream(b))) { + match (*p).upgrade(Receiver::new(Stream(b))) { oneshot::UpSuccess => { (*a.get()).send(t); (a, true) @@ -425,7 +466,7 @@ impl Sender { }; unsafe { - let mut tmp = Sender::my_new(Stream(new_inner)); + let mut tmp = Sender::new(Stream(new_inner)); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } return ret; @@ -437,31 +478,31 @@ impl Clone for Sender { let (packet, sleeper) = match self.inner { Oneshot(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { oneshot::UpSuccess | oneshot::UpDisconnected => (b, None), oneshot::UpWoke(task) => (b, Some(task)) } } Stream(ref p) => { let (a, b) = UnsafeArc::new2(shared::Packet::new()); - match unsafe { (*p.get()).upgrade(Receiver::my_new(Shared(a))) } { + match unsafe { (*p.get()).upgrade(Receiver::new(Shared(a))) } { stream::UpSuccess | stream::UpDisconnected => (b, None), stream::UpWoke(task) => (b, Some(task)), } } Shared(ref p) => { unsafe { (*p.get()).clone_chan(); } - return Sender::my_new(Shared(p.clone())); + return Sender::new(Shared(p.clone())); } }; unsafe { (*packet.get()).inherit_blocker(sleeper); - let mut tmp = Sender::my_new(Shared(packet.clone())); + let mut tmp = Sender::new(Shared(packet.clone())); mem::swap(&mut cast::transmute_mut(self).inner, &mut tmp.inner); } - Sender::my_new(Shared(packet)) + Sender::new(Shared(packet)) } } @@ -477,7 +518,7 @@ impl Drop for Sender { } impl Receiver { - fn my_new(inner: Flavor) -> Receiver { + fn new(inner: Flavor) -> Receiver { Receiver { inner: inner, receives: Cell::new(0), marker: marker::NoShare } } @@ -708,6 +749,17 @@ impl Drop for Receiver { } } +fn assert_sane_bound(amt: uint) { + // Assume that either taking up half the address space with messages or + // having some "very large number" of messages constitues overflowing. + // + // Currently, this "very large number" is around 500 million. This was + // arbitrarily chosen. On OSX, it took about a 80 seconds of sending 1s on a + // channel to reach this limit. + let limit = cmp::min(uint::MAX / 2 / mem::nonzero_size_of::(), 2 << 28); + assert!(amt < limit); +} + #[cfg(test)] mod test { use prelude::*; diff --git a/src/libstd/comm/shared.rs b/src/libstd/comm/shared.rs index 8c8ae85e4ea20..cc9d95c5c6097 100644 --- a/src/libstd/comm/shared.rs +++ b/src/libstd/comm/shared.rs @@ -58,6 +58,10 @@ pub struct Packet { // this lock protects various portions of this implementation during // select() select_lock: NativeMutex, + + // Flag to disable sanity bound checks. This value does not change after + // construction. + bound_checks: bool, } pub enum Failure { @@ -77,6 +81,7 @@ impl Packet { port_dropped: atomics::AtomicBool::new(false), sender_drain: atomics::AtomicInt::new(0), select_lock: unsafe { NativeMutex::new() }, + bound_checks: true, }; // see comments in inherit_blocker about why we grab this lock unsafe { p.select_lock.lock_noguard() } @@ -210,7 +215,13 @@ impl Packet { } // Can't make any assumptions about this case like in the SPSC case. - _ => {} + // Be sure, however, that we're not going towards exhausting the + // address space. + n => { + if n > 0 && self.bound_checks { + super::assert_sane_bound::(n as uint); + } + } } true diff --git a/src/libstd/comm/stream.rs b/src/libstd/comm/stream.rs index 5820b13a35f46..0fc6021f3583e 100644 --- a/src/libstd/comm/stream.rs +++ b/src/libstd/comm/stream.rs @@ -132,9 +132,14 @@ impl Packet { } } - // Otherwise we just sent some data on a non-waiting queue, so just - // make sure the world is sane and carry on! - n => { assert!(n >= 0); UpSuccess } + // Otherwise we just sent some data on a non-waiting queue. Be sure + // that we're not moving towards exhausting the address space, and + // then carry on. + n => { + assert!(n >= 0); + super::assert_sane_bound::(n as uint); + UpSuccess + } } }