diff --git a/mk/rt.mk b/mk/rt.mk index 26f35ea90dbe4..840442a2d6569 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -50,7 +50,6 @@ RUNTIME_CS_$(1) := \ rt/rust_port.cpp \ rt/rust_upcall.cpp \ rt/rust_uv.cpp \ - rt/rust_uvtmp.cpp \ rt/rust_log.cpp \ rt/rust_port_selector.cpp \ rt/circular_buffer.cpp \ diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 478dbb695132c..83e4a04f0dfa0 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -7,7 +7,7 @@ #[license = "MIT"]; #[crate_type = "lib"]; -export fs, io, net, run, uv, uvtmp; +export fs, io, net, run, uv; export c_vec, four, tri, util; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind; export rope; @@ -25,7 +25,6 @@ mod net; #[path = "run_program.rs"] mod run; mod uv; -mod uvtmp; // Utility modules diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 27f2a3c54a85c..8022a7a38b793 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -1,188 +1,625 @@ -/* -This is intended to be a low-level binding to libuv that very closely mimics -the C libuv API. Does very little right now pending scheduler improvements. -*/ - -export sanity_check; -export loop_t, idle_t; -export loop_new, loop_delete, default_loop, run, unref; -export idle_init, idle_start; -export idle_new; - -import core::ctypes; - -#[link_name = "rustrt"] -native mod uv { - fn rust_uv_loop_new() -> *loop_t; - fn rust_uv_loop_delete(loop: *loop_t); - fn rust_uv_default_loop() -> *loop_t; - fn rust_uv_run(loop: *loop_t) -> ctypes::c_int; - fn rust_uv_unref(loop: *loop_t); - fn rust_uv_idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int; - fn rust_uv_idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int; -} - -#[link_name = "rustrt"] -native mod helpers { - fn rust_uv_size_of_idle_t() -> ctypes::size_t; -} - -type opaque_cb = *ctypes::void; - -type handle_type = ctypes::enum; - -type close_cb = opaque_cb; -type idle_cb = opaque_cb; - -type handle_private_fields = { - a00: ctypes::c_int, - a01: ctypes::c_int, - a02: ctypes::c_int, - a03: ctypes::c_int, - a04: ctypes::c_int, - a05: ctypes::c_int, - a06: int, - a07: int, - a08: int, - a09: int, - a10: int, - a11: int, - a12: int -}; +export loop_new, loop_delete, run, close, run_in_bg; +export async_init, async_send; +export timer_init, timer_start, timer_stop; + +// these are processed solely in the +// process_operation() crust fn below +enum uv_operation { + op_async_init([u8]), + op_close(uv_handle, *ctypes::void), + op_timer_init([u8]), + op_timer_start([u8], *ctypes::void, u32, u32), + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)), + op_teardown(*ctypes::void) +} -type handle_fields = { - loop: *loop_t, - type_: handle_type, - close_cb: close_cb, - data: *ctypes::void, - private: handle_private_fields -}; +enum uv_handle { + uv_async([u8], uv_loop), + uv_timer([u8], uv_loop) +} + +enum uv_msg { + // requests from library users + msg_run(comm::chan), + msg_run_in_bg(), + msg_async_init(fn~(uv_handle), fn~(uv_handle)), + msg_async_send([u8]), + msg_close(uv_handle, fn~()), + msg_timer_init(fn~(uv_handle)), + msg_timer_start([u8], u32, u32, fn~(uv_handle)), + msg_timer_stop([u8], fn~(uv_handle)), + + // dispatches from libuv + uv_async_init([u8], *ctypes::void), + uv_async_send([u8]), + uv_close([u8]), + uv_timer_init([u8], *ctypes::void), + uv_timer_call([u8]), + uv_timer_stop([u8], fn~(uv_handle)), + uv_end(), + uv_teardown_check() +} -type handle_t = { - fields: handle_fields +type uv_loop_data = { + operation_port: comm::port, + rust_loop_chan: comm::chan }; -type loop_t = int; +enum uv_loop { + uv_loop_new(comm::chan, *ctypes::void) +} +#[nolink] +native mod rustrt { + fn rust_uv_loop_new() -> *ctypes::void; + fn rust_uv_loop_delete(loop: *ctypes::void); + fn rust_uv_loop_set_data( + loop: *ctypes::void, + data: *uv_loop_data); + fn rust_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) + -> *ctypes::void; + fn rust_uv_stop_op_cb(handle: *ctypes::void); + fn rust_uv_run(loop_handle: *ctypes::void); + fn rust_uv_close(handle: *ctypes::void, cb: *u8); + fn rust_uv_close_async(handle: *ctypes::void); + fn rust_uv_close_timer(handle: *ctypes::void); + fn rust_uv_async_send(handle: *ctypes::void); + fn rust_uv_async_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_start( + timer_handle: *ctypes::void, + timeout: ctypes::c_uint, + repeat: ctypes::c_uint); + fn rust_uv_timer_stop(handle: *ctypes::void); +} +// public functions +fn loop_new() -> uv_loop unsafe { + let ret_recv_port: comm::port = + comm::port(); + let ret_recv_chan: comm::chan = + comm::chan(ret_recv_port); + + task::spawn_sched(task::manual_threads(1u)) {|| + // our beloved uv_loop_t ptr + let loop_handle = rustrt:: + rust_uv_loop_new(); + + // this port/chan pair are used to send messages to + // libuv. libuv processes any pending messages on the + // port (via crust) after receiving an async "wakeup" + // on a special uv_async_t handle created below + let operation_port = comm::port::(); + let operation_chan = comm::chan::( + operation_port); + + // this port/chan pair as used in the while() loop + // below. It takes dispatches, originating from libuv + // callbacks, to invoke handles registered by the + // user + let rust_loop_port = comm::port::(); + let rust_loop_chan = + comm::chan::(rust_loop_port); + // let the task-spawner return + let user_uv_loop = uv_loop_new(rust_loop_chan, loop_handle); + comm::send(ret_recv_chan, copy(user_uv_loop)); + + // create our "special" async handle that will + // allow all operations against libuv to be + // "buffered" in the operation_port, for processing + // from the thread that libuv runs on + let loop_data: uv_loop_data = { + operation_port: operation_port, + rust_loop_chan: rust_loop_chan + }; + rustrt::rust_uv_loop_set_data( + loop_handle, + ptr::addr_of(loop_data)); // pass an opaque C-ptr + // to libuv, this will be + // in the process_operation + // crust fn + let op_handle = rustrt::rust_uv_bind_op_cb( + loop_handle, + process_operation); + + // all state goes here + let handles: map::map<[u8], *ctypes::void> = + map::new_bytes_hash(); + let id_to_handle: map::map<[u8], uv_handle> = + map::new_bytes_hash(); + let after_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let close_callbacks: map::map<[u8], fn~()> = + map::new_bytes_hash(); + let async_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let timer_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + + // the main loop that this task blocks on. + // should have the same lifetime as the C libuv + // event loop. + let keep_going = true; + while (keep_going) { + alt comm::recv(rust_loop_port) { + msg_run(end_chan) { + // start the libuv event loop + // we'll also do a uv_async_send with + // the operation handle to have the + // loop process any pending operations + // once its up and running + task::spawn_sched(task::manual_threads(1u)) {|| + // make sure we didn't start the loop + // without the user registering handles + comm::send(rust_loop_chan, uv_teardown_check); + // this call blocks + rustrt::rust_uv_run(loop_handle); + // when we're done, msg the + // end chan + comm::send(end_chan, true); + comm::send(rust_loop_chan, uv_end); + }; + } + + msg_run_in_bg { + task::spawn_sched(task::manual_threads(1u)) {|| + // see note above + comm::send(rust_loop_chan, uv_teardown_check); + // this call blocks + rustrt::rust_uv_run(loop_handle); + }; + } + + msg_close(handle, cb) { + let id = get_id_from_handle(handle); + close_callbacks.insert(id, cb); + let handle_ptr = handles.get(id); + let op = op_close(handle, handle_ptr); + + pass_to_libuv(op_handle, operation_chan, op); + } + uv_close(id) { + handles.remove(id); + let handle = id_to_handle.get(id); + id_to_handle.remove(id); + alt handle { + uv_async(id, _) { + async_cbs.remove(id); + } + uv_timer(id, _) { + timer_cbs.remove(id); + } + _ { + fail "unknown form of uv_handle encountered " + + "in uv_close handler"; + } + } + let cb = close_callbacks.get(id); + close_callbacks.remove(id); + task::spawn {|| + cb(); + }; + // ask the rust loop to check and see if there + // are no more user-registered handles + comm::send(rust_loop_chan, uv_teardown_check); + } + + msg_async_init(callback, after_cb) { + // create a new async handle + // with the id as the handle's + // data and save the callback for + // invocation on msg_async_send + let id = gen_handle_id(); + handles.insert(id, ptr::null()); + async_cbs.insert(id, callback); + after_cbs.insert(id, after_cb); + let op = op_async_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_async_init(id, async_handle) { + // libuv created a handle, which is + // passed back to us. save it and + // then invoke the supplied callback + // for after completion + handles.insert(id, async_handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let async = uv_async(id, user_uv_loop); + id_to_handle.insert(id, copy(async)); + task::spawn {|| + after_cb(async); + }; + } + + msg_async_send(id) { + let async_handle = handles.get(id); + do_send(async_handle); + } + uv_async_send(id) { + let async_cb = async_cbs.get(id); + task::spawn {|| + let the_loop = user_uv_loop; + async_cb(uv_async(id, the_loop)); + }; + } + + msg_timer_init(after_cb) { + let id = gen_handle_id(); + handles.insert(id, ptr::null()); + after_cbs.insert(id, after_cb); + let op = op_timer_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_init(id, handle) { + handles.insert(id, handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let new_timer = uv_timer(id, user_uv_loop); + id_to_handle.insert(id, copy(new_timer)); + task::spawn {|| + after_cb(new_timer); + }; + } + + uv_timer_call(id) { + let cb = timer_cbs.get(id); + let the_timer = id_to_handle.get(id); + task::spawn {|| + cb(the_timer); + }; + } + + msg_timer_start(id, timeout, repeat, timer_call_cb) { + timer_cbs.insert(id, timer_call_cb); + let handle = handles.get(id); + let op = op_timer_start(id, handle, timeout, + repeat); + pass_to_libuv(op_handle, operation_chan, op); + } + + msg_timer_stop(id, after_cb) { + let handle = handles.get(id); + let op = op_timer_stop(id, handle, after_cb); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_stop(id, after_cb) { + let the_timer = id_to_handle.get(id); + after_cb(the_timer); + } + + uv_teardown_check() { + // here we're checking if there are no user-registered + // handles (and the loop is running), if this is the + // case, then we need to unregister the op_handle via + // a uv_close() call, thus allowing libuv to return + // on its own. + if (handles.size() == 0u) { + let op = op_teardown(op_handle); + pass_to_libuv(op_handle, operation_chan, op); + } + } + + uv_end() { + keep_going = false; + } + + _ { fail "unknown form of uv_msg received"; } + } + } + }; + ret comm::recv(ret_recv_port); +} +fn loop_delete(loop: uv_loop) { + let loop_ptr = get_loop_ptr_from_uv_loop(loop); + rustrt::rust_uv_loop_delete(loop_ptr); +} -type idle_t = { - fields: handle_fields - /* private: idle_private_fields */ -}; +fn run(loop: uv_loop) { + let end_port = comm::port::(); + let end_chan = comm::chan::(end_port); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_run(end_chan)); + comm::recv(end_port); +} -fn idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int { - uv::rust_uv_idle_init(loop, idle) +fn run_in_bg(loop: uv_loop) { + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_run_in_bg); } -fn idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int { - uv::rust_uv_idle_start(idle, cb) +fn async_init ( + loop: uv_loop, + async_cb: fn~(uv_handle), + after_cb: fn~(uv_handle)) { + let msg = msg_async_init(async_cb, after_cb); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg); } +fn async_send(async: uv_handle) { + alt async { + uv_async(id, loop) { + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_async_send(id)); + } + _ { + fail "attempting to call async_send() with a" + + " uv_async uv_handle"; + } + } +} +fn close(h: uv_handle, cb: fn~()) { + let loop_chan = get_loop_chan_from_handle(h); + comm::send(loop_chan, msg_close(h, cb)); +} +fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { + let msg = msg_timer_init(after_cb); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg); +} -fn default_loop() -> *loop_t { - uv::rust_uv_default_loop() +fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, + timer_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop) { + let msg = msg_timer_start(id, timeout, repeat, timer_cb); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg); + } + _ { + fail "can only pass a uv_timer form of uv_handle to "+ + " uv::timer_start()"; + } + } } -fn loop_new() -> *loop_t { - uv::rust_uv_loop_new() +fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop) { + let loop_chan = get_loop_chan_from_uv_loop(loop); + let msg = msg_timer_stop(id, after_cb); + comm::send(loop_chan, msg); + } + _ { + fail "only uv_timer form is allowed in calls to "+ + " uv::timer_stop()"; + } + } } -fn loop_delete(loop: *loop_t) { - uv::rust_uv_loop_delete(loop) +// internal functions +fn pass_to_libuv( + op_handle: *ctypes::void, + operation_chan: comm::chan, + op: uv_operation) unsafe { + comm::send(operation_chan, copy(op)); + do_send(op_handle); +} +fn do_send(h: *ctypes::void) { + rustrt::rust_uv_async_send(h); +} +fn gen_handle_id() -> [u8] { + ret rand::mk_rng().gen_bytes(16u); +} +fn get_handle_id_from(buf: *u8) -> [u8] unsafe { + ret vec::unsafe::from_buf(buf, 16u); } -fn run(loop: *loop_t) -> ctypes::c_int { - uv::rust_uv_run(loop) +fn get_loop_chan_from_data(data: *uv_loop_data) + -> comm::chan unsafe { + ret (*data).rust_loop_chan; } -fn unref(loop: *loop_t) { - uv::rust_uv_unref(loop) +fn get_loop_chan_from_handle(handle: uv_handle) + -> comm::chan { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + let loop_chan = get_loop_chan_from_uv_loop(loop); + ret loop_chan; + } + _ { + fail "unknown form of uv_handle for get_loop_chan_from " + + " handle"; + } + } } +fn get_loop_ptr_from_uv_loop(loop: uv_loop) -> *ctypes::void { + alt loop { + uv_loop_new(loop_chan, loop_ptr) { + ret loop_ptr; + } + } +} +fn get_loop_chan_from_uv_loop(loop: uv_loop) -> comm::chan { + alt loop { + uv_loop_new(loop_chan, loop_ptr) { + ret loop_chan; + } + } +} -fn sanity_check() { - fn check_size(t: str, uv: ctypes::size_t, rust: ctypes::size_t) { - #debug("size of %s: uv: %u, rust: %u", t, uv, rust); - assert uv <= rust; +fn get_id_from_handle(handle: uv_handle) -> [u8] { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + ret id; + } + _ { + fail "unknown form of uv_handle for get_id_from handle"; + } } - check_size("idle_t", - helpers::rust_uv_size_of_idle_t(), - sys::size_of::()); -} - -fn handle_fields_new() -> handle_fields { - { - loop: ptr::null(), - type_: 0u32, - close_cb: ptr::null(), - data: ptr::null(), - private: { - a00: 0i32, - a01: 0i32, - a02: 0i32, - a03: 0i32, - a04: 0i32, - a05: 0i32, - a06: 0, - a07: 0, - a08: 0, - a09: 0, - a10: 0, - a11: 0, - a12: 0 +} + +// crust +crust fn process_operation( + loop: *ctypes::void, + data: *uv_loop_data) unsafe { + let op_port = (*data).operation_port; + let loop_chan = get_loop_chan_from_data(data); + let op_pending = comm::peek(op_port); + while(op_pending) { + alt comm::recv(op_port) { + op_async_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let async_handle = rustrt::rust_uv_async_init( + loop, + process_async_send, + id_ptr); + comm::send(loop_chan, uv_async_init( + id, + async_handle)); + } + op_close(handle, handle_ptr) { + handle_op_close(handle, handle_ptr); + } + op_timer_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let timer_handle = rustrt::rust_uv_timer_init( + loop, + process_timer_call, + id_ptr); + comm::send(loop_chan, uv_timer_init( + id, + timer_handle)); + } + op_timer_start(id, handle, timeout, repeat) { + rustrt::rust_uv_timer_start(handle, timeout, + repeat); + } + op_timer_stop(id, handle, after_cb) { + rustrt::rust_uv_timer_stop(handle); + comm::send(loop_chan, uv_timer_stop(id, after_cb)); + } + op_teardown(op_handle) { + // this is the last msg that'll be processed by + // this fn, in the current lifetime of the handle's + // uv_loop_t + rustrt::rust_uv_stop_op_cb(op_handle); + } + _ { fail "unknown form of uv_operation received"; } } + op_pending = comm::peek(op_port); } } -fn idle_new() -> idle_t { - { - fields: handle_fields_new() +fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { + // it's just like im doing C + alt handle { + uv_async(id, loop) { + let cb = process_close_async; + rustrt::rust_uv_close( + handle_ptr, cb); + } + uv_timer(id, loop) { + let cb = process_close_timer; + rustrt::rust_uv_close( + handle_ptr, cb); + } + _ { + fail "unknown form of uv_handle encountered " + + "in process_operation/op_close"; + } } } -#[cfg(test)] -mod tests { +crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_async_send(handle_id)); +} - #[test] - fn test_sanity_check() { - sanity_check(); - } +crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_timer_call(handle_id)); +} + +fn process_close_common(id: [u8], data: *uv_loop_data) + unsafe { + // notify the rust loop that their handle is closed, then + // the caller will invoke a per-handle-type c++ func to + // free allocated memory + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_close(id)); +} - // From test-ref.c - mod test_ref { +crust fn process_close_async( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_async(handle_ptr); + // at this point, the handle and its data has been + // released. notify the rust loop to remove the + // handle and its data and call the user-supplied + // close cb + process_close_common(id, data); +} - #[test] - fn ref() { - let loop = loop_new(); - run(loop); - loop_delete(loop); - } +crust fn process_close_timer( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_timer(handle_ptr); + process_close_common(id, data); +} - #[test] - fn idle_ref() { - let loop = loop_new(); - let h = idle_new(); - idle_init(loop, ptr::addr_of(h)); - idle_start(ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - } - #[test] - fn async_ref() { - /* - let loop = loop_new(); - let h = async_new(); - async_init(loop, ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - */ - } - } +#[test] +fn test_uv_new_loop_no_handles() { + let test_loop = uv::loop_new(); + uv::run(test_loop); // this should return immediately + // since there aren't any handles.. + uv::loop_delete(test_loop); +} + +#[test] +fn test_uv_simple_async() { + let test_loop = uv::loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + uv::async_init(test_loop, {|new_async| + uv::close(new_async) {|| + comm::send(exit_chan, true); + }; + }, {|new_async| + uv::async_send(new_async); + }); + uv::run(test_loop); + let result = comm::recv(exit_port); + assert result; + uv::loop_delete(test_loop); +} + +#[test] +fn test_uv_timer() { + let test_loop = uv::loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + uv::timer_init(test_loop) {|new_timer| + uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| + uv::timer_stop(started_timer) {|stopped_timer| + uv::close(stopped_timer) {|| + comm::send(exit_chan, true); + }; + }; + }; + }; + uv::run(test_loop); + assert comm::recv(exit_port); + uv::loop_delete(test_loop); } diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs deleted file mode 100644 index d18a59031eaf6..0000000000000 --- a/src/libstd/uvtmp.rs +++ /dev/null @@ -1,168 +0,0 @@ -// Some temporary libuv hacks for servo - -#[nolink] -native mod rustrt { - fn rust_uvtmp_create_thread() -> thread; - fn rust_uvtmp_start_thread(thread: thread); - fn rust_uvtmp_join_thread(thread: thread); - fn rust_uvtmp_delete_thread(thread: thread); - fn rust_uvtmp_connect( - thread: thread, - req_id: u32, - ip: str::sbuf, - chan: comm::chan) -> connect_data; - fn rust_uvtmp_close_connection(thread: thread, req_id: u32); - fn rust_uvtmp_write( - thread: thread, - req_id: u32, - buf: *u8, - len: ctypes::size_t, - chan: comm::chan); - fn rust_uvtmp_read_start( - thread: thread, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_timer( - thread: thread, - timeout: u32, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_delete_buf(buf: *u8); - fn rust_uvtmp_get_req_id(cd: connect_data) -> u32; -} - -type thread = *ctypes::void; - -type connect_data = *ctypes::void; - -enum iomsg { - whatever, - connected(connect_data), - wrote(connect_data), - read(connect_data, *u8, ctypes::ssize_t), - timer(u32), - exit -} - -fn create_thread() -> thread { - rustrt::rust_uvtmp_create_thread() -} - -fn start_thread(thread: thread) { - rustrt::rust_uvtmp_start_thread(thread) -} - -fn join_thread(thread: thread) { - rustrt::rust_uvtmp_join_thread(thread) -} - -fn delete_thread(thread: thread) { - rustrt::rust_uvtmp_delete_thread(thread) -} - -fn connect(thread: thread, req_id: u32, - ip: str, ch: comm::chan) -> connect_data { - str::as_buf(ip) {|ipbuf| - rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch) - } -} - -fn close_connection(thread: thread, req_id: u32) { - rustrt::rust_uvtmp_close_connection(thread, req_id); -} - -fn write(thread: thread, req_id: u32, bytes: [u8], - chan: comm::chan) unsafe { - rustrt::rust_uvtmp_write( - thread, req_id, vec::to_ptr(bytes), vec::len(bytes), chan); -} - -fn read_start(thread: thread, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_read_start(thread, req_id, chan); -} - -fn timer_start(thread: thread, timeout: u32, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan); -} - -fn delete_buf(buf: *u8) { - rustrt::rust_uvtmp_delete_buf(buf); -} - -fn get_req_id(cd: connect_data) -> u32 { - ret rustrt::rust_uvtmp_get_req_id(cd); -} - -#[test] -fn test_start_stop() { - let thread = create_thread(); - start_thread(thread); - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_connect() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - close_connection(thread, 0u32); - } - _ { fail "test_connect: port isn't connected"; } - } - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_http() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan); - alt comm::recv(port) { - wrote(cd) { - read_start(thread, 0u32, chan); - let keep_going = true; - while keep_going { - alt comm::recv(port) { - read(_, buf, -1) { - keep_going = false; - delete_buf(buf); - } - read(_, buf, len) { - unsafe { - log(error, len); - let buf = vec::unsafe::from_buf(buf, - len as uint); - let str = str::from_bytes(buf); - #error("read something"); - io::println(str); - } - delete_buf(buf); - } - _ { fail "test_http: protocol error"; } - } - } - close_connection(thread, 0u32); - } - _ { fail "test_http: expected `wrote`"; } - } - } - _ { fail "test_http: port not connected"; } - } - join_thread(thread); - delete_thread(thread); -} diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 9ee1b844add2c..096d40d223560 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -1,51 +1,175 @@ #include "rust_internal.h" #include "uv.h" -/* - Wrappers of uv_* functions. These can be eliminated by figuring - out how to build static uv with externs, or by just using dynamic libuv - */ +// crust fn pointers +typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data, + uv_async_t* op_handle); +typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, + void* data); -extern "C" CDECL uv_loop_t* -rust_uv_default_loop() { - return uv_default_loop(); +// data types +#define RUST_UV_HANDLE_LEN 16 + +struct handle_data { + uint8_t id_buf[RUST_UV_HANDLE_LEN]; + crust_simple_cb cb; + crust_close_cb close_cb; +}; + +// helpers +static void* +current_kernel_malloc(size_t size, const char* tag) { + void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); + return ptr; +} + +static void +current_kernel_free(void* ptr) { + rust_task_thread::get_task()->kernel->free(ptr); +} + +static handle_data* +new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + return data; +} + +// libuv callback impls +static void +native_crust_async_op_cb(uv_async_t* handle, int status) { + crust_async_op_cb cb = (crust_async_op_cb)handle->data; + void* loop_data = handle->loop->data; + cb(handle->loop, loop_data, handle); } -extern "C" CDECL uv_loop_t* +static void +native_async_cb(uv_async_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_timer_cb(uv_timer_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_close_cb(uv_handle_t* handle) { + handle_data* data = (handle_data*)handle->data; + data->close_cb(data->id_buf, handle, handle->loop->data); +} + +static void +native_close_op_cb(uv_handle_t* op_handle) { + current_kernel_free(op_handle); + // uv_run() should return after this.. +} + +// native fns bound in rust +extern "C" void* rust_uv_loop_new() { - return uv_loop_new(); + return (void*)uv_loop_new(); +} + +extern "C" void +rust_uv_loop_delete(uv_loop_t* loop) { + uv_loop_delete(loop); +} + +extern "C" void +rust_uv_loop_set_data(uv_loop_t* loop, void* data) { + loop->data = data; +} + +extern "C" void* +rust_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_crust_async_op_cb); + async->data = (void*)cb; + // decrement the ref count, so that our async bind + // doesn't count towards keeping the loop alive + //uv_unref(loop); + return async; } -extern "C" CDECL void -rust_uv_loop_delete(uv_loop_t *loop) { - return uv_loop_delete(loop); +extern "C" void +rust_uv_stop_op_cb(uv_handle_t* op_handle) { + uv_close(op_handle, native_close_op_cb); } -extern "C" CDECL int -rust_uv_run(uv_loop_t *loop) { - return uv_run(loop); +extern "C" void +rust_uv_run(uv_loop_t* loop) { + uv_run(loop); } -extern "C" CDECL void -rust_uv_unref(uv_loop_t *loop) { - return uv_unref(loop); +extern "C" void +rust_uv_close(uv_handle_t* handle, crust_close_cb cb) { + handle_data* data = (handle_data*)handle->data; + data->close_cb = cb; + uv_close(handle, native_close_cb); } -extern "C" CDECL int -rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) { - return uv_idle_init(loop, idle); +extern "C" void +rust_uv_close_async(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); } -extern "C" CDECL int -rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) { - return uv_idle_start(idle, cb); +extern "C" void +rust_uv_close_timer(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); } +extern "C" void +rust_uv_async_send(uv_async_t* handle) { + uv_async_send(handle); +} + +extern "C" void* +rust_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_async_cb); + handle_data* data = new_handle_data_from(buf, cb); + async->data = data; + + return async; +} +extern "C" void* +rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( + sizeof(uv_timer_t), + "uv_timer_t"); + uv_timer_init(loop, new_timer); + handle_data* data = new_handle_data_from(buf, cb); + new_timer->data = data; + return new_timer; +} + +extern "C" void +rust_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, + uint32_t repeat) { + uv_timer_start(the_timer, native_timer_cb, timeout, repeat); +} -extern "C" CDECL size_t -rust_uv_size_of_idle_t() { - return sizeof(uv_idle_t); +extern "C" void +rust_uv_timer_stop(uv_timer_t* the_timer) { + uv_timer_stop(the_timer); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp deleted file mode 100644 index 7bd0b4fe55bd9..0000000000000 --- a/src/rt/rust_uvtmp.cpp +++ /dev/null @@ -1,428 +0,0 @@ -#include -#include -#include -#include "rust_internal.h" -#include "uv.h" - -class rust_uvtmp_thread; - -struct connect_data { - uint32_t req_id; - rust_uvtmp_thread *thread; - char * ip_addr; - uv_connect_t connect; - uv_tcp_t tcp; - chan_handle chan; -}; - -const intptr_t whatever_tag = 0; -const intptr_t connected_tag = 1; -const intptr_t wrote_tag = 2; -const intptr_t read_tag = 3; -const intptr_t timer_tag = 4; -const intptr_t exit_tag = 5; - -struct iomsg { - intptr_t tag; - union { - connect_data *connected_val; - connect_data *wrote_val; - struct { - connect_data *cd; - uint8_t *buf; - ssize_t nread; - } read_val; - uint32_t timer_req_id; - } val; -}; - -struct write_data { - connect_data *cd; - uint8_t *buf; - size_t len; - chan_handle chan; -}; - -struct read_start_data { - connect_data *cd; - chan_handle chan; -}; - -struct timer_start_data { - rust_uvtmp_thread *thread; - uint32_t timeout; - uint32_t req_id; - chan_handle chan; -}; - -// FIXME: Copied from rust_builtins.cpp. Could bitrot easily -static void -send(rust_task *task, chan_handle chan, void *data) { - rust_task *target_task = task->kernel->get_task_by_id(chan.task); - if(target_task) { - rust_port *port = target_task->get_port_by_id(chan.port); - if(port) { - port->send(data); - scoped_lock with(target_task->lock); - port->deref(); - } - target_task->deref(); - } -} - -class rust_uvtmp_thread : public rust_thread { - -private: - std::map req_map; - rust_task *task; - uv_loop_t *loop; - uv_idle_t idle; - lock_and_signal lock; - bool stop_flag; - std::queue > connect_queue; - std::queue close_connection_queue; - std::queue write_queue; - std::queue read_start_queue; - std::queue timer_start_queue; -public: - - rust_uvtmp_thread() { - task = rust_task_thread::get_task(); - stop_flag = false; - loop = uv_loop_new(); - uv_idle_init(loop, &idle); - idle.data = this; - uv_idle_start(&idle, idle_cb); - } - - ~rust_uvtmp_thread() { - uv_loop_delete(loop); - } - - void stop() { - scoped_lock with(lock); - stop_flag = true; - } - - connect_data *connect(uint32_t req_id, char *ip, chan_handle chan) { - scoped_lock with(lock); - if (req_map.count(req_id)) return NULL; - connect_data *cd = new connect_data(); - req_map[req_id] = cd; - cd->req_id = req_id; - cd->ip_addr = ip; - connect_queue.push( - std::pair(cd, chan)); - return cd; - } - - void - close_connection(uint32_t req_id) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - close_connection_queue.push(cd); - req_map.erase(req_id); - } - - void - write(uint32_t req_id, uint8_t *buf, size_t len, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - write_data *wd = new write_data(); - wd->cd = cd; - wd->buf = new uint8_t[len]; - wd->len = len; - wd->chan = chan; - - memcpy(wd->buf, buf, len); - - write_queue.push(wd); - } - - void - read_start(uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - read_start_data *rd = new read_start_data(); - rd->cd = cd; - rd->chan = chan; - - read_start_queue.push(rd); - } - - void - timer(uint32_t timeout, uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - - timer_start_data *td = new timer_start_data(); - td->timeout = timeout; - td->req_id = req_id; - td->chan = chan; - timer_start_queue.push(td); - } - -private: - - virtual void - run() { - uv_run(loop); - } - - static void - idle_cb(uv_idle_t* handle, int status) { - rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data; - self->on_idle(); - } - - void - on_idle() { - scoped_lock with(lock); - make_new_connections(); - close_connections(); - write_buffers(); - start_reads(); - start_timers(); - close_idle_if_stop(); - } - - void - make_new_connections() { - assert(lock.lock_held_by_current_thread()); - while (!connect_queue.empty()) { - std::pair pair = connect_queue.front(); - connect_queue.pop(); - connect_data *cd = pair.first; - struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0); - struct sockaddr_in server_addr = uv_ip4_addr(cd->ip_addr, 80); - - cd->thread = this; - cd->chan = pair.second; - cd->connect.data = cd; - - uv_tcp_init(loop, &cd->tcp); - uv_tcp_bind(&cd->tcp, client_addr); - - uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb); - } - } - - static void - connect_cb(uv_connect_t *handle, int status) { - connect_data *cd = (connect_data*)handle->data; - cd->thread->on_connect(cd); - } - - void - on_connect(connect_data *cd) { - iomsg msg; - msg.tag = connected_tag; - msg.val.connected_val = cd; - - send(task, cd->chan, &msg); - } - - void - close_connections() { - assert(lock.lock_held_by_current_thread()); - while (!close_connection_queue.empty()) { - connect_data *cd = close_connection_queue.front(); - close_connection_queue.pop(); - - cd->tcp.data = cd; - - uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb); - } - } - - static void - tcp_close_cb(uv_handle_t *handle) { - connect_data *cd = (connect_data*)handle->data; - delete cd; - } - - void - write_buffers() { - assert(lock.lock_held_by_current_thread()); - while (!write_queue.empty()) { - write_data *wd = write_queue.front(); - write_queue.pop(); - - uv_write_t *write = new uv_write_t(); - - write->data = wd; - - uv_buf_t buf; - buf.base = (char*)wd->buf; - buf.len = wd->len; - - uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb); - } - } - - static void - write_cb(uv_write_t *handle, int status) { - write_data *wd = (write_data*)handle->data; - rust_uvtmp_thread *self = wd->cd->thread; - self->on_write(handle, wd); - } - - void - on_write(uv_write_t *handle, write_data *wd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.wrote_val = wd->cd; - - send(task, wd->chan, &msg); - - delete [] wd->buf; - delete wd; - delete handle; - } - - void - start_reads() { - assert (lock.lock_held_by_current_thread()); - while (!read_start_queue.empty()) { - read_start_data *rd = read_start_queue.front(); - read_start_queue.pop(); - - connect_data *cd = rd->cd; - cd->tcp.data = rd; - - uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb); - } - } - - static uv_buf_t - alloc_cb(uv_handle_t* handle, size_t size) { - uv_buf_t buf; - buf.base = new char[size]; - buf.len = size; - return buf; - } - - static void - read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) { - read_start_data *rd = (read_start_data*)handle->data; - rust_uvtmp_thread *self = rd->cd->thread; - self->on_read(rd, nread, buf); - } - - void - on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) { - iomsg msg; - msg.tag = read_tag; - msg.val.read_val.cd = rd->cd; - msg.val.read_val.buf = (uint8_t*)buf.base; - msg.val.read_val.nread = nread; - - send(task, rd->chan, &msg); - if (nread == -1) { - delete rd; - } - } - - void - start_timers() { - assert (lock.lock_held_by_current_thread()); - while (!timer_start_queue.empty()) { - timer_start_data *td = timer_start_queue.front(); - timer_start_queue.pop(); - - td->thread = this; - - uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t)); - timer->data = td; - uv_timer_init(loop, timer); - uv_timer_start(timer, timer_cb, td->timeout, 0); - } - } - - static void - timer_cb(uv_timer_t *handle, int what) { - timer_start_data *td = (timer_start_data*)handle->data; - rust_uvtmp_thread *self = td->thread; - self->on_timer(td); - free(handle); - } - - void - on_timer(timer_start_data *rd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.timer_req_id = rd->req_id; - - send(task, rd->chan, &msg); - delete rd; - } - - void - close_idle_if_stop() { - assert(lock.lock_held_by_current_thread()); - if (stop_flag) { - uv_close((uv_handle_t*)&idle, NULL); - } - } - -}; - -extern "C" rust_uvtmp_thread * -rust_uvtmp_create_thread() { - rust_uvtmp_thread *thread = new rust_uvtmp_thread(); - return thread; -} - -extern "C" void -rust_uvtmp_start_thread(rust_uvtmp_thread *thread) { - thread->start(); -} - -extern "C" void -rust_uvtmp_join_thread(rust_uvtmp_thread *thread) { - thread->stop(); - thread->join(); -} - -extern "C" void -rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) { - delete thread; -} - -extern "C" connect_data * -rust_uvtmp_connect(rust_uvtmp_thread *thread, uint32_t req_id, char *ip, chan_handle *chan) { - return thread->connect(req_id, ip, *chan); -} - -extern "C" void -rust_uvtmp_close_connection(rust_uvtmp_thread *thread, uint32_t req_id) { - thread->close_connection(req_id); -} - -extern "C" void -rust_uvtmp_write(rust_uvtmp_thread *thread, uint32_t req_id, - uint8_t *buf, size_t len, chan_handle *chan) { - thread->write(req_id, buf, len, *chan); -} - -extern "C" void -rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id, - chan_handle *chan) { - thread->read_start(req_id, *chan); -} - -extern "C" void -rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) { - thread->timer(timeout, req_id, *chan); -} - -extern "C" void -rust_uvtmp_delete_buf(uint8_t *buf) { - delete [] buf; -} - -extern "C" uint32_t -rust_uvtmp_get_req_id(connect_data *cd) { - return cd->req_id; -} - - diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 5b62d606917f5..2b5929600b03b 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -80,25 +80,20 @@ upcall_call_shim_on_rust_stack upcall_new_stack upcall_del_stack upcall_reset_stack_limit -rust_uv_default_loop rust_uv_loop_new rust_uv_loop_delete +rust_uv_loop_set_data +rust_uv_bind_op_cb +rust_uv_stop_op_cb rust_uv_run -rust_uv_unref -rust_uv_idle_init -rust_uv_idle_start -rust_uv_size_of_idle_t -rust_uvtmp_create_thread -rust_uvtmp_start_thread -rust_uvtmp_join_thread -rust_uvtmp_delete_thread -rust_uvtmp_connect -rust_uvtmp_close_connection -rust_uvtmp_write -rust_uvtmp_read_start -rust_uvtmp_timer -rust_uvtmp_delete_buf -rust_uvtmp_get_req_id +rust_uv_close +rust_uv_close_async +rust_uv_close_timer +rust_uv_async_send +rust_uv_async_init +rust_uv_timer_init +rust_uv_timer_start +rust_uv_timer_stop rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock