From b89712cdcac26dc84adb43836adecdf0daa4fd85 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 17 Feb 2012 16:33:56 -0800 Subject: [PATCH 01/12] everything is laid out and working through a basic hw the core impl is there, with a async handle in place to take incoming operations from user code. No actual uv handle/operations are implemented yet, though. --- src/libstd/uvtmp.rs | 198 ++++++++++++++++++++++++++++++++++++++++++ src/rt/rust_uvtmp.cpp | 52 +++++++++++ 2 files changed, 250 insertions(+) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index d18a59031eaf6..18d4904c22440 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -1,5 +1,34 @@ // Some temporary libuv hacks for servo +// UV2 +enum uv_operation { + op_hw() +} + +enum uv_msg { + // requests from library users + msg_run(comm::chan), + msg_run_in_bg(), + msg_loop_delete(), + msg_async_init([u8], fn~()), + msg_async_send([u8]), + msg_hw(), + + // dispatches from libuv + uv_hw() +} + +type uv_loop_data = { + operation_port: comm::port, + rust_loop_chan: comm::chan +}; + +type uv_loop = comm::chan; + +enum uv_handle { + handle([u8], *ctypes::void) +} + #[nolink] native mod rustrt { fn rust_uvtmp_create_thread() -> thread; @@ -29,8 +58,177 @@ native mod rustrt { chan: comm::chan); fn rust_uvtmp_delete_buf(buf: *u8); fn rust_uvtmp_get_req_id(cd: connect_data) -> u32; + + fn rust_uvtmp_uv_loop_new() -> *ctypes::void; + fn rust_uvtmp_uv_loop_set_data( + loop: *ctypes::void, + data: *uv_loop_data); + fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); + fn rust_uvtmp_uv_async_send(handle: *ctypes::void); } +mod uv { + export loop_new, run, run_in_bg, hw; + + // public functions + fn loop_new() -> uv_loop unsafe { + let ret_recv_port: comm::port = + comm::port(); + let ret_recv_chan: comm::chan = + comm::chan(ret_recv_port); + + task::spawn_sched(3u) {|| + // our beloved uv_loop_t ptr + let loop_handle = rustrt:: + rust_uvtmp_uv_loop_new(); + + // this port/chan pair are used to send messages to + // libuv. libuv processes any pending messages on the + // port (via crust) after receiving an async "wakeup" + // on a special uv_async_t handle created below + let operation_port = comm::port::(); + let operation_chan = comm::chan::( + operation_port); + + // this port/chan pair as used in the while() loop + // below. It takes dispatches, originating from libuv + // callbacks, to invoke handles registered by the + // user + let rust_loop_port = comm::port::(); + let rust_loop_chan = + comm::chan::(rust_loop_port); + // let the task-spawner return + comm::send(ret_recv_chan, copy(rust_loop_chan)); + + // create our "special" async handle that will + // allow all operations against libuv to be + // "buffered" in the operation_port, for processing + // from the thread that libuv runs on + let loop_data: uv_loop_data = { + operation_port: operation_port, + rust_loop_chan: rust_loop_chan + }; + rustrt::rust_uvtmp_uv_loop_set_data( + loop_handle, + ptr::addr_of(loop_data)); // pass an opaque C-ptr + // to libuv, this will be + // in the process_operation + // crust fn + let async_handle = rustrt::rust_uvtmp_uv_bind_op_cb( + loop_handle, + process_operation); + + // all state goes here + let handles: map::map<[u8], uv_handle> = + map::new_bytes_hash(); + + // the main loop that this task blocks on. + // should have the same lifetime as the C libuv + // event loop. + let keep_going = true; + while (keep_going) { + alt comm::recv(rust_loop_port) { + msg_run(end_chan) { + // start the libuv event loop + // we'll also do a uv_async_send with + // the operation handle to have the + // loop process any pending operations + // once its up and running + task::spawn_sched(1u) {|| + // this call blocks + rustrt::rust_uvtmp_uv_run(loop_handle); + // when we're done, msg the + // end chan + comm::send(end_chan, true); + }; + } + msg_run_in_bg { + task::spawn_sched(1u) {|| + // this call blocks + rustrt::rust_uvtmp_uv_run(loop_handle); + }; + } + msg_hw() { + comm::send(operation_chan, op_hw); + io::println("CALLING ASYNC_SEND FOR HW"); + rustrt::rust_uvtmp_uv_async_send(async_handle); + } + uv_hw() { + io::println("HELLO WORLD!!!"); + } + + ////// STUBS /////// + msg_loop_delete { + // delete the event loop's c ptr + // this will of course stop any + // further processing + } + msg_async_init(id, callback) { + // create a new async handle + // with the id as the handle's + // data and save the callback for + // invocation on msg_async_send + } + msg_async_send(id) { + // get the callback matching the + // supplied id and invoke it + } + + _ { fail "unknown form of uv_msg received"; } + } + } + }; + ret comm::recv(ret_recv_port); + } + + fn run(loop: uv_loop) { + let end_port = comm::port::(); + let end_chan = comm::chan::(end_port); + comm::send(loop, msg_run(end_chan)); + comm::recv(end_port); + } + + fn run_in_bg(loop: uv_loop) { + comm::send(loop, msg_run_in_bg); + } + + fn hw(loop: uv_loop) { + comm::send(loop, msg_hw); + } + + // internal functions + + // crust + crust fn process_operation(data: *uv_loop_data) unsafe { + io::println("IN PROCESS_OPERATION"); + let op_port = (*data).operation_port; + let loop_chan = (*data).rust_loop_chan; + let op_pending = comm::peek(op_port); + while(op_pending) { + io::println("OPERATION PENDING!"); + alt comm::recv(op_port) { + op_hw() { + io::println("GOT OP_HW IN CRUST"); + comm::send(loop_chan, uv_hw); + } + _ { fail "unknown form of uv_operation received"; } + } + op_pending = comm::peek(op_port); + } + io::println("NO MORE OPERATIONS PENDING!"); + } +} + +#[test] +fn uvtmp_uv_test_hello_world() { + let test_loop = uv::loop_new(); + uv::hw(test_loop); + uv::run(test_loop); +} + +// END OF UV2 + type thread = *ctypes::void; type connect_data = *ctypes::void; diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index 7bd0b4fe55bd9..4a8df1fa8863f 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -55,6 +55,58 @@ struct timer_start_data { chan_handle chan; }; +// UVTMP REWORK + +static void* +current_kernel_malloc(size_t size, const char* tag) { + return rust_task_thread::get_task()->malloc(size, tag); +} + +/* +static void +current_kernel_free(void* ptr) { + rust_task_thread::get_task()->free(ptr); +} +*/ + +extern "C" void* +rust_uvtmp_uv_loop_new() { + return (void*)uv_loop_new(); +} + +extern "C" void +rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) { + loop->data = data; +} + +typedef void (*async_op_cb)(void* data); +void native_async_op_cb(uv_async_t* handle, int status) { + async_op_cb cb = (async_op_cb)handle->data; + void* loop_data = handle->loop->data; + cb(loop_data); +} + +extern "C" void* +rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, async_op_cb cb) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_async_op_cb); + async->data = (void*)cb; + return async; +} + +extern "C" void rust_uvtmp_uv_run(uv_loop_t* loop) { + uv_run(loop); +} + +extern "C" void +rust_uvtmp_uv_async_send(uv_async_t* handle) { + uv_async_send(handle); +} + +// UVTMP REWORK + // FIXME: Copied from rust_builtins.cpp. Could bitrot easily static void send(rust_task *task, chan_handle chan, void *data) { From 0596ffc6c7e72ebaf44672f42061e09fd3d7b6c5 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 17 Feb 2012 23:11:02 -0800 Subject: [PATCH 02/12] removed hello world and added uv_async_* --- src/libstd/uvtmp.rs | 156 +++++++++++++++++++++++++++++++----------- src/rt/rust_uvtmp.cpp | 40 ++++++++++- 2 files changed, 154 insertions(+), 42 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index 18d4904c22440..8a61df37982ba 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -2,20 +2,25 @@ // UV2 enum uv_operation { - op_hw() + op_async_init([u8]) } +type uv_async = { + id: [u8], + loop: uv_loop +}; + enum uv_msg { // requests from library users msg_run(comm::chan), msg_run_in_bg(), msg_loop_delete(), - msg_async_init([u8], fn~()), + msg_async_init(fn~(uv_async), fn~(uv_async)), msg_async_send([u8]), - msg_hw(), // dispatches from libuv - uv_hw() + uv_async_init([u8], *ctypes::void), + uv_async_send([u8]) } type uv_loop_data = { @@ -25,10 +30,6 @@ type uv_loop_data = { type uv_loop = comm::chan; -enum uv_handle { - handle([u8], *ctypes::void) -} - #[nolink] native mod rustrt { fn rust_uvtmp_create_thread() -> thread; @@ -66,10 +67,14 @@ native mod rustrt { fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void; fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); fn rust_uvtmp_uv_async_send(handle: *ctypes::void); + fn rust_uvtmp_uv_async_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; } mod uv { - export loop_new, run, run_in_bg, hw; + export loop_new, run, run_in_bg, async_init, async_send; // public functions fn loop_new() -> uv_loop unsafe { @@ -78,7 +83,9 @@ mod uv { let ret_recv_chan: comm::chan = comm::chan(ret_recv_port); - task::spawn_sched(3u) {|| + let num_threads = 4u; // would be cool to tie this to + // the number of logical procs + task::spawn_sched(num_threads) {|| // our beloved uv_loop_t ptr let loop_handle = rustrt:: rust_uvtmp_uv_loop_new(); @@ -115,12 +122,17 @@ mod uv { // to libuv, this will be // in the process_operation // crust fn - let async_handle = rustrt::rust_uvtmp_uv_bind_op_cb( + let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb( loop_handle, process_operation); // all state goes here - let handles: map::map<[u8], uv_handle> = + let handles: map::map<[u8], *ctypes::void> = + map::new_bytes_hash(); + let async_cbs: map::map<[u8], fn~(uv_async)> = + map::new_bytes_hash(); + let async_init_after_cbs: map::map<[u8], + fn~(uv_async)> = map::new_bytes_hash(); // the main loop that this task blocks on. @@ -143,36 +155,51 @@ mod uv { comm::send(end_chan, true); }; } + msg_run_in_bg { task::spawn_sched(1u) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); }; } - msg_hw() { - comm::send(operation_chan, op_hw); - io::println("CALLING ASYNC_SEND FOR HW"); - rustrt::rust_uvtmp_uv_async_send(async_handle); - } - uv_hw() { - io::println("HELLO WORLD!!!"); - } - - ////// STUBS /////// - msg_loop_delete { - // delete the event loop's c ptr - // this will of course stop any - // further processing - } - msg_async_init(id, callback) { + + msg_async_init(callback, after_cb) { // create a new async handle // with the id as the handle's // data and save the callback for // invocation on msg_async_send + let id = gen_handle_id(); + async_cbs.insert(id, callback); + async_init_after_cbs.insert(id, after_cb); + let op = op_async_init(id); + comm::send(operation_chan, op); + rustrt::rust_uvtmp_uv_async_send(op_handle); + io::println("MSG_ASYNC_INIT"); } + uv_async_init(id, async_handle) { + // libuv created a handle, which is + // passed back to us. save it and + // then invoke the supplied callback + // for after completion + handles.insert(id, async_handle); + let after_cb = async_init_after_cbs.get(id); + async_init_after_cbs.remove(id); + task::spawn {|| + let async: uv_async = { + id: id, + loop: rust_loop_chan + }; + after_cb(async); + }; + } + msg_async_send(id) { - // get the callback matching the - // supplied id and invoke it + let async_handle = handles.get(id); + rustrt::rust_uvtmp_uv_async_send(async_handle); + } + uv_async_send(id) { + let async_cb = async_cbs.get(id); + async_cb({id: id, loop: rust_loop_chan}); } _ { fail "unknown form of uv_msg received"; } @@ -193,37 +220,88 @@ mod uv { comm::send(loop, msg_run_in_bg); } - fn hw(loop: uv_loop) { - comm::send(loop, msg_hw); + fn async_init ( + loop: uv_loop, + async_cb: fn~(uv_async), + after_cb: fn~(uv_async)) { + let msg = msg_async_init(async_cb, after_cb); + comm::send(loop, msg); + } + + fn async_send(async: uv_async) { + comm::send(async.loop, msg_async_send(async.id)); } // internal functions + fn gen_handle_id() -> [u8] { + ret rand::mk_rng().gen_bytes(16u); + } + fn get_handle_id_from(buf: *u8) -> [u8] unsafe { + ret vec::unsafe::from_buf(buf, 16u); + } + + fn get_loop_chan_from(data: *uv_loop_data) + -> comm::chan unsafe { + ret (*data).rust_loop_chan; + } // crust - crust fn process_operation(data: *uv_loop_data) unsafe { + crust fn process_operation( + loop: *ctypes::void, + data: *uv_loop_data) unsafe { io::println("IN PROCESS_OPERATION"); let op_port = (*data).operation_port; - let loop_chan = (*data).rust_loop_chan; + let loop_chan = get_loop_chan_from(data); let op_pending = comm::peek(op_port); while(op_pending) { io::println("OPERATION PENDING!"); alt comm::recv(op_port) { - op_hw() { - io::println("GOT OP_HW IN CRUST"); - comm::send(loop_chan, uv_hw); + op_async_init(id) { + io::println("OP_ASYNC_INIT"); + let id_ptr = vec::unsafe::to_ptr(id); + let async_handle = rustrt::rust_uvtmp_uv_async_init( + loop, + process_async_send, + id_ptr); + comm::send(loop_chan, uv_async_init( + id, + async_handle)); } + _ { fail "unknown form of uv_operation received"; } } op_pending = comm::peek(op_port); } io::println("NO MORE OPERATIONS PENDING!"); } + + crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from(data); + comm::send(loop_chan, uv_async_send(handle_id)); + } + + +} + +#[test] +fn test_uvtmp_uv_new_loop_no_handles() { + let test_loop = uv::loop_new(); + uv::run(test_loop); // this should return immediately + // since there aren't any handles.. } #[test] -fn uvtmp_uv_test_hello_world() { +fn test_uvtmp_uv_simple_async() { let test_loop = uv::loop_new(); - uv::hw(test_loop); + let cb: fn~(uv_async) = fn~(h: uv_async) { + io::println("HELLO FROM ASYNC CALLBACK!"); + }; + uv::async_init(test_loop, cb) {|new_async| + io::println("NEW_ASYNC CREATED!"); + uv::async_send(new_async); + }; uv::run(test_loop); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index 4a8df1fa8863f..18a30fb64042c 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -57,6 +57,9 @@ struct timer_start_data { // UVTMP REWORK +typedef void (*async_op_cb)(uv_loop_t* loop, void* data); +typedef void (*rust_async_cb)(uint8_t* id_buf, void* loop_data); + static void* current_kernel_malloc(size_t size, const char* tag) { return rust_task_thread::get_task()->malloc(size, tag); @@ -68,6 +71,11 @@ current_kernel_free(void* ptr) { rust_task_thread::get_task()->free(ptr); } */ +#define RUST_UV_HANDLE_LEN 16 +struct async_data { + uint8_t id_buf[RUST_UV_HANDLE_LEN]; + rust_async_cb cb; +}; extern "C" void* rust_uvtmp_uv_loop_new() { @@ -79,11 +87,11 @@ rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) { loop->data = data; } -typedef void (*async_op_cb)(void* data); -void native_async_op_cb(uv_async_t* handle, int status) { +static void +native_async_op_cb(uv_async_t* handle, int status) { async_op_cb cb = (async_op_cb)handle->data; void* loop_data = handle->loop->data; - cb(loop_data); + cb(handle->loop, loop_data); } extern "C" void* @@ -92,6 +100,8 @@ rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, async_op_cb cb) { sizeof(uv_async_t), "uv_async_t"); uv_async_init(loop, async, native_async_op_cb); + // decrement the ref count, so that our async bind + // does count towards keeping the loop alive async->data = (void*)cb; return async; } @@ -105,6 +115,30 @@ rust_uvtmp_uv_async_send(uv_async_t* handle) { uv_async_send(handle); } +static void +native_async_cb(uv_async_t* handle, int status) { + async_data* handle_data = (async_data*)handle->data; + void* loop_data = handle->loop->data; + handle_data->cb(handle_data->id_buf, loop_data); +} + +extern "C" void* +rust_uvtmp_uv_async_init(uv_loop_t* loop, rust_async_cb cb, + uint8_t* buf) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_async_cb); + async_data* data = (async_data*)current_kernel_malloc( + sizeof(async_data), + "async_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + async->data = data; + + return async; +} + // UVTMP REWORK // FIXME: Copied from rust_builtins.cpp. Could bitrot easily From 5b77ac3bb5437f595d43609026b294b014f29ab1 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 21 Feb 2012 07:56:27 -0800 Subject: [PATCH 03/12] cleaning up uv_async stuff and stubbing uv_timer --- src/libstd/uvtmp.rs | 226 +++++++++++++++++++++++++++++++++++------- src/rt/rust_uvtmp.cpp | 127 +++++++++++++++++------- 2 files changed, 279 insertions(+), 74 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index 8a61df37982ba..75e962bd331b6 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -1,26 +1,31 @@ // Some temporary libuv hacks for servo // UV2 + +// these are processed solely in the +// process_operation() crust fn below enum uv_operation { - op_async_init([u8]) + op_async_init([u8]), + op_close(uv_handle, *ctypes::void) } -type uv_async = { - id: [u8], - loop: uv_loop -}; +enum uv_handle { + uv_async([u8], uv_loop) +} enum uv_msg { // requests from library users msg_run(comm::chan), msg_run_in_bg(), - msg_loop_delete(), - msg_async_init(fn~(uv_async), fn~(uv_async)), + msg_async_init(fn~(uv_handle), fn~(uv_handle)), msg_async_send([u8]), + msg_close(uv_handle, fn~()), // dispatches from libuv uv_async_init([u8], *ctypes::void), - uv_async_send([u8]) + uv_async_send([u8]), + uv_close([u8]), + uv_end() } type uv_loop_data = { @@ -65,7 +70,10 @@ native mod rustrt { loop: *ctypes::void, data: *uv_loop_data); fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_stop_op_cb(handle: *ctypes::void); fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); + fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); + fn rust_uvtmp_uv_close_async(handle: *ctypes::void); fn rust_uvtmp_uv_async_send(handle: *ctypes::void); fn rust_uvtmp_uv_async_init( loop_handle: *ctypes::void, @@ -74,7 +82,8 @@ native mod rustrt { } mod uv { - export loop_new, run, run_in_bg, async_init, async_send; + export loop_new, run, close, run_in_bg, async_init, async_send, + timer_init; // public functions fn loop_new() -> uv_loop unsafe { @@ -129,10 +138,14 @@ mod uv { // all state goes here let handles: map::map<[u8], *ctypes::void> = map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_async)> = + let id_to_handle: map::map<[u8], uv_handle> = + map::new_bytes_hash(); + let async_cbs: map::map<[u8], fn~(uv_handle)> = map::new_bytes_hash(); let async_init_after_cbs: map::map<[u8], - fn~(uv_async)> = + fn~(uv_handle)> = + map::new_bytes_hash(); + let close_callbacks: map::map<[u8], fn~()> = map::new_bytes_hash(); // the main loop that this task blocks on. @@ -152,7 +165,9 @@ mod uv { rustrt::rust_uvtmp_uv_run(loop_handle); // when we're done, msg the // end chan + rustrt::rust_uvtmp_uv_stop_op_cb(op_handle); comm::send(end_chan, true); + comm::send(rust_loop_chan, uv_end); }; } @@ -163,6 +178,34 @@ mod uv { }; } + msg_close(handle, cb) { + let id = get_id_from_handle(handle); + close_callbacks.insert(id, cb); + let handle_ptr = handles.get(id); + let op = op_close(handle, handle_ptr); + + pass_to_libuv(op_handle, operation_chan, op); + } + uv_close(id) { + handles.remove(id); + let handle = id_to_handle.get(id); + id_to_handle.remove(id); + alt handle { + uv_async(id, _) { + async_cbs.remove(id); + } + _ { + fail "unknown form of uv_handle encountered " + + "in uv_close handler"; + } + } + let cb = close_callbacks.get(id); + close_callbacks.remove(id); + task::spawn {|| + cb(); + }; + } + msg_async_init(callback, after_cb) { // create a new async handle // with the id as the handle's @@ -172,9 +215,7 @@ mod uv { async_cbs.insert(id, callback); async_init_after_cbs.insert(id, after_cb); let op = op_async_init(id); - comm::send(operation_chan, op); - rustrt::rust_uvtmp_uv_async_send(op_handle); - io::println("MSG_ASYNC_INIT"); + pass_to_libuv(op_handle, operation_chan, op); } uv_async_init(id, async_handle) { // libuv created a handle, which is @@ -184,22 +225,25 @@ mod uv { handles.insert(id, async_handle); let after_cb = async_init_after_cbs.get(id); async_init_after_cbs.remove(id); + let async = uv_async(id, rust_loop_chan); + id_to_handle.insert(id, copy(async)); task::spawn {|| - let async: uv_async = { - id: id, - loop: rust_loop_chan - }; after_cb(async); }; } msg_async_send(id) { let async_handle = handles.get(id); - rustrt::rust_uvtmp_uv_async_send(async_handle); + do_send(async_handle); } uv_async_send(id) { let async_cb = async_cbs.get(id); - async_cb({id: id, loop: rust_loop_chan}); + task::spawn {|| + async_cb(uv_async(id, rust_loop_chan)); + }; + } + uv_end() { + keep_going = false; } _ { fail "unknown form of uv_msg received"; } @@ -222,17 +266,45 @@ mod uv { fn async_init ( loop: uv_loop, - async_cb: fn~(uv_async), - after_cb: fn~(uv_async)) { + async_cb: fn~(uv_handle), + after_cb: fn~(uv_handle)) { let msg = msg_async_init(async_cb, after_cb); comm::send(loop, msg); } - fn async_send(async: uv_async) { - comm::send(async.loop, msg_async_send(async.id)); + fn async_send(async: uv_handle) { + alt async { + uv_async(id, loop) { + comm::send(loop, msg_async_send(id)); + } + _ { + fail "attempting to call async_send() with a" + + " uv_async uv_handle"; + } + } + } + + fn close(h: uv_handle, cb: fn~()) { + let loop_chan = get_loop_chan_from_handle(h); + comm::send(loop_chan, msg_close(h, cb)); + } + + fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { + let msg = msg_timer_init(after_cb); + comm::send(loop, msg); } // internal functions + fn pass_to_libuv( + op_handle: *ctypes::void, + operation_chan: comm::chan, + op: uv_operation) unsafe { + comm::send(operation_chan, copy(op)); + do_send(op_handle); + } + fn do_send(h: *ctypes::void) { + rustrt::rust_uvtmp_uv_async_send(h); + } fn gen_handle_id() -> [u8] { ret rand::mk_rng().gen_bytes(16u); } @@ -240,24 +312,45 @@ mod uv { ret vec::unsafe::from_buf(buf, 16u); } - fn get_loop_chan_from(data: *uv_loop_data) - -> comm::chan unsafe { + fn get_loop_chan_from_data(data: *uv_loop_data) + -> uv_loop unsafe { ret (*data).rust_loop_chan; } + fn get_loop_chan_from_handle(handle: uv_handle) + -> uv_loop { + alt handle { + uv_async(id,loop) { + ret loop; + } + _ { + fail "unknown form of uv_handle for get_loop_chan_from " + + " handle"; + } + } + } + + fn get_id_from_handle(handle: uv_handle) -> [u8] { + alt handle { + uv_async(id,loop) { + ret id; + } + _ { + fail "unknown form of uv_handle for get_id_from handle"; + } + } + } + // crust crust fn process_operation( loop: *ctypes::void, data: *uv_loop_data) unsafe { - io::println("IN PROCESS_OPERATION"); let op_port = (*data).operation_port; - let loop_chan = get_loop_chan_from(data); + let loop_chan = get_loop_chan_from_data(data); let op_pending = comm::peek(op_port); while(op_pending) { - io::println("OPERATION PENDING!"); alt comm::recv(op_port) { op_async_init(id) { - io::println("OP_ASYNC_INIT"); let id_ptr = vec::unsafe::to_ptr(id); let async_handle = rustrt::rust_uvtmp_uv_async_init( loop, @@ -267,21 +360,61 @@ mod uv { id, async_handle)); } + op_close(handle, handle_ptr) { + handle_op_close(handle, handle_ptr); + } _ { fail "unknown form of uv_operation received"; } } op_pending = comm::peek(op_port); } - io::println("NO MORE OPERATIONS PENDING!"); + } + + fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { + // it's just like im doing C + alt handle { + uv_async(id, loop) { + let cb = process_close_async; + rustrt::rust_uvtmp_uv_close( + handle_ptr, cb); + } + _ { + fail "unknown form of uv_handle encountered " + + "in process_operation/op_close"; + } + } } crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) unsafe { let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from(data); + let loop_chan = get_loop_chan_from_data(data); comm::send(loop_chan, uv_async_send(handle_id)); } + fn process_close_common(id: [u8], data: *uv_loop_data) + unsafe { + // notify the rust loop that their handle is closed, then + // the caller will invoke a per-handle-type c++ func to + // free allocated memory + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_close(id)); + } + + crust fn process_close_async( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uvtmp_uv_close_async(handle_ptr); + // at this point, the handle and its data has been + // released. notify the rust loop to remove the + // handle and its data and call the user-supplied + // close cb + process_close_common(id, data); + } + } @@ -295,14 +428,31 @@ fn test_uvtmp_uv_new_loop_no_handles() { #[test] fn test_uvtmp_uv_simple_async() { let test_loop = uv::loop_new(); - let cb: fn~(uv_async) = fn~(h: uv_async) { - io::println("HELLO FROM ASYNC CALLBACK!"); - }; - uv::async_init(test_loop, cb) {|new_async| - io::println("NEW_ASYNC CREATED!"); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + uv::async_init(test_loop, {|new_async| + uv::close(new_async) {|| + comm::send(exit_chan, true); + }; + }, {|new_async| uv::async_send(new_async); - }; + }); + uv::run(test_loop); + assert comm::recv(exit_port); +} + +#[test] +fn test_uvtmp_uv_timer() { + let test_loop = uv::loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + uv::timer(test_loop, {|new_timer| + uv::timer_start(new_async) {|| + comm::send(exit_chan, true); + }; + }); uv::run(test_loop); + assert comm::recv(exit_port); } // END OF UV2 diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index 18a30fb64042c..f97312eaf44aa 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -57,26 +57,63 @@ struct timer_start_data { // UVTMP REWORK -typedef void (*async_op_cb)(uv_loop_t* loop, void* data); -typedef void (*rust_async_cb)(uint8_t* id_buf, void* loop_data); +// crust fn pointers +typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); +typedef void (*crust_async_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, + void* data); +// data types +#define RUST_UV_HANDLE_LEN 16 + +struct handle_data { + uint8_t id_buf[RUST_UV_HANDLE_LEN]; + crust_async_cb cb; + crust_close_cb close_cb; +}; + +// helpers static void* current_kernel_malloc(size_t size, const char* tag) { - return rust_task_thread::get_task()->malloc(size, tag); + void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); + return ptr; } -/* static void current_kernel_free(void* ptr) { - rust_task_thread::get_task()->free(ptr); + rust_task_thread::get_task()->kernel->free(ptr); +} + +// libuv callback impls +static void +native_crust_async_op_cb(uv_async_t* handle, int status) { + crust_async_op_cb cb = (crust_async_op_cb)handle->data; + void* loop_data = handle->loop->data; + cb(handle->loop, loop_data); +} + +static void +native_async_cb(uv_async_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_close_cb(uv_handle_t* handle) { + handle_data* data = (handle_data*)handle->data; + data->close_cb(data->id_buf, handle, handle->loop->data); +} + +static void +native_close_op_cb(uv_handle_t* op_handle) { + uv_loop_t* loop = op_handle->loop; + current_kernel_free(op_handle); + loop->data = 0; // a ptr to some stack-allocated rust mem + uv_loop_delete(loop); } -*/ -#define RUST_UV_HANDLE_LEN 16 -struct async_data { - uint8_t id_buf[RUST_UV_HANDLE_LEN]; - rust_async_cb cb; -}; +// native fns bound in rust extern "C" void* rust_uvtmp_uv_loop_new() { return (void*)uv_loop_new(); @@ -87,51 +124,69 @@ rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) { loop->data = data; } -static void -native_async_op_cb(uv_async_t* handle, int status) { - async_op_cb cb = (async_op_cb)handle->data; - void* loop_data = handle->loop->data; - cb(handle->loop, loop_data); -} - extern "C" void* -rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, async_op_cb cb) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( +rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( sizeof(uv_async_t), "uv_async_t"); - uv_async_init(loop, async, native_async_op_cb); - // decrement the ref count, so that our async bind - // does count towards keeping the loop alive + uv_async_init(loop, async, native_crust_async_op_cb); async->data = (void*)cb; + // decrement the ref count, so that our async bind + // doesn't count towards keeping the loop alive + uv_unref(loop); return async; } -extern "C" void rust_uvtmp_uv_run(uv_loop_t* loop) { +extern "C" void +rust_uvtmp_uv_stop_op_cb(uv_handle_t* op_handle) { + /* // this is a hack to get libuv to cleanup a + // handle that was made to not prevent the loop + // from exiting via uv_unref(). + uv_ref(op_handle->loop); + uv_close(op_handle, native_close_op_cb); + uv_run(op_handle->loop); // this should process the handle's + // close event and then return + */ + // the above code is supposed to work to cleanly close + // a handler that was uv_unref()'d. but it causes much spew + // instead. this is the ugly/quick way to deal w/ it for now. + uv_close(op_handle, native_close_op_cb); + native_close_op_cb(op_handle); +} + +extern "C" void +rust_uvtmp_uv_run(uv_loop_t* loop) { uv_run(loop); } extern "C" void -rust_uvtmp_uv_async_send(uv_async_t* handle) { - uv_async_send(handle); +rust_uvtmp_uv_close(uv_handle_t* handle, crust_close_cb cb) { + handle_data* data = (handle_data*)handle->data; + data->close_cb = cb; + uv_close(handle, native_close_cb); } -static void -native_async_cb(uv_async_t* handle, int status) { - async_data* handle_data = (async_data*)handle->data; - void* loop_data = handle->loop->data; - handle_data->cb(handle_data->id_buf, loop_data); +extern "C" void +rust_uvtmp_uv_close_async(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + +extern "C" void +rust_uvtmp_uv_async_send(uv_async_t* handle) { + uv_async_send(handle); } extern "C" void* -rust_uvtmp_uv_async_init(uv_loop_t* loop, rust_async_cb cb, +rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_async_cb cb, uint8_t* buf) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( + uv_async_t* async = (uv_async_t*)current_kernel_malloc( sizeof(uv_async_t), "uv_async_t"); uv_async_init(loop, async, native_async_cb); - async_data* data = (async_data*)current_kernel_malloc( - sizeof(async_data), - "async_data"); + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); data->cb = cb; async->data = data; From 23c9c1edc4b8b6f7ff9c826eacd4e6605f38dfbe Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Tue, 21 Feb 2012 11:29:36 -0800 Subject: [PATCH 04/12] finishing up simple uv_timer impl as it stands, basic async nad timer support is added --- src/libstd/uvtmp.rs | 182 +++++++++++++++++++++++++++++++++++++----- src/rt/rust_uvtmp.cpp | 59 ++++++++++++-- 2 files changed, 211 insertions(+), 30 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index 75e962bd331b6..123cd8ac2a81c 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -6,11 +6,15 @@ // process_operation() crust fn below enum uv_operation { op_async_init([u8]), - op_close(uv_handle, *ctypes::void) + op_close(uv_handle, *ctypes::void), + op_timer_init([u8]), + op_timer_start([u8], *ctypes::void, u32, u32), + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) } enum uv_handle { - uv_async([u8], uv_loop) + uv_async([u8], uv_loop), + uv_timer([u8], uv_loop) } enum uv_msg { @@ -20,11 +24,17 @@ enum uv_msg { msg_async_init(fn~(uv_handle), fn~(uv_handle)), msg_async_send([u8]), msg_close(uv_handle, fn~()), + msg_timer_init(fn~(uv_handle)), + msg_timer_start([u8], u32, u32, fn~(uv_handle)), + msg_timer_stop([u8], fn~(uv_handle)), // dispatches from libuv uv_async_init([u8], *ctypes::void), uv_async_send([u8]), uv_close([u8]), + uv_timer_init([u8], *ctypes::void), + uv_timer_call([u8]), + uv_timer_stop([u8], fn~(uv_handle)), uv_end() } @@ -74,16 +84,26 @@ native mod rustrt { fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); fn rust_uvtmp_uv_close_async(handle: *ctypes::void); + fn rust_uvtmp_uv_close_timer(handle: *ctypes::void); fn rust_uvtmp_uv_async_send(handle: *ctypes::void); fn rust_uvtmp_uv_async_init( loop_handle: *ctypes::void, cb: *u8, id: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_timer_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_timer_start( + timer_handle: *ctypes::void, + timeout: ctypes::c_uint, + repeat: ctypes::c_uint); + fn rust_uvtmp_uv_timer_stop(handle: *ctypes::void); } mod uv { export loop_new, run, close, run_in_bg, async_init, async_send, - timer_init; + timer_init, timer_start, timer_stop; // public functions fn loop_new() -> uv_loop unsafe { @@ -92,9 +112,7 @@ mod uv { let ret_recv_chan: comm::chan = comm::chan(ret_recv_port); - let num_threads = 4u; // would be cool to tie this to - // the number of logical procs - task::spawn_sched(num_threads) {|| + task::spawn_sched(task::manual_threads(4u)) {|| // our beloved uv_loop_t ptr let loop_handle = rustrt:: rust_uvtmp_uv_loop_new(); @@ -140,13 +158,15 @@ mod uv { map::new_bytes_hash(); let id_to_handle: map::map<[u8], uv_handle> = map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let async_init_after_cbs: map::map<[u8], - fn~(uv_handle)> = + let after_cbs: map::map<[u8], fn~(uv_handle)> = map::new_bytes_hash(); let close_callbacks: map::map<[u8], fn~()> = map::new_bytes_hash(); + + let async_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let timer_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); // the main loop that this task blocks on. // should have the same lifetime as the C libuv @@ -160,7 +180,7 @@ mod uv { // the operation handle to have the // loop process any pending operations // once its up and running - task::spawn_sched(1u) {|| + task::spawn_sched(task::manual_threads(1u)) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); // when we're done, msg the @@ -172,7 +192,7 @@ mod uv { } msg_run_in_bg { - task::spawn_sched(1u) {|| + task::spawn_sched(task::manual_threads(1u)) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); }; @@ -194,6 +214,9 @@ mod uv { uv_async(id, _) { async_cbs.remove(id); } + uv_timer(id, _) { + timer_cbs.remove(id); + } _ { fail "unknown form of uv_handle encountered " + "in uv_close handler"; @@ -213,7 +236,7 @@ mod uv { // invocation on msg_async_send let id = gen_handle_id(); async_cbs.insert(id, callback); - async_init_after_cbs.insert(id, after_cb); + after_cbs.insert(id, after_cb); let op = op_async_init(id); pass_to_libuv(op_handle, operation_chan, op); } @@ -223,8 +246,8 @@ mod uv { // then invoke the supplied callback // for after completion handles.insert(id, async_handle); - let after_cb = async_init_after_cbs.get(id); - async_init_after_cbs.remove(id); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); let async = uv_async(id, rust_loop_chan); id_to_handle.insert(id, copy(async)); task::spawn {|| @@ -242,6 +265,50 @@ mod uv { async_cb(uv_async(id, rust_loop_chan)); }; } + + msg_timer_init(after_cb) { + let id = gen_handle_id(); + after_cbs.insert(id, after_cb); + let op = op_timer_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_init(id, handle) { + handles.insert(id, handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let new_timer = uv_timer(id, rust_loop_chan); + id_to_handle.insert(id, copy(new_timer)); + task::spawn {|| + after_cb(new_timer); + }; + } + + uv_timer_call(id) { + let cb = timer_cbs.get(id); + let the_timer = id_to_handle.get(id); + task::spawn {|| + cb(the_timer); + }; + } + + msg_timer_start(id, timeout, repeat, timer_call_cb) { + timer_cbs.insert(id, timer_call_cb); + let handle = handles.get(id); + let op = op_timer_start(id, handle, timeout, + repeat); + pass_to_libuv(op_handle, operation_chan, op); + } + + msg_timer_stop(id, after_cb) { + let handle = handles.get(id); + let op = op_timer_stop(id, handle, after_cb); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_stop(id, after_cb) { + let the_timer = id_to_handle.get(id); + after_cb(the_timer); + } + uv_end() { keep_going = false; } @@ -294,6 +361,33 @@ mod uv { comm::send(loop, msg); } + fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, + timer_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_start(id, timeout, repeat, timer_cb); + comm::send(loop_chan, msg); + } + _ { + fail "can only pass a uv_timer form of uv_handle to "+ + " uv::timer_start()"; + } + } + } + + fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_stop(id, after_cb); + comm::send(loop_chan, msg); + } + _ { + fail "only uv_timer form is allowed in calls to "+ + " uv::timer_stop()"; + } + } + } + // internal functions fn pass_to_libuv( op_handle: *ctypes::void, @@ -320,7 +414,7 @@ mod uv { fn get_loop_chan_from_handle(handle: uv_handle) -> uv_loop { alt handle { - uv_async(id,loop) { + uv_async(id,loop) | uv_timer(id,loop) { ret loop; } _ { @@ -332,7 +426,7 @@ mod uv { fn get_id_from_handle(handle: uv_handle) -> [u8] { alt handle { - uv_async(id,loop) { + uv_async(id,loop) | uv_timer(id,loop) { ret id; } _ { @@ -363,6 +457,24 @@ mod uv { op_close(handle, handle_ptr) { handle_op_close(handle, handle_ptr); } + op_timer_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let timer_handle = rustrt::rust_uvtmp_uv_timer_init( + loop, + process_timer_call, + id_ptr); + comm::send(loop_chan, uv_timer_init( + id, + timer_handle)); + } + op_timer_start(id, handle, timeout, repeat) { + rustrt::rust_uvtmp_uv_timer_start(handle, timeout, + repeat); + } + op_timer_stop(id, handle, after_cb) { + rustrt::rust_uvtmp_uv_timer_stop(handle); + comm::send(loop_chan, uv_timer_stop(id, after_cb)); + } _ { fail "unknown form of uv_operation received"; } } @@ -378,6 +490,11 @@ mod uv { rustrt::rust_uvtmp_uv_close( handle_ptr, cb); } + uv_timer(id, loop) { + let cb = process_close_timer; + rustrt::rust_uvtmp_uv_close( + handle_ptr, cb); + } _ { fail "unknown form of uv_handle encountered " + "in process_operation/op_close"; @@ -386,12 +503,19 @@ mod uv { } crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) - unsafe { + unsafe { let handle_id = get_handle_id_from(id_buf); let loop_chan = get_loop_chan_from_data(data); comm::send(loop_chan, uv_async_send(handle_id)); } + crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_timer_call(handle_id)); + } + fn process_close_common(id: [u8], data: *uv_loop_data) unsafe { // notify the rust loop that their handle is closed, then @@ -414,6 +538,16 @@ mod uv { // close cb process_close_common(id, data); } + + crust fn process_close_timer( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uvtmp_uv_close_timer(handle_ptr); + process_close_common(id, data); + } } @@ -446,11 +580,15 @@ fn test_uvtmp_uv_timer() { let test_loop = uv::loop_new(); let exit_port = comm::port::(); let exit_chan = comm::chan::(exit_port); - uv::timer(test_loop, {|new_timer| - uv::timer_start(new_async) {|| - comm::send(exit_chan, true); + uv::timer_init(test_loop) {|new_timer| + uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| + uv::timer_stop(started_timer) {|stopped_timer| + uv::close(stopped_timer) {|| + comm::send(exit_chan, true); + }; + }; }; - }); + }; uv::run(test_loop); assert comm::recv(exit_port); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp index f97312eaf44aa..697231dff1b56 100644 --- a/src/rt/rust_uvtmp.cpp +++ b/src/rt/rust_uvtmp.cpp @@ -59,7 +59,7 @@ struct timer_start_data { // crust fn pointers typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); -typedef void (*crust_async_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, void* data); @@ -68,7 +68,7 @@ typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, struct handle_data { uint8_t id_buf[RUST_UV_HANDLE_LEN]; - crust_async_cb cb; + crust_simple_cb cb; crust_close_cb close_cb; }; @@ -84,6 +84,16 @@ current_kernel_free(void* ptr) { rust_task_thread::get_task()->kernel->free(ptr); } +static handle_data* +new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + return data; +} + // libuv callback impls static void native_crust_async_op_cb(uv_async_t* handle, int status) { @@ -99,6 +109,13 @@ native_async_cb(uv_async_t* handle, int status) { handle_d->cb(handle_d->id_buf, loop_data); } +static void +native_timer_cb(uv_timer_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + static void native_close_cb(uv_handle_t* handle) { handle_data* data = (handle_data*)handle->data; @@ -172,28 +189,54 @@ rust_uvtmp_uv_close_async(uv_async_t* handle) { current_kernel_free(handle); } +extern "C" void +rust_uvtmp_uv_close_timer(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + extern "C" void rust_uvtmp_uv_async_send(uv_async_t* handle) { uv_async_send(handle); } extern "C" void* -rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_async_cb cb, +rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, uint8_t* buf) { uv_async_t* async = (uv_async_t*)current_kernel_malloc( sizeof(uv_async_t), "uv_async_t"); uv_async_init(loop, async, native_async_cb); - handle_data* data = (handle_data*)current_kernel_malloc( - sizeof(handle_data), - "handle_data"); - memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); - data->cb = cb; + handle_data* data = new_handle_data_from(buf, cb); async->data = data; return async; } +extern "C" void* +rust_uvtmp_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( + sizeof(uv_timer_t), + "uv_timer_t"); + uv_timer_init(loop, new_timer); + handle_data* data = new_handle_data_from(buf, cb); + new_timer->data = data; + + return new_timer; +} + +extern "C" void +rust_uvtmp_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, + uint32_t repeat) { + uv_timer_start(the_timer, native_timer_cb, timeout, repeat); +} + +extern "C" void +rust_uvtmp_uv_timer_stop(uv_timer_t* the_timer) { + uv_timer_stop(the_timer); +} + // UVTMP REWORK // FIXME: Copied from rust_builtins.cpp. Could bitrot easily From f1b4ad18d58d008d56552c64acd9c1064ad8c93a Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 22 Feb 2012 11:33:01 -0800 Subject: [PATCH 05/12] trailing whitespace fixes --- src/libstd/uvtmp.rs | 22 ++++++++++------------ 1 file changed, 10 insertions(+), 12 deletions(-) diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs index 123cd8ac2a81c..b64de837b095b 100644 --- a/src/libstd/uvtmp.rs +++ b/src/libstd/uvtmp.rs @@ -79,7 +79,8 @@ native mod rustrt { fn rust_uvtmp_uv_loop_set_data( loop: *ctypes::void, data: *uv_loop_data); - fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) -> *ctypes::void; + fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) + -> *ctypes::void; fn rust_uvtmp_uv_stop_op_cb(handle: *ctypes::void); fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); @@ -162,7 +163,6 @@ mod uv { map::new_bytes_hash(); let close_callbacks: map::map<[u8], fn~()> = map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_handle)> = map::new_bytes_hash(); let timer_cbs: map::map<[u8], fn~(uv_handle)> = @@ -190,14 +190,14 @@ mod uv { comm::send(rust_loop_chan, uv_end); }; } - + msg_run_in_bg { task::spawn_sched(task::manual_threads(1u)) {|| // this call blocks rustrt::rust_uvtmp_uv_run(loop_handle); }; } - + msg_close(handle, cb) { let id = get_id_from_handle(handle); close_callbacks.insert(id, cb); @@ -228,7 +228,7 @@ mod uv { cb(); }; } - + msg_async_init(callback, after_cb) { // create a new async handle // with the id as the handle's @@ -308,7 +308,7 @@ mod uv { let the_timer = id_to_handle.get(id); after_cb(the_timer); } - + uv_end() { keep_going = false; } @@ -353,7 +353,7 @@ mod uv { fn close(h: uv_handle, cb: fn~()) { let loop_chan = get_loop_chan_from_handle(h); - comm::send(loop_chan, msg_close(h, cb)); + comm::send(loop_chan, msg_close(h, cb)); } fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { @@ -403,7 +403,7 @@ mod uv { ret rand::mk_rng().gen_bytes(16u); } fn get_handle_id_from(buf: *u8) -> [u8] unsafe { - ret vec::unsafe::from_buf(buf, 16u); + ret vec::unsafe::from_buf(buf, 16u); } fn get_loop_chan_from_data(data: *uv_loop_data) @@ -475,7 +475,6 @@ mod uv { rustrt::rust_uvtmp_uv_timer_stop(handle); comm::send(loop_chan, uv_timer_stop(id, after_cb)); } - _ { fail "unknown form of uv_operation received"; } } op_pending = comm::peek(op_port); @@ -538,7 +537,7 @@ mod uv { // close cb process_close_common(id, data); } - + crust fn process_close_timer( id_buf: *u8, handle_ptr: *ctypes::void, @@ -549,7 +548,6 @@ mod uv { process_close_common(id, data); } - } #[test] @@ -588,7 +586,7 @@ fn test_uvtmp_uv_timer() { }; }; }; - }; + }; uv::run(test_loop); assert comm::recv(exit_port); } From ce25209cf1059d92cc89ca9b17864f465427ce19 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 22 Feb 2012 12:10:33 -0800 Subject: [PATCH 06/12] fzzzy's patch for rustrt.def.in adds new c/c++ methods bound in rust for uvtmp::uv --- src/rt/rustrt.def.in | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 5b62d606917f5..2c7265ff58ae9 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -99,6 +99,19 @@ rust_uvtmp_read_start rust_uvtmp_timer rust_uvtmp_delete_buf rust_uvtmp_get_req_id +rust_uvtmp_uv_loop_new +rust_uvtmp_uv_loop_set_data +rust_uvtmp_uv_bind_op_cb +rust_uvtmp_uv_stop_op_cb +rust_uvtmp_uv_run +rust_uvtmp_uv_close +rust_uvtmp_uv_close_async +rust_uvtmp_uv_close_timer +rust_uvtmp_uv_async_send +rust_uvtmp_uv_async_init +rust_uvtmp_uv_timer_init +rust_uvtmp_uv_timer_start +rust_uvtmp_uv_timer_stop rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock From c5344e15878d8b06afb0932cb90d9f102374bffe Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 22 Feb 2012 14:00:34 -0800 Subject: [PATCH 07/12] moving new uv stuff into uv.rs and rust_uv.cpp - removing the remains of uvtmp.rs and rust_uvtmp.rs - removing the displaced, low-level libuv bindings in uv.rs and rust_uv.cpp --- src/libstd/std.rc | 3 +- src/libstd/uv.rs | 670 +++++++++++++++++++++++++++++--------- src/libstd/uvtmp.rs | 730 ------------------------------------------ src/rt/rust_uv.cpp | 186 +++++++++-- src/rt/rust_uvtmp.cpp | 612 ----------------------------------- src/rt/rustrt.def.in | 41 +-- 6 files changed, 691 insertions(+), 1551 deletions(-) delete mode 100644 src/libstd/uvtmp.rs delete mode 100644 src/rt/rust_uvtmp.cpp diff --git a/src/libstd/std.rc b/src/libstd/std.rc index 478dbb695132c..83e4a04f0dfa0 100644 --- a/src/libstd/std.rc +++ b/src/libstd/std.rc @@ -7,7 +7,7 @@ #[license = "MIT"]; #[crate_type = "lib"]; -export fs, io, net, run, uv, uvtmp; +export fs, io, net, run, uv; export c_vec, four, tri, util; export bitv, deque, fun_treemap, list, map, smallintmap, sort, treemap, ufind; export rope; @@ -25,7 +25,6 @@ mod net; #[path = "run_program.rs"] mod run; mod uv; -mod uvtmp; // Utility modules diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 27f2a3c54a85c..90ac5ce4b1cfb 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -1,188 +1,558 @@ -/* -This is intended to be a low-level binding to libuv that very closely mimics -the C libuv API. Does very little right now pending scheduler improvements. -*/ - -export sanity_check; -export loop_t, idle_t; -export loop_new, loop_delete, default_loop, run, unref; -export idle_init, idle_start; -export idle_new; - -import core::ctypes; - -#[link_name = "rustrt"] -native mod uv { - fn rust_uv_loop_new() -> *loop_t; - fn rust_uv_loop_delete(loop: *loop_t); - fn rust_uv_default_loop() -> *loop_t; - fn rust_uv_run(loop: *loop_t) -> ctypes::c_int; - fn rust_uv_unref(loop: *loop_t); - fn rust_uv_idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int; - fn rust_uv_idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int; -} - -#[link_name = "rustrt"] -native mod helpers { - fn rust_uv_size_of_idle_t() -> ctypes::size_t; -} - -type opaque_cb = *ctypes::void; - -type handle_type = ctypes::enum; - -type close_cb = opaque_cb; -type idle_cb = opaque_cb; - -type handle_private_fields = { - a00: ctypes::c_int, - a01: ctypes::c_int, - a02: ctypes::c_int, - a03: ctypes::c_int, - a04: ctypes::c_int, - a05: ctypes::c_int, - a06: int, - a07: int, - a08: int, - a09: int, - a10: int, - a11: int, - a12: int -}; - -type handle_fields = { - loop: *loop_t, - type_: handle_type, - close_cb: close_cb, - data: *ctypes::void, - private: handle_private_fields -}; - -type handle_t = { - fields: handle_fields -}; +export loop_new, run, close, run_in_bg, async_init, async_send, + timer_init, timer_start, timer_stop; + +// these are processed solely in the +// process_operation() crust fn below +enum uv_operation { + op_async_init([u8]), + op_close(uv_handle, *ctypes::void), + op_timer_init([u8]), + op_timer_start([u8], *ctypes::void, u32, u32), + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) +} -type loop_t = int; +enum uv_handle { + uv_async([u8], uv_loop), + uv_timer([u8], uv_loop) +} +enum uv_msg { + // requests from library users + msg_run(comm::chan), + msg_run_in_bg(), + msg_async_init(fn~(uv_handle), fn~(uv_handle)), + msg_async_send([u8]), + msg_close(uv_handle, fn~()), + msg_timer_init(fn~(uv_handle)), + msg_timer_start([u8], u32, u32, fn~(uv_handle)), + msg_timer_stop([u8], fn~(uv_handle)), + + // dispatches from libuv + uv_async_init([u8], *ctypes::void), + uv_async_send([u8]), + uv_close([u8]), + uv_timer_init([u8], *ctypes::void), + uv_timer_call([u8]), + uv_timer_stop([u8], fn~(uv_handle)), + uv_end() +} +type uv_loop_data = { + operation_port: comm::port, + rust_loop_chan: comm::chan +}; +type uv_loop = comm::chan; + +#[nolink] +native mod rustrt { + fn rust_uv_loop_new() -> *ctypes::void; + fn rust_uv_loop_set_data( + loop: *ctypes::void, + data: *uv_loop_data); + fn rust_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) + -> *ctypes::void; + fn rust_uv_stop_op_cb(handle: *ctypes::void); + fn rust_uv_run(loop_handle: *ctypes::void); + fn rust_uv_close(handle: *ctypes::void, cb: *u8); + fn rust_uv_close_async(handle: *ctypes::void); + fn rust_uv_close_timer(handle: *ctypes::void); + fn rust_uv_async_send(handle: *ctypes::void); + fn rust_uv_async_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_init( + loop_handle: *ctypes::void, + cb: *u8, + id: *u8) -> *ctypes::void; + fn rust_uv_timer_start( + timer_handle: *ctypes::void, + timeout: ctypes::c_uint, + repeat: ctypes::c_uint); + fn rust_uv_timer_stop(handle: *ctypes::void); +} -type idle_t = { - fields: handle_fields - /* private: idle_private_fields */ -}; +// public functions +fn loop_new() -> uv_loop unsafe { + let ret_recv_port: comm::port = + comm::port(); + let ret_recv_chan: comm::chan = + comm::chan(ret_recv_port); + + task::spawn_sched(task::manual_threads(4u)) {|| + // our beloved uv_loop_t ptr + let loop_handle = rustrt:: + rust_uv_loop_new(); + + // this port/chan pair are used to send messages to + // libuv. libuv processes any pending messages on the + // port (via crust) after receiving an async "wakeup" + // on a special uv_async_t handle created below + let operation_port = comm::port::(); + let operation_chan = comm::chan::( + operation_port); + + // this port/chan pair as used in the while() loop + // below. It takes dispatches, originating from libuv + // callbacks, to invoke handles registered by the + // user + let rust_loop_port = comm::port::(); + let rust_loop_chan = + comm::chan::(rust_loop_port); + // let the task-spawner return + comm::send(ret_recv_chan, copy(rust_loop_chan)); + + // create our "special" async handle that will + // allow all operations against libuv to be + // "buffered" in the operation_port, for processing + // from the thread that libuv runs on + let loop_data: uv_loop_data = { + operation_port: operation_port, + rust_loop_chan: rust_loop_chan + }; + rustrt::rust_uv_loop_set_data( + loop_handle, + ptr::addr_of(loop_data)); // pass an opaque C-ptr + // to libuv, this will be + // in the process_operation + // crust fn + let op_handle = rustrt::rust_uv_bind_op_cb( + loop_handle, + process_operation); + + // all state goes here + let handles: map::map<[u8], *ctypes::void> = + map::new_bytes_hash(); + let id_to_handle: map::map<[u8], uv_handle> = + map::new_bytes_hash(); + let after_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let close_callbacks: map::map<[u8], fn~()> = + map::new_bytes_hash(); + let async_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + let timer_cbs: map::map<[u8], fn~(uv_handle)> = + map::new_bytes_hash(); + + // the main loop that this task blocks on. + // should have the same lifetime as the C libuv + // event loop. + let keep_going = true; + while (keep_going) { + alt comm::recv(rust_loop_port) { + msg_run(end_chan) { + // start the libuv event loop + // we'll also do a uv_async_send with + // the operation handle to have the + // loop process any pending operations + // once its up and running + task::spawn_sched(task::manual_threads(1u)) {|| + // this call blocks + rustrt::rust_uv_run(loop_handle); + // when we're done, msg the + // end chan + rustrt::rust_uv_stop_op_cb(op_handle); + comm::send(end_chan, true); + comm::send(rust_loop_chan, uv_end); + }; + } + + msg_run_in_bg { + task::spawn_sched(task::manual_threads(1u)) {|| + // this call blocks + rustrt::rust_uv_run(loop_handle); + }; + } + + msg_close(handle, cb) { + let id = get_id_from_handle(handle); + close_callbacks.insert(id, cb); + let handle_ptr = handles.get(id); + let op = op_close(handle, handle_ptr); + + pass_to_libuv(op_handle, operation_chan, op); + } + uv_close(id) { + handles.remove(id); + let handle = id_to_handle.get(id); + id_to_handle.remove(id); + alt handle { + uv_async(id, _) { + async_cbs.remove(id); + } + uv_timer(id, _) { + timer_cbs.remove(id); + } + _ { + fail "unknown form of uv_handle encountered " + + "in uv_close handler"; + } + } + let cb = close_callbacks.get(id); + close_callbacks.remove(id); + task::spawn {|| + cb(); + }; + } + + msg_async_init(callback, after_cb) { + // create a new async handle + // with the id as the handle's + // data and save the callback for + // invocation on msg_async_send + let id = gen_handle_id(); + async_cbs.insert(id, callback); + after_cbs.insert(id, after_cb); + let op = op_async_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_async_init(id, async_handle) { + // libuv created a handle, which is + // passed back to us. save it and + // then invoke the supplied callback + // for after completion + handles.insert(id, async_handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let async = uv_async(id, rust_loop_chan); + id_to_handle.insert(id, copy(async)); + task::spawn {|| + after_cb(async); + }; + } + + msg_async_send(id) { + let async_handle = handles.get(id); + do_send(async_handle); + } + uv_async_send(id) { + let async_cb = async_cbs.get(id); + task::spawn {|| + async_cb(uv_async(id, rust_loop_chan)); + }; + } + + msg_timer_init(after_cb) { + let id = gen_handle_id(); + after_cbs.insert(id, after_cb); + let op = op_timer_init(id); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_init(id, handle) { + handles.insert(id, handle); + let after_cb = after_cbs.get(id); + after_cbs.remove(id); + let new_timer = uv_timer(id, rust_loop_chan); + id_to_handle.insert(id, copy(new_timer)); + task::spawn {|| + after_cb(new_timer); + }; + } + + uv_timer_call(id) { + let cb = timer_cbs.get(id); + let the_timer = id_to_handle.get(id); + task::spawn {|| + cb(the_timer); + }; + } + + msg_timer_start(id, timeout, repeat, timer_call_cb) { + timer_cbs.insert(id, timer_call_cb); + let handle = handles.get(id); + let op = op_timer_start(id, handle, timeout, + repeat); + pass_to_libuv(op_handle, operation_chan, op); + } + + msg_timer_stop(id, after_cb) { + let handle = handles.get(id); + let op = op_timer_stop(id, handle, after_cb); + pass_to_libuv(op_handle, operation_chan, op); + } + uv_timer_stop(id, after_cb) { + let the_timer = id_to_handle.get(id); + after_cb(the_timer); + } + + uv_end() { + keep_going = false; + } + + _ { fail "unknown form of uv_msg received"; } + } + } + }; + ret comm::recv(ret_recv_port); +} -fn idle_init(loop: *loop_t, idle: *idle_t) -> ctypes::c_int { - uv::rust_uv_idle_init(loop, idle) +fn run(loop: uv_loop) { + let end_port = comm::port::(); + let end_chan = comm::chan::(end_port); + comm::send(loop, msg_run(end_chan)); + comm::recv(end_port); } -fn idle_start(idle: *idle_t, cb: idle_cb) -> ctypes::c_int { - uv::rust_uv_idle_start(idle, cb) +fn run_in_bg(loop: uv_loop) { + comm::send(loop, msg_run_in_bg); } +fn async_init ( + loop: uv_loop, + async_cb: fn~(uv_handle), + after_cb: fn~(uv_handle)) { + let msg = msg_async_init(async_cb, after_cb); + comm::send(loop, msg); +} +fn async_send(async: uv_handle) { + alt async { + uv_async(id, loop) { + comm::send(loop, msg_async_send(id)); + } + _ { + fail "attempting to call async_send() with a" + + " uv_async uv_handle"; + } + } +} +fn close(h: uv_handle, cb: fn~()) { + let loop_chan = get_loop_chan_from_handle(h); + comm::send(loop_chan, msg_close(h, cb)); +} -fn default_loop() -> *loop_t { - uv::rust_uv_default_loop() +fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { + let msg = msg_timer_init(after_cb); + comm::send(loop, msg); } -fn loop_new() -> *loop_t { - uv::rust_uv_loop_new() +fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, + timer_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_start(id, timeout, repeat, timer_cb); + comm::send(loop_chan, msg); + } + _ { + fail "can only pass a uv_timer form of uv_handle to "+ + " uv::timer_start()"; + } + } } -fn loop_delete(loop: *loop_t) { - uv::rust_uv_loop_delete(loop) +fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { + alt the_timer { + uv_timer(id, loop_chan) { + let msg = msg_timer_stop(id, after_cb); + comm::send(loop_chan, msg); + } + _ { + fail "only uv_timer form is allowed in calls to "+ + " uv::timer_stop()"; + } + } } -fn run(loop: *loop_t) -> ctypes::c_int { - uv::rust_uv_run(loop) +// internal functions +fn pass_to_libuv( + op_handle: *ctypes::void, + operation_chan: comm::chan, + op: uv_operation) unsafe { + comm::send(operation_chan, copy(op)); + do_send(op_handle); +} +fn do_send(h: *ctypes::void) { + rustrt::rust_uv_async_send(h); +} +fn gen_handle_id() -> [u8] { + ret rand::mk_rng().gen_bytes(16u); +} +fn get_handle_id_from(buf: *u8) -> [u8] unsafe { + ret vec::unsafe::from_buf(buf, 16u); } -fn unref(loop: *loop_t) { - uv::rust_uv_unref(loop) +fn get_loop_chan_from_data(data: *uv_loop_data) + -> uv_loop unsafe { + ret (*data).rust_loop_chan; } +fn get_loop_chan_from_handle(handle: uv_handle) + -> uv_loop { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + ret loop; + } + _ { + fail "unknown form of uv_handle for get_loop_chan_from " + + " handle"; + } + } +} -fn sanity_check() { - fn check_size(t: str, uv: ctypes::size_t, rust: ctypes::size_t) { - #debug("size of %s: uv: %u, rust: %u", t, uv, rust); - assert uv <= rust; +fn get_id_from_handle(handle: uv_handle) -> [u8] { + alt handle { + uv_async(id,loop) | uv_timer(id,loop) { + ret id; + } + _ { + fail "unknown form of uv_handle for get_id_from handle"; + } } - check_size("idle_t", - helpers::rust_uv_size_of_idle_t(), - sys::size_of::()); -} - -fn handle_fields_new() -> handle_fields { - { - loop: ptr::null(), - type_: 0u32, - close_cb: ptr::null(), - data: ptr::null(), - private: { - a00: 0i32, - a01: 0i32, - a02: 0i32, - a03: 0i32, - a04: 0i32, - a05: 0i32, - a06: 0, - a07: 0, - a08: 0, - a09: 0, - a10: 0, - a11: 0, - a12: 0 +} + +// crust +crust fn process_operation( + loop: *ctypes::void, + data: *uv_loop_data) unsafe { + let op_port = (*data).operation_port; + let loop_chan = get_loop_chan_from_data(data); + let op_pending = comm::peek(op_port); + while(op_pending) { + alt comm::recv(op_port) { + op_async_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let async_handle = rustrt::rust_uv_async_init( + loop, + process_async_send, + id_ptr); + comm::send(loop_chan, uv_async_init( + id, + async_handle)); + } + op_close(handle, handle_ptr) { + handle_op_close(handle, handle_ptr); + } + op_timer_init(id) { + let id_ptr = vec::unsafe::to_ptr(id); + let timer_handle = rustrt::rust_uv_timer_init( + loop, + process_timer_call, + id_ptr); + comm::send(loop_chan, uv_timer_init( + id, + timer_handle)); + } + op_timer_start(id, handle, timeout, repeat) { + rustrt::rust_uv_timer_start(handle, timeout, + repeat); + } + op_timer_stop(id, handle, after_cb) { + rustrt::rust_uv_timer_stop(handle); + comm::send(loop_chan, uv_timer_stop(id, after_cb)); + } + _ { fail "unknown form of uv_operation received"; } } + op_pending = comm::peek(op_port); } } -fn idle_new() -> idle_t { - { - fields: handle_fields_new() +fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { + // it's just like im doing C + alt handle { + uv_async(id, loop) { + let cb = process_close_async; + rustrt::rust_uv_close( + handle_ptr, cb); + } + uv_timer(id, loop) { + let cb = process_close_timer; + rustrt::rust_uv_close( + handle_ptr, cb); + } + _ { + fail "unknown form of uv_handle encountered " + + "in process_operation/op_close"; + } } } -#[cfg(test)] -mod tests { +crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_async_send(handle_id)); +} - #[test] - fn test_sanity_check() { - sanity_check(); - } +crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) + unsafe { + let handle_id = get_handle_id_from(id_buf); + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_timer_call(handle_id)); +} + +fn process_close_common(id: [u8], data: *uv_loop_data) + unsafe { + // notify the rust loop that their handle is closed, then + // the caller will invoke a per-handle-type c++ func to + // free allocated memory + let loop_chan = get_loop_chan_from_data(data); + comm::send(loop_chan, uv_close(id)); +} - // From test-ref.c - mod test_ref { +crust fn process_close_async( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_async(handle_ptr); + // at this point, the handle and its data has been + // released. notify the rust loop to remove the + // handle and its data and call the user-supplied + // close cb + process_close_common(id, data); +} - #[test] - fn ref() { - let loop = loop_new(); - run(loop); - loop_delete(loop); - } +crust fn process_close_timer( + id_buf: *u8, + handle_ptr: *ctypes::void, + data: *uv_loop_data) + unsafe { + let id = get_handle_id_from(id_buf); + rustrt::rust_uv_close_timer(handle_ptr); + process_close_common(id, data); +} - #[test] - fn idle_ref() { - let loop = loop_new(); - let h = idle_new(); - idle_init(loop, ptr::addr_of(h)); - idle_start(ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - } - #[test] - fn async_ref() { - /* - let loop = loop_new(); - let h = async_new(); - async_init(loop, ptr::addr_of(h), ptr::null()); - unref(loop); - run(loop); - loop_delete(loop); - */ - } - } +#[test] +fn test_uv_new_loop_no_handles() { + let test_loop = uv::loop_new(); + run(test_loop); // this should return immediately + // since there aren't any handles.. } + +#[test] +fn test_uv_simple_async() { + let test_loop = loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + async_init(test_loop, {|new_async| + close(new_async) {|| + comm::send(exit_chan, true); + }; + }, {|new_async| + async_send(new_async); + }); + run(test_loop); + assert comm::recv(exit_port); +} + +#[test] +fn test_uv_timer() { + let test_loop = loop_new(); + let exit_port = comm::port::(); + let exit_chan = comm::chan::(exit_port); + timer_init(test_loop) {|new_timer| + timer_start(new_timer, 1u32, 0u32) {|started_timer| + timer_stop(started_timer) {|stopped_timer| + close(stopped_timer) {|| + comm::send(exit_chan, true); + }; + }; + }; + }; + run(test_loop); + assert comm::recv(exit_port); +} \ No newline at end of file diff --git a/src/libstd/uvtmp.rs b/src/libstd/uvtmp.rs deleted file mode 100644 index b64de837b095b..0000000000000 --- a/src/libstd/uvtmp.rs +++ /dev/null @@ -1,730 +0,0 @@ -// Some temporary libuv hacks for servo - -// UV2 - -// these are processed solely in the -// process_operation() crust fn below -enum uv_operation { - op_async_init([u8]), - op_close(uv_handle, *ctypes::void), - op_timer_init([u8]), - op_timer_start([u8], *ctypes::void, u32, u32), - op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) -} - -enum uv_handle { - uv_async([u8], uv_loop), - uv_timer([u8], uv_loop) -} - -enum uv_msg { - // requests from library users - msg_run(comm::chan), - msg_run_in_bg(), - msg_async_init(fn~(uv_handle), fn~(uv_handle)), - msg_async_send([u8]), - msg_close(uv_handle, fn~()), - msg_timer_init(fn~(uv_handle)), - msg_timer_start([u8], u32, u32, fn~(uv_handle)), - msg_timer_stop([u8], fn~(uv_handle)), - - // dispatches from libuv - uv_async_init([u8], *ctypes::void), - uv_async_send([u8]), - uv_close([u8]), - uv_timer_init([u8], *ctypes::void), - uv_timer_call([u8]), - uv_timer_stop([u8], fn~(uv_handle)), - uv_end() -} - -type uv_loop_data = { - operation_port: comm::port, - rust_loop_chan: comm::chan -}; - -type uv_loop = comm::chan; - -#[nolink] -native mod rustrt { - fn rust_uvtmp_create_thread() -> thread; - fn rust_uvtmp_start_thread(thread: thread); - fn rust_uvtmp_join_thread(thread: thread); - fn rust_uvtmp_delete_thread(thread: thread); - fn rust_uvtmp_connect( - thread: thread, - req_id: u32, - ip: str::sbuf, - chan: comm::chan) -> connect_data; - fn rust_uvtmp_close_connection(thread: thread, req_id: u32); - fn rust_uvtmp_write( - thread: thread, - req_id: u32, - buf: *u8, - len: ctypes::size_t, - chan: comm::chan); - fn rust_uvtmp_read_start( - thread: thread, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_timer( - thread: thread, - timeout: u32, - req_id: u32, - chan: comm::chan); - fn rust_uvtmp_delete_buf(buf: *u8); - fn rust_uvtmp_get_req_id(cd: connect_data) -> u32; - - fn rust_uvtmp_uv_loop_new() -> *ctypes::void; - fn rust_uvtmp_uv_loop_set_data( - loop: *ctypes::void, - data: *uv_loop_data); - fn rust_uvtmp_uv_bind_op_cb(loop: *ctypes::void, cb: *u8) - -> *ctypes::void; - fn rust_uvtmp_uv_stop_op_cb(handle: *ctypes::void); - fn rust_uvtmp_uv_run(loop_handle: *ctypes::void); - fn rust_uvtmp_uv_close(handle: *ctypes::void, cb: *u8); - fn rust_uvtmp_uv_close_async(handle: *ctypes::void); - fn rust_uvtmp_uv_close_timer(handle: *ctypes::void); - fn rust_uvtmp_uv_async_send(handle: *ctypes::void); - fn rust_uvtmp_uv_async_init( - loop_handle: *ctypes::void, - cb: *u8, - id: *u8) -> *ctypes::void; - fn rust_uvtmp_uv_timer_init( - loop_handle: *ctypes::void, - cb: *u8, - id: *u8) -> *ctypes::void; - fn rust_uvtmp_uv_timer_start( - timer_handle: *ctypes::void, - timeout: ctypes::c_uint, - repeat: ctypes::c_uint); - fn rust_uvtmp_uv_timer_stop(handle: *ctypes::void); -} - -mod uv { - export loop_new, run, close, run_in_bg, async_init, async_send, - timer_init, timer_start, timer_stop; - - // public functions - fn loop_new() -> uv_loop unsafe { - let ret_recv_port: comm::port = - comm::port(); - let ret_recv_chan: comm::chan = - comm::chan(ret_recv_port); - - task::spawn_sched(task::manual_threads(4u)) {|| - // our beloved uv_loop_t ptr - let loop_handle = rustrt:: - rust_uvtmp_uv_loop_new(); - - // this port/chan pair are used to send messages to - // libuv. libuv processes any pending messages on the - // port (via crust) after receiving an async "wakeup" - // on a special uv_async_t handle created below - let operation_port = comm::port::(); - let operation_chan = comm::chan::( - operation_port); - - // this port/chan pair as used in the while() loop - // below. It takes dispatches, originating from libuv - // callbacks, to invoke handles registered by the - // user - let rust_loop_port = comm::port::(); - let rust_loop_chan = - comm::chan::(rust_loop_port); - // let the task-spawner return - comm::send(ret_recv_chan, copy(rust_loop_chan)); - - // create our "special" async handle that will - // allow all operations against libuv to be - // "buffered" in the operation_port, for processing - // from the thread that libuv runs on - let loop_data: uv_loop_data = { - operation_port: operation_port, - rust_loop_chan: rust_loop_chan - }; - rustrt::rust_uvtmp_uv_loop_set_data( - loop_handle, - ptr::addr_of(loop_data)); // pass an opaque C-ptr - // to libuv, this will be - // in the process_operation - // crust fn - let op_handle = rustrt::rust_uvtmp_uv_bind_op_cb( - loop_handle, - process_operation); - - // all state goes here - let handles: map::map<[u8], *ctypes::void> = - map::new_bytes_hash(); - let id_to_handle: map::map<[u8], uv_handle> = - map::new_bytes_hash(); - let after_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let close_callbacks: map::map<[u8], fn~()> = - map::new_bytes_hash(); - let async_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - let timer_cbs: map::map<[u8], fn~(uv_handle)> = - map::new_bytes_hash(); - - // the main loop that this task blocks on. - // should have the same lifetime as the C libuv - // event loop. - let keep_going = true; - while (keep_going) { - alt comm::recv(rust_loop_port) { - msg_run(end_chan) { - // start the libuv event loop - // we'll also do a uv_async_send with - // the operation handle to have the - // loop process any pending operations - // once its up and running - task::spawn_sched(task::manual_threads(1u)) {|| - // this call blocks - rustrt::rust_uvtmp_uv_run(loop_handle); - // when we're done, msg the - // end chan - rustrt::rust_uvtmp_uv_stop_op_cb(op_handle); - comm::send(end_chan, true); - comm::send(rust_loop_chan, uv_end); - }; - } - - msg_run_in_bg { - task::spawn_sched(task::manual_threads(1u)) {|| - // this call blocks - rustrt::rust_uvtmp_uv_run(loop_handle); - }; - } - - msg_close(handle, cb) { - let id = get_id_from_handle(handle); - close_callbacks.insert(id, cb); - let handle_ptr = handles.get(id); - let op = op_close(handle, handle_ptr); - - pass_to_libuv(op_handle, operation_chan, op); - } - uv_close(id) { - handles.remove(id); - let handle = id_to_handle.get(id); - id_to_handle.remove(id); - alt handle { - uv_async(id, _) { - async_cbs.remove(id); - } - uv_timer(id, _) { - timer_cbs.remove(id); - } - _ { - fail "unknown form of uv_handle encountered " - + "in uv_close handler"; - } - } - let cb = close_callbacks.get(id); - close_callbacks.remove(id); - task::spawn {|| - cb(); - }; - } - - msg_async_init(callback, after_cb) { - // create a new async handle - // with the id as the handle's - // data and save the callback for - // invocation on msg_async_send - let id = gen_handle_id(); - async_cbs.insert(id, callback); - after_cbs.insert(id, after_cb); - let op = op_async_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_async_init(id, async_handle) { - // libuv created a handle, which is - // passed back to us. save it and - // then invoke the supplied callback - // for after completion - handles.insert(id, async_handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let async = uv_async(id, rust_loop_chan); - id_to_handle.insert(id, copy(async)); - task::spawn {|| - after_cb(async); - }; - } - - msg_async_send(id) { - let async_handle = handles.get(id); - do_send(async_handle); - } - uv_async_send(id) { - let async_cb = async_cbs.get(id); - task::spawn {|| - async_cb(uv_async(id, rust_loop_chan)); - }; - } - - msg_timer_init(after_cb) { - let id = gen_handle_id(); - after_cbs.insert(id, after_cb); - let op = op_timer_init(id); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_init(id, handle) { - handles.insert(id, handle); - let after_cb = after_cbs.get(id); - after_cbs.remove(id); - let new_timer = uv_timer(id, rust_loop_chan); - id_to_handle.insert(id, copy(new_timer)); - task::spawn {|| - after_cb(new_timer); - }; - } - - uv_timer_call(id) { - let cb = timer_cbs.get(id); - let the_timer = id_to_handle.get(id); - task::spawn {|| - cb(the_timer); - }; - } - - msg_timer_start(id, timeout, repeat, timer_call_cb) { - timer_cbs.insert(id, timer_call_cb); - let handle = handles.get(id); - let op = op_timer_start(id, handle, timeout, - repeat); - pass_to_libuv(op_handle, operation_chan, op); - } - - msg_timer_stop(id, after_cb) { - let handle = handles.get(id); - let op = op_timer_stop(id, handle, after_cb); - pass_to_libuv(op_handle, operation_chan, op); - } - uv_timer_stop(id, after_cb) { - let the_timer = id_to_handle.get(id); - after_cb(the_timer); - } - - uv_end() { - keep_going = false; - } - - _ { fail "unknown form of uv_msg received"; } - } - } - }; - ret comm::recv(ret_recv_port); - } - - fn run(loop: uv_loop) { - let end_port = comm::port::(); - let end_chan = comm::chan::(end_port); - comm::send(loop, msg_run(end_chan)); - comm::recv(end_port); - } - - fn run_in_bg(loop: uv_loop) { - comm::send(loop, msg_run_in_bg); - } - - fn async_init ( - loop: uv_loop, - async_cb: fn~(uv_handle), - after_cb: fn~(uv_handle)) { - let msg = msg_async_init(async_cb, after_cb); - comm::send(loop, msg); - } - - fn async_send(async: uv_handle) { - alt async { - uv_async(id, loop) { - comm::send(loop, msg_async_send(id)); - } - _ { - fail "attempting to call async_send() with a" + - " uv_async uv_handle"; - } - } - } - - fn close(h: uv_handle, cb: fn~()) { - let loop_chan = get_loop_chan_from_handle(h); - comm::send(loop_chan, msg_close(h, cb)); - } - - fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { - let msg = msg_timer_init(after_cb); - comm::send(loop, msg); - } - - fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, - timer_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, loop_chan) { - let msg = msg_timer_start(id, timeout, repeat, timer_cb); - comm::send(loop_chan, msg); - } - _ { - fail "can only pass a uv_timer form of uv_handle to "+ - " uv::timer_start()"; - } - } - } - - fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { - alt the_timer { - uv_timer(id, loop_chan) { - let msg = msg_timer_stop(id, after_cb); - comm::send(loop_chan, msg); - } - _ { - fail "only uv_timer form is allowed in calls to "+ - " uv::timer_stop()"; - } - } - } - - // internal functions - fn pass_to_libuv( - op_handle: *ctypes::void, - operation_chan: comm::chan, - op: uv_operation) unsafe { - comm::send(operation_chan, copy(op)); - do_send(op_handle); - } - fn do_send(h: *ctypes::void) { - rustrt::rust_uvtmp_uv_async_send(h); - } - fn gen_handle_id() -> [u8] { - ret rand::mk_rng().gen_bytes(16u); - } - fn get_handle_id_from(buf: *u8) -> [u8] unsafe { - ret vec::unsafe::from_buf(buf, 16u); - } - - fn get_loop_chan_from_data(data: *uv_loop_data) - -> uv_loop unsafe { - ret (*data).rust_loop_chan; - } - - fn get_loop_chan_from_handle(handle: uv_handle) - -> uv_loop { - alt handle { - uv_async(id,loop) | uv_timer(id,loop) { - ret loop; - } - _ { - fail "unknown form of uv_handle for get_loop_chan_from " - + " handle"; - } - } - } - - fn get_id_from_handle(handle: uv_handle) -> [u8] { - alt handle { - uv_async(id,loop) | uv_timer(id,loop) { - ret id; - } - _ { - fail "unknown form of uv_handle for get_id_from handle"; - } - } - } - - // crust - crust fn process_operation( - loop: *ctypes::void, - data: *uv_loop_data) unsafe { - let op_port = (*data).operation_port; - let loop_chan = get_loop_chan_from_data(data); - let op_pending = comm::peek(op_port); - while(op_pending) { - alt comm::recv(op_port) { - op_async_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let async_handle = rustrt::rust_uvtmp_uv_async_init( - loop, - process_async_send, - id_ptr); - comm::send(loop_chan, uv_async_init( - id, - async_handle)); - } - op_close(handle, handle_ptr) { - handle_op_close(handle, handle_ptr); - } - op_timer_init(id) { - let id_ptr = vec::unsafe::to_ptr(id); - let timer_handle = rustrt::rust_uvtmp_uv_timer_init( - loop, - process_timer_call, - id_ptr); - comm::send(loop_chan, uv_timer_init( - id, - timer_handle)); - } - op_timer_start(id, handle, timeout, repeat) { - rustrt::rust_uvtmp_uv_timer_start(handle, timeout, - repeat); - } - op_timer_stop(id, handle, after_cb) { - rustrt::rust_uvtmp_uv_timer_stop(handle); - comm::send(loop_chan, uv_timer_stop(id, after_cb)); - } - _ { fail "unknown form of uv_operation received"; } - } - op_pending = comm::peek(op_port); - } - } - - fn handle_op_close(handle: uv_handle, handle_ptr: *ctypes::void) { - // it's just like im doing C - alt handle { - uv_async(id, loop) { - let cb = process_close_async; - rustrt::rust_uvtmp_uv_close( - handle_ptr, cb); - } - uv_timer(id, loop) { - let cb = process_close_timer; - rustrt::rust_uvtmp_uv_close( - handle_ptr, cb); - } - _ { - fail "unknown form of uv_handle encountered " + - "in process_operation/op_close"; - } - } - } - - crust fn process_async_send(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_async_send(handle_id)); - } - - crust fn process_timer_call(id_buf: *u8, data: *uv_loop_data) - unsafe { - let handle_id = get_handle_id_from(id_buf); - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_timer_call(handle_id)); - } - - fn process_close_common(id: [u8], data: *uv_loop_data) - unsafe { - // notify the rust loop that their handle is closed, then - // the caller will invoke a per-handle-type c++ func to - // free allocated memory - let loop_chan = get_loop_chan_from_data(data); - comm::send(loop_chan, uv_close(id)); - } - - crust fn process_close_async( - id_buf: *u8, - handle_ptr: *ctypes::void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uvtmp_uv_close_async(handle_ptr); - // at this point, the handle and its data has been - // released. notify the rust loop to remove the - // handle and its data and call the user-supplied - // close cb - process_close_common(id, data); - } - - crust fn process_close_timer( - id_buf: *u8, - handle_ptr: *ctypes::void, - data: *uv_loop_data) - unsafe { - let id = get_handle_id_from(id_buf); - rustrt::rust_uvtmp_uv_close_timer(handle_ptr); - process_close_common(id, data); - } - -} - -#[test] -fn test_uvtmp_uv_new_loop_no_handles() { - let test_loop = uv::loop_new(); - uv::run(test_loop); // this should return immediately - // since there aren't any handles.. -} - -#[test] -fn test_uvtmp_uv_simple_async() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::async_init(test_loop, {|new_async| - uv::close(new_async) {|| - comm::send(exit_chan, true); - }; - }, {|new_async| - uv::async_send(new_async); - }); - uv::run(test_loop); - assert comm::recv(exit_port); -} - -#[test] -fn test_uvtmp_uv_timer() { - let test_loop = uv::loop_new(); - let exit_port = comm::port::(); - let exit_chan = comm::chan::(exit_port); - uv::timer_init(test_loop) {|new_timer| - uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| - uv::timer_stop(started_timer) {|stopped_timer| - uv::close(stopped_timer) {|| - comm::send(exit_chan, true); - }; - }; - }; - }; - uv::run(test_loop); - assert comm::recv(exit_port); -} - -// END OF UV2 - -type thread = *ctypes::void; - -type connect_data = *ctypes::void; - -enum iomsg { - whatever, - connected(connect_data), - wrote(connect_data), - read(connect_data, *u8, ctypes::ssize_t), - timer(u32), - exit -} - -fn create_thread() -> thread { - rustrt::rust_uvtmp_create_thread() -} - -fn start_thread(thread: thread) { - rustrt::rust_uvtmp_start_thread(thread) -} - -fn join_thread(thread: thread) { - rustrt::rust_uvtmp_join_thread(thread) -} - -fn delete_thread(thread: thread) { - rustrt::rust_uvtmp_delete_thread(thread) -} - -fn connect(thread: thread, req_id: u32, - ip: str, ch: comm::chan) -> connect_data { - str::as_buf(ip) {|ipbuf| - rustrt::rust_uvtmp_connect(thread, req_id, ipbuf, ch) - } -} - -fn close_connection(thread: thread, req_id: u32) { - rustrt::rust_uvtmp_close_connection(thread, req_id); -} - -fn write(thread: thread, req_id: u32, bytes: [u8], - chan: comm::chan) unsafe { - rustrt::rust_uvtmp_write( - thread, req_id, vec::to_ptr(bytes), vec::len(bytes), chan); -} - -fn read_start(thread: thread, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_read_start(thread, req_id, chan); -} - -fn timer_start(thread: thread, timeout: u32, req_id: u32, - chan: comm::chan) { - rustrt::rust_uvtmp_timer(thread, timeout, req_id, chan); -} - -fn delete_buf(buf: *u8) { - rustrt::rust_uvtmp_delete_buf(buf); -} - -fn get_req_id(cd: connect_data) -> u32 { - ret rustrt::rust_uvtmp_get_req_id(cd); -} - -#[test] -fn test_start_stop() { - let thread = create_thread(); - start_thread(thread); - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_connect() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - close_connection(thread, 0u32); - } - _ { fail "test_connect: port isn't connected"; } - } - join_thread(thread); - delete_thread(thread); -} - -#[test] -#[ignore] -fn test_http() { - let thread = create_thread(); - start_thread(thread); - let port = comm::port(); - let chan = comm::chan(port); - connect(thread, 0u32, "74.125.224.146", chan); - alt comm::recv(port) { - connected(cd) { - write(thread, 0u32, str::bytes("GET / HTTP/1.0\n\n"), chan); - alt comm::recv(port) { - wrote(cd) { - read_start(thread, 0u32, chan); - let keep_going = true; - while keep_going { - alt comm::recv(port) { - read(_, buf, -1) { - keep_going = false; - delete_buf(buf); - } - read(_, buf, len) { - unsafe { - log(error, len); - let buf = vec::unsafe::from_buf(buf, - len as uint); - let str = str::from_bytes(buf); - #error("read something"); - io::println(str); - } - delete_buf(buf); - } - _ { fail "test_http: protocol error"; } - } - } - close_connection(thread, 0u32); - } - _ { fail "test_http: expected `wrote`"; } - } - } - _ { fail "test_http: port not connected"; } - } - join_thread(thread); - delete_thread(thread); -} diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 9ee1b844add2c..03cd75d4805c2 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -1,51 +1,183 @@ #include "rust_internal.h" #include "uv.h" -/* - Wrappers of uv_* functions. These can be eliminated by figuring - out how to build static uv with externs, or by just using dynamic libuv - */ +// crust fn pointers +typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); +typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); +typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, + void* data); -extern "C" CDECL uv_loop_t* -rust_uv_default_loop() { - return uv_default_loop(); +// data types +#define RUST_UV_HANDLE_LEN 16 + +struct handle_data { + uint8_t id_buf[RUST_UV_HANDLE_LEN]; + crust_simple_cb cb; + crust_close_cb close_cb; +}; + +// helpers +static void* +current_kernel_malloc(size_t size, const char* tag) { + void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); + return ptr; +} + +static void +current_kernel_free(void* ptr) { + rust_task_thread::get_task()->kernel->free(ptr); +} + +static handle_data* +new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { + handle_data* data = (handle_data*)current_kernel_malloc( + sizeof(handle_data), + "handle_data"); + memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); + data->cb = cb; + return data; +} + +// libuv callback impls +static void +native_crust_async_op_cb(uv_async_t* handle, int status) { + crust_async_op_cb cb = (crust_async_op_cb)handle->data; + void* loop_data = handle->loop->data; + cb(handle->loop, loop_data); +} + +static void +native_async_cb(uv_async_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_timer_cb(uv_timer_t* handle, int status) { + handle_data* handle_d = (handle_data*)handle->data; + void* loop_data = handle->loop->data; + handle_d->cb(handle_d->id_buf, loop_data); +} + +static void +native_close_cb(uv_handle_t* handle) { + handle_data* data = (handle_data*)handle->data; + data->close_cb(data->id_buf, handle, handle->loop->data); } -extern "C" CDECL uv_loop_t* +static void +native_close_op_cb(uv_handle_t* op_handle) { + uv_loop_t* loop = op_handle->loop; + current_kernel_free(op_handle); + loop->data = 0; // a ptr to some stack-allocated rust mem + uv_loop_delete(loop); +} + +// native fns bound in rust +extern "C" void* rust_uv_loop_new() { - return uv_loop_new(); + return (void*)uv_loop_new(); +} + +extern "C" void +rust_uv_loop_set_data(uv_loop_t* loop, void* data) { + loop->data = data; } -extern "C" CDECL void -rust_uv_loop_delete(uv_loop_t *loop) { - return uv_loop_delete(loop); +extern "C" void* +rust_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_crust_async_op_cb); + async->data = (void*)cb; + // decrement the ref count, so that our async bind + // doesn't count towards keeping the loop alive + uv_unref(loop); + return async; } -extern "C" CDECL int -rust_uv_run(uv_loop_t *loop) { - return uv_run(loop); +extern "C" void +rust_uv_stop_op_cb(uv_handle_t* op_handle) { + /* // this is a hack to get libuv to cleanup a + // handle that was made to not prevent the loop + // from exiting via uv_unref(). + uv_ref(op_handle->loop); + uv_close(op_handle, native_close_op_cb); + uv_run(op_handle->loop); // this should process the handle's + // close event and then return + */ + // the above code is supposed to work to cleanly close + // a handler that was uv_unref()'d. but it causes much spew + // instead. this is the ugly/quick way to deal w/ it for now. + uv_close(op_handle, native_close_op_cb); + native_close_op_cb(op_handle); } -extern "C" CDECL void -rust_uv_unref(uv_loop_t *loop) { - return uv_unref(loop); +extern "C" void +rust_uv_run(uv_loop_t* loop) { + uv_run(loop); } -extern "C" CDECL int -rust_uv_idle_init(uv_loop_t* loop, uv_idle_t* idle) { - return uv_idle_init(loop, idle); +extern "C" void +rust_uv_close(uv_handle_t* handle, crust_close_cb cb) { + handle_data* data = (handle_data*)handle->data; + data->close_cb = cb; + uv_close(handle, native_close_cb); } -extern "C" CDECL int -rust_uv_idle_start(uv_idle_t* idle, uv_idle_cb cb) { - return uv_idle_start(idle, cb); +extern "C" void +rust_uv_close_async(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); } +extern "C" void +rust_uv_close_timer(uv_async_t* handle) { + current_kernel_free(handle->data); + current_kernel_free(handle); +} + +extern "C" void +rust_uv_async_send(uv_async_t* handle) { + uv_async_send(handle); +} +extern "C" void* +rust_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_async_t* async = (uv_async_t*)current_kernel_malloc( + sizeof(uv_async_t), + "uv_async_t"); + uv_async_init(loop, async, native_async_cb); + handle_data* data = new_handle_data_from(buf, cb); + async->data = data; + return async; +} + +extern "C" void* +rust_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, + uint8_t* buf) { + uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( + sizeof(uv_timer_t), + "uv_timer_t"); + uv_timer_init(loop, new_timer); + handle_data* data = new_handle_data_from(buf, cb); + new_timer->data = data; + + return new_timer; +} + +extern "C" void +rust_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, + uint32_t repeat) { + uv_timer_start(the_timer, native_timer_cb, timeout, repeat); +} -extern "C" CDECL size_t -rust_uv_size_of_idle_t() { - return sizeof(uv_idle_t); +extern "C" void +rust_uv_timer_stop(uv_timer_t* the_timer) { + uv_timer_stop(the_timer); } diff --git a/src/rt/rust_uvtmp.cpp b/src/rt/rust_uvtmp.cpp deleted file mode 100644 index 697231dff1b56..0000000000000 --- a/src/rt/rust_uvtmp.cpp +++ /dev/null @@ -1,612 +0,0 @@ -#include -#include -#include -#include "rust_internal.h" -#include "uv.h" - -class rust_uvtmp_thread; - -struct connect_data { - uint32_t req_id; - rust_uvtmp_thread *thread; - char * ip_addr; - uv_connect_t connect; - uv_tcp_t tcp; - chan_handle chan; -}; - -const intptr_t whatever_tag = 0; -const intptr_t connected_tag = 1; -const intptr_t wrote_tag = 2; -const intptr_t read_tag = 3; -const intptr_t timer_tag = 4; -const intptr_t exit_tag = 5; - -struct iomsg { - intptr_t tag; - union { - connect_data *connected_val; - connect_data *wrote_val; - struct { - connect_data *cd; - uint8_t *buf; - ssize_t nread; - } read_val; - uint32_t timer_req_id; - } val; -}; - -struct write_data { - connect_data *cd; - uint8_t *buf; - size_t len; - chan_handle chan; -}; - -struct read_start_data { - connect_data *cd; - chan_handle chan; -}; - -struct timer_start_data { - rust_uvtmp_thread *thread; - uint32_t timeout; - uint32_t req_id; - chan_handle chan; -}; - -// UVTMP REWORK - -// crust fn pointers -typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); -typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); -typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, - void* data); - -// data types -#define RUST_UV_HANDLE_LEN 16 - -struct handle_data { - uint8_t id_buf[RUST_UV_HANDLE_LEN]; - crust_simple_cb cb; - crust_close_cb close_cb; -}; - -// helpers -static void* -current_kernel_malloc(size_t size, const char* tag) { - void* ptr = rust_task_thread::get_task()->kernel->malloc(size, tag); - return ptr; -} - -static void -current_kernel_free(void* ptr) { - rust_task_thread::get_task()->kernel->free(ptr); -} - -static handle_data* -new_handle_data_from(uint8_t* buf, crust_simple_cb cb) { - handle_data* data = (handle_data*)current_kernel_malloc( - sizeof(handle_data), - "handle_data"); - memcpy(data->id_buf, buf, RUST_UV_HANDLE_LEN); - data->cb = cb; - return data; -} - -// libuv callback impls -static void -native_crust_async_op_cb(uv_async_t* handle, int status) { - crust_async_op_cb cb = (crust_async_op_cb)handle->data; - void* loop_data = handle->loop->data; - cb(handle->loop, loop_data); -} - -static void -native_async_cb(uv_async_t* handle, int status) { - handle_data* handle_d = (handle_data*)handle->data; - void* loop_data = handle->loop->data; - handle_d->cb(handle_d->id_buf, loop_data); -} - -static void -native_timer_cb(uv_timer_t* handle, int status) { - handle_data* handle_d = (handle_data*)handle->data; - void* loop_data = handle->loop->data; - handle_d->cb(handle_d->id_buf, loop_data); -} - -static void -native_close_cb(uv_handle_t* handle) { - handle_data* data = (handle_data*)handle->data; - data->close_cb(data->id_buf, handle, handle->loop->data); -} - -static void -native_close_op_cb(uv_handle_t* op_handle) { - uv_loop_t* loop = op_handle->loop; - current_kernel_free(op_handle); - loop->data = 0; // a ptr to some stack-allocated rust mem - uv_loop_delete(loop); -} - -// native fns bound in rust -extern "C" void* -rust_uvtmp_uv_loop_new() { - return (void*)uv_loop_new(); -} - -extern "C" void -rust_uvtmp_uv_loop_set_data(uv_loop_t* loop, void* data) { - loop->data = data; -} - -extern "C" void* -rust_uvtmp_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( - sizeof(uv_async_t), - "uv_async_t"); - uv_async_init(loop, async, native_crust_async_op_cb); - async->data = (void*)cb; - // decrement the ref count, so that our async bind - // doesn't count towards keeping the loop alive - uv_unref(loop); - return async; -} - -extern "C" void -rust_uvtmp_uv_stop_op_cb(uv_handle_t* op_handle) { - /* // this is a hack to get libuv to cleanup a - // handle that was made to not prevent the loop - // from exiting via uv_unref(). - uv_ref(op_handle->loop); - uv_close(op_handle, native_close_op_cb); - uv_run(op_handle->loop); // this should process the handle's - // close event and then return - */ - // the above code is supposed to work to cleanly close - // a handler that was uv_unref()'d. but it causes much spew - // instead. this is the ugly/quick way to deal w/ it for now. - uv_close(op_handle, native_close_op_cb); - native_close_op_cb(op_handle); -} - -extern "C" void -rust_uvtmp_uv_run(uv_loop_t* loop) { - uv_run(loop); -} - -extern "C" void -rust_uvtmp_uv_close(uv_handle_t* handle, crust_close_cb cb) { - handle_data* data = (handle_data*)handle->data; - data->close_cb = cb; - uv_close(handle, native_close_cb); -} - -extern "C" void -rust_uvtmp_uv_close_async(uv_async_t* handle) { - current_kernel_free(handle->data); - current_kernel_free(handle); -} - -extern "C" void -rust_uvtmp_uv_close_timer(uv_async_t* handle) { - current_kernel_free(handle->data); - current_kernel_free(handle); -} - -extern "C" void -rust_uvtmp_uv_async_send(uv_async_t* handle) { - uv_async_send(handle); -} - -extern "C" void* -rust_uvtmp_uv_async_init(uv_loop_t* loop, crust_simple_cb cb, - uint8_t* buf) { - uv_async_t* async = (uv_async_t*)current_kernel_malloc( - sizeof(uv_async_t), - "uv_async_t"); - uv_async_init(loop, async, native_async_cb); - handle_data* data = new_handle_data_from(buf, cb); - async->data = data; - - return async; -} - -extern "C" void* -rust_uvtmp_uv_timer_init(uv_loop_t* loop, crust_simple_cb cb, - uint8_t* buf) { - uv_timer_t* new_timer = (uv_timer_t*)current_kernel_malloc( - sizeof(uv_timer_t), - "uv_timer_t"); - uv_timer_init(loop, new_timer); - handle_data* data = new_handle_data_from(buf, cb); - new_timer->data = data; - - return new_timer; -} - -extern "C" void -rust_uvtmp_uv_timer_start(uv_timer_t* the_timer, uint32_t timeout, - uint32_t repeat) { - uv_timer_start(the_timer, native_timer_cb, timeout, repeat); -} - -extern "C" void -rust_uvtmp_uv_timer_stop(uv_timer_t* the_timer) { - uv_timer_stop(the_timer); -} - -// UVTMP REWORK - -// FIXME: Copied from rust_builtins.cpp. Could bitrot easily -static void -send(rust_task *task, chan_handle chan, void *data) { - rust_task *target_task = task->kernel->get_task_by_id(chan.task); - if(target_task) { - rust_port *port = target_task->get_port_by_id(chan.port); - if(port) { - port->send(data); - scoped_lock with(target_task->lock); - port->deref(); - } - target_task->deref(); - } -} - -class rust_uvtmp_thread : public rust_thread { - -private: - std::map req_map; - rust_task *task; - uv_loop_t *loop; - uv_idle_t idle; - lock_and_signal lock; - bool stop_flag; - std::queue > connect_queue; - std::queue close_connection_queue; - std::queue write_queue; - std::queue read_start_queue; - std::queue timer_start_queue; -public: - - rust_uvtmp_thread() { - task = rust_task_thread::get_task(); - stop_flag = false; - loop = uv_loop_new(); - uv_idle_init(loop, &idle); - idle.data = this; - uv_idle_start(&idle, idle_cb); - } - - ~rust_uvtmp_thread() { - uv_loop_delete(loop); - } - - void stop() { - scoped_lock with(lock); - stop_flag = true; - } - - connect_data *connect(uint32_t req_id, char *ip, chan_handle chan) { - scoped_lock with(lock); - if (req_map.count(req_id)) return NULL; - connect_data *cd = new connect_data(); - req_map[req_id] = cd; - cd->req_id = req_id; - cd->ip_addr = ip; - connect_queue.push( - std::pair(cd, chan)); - return cd; - } - - void - close_connection(uint32_t req_id) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - close_connection_queue.push(cd); - req_map.erase(req_id); - } - - void - write(uint32_t req_id, uint8_t *buf, size_t len, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - write_data *wd = new write_data(); - wd->cd = cd; - wd->buf = new uint8_t[len]; - wd->len = len; - wd->chan = chan; - - memcpy(wd->buf, buf, len); - - write_queue.push(wd); - } - - void - read_start(uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - connect_data *cd = req_map[req_id]; - read_start_data *rd = new read_start_data(); - rd->cd = cd; - rd->chan = chan; - - read_start_queue.push(rd); - } - - void - timer(uint32_t timeout, uint32_t req_id, chan_handle chan) { - scoped_lock with(lock); - - timer_start_data *td = new timer_start_data(); - td->timeout = timeout; - td->req_id = req_id; - td->chan = chan; - timer_start_queue.push(td); - } - -private: - - virtual void - run() { - uv_run(loop); - } - - static void - idle_cb(uv_idle_t* handle, int status) { - rust_uvtmp_thread *self = (rust_uvtmp_thread*) handle->data; - self->on_idle(); - } - - void - on_idle() { - scoped_lock with(lock); - make_new_connections(); - close_connections(); - write_buffers(); - start_reads(); - start_timers(); - close_idle_if_stop(); - } - - void - make_new_connections() { - assert(lock.lock_held_by_current_thread()); - while (!connect_queue.empty()) { - std::pair pair = connect_queue.front(); - connect_queue.pop(); - connect_data *cd = pair.first; - struct sockaddr_in client_addr = uv_ip4_addr("0.0.0.0", 0); - struct sockaddr_in server_addr = uv_ip4_addr(cd->ip_addr, 80); - - cd->thread = this; - cd->chan = pair.second; - cd->connect.data = cd; - - uv_tcp_init(loop, &cd->tcp); - uv_tcp_bind(&cd->tcp, client_addr); - - uv_tcp_connect(&cd->connect, &cd->tcp, server_addr, connect_cb); - } - } - - static void - connect_cb(uv_connect_t *handle, int status) { - connect_data *cd = (connect_data*)handle->data; - cd->thread->on_connect(cd); - } - - void - on_connect(connect_data *cd) { - iomsg msg; - msg.tag = connected_tag; - msg.val.connected_val = cd; - - send(task, cd->chan, &msg); - } - - void - close_connections() { - assert(lock.lock_held_by_current_thread()); - while (!close_connection_queue.empty()) { - connect_data *cd = close_connection_queue.front(); - close_connection_queue.pop(); - - cd->tcp.data = cd; - - uv_close((uv_handle_t*)&cd->tcp, tcp_close_cb); - } - } - - static void - tcp_close_cb(uv_handle_t *handle) { - connect_data *cd = (connect_data*)handle->data; - delete cd; - } - - void - write_buffers() { - assert(lock.lock_held_by_current_thread()); - while (!write_queue.empty()) { - write_data *wd = write_queue.front(); - write_queue.pop(); - - uv_write_t *write = new uv_write_t(); - - write->data = wd; - - uv_buf_t buf; - buf.base = (char*)wd->buf; - buf.len = wd->len; - - uv_write(write, (uv_stream_t*)&wd->cd->tcp, &buf, 1, write_cb); - } - } - - static void - write_cb(uv_write_t *handle, int status) { - write_data *wd = (write_data*)handle->data; - rust_uvtmp_thread *self = wd->cd->thread; - self->on_write(handle, wd); - } - - void - on_write(uv_write_t *handle, write_data *wd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.wrote_val = wd->cd; - - send(task, wd->chan, &msg); - - delete [] wd->buf; - delete wd; - delete handle; - } - - void - start_reads() { - assert (lock.lock_held_by_current_thread()); - while (!read_start_queue.empty()) { - read_start_data *rd = read_start_queue.front(); - read_start_queue.pop(); - - connect_data *cd = rd->cd; - cd->tcp.data = rd; - - uv_read_start((uv_stream_t*)&cd->tcp, alloc_cb, read_cb); - } - } - - static uv_buf_t - alloc_cb(uv_handle_t* handle, size_t size) { - uv_buf_t buf; - buf.base = new char[size]; - buf.len = size; - return buf; - } - - static void - read_cb(uv_stream_t *handle, ssize_t nread, uv_buf_t buf) { - read_start_data *rd = (read_start_data*)handle->data; - rust_uvtmp_thread *self = rd->cd->thread; - self->on_read(rd, nread, buf); - } - - void - on_read(read_start_data *rd, ssize_t nread, uv_buf_t buf) { - iomsg msg; - msg.tag = read_tag; - msg.val.read_val.cd = rd->cd; - msg.val.read_val.buf = (uint8_t*)buf.base; - msg.val.read_val.nread = nread; - - send(task, rd->chan, &msg); - if (nread == -1) { - delete rd; - } - } - - void - start_timers() { - assert (lock.lock_held_by_current_thread()); - while (!timer_start_queue.empty()) { - timer_start_data *td = timer_start_queue.front(); - timer_start_queue.pop(); - - td->thread = this; - - uv_timer_t *timer = (uv_timer_t *)malloc(sizeof(uv_timer_t)); - timer->data = td; - uv_timer_init(loop, timer); - uv_timer_start(timer, timer_cb, td->timeout, 0); - } - } - - static void - timer_cb(uv_timer_t *handle, int what) { - timer_start_data *td = (timer_start_data*)handle->data; - rust_uvtmp_thread *self = td->thread; - self->on_timer(td); - free(handle); - } - - void - on_timer(timer_start_data *rd) { - iomsg msg; - msg.tag = timer_tag; - msg.val.timer_req_id = rd->req_id; - - send(task, rd->chan, &msg); - delete rd; - } - - void - close_idle_if_stop() { - assert(lock.lock_held_by_current_thread()); - if (stop_flag) { - uv_close((uv_handle_t*)&idle, NULL); - } - } - -}; - -extern "C" rust_uvtmp_thread * -rust_uvtmp_create_thread() { - rust_uvtmp_thread *thread = new rust_uvtmp_thread(); - return thread; -} - -extern "C" void -rust_uvtmp_start_thread(rust_uvtmp_thread *thread) { - thread->start(); -} - -extern "C" void -rust_uvtmp_join_thread(rust_uvtmp_thread *thread) { - thread->stop(); - thread->join(); -} - -extern "C" void -rust_uvtmp_delete_thread(rust_uvtmp_thread *thread) { - delete thread; -} - -extern "C" connect_data * -rust_uvtmp_connect(rust_uvtmp_thread *thread, uint32_t req_id, char *ip, chan_handle *chan) { - return thread->connect(req_id, ip, *chan); -} - -extern "C" void -rust_uvtmp_close_connection(rust_uvtmp_thread *thread, uint32_t req_id) { - thread->close_connection(req_id); -} - -extern "C" void -rust_uvtmp_write(rust_uvtmp_thread *thread, uint32_t req_id, - uint8_t *buf, size_t len, chan_handle *chan) { - thread->write(req_id, buf, len, *chan); -} - -extern "C" void -rust_uvtmp_read_start(rust_uvtmp_thread *thread, uint32_t req_id, - chan_handle *chan) { - thread->read_start(req_id, *chan); -} - -extern "C" void -rust_uvtmp_timer(rust_uvtmp_thread *thread, uint32_t timeout, uint32_t req_id, chan_handle *chan) { - thread->timer(timeout, req_id, *chan); -} - -extern "C" void -rust_uvtmp_delete_buf(uint8_t *buf) { - delete [] buf; -} - -extern "C" uint32_t -rust_uvtmp_get_req_id(connect_data *cd) { - return cd->req_id; -} - - diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 2c7265ff58ae9..12bb693774828 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -80,38 +80,19 @@ upcall_call_shim_on_rust_stack upcall_new_stack upcall_del_stack upcall_reset_stack_limit -rust_uv_default_loop rust_uv_loop_new -rust_uv_loop_delete +rust_uv_loop_set_data +rust_uv_bind_op_cb +rust_uv_stop_op_cb rust_uv_run -rust_uv_unref -rust_uv_idle_init -rust_uv_idle_start -rust_uv_size_of_idle_t -rust_uvtmp_create_thread -rust_uvtmp_start_thread -rust_uvtmp_join_thread -rust_uvtmp_delete_thread -rust_uvtmp_connect -rust_uvtmp_close_connection -rust_uvtmp_write -rust_uvtmp_read_start -rust_uvtmp_timer -rust_uvtmp_delete_buf -rust_uvtmp_get_req_id -rust_uvtmp_uv_loop_new -rust_uvtmp_uv_loop_set_data -rust_uvtmp_uv_bind_op_cb -rust_uvtmp_uv_stop_op_cb -rust_uvtmp_uv_run -rust_uvtmp_uv_close -rust_uvtmp_uv_close_async -rust_uvtmp_uv_close_timer -rust_uvtmp_uv_async_send -rust_uvtmp_uv_async_init -rust_uvtmp_uv_timer_init -rust_uvtmp_uv_timer_start -rust_uvtmp_uv_timer_stop +rust_uv_close +rust_uv_close_async +rust_uv_close_timer +rust_uv_async_send +rust_uv_async_init +rust_uv_timer_init +rust_uv_timer_start +rust_uv_timer_stop rust_dbg_lock_create rust_dbg_lock_destroy rust_dbg_lock_lock From da2c8ec2c3e8eb95d4646cdc03d8a022eeff0246 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Wed, 22 Feb 2012 14:15:39 -0800 Subject: [PATCH 08/12] missing build file update for parent commit --- mk/rt.mk | 1 - 1 file changed, 1 deletion(-) diff --git a/mk/rt.mk b/mk/rt.mk index 26f35ea90dbe4..840442a2d6569 100644 --- a/mk/rt.mk +++ b/mk/rt.mk @@ -50,7 +50,6 @@ RUNTIME_CS_$(1) := \ rt/rust_port.cpp \ rt/rust_upcall.cpp \ rt/rust_uv.cpp \ - rt/rust_uvtmp.cpp \ rt/rust_log.cpp \ rt/rust_port_selector.cpp \ rt/circular_buffer.cpp \ From bfd9d502a8fb4a18c28726674839a1a6fc551be8 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Fri, 24 Feb 2012 17:43:31 -0800 Subject: [PATCH 09/12] correcting for libuv behavior that differs between linux & windows net complexity increase :/ --- src/libstd/uv.rs | 38 ++++++++++++++++++++++++++++++++++---- src/rt/rust_uv.cpp | 18 ++---------------- 2 files changed, 36 insertions(+), 20 deletions(-) diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 90ac5ce4b1cfb..0cc89a441ef70 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -8,7 +8,8 @@ enum uv_operation { op_close(uv_handle, *ctypes::void), op_timer_init([u8]), op_timer_start([u8], *ctypes::void, u32, u32), - op_timer_stop([u8], *ctypes::void, fn~(uv_handle)) + op_timer_stop([u8], *ctypes::void, fn~(uv_handle)), + op_teardown(*ctypes::void) } enum uv_handle { @@ -34,7 +35,8 @@ enum uv_msg { uv_timer_init([u8], *ctypes::void), uv_timer_call([u8]), uv_timer_stop([u8], fn~(uv_handle)), - uv_end() + uv_end(), + uv_teardown_check() } type uv_loop_data = { @@ -148,11 +150,13 @@ fn loop_new() -> uv_loop unsafe { // loop process any pending operations // once its up and running task::spawn_sched(task::manual_threads(1u)) {|| + // make sure we didn't start the loop + // without the user registering handles + comm::send(rust_loop_chan, uv_teardown_check); // this call blocks rustrt::rust_uv_run(loop_handle); // when we're done, msg the // end chan - rustrt::rust_uv_stop_op_cb(op_handle); comm::send(end_chan, true); comm::send(rust_loop_chan, uv_end); }; @@ -160,6 +164,8 @@ fn loop_new() -> uv_loop unsafe { msg_run_in_bg { task::spawn_sched(task::manual_threads(1u)) {|| + // see note above + comm::send(rust_loop_chan, uv_teardown_check); // this call blocks rustrt::rust_uv_run(loop_handle); }; @@ -194,6 +200,9 @@ fn loop_new() -> uv_loop unsafe { task::spawn {|| cb(); }; + // ask the rust loop to check and see if there + // are no more user-registered handles + comm::send(rust_loop_chan, uv_teardown_check); } msg_async_init(callback, after_cb) { @@ -202,6 +211,7 @@ fn loop_new() -> uv_loop unsafe { // data and save the callback for // invocation on msg_async_send let id = gen_handle_id(); + handles.insert(id, ptr::null()); async_cbs.insert(id, callback); after_cbs.insert(id, after_cb); let op = op_async_init(id); @@ -235,6 +245,7 @@ fn loop_new() -> uv_loop unsafe { msg_timer_init(after_cb) { let id = gen_handle_id(); + handles.insert(id, ptr::null()); after_cbs.insert(id, after_cb); let op = op_timer_init(id); pass_to_libuv(op_handle, operation_chan, op); @@ -276,6 +287,18 @@ fn loop_new() -> uv_loop unsafe { after_cb(the_timer); } + uv_teardown_check() { + // here we're checking if there are no user-registered + // handles (and the loop is running), if this is the + // case, then we need to unregister the op_handle via + // a uv_close() call, thus allowing libuv to return + // on its own. + if (handles.size() == 0u) { + let op = op_teardown(op_handle); + pass_to_libuv(op_handle, operation_chan, op); + } + } + uv_end() { keep_going = false; } @@ -442,6 +465,12 @@ crust fn process_operation( rustrt::rust_uv_timer_stop(handle); comm::send(loop_chan, uv_timer_stop(id, after_cb)); } + op_teardown(op_handle) { + // this is the last msg that'll be processed by + // this fn, in the current lifetime of the handle's + // uv_loop_t + rustrt::rust_uv_stop_op_cb(op_handle); + } _ { fail "unknown form of uv_operation received"; } } op_pending = comm::peek(op_port); @@ -536,7 +565,8 @@ fn test_uv_simple_async() { async_send(new_async); }); run(test_loop); - assert comm::recv(exit_port); + let result = comm::recv(exit_port); + assert result; } #[test] diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 03cd75d4805c2..4126896895e0c 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -68,10 +68,8 @@ native_close_cb(uv_handle_t* handle) { static void native_close_op_cb(uv_handle_t* op_handle) { - uv_loop_t* loop = op_handle->loop; current_kernel_free(op_handle); - loop->data = 0; // a ptr to some stack-allocated rust mem - uv_loop_delete(loop); + // uv_run() should return after this.. } // native fns bound in rust @@ -94,25 +92,13 @@ rust_uv_bind_op_cb(uv_loop_t* loop, crust_async_op_cb cb) { async->data = (void*)cb; // decrement the ref count, so that our async bind // doesn't count towards keeping the loop alive - uv_unref(loop); + //uv_unref(loop); return async; } extern "C" void rust_uv_stop_op_cb(uv_handle_t* op_handle) { - /* // this is a hack to get libuv to cleanup a - // handle that was made to not prevent the loop - // from exiting via uv_unref(). - uv_ref(op_handle->loop); uv_close(op_handle, native_close_op_cb); - uv_run(op_handle->loop); // this should process the handle's - // close event and then return - */ - // the above code is supposed to work to cleanly close - // a handler that was uv_unref()'d. but it causes much spew - // instead. this is the ugly/quick way to deal w/ it for now. - uv_close(op_handle, native_close_op_cb); - native_close_op_cb(op_handle); } extern "C" void From a5c2869a34defb6511b6c4029cbb53242f9c96a6 Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sat, 25 Feb 2012 22:08:52 -0800 Subject: [PATCH 10/12] add uv::loop_delete() because of the last change, the loop ptr is no longer cleaned up when the loop exits. This api call addresses that. Sadly, the loop ptr is not "reusable" across multiple calls to uv::run(). --- src/libstd/uv.rs | 97 ++++++++++++++++++++++++++++++++-------------- src/rt/rust_uv.cpp | 10 ++++- 2 files changed, 75 insertions(+), 32 deletions(-) diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 0cc89a441ef70..3988bd378d2cf 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -1,5 +1,6 @@ -export loop_new, run, close, run_in_bg, async_init, async_send, - timer_init, timer_start, timer_stop; +export loop_new, loop_delete, run, close, run_in_bg; +export async_init, async_send; +export timer_init, timer_start, timer_stop; // these are processed solely in the // process_operation() crust fn below @@ -44,11 +45,14 @@ type uv_loop_data = { rust_loop_chan: comm::chan }; -type uv_loop = comm::chan; +enum uv_loop { + uv_loop_new(comm::chan, *ctypes::void) +} #[nolink] native mod rustrt { fn rust_uv_loop_new() -> *ctypes::void; + fn rust_uv_loop_delete(loop: *ctypes::void); fn rust_uv_loop_set_data( loop: *ctypes::void, data: *uv_loop_data); @@ -103,7 +107,8 @@ fn loop_new() -> uv_loop unsafe { let rust_loop_chan = comm::chan::(rust_loop_port); // let the task-spawner return - comm::send(ret_recv_chan, copy(rust_loop_chan)); + let user_uv_loop = uv_loop_new(rust_loop_chan, loop_handle); + comm::send(ret_recv_chan, copy(user_uv_loop)); // create our "special" async handle that will // allow all operations against libuv to be @@ -225,7 +230,7 @@ fn loop_new() -> uv_loop unsafe { handles.insert(id, async_handle); let after_cb = after_cbs.get(id); after_cbs.remove(id); - let async = uv_async(id, rust_loop_chan); + let async = uv_async(id, user_uv_loop); id_to_handle.insert(id, copy(async)); task::spawn {|| after_cb(async); @@ -239,7 +244,8 @@ fn loop_new() -> uv_loop unsafe { uv_async_send(id) { let async_cb = async_cbs.get(id); task::spawn {|| - async_cb(uv_async(id, rust_loop_chan)); + let the_loop = user_uv_loop; + async_cb(uv_async(id, the_loop)); }; } @@ -254,7 +260,7 @@ fn loop_new() -> uv_loop unsafe { handles.insert(id, handle); let after_cb = after_cbs.get(id); after_cbs.remove(id); - let new_timer = uv_timer(id, rust_loop_chan); + let new_timer = uv_timer(id, user_uv_loop); id_to_handle.insert(id, copy(new_timer)); task::spawn {|| after_cb(new_timer); @@ -310,15 +316,22 @@ fn loop_new() -> uv_loop unsafe { ret comm::recv(ret_recv_port); } +fn loop_delete(loop: uv_loop) { + let loop_ptr = get_loop_ptr_from_uv_loop(loop); + rustrt::rust_uv_loop_delete(loop_ptr); +} + fn run(loop: uv_loop) { let end_port = comm::port::(); let end_chan = comm::chan::(end_port); - comm::send(loop, msg_run(end_chan)); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_run(end_chan)); comm::recv(end_port); } fn run_in_bg(loop: uv_loop) { - comm::send(loop, msg_run_in_bg); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_run_in_bg); } fn async_init ( @@ -326,13 +339,15 @@ fn async_init ( async_cb: fn~(uv_handle), after_cb: fn~(uv_handle)) { let msg = msg_async_init(async_cb, after_cb); - comm::send(loop, msg); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg); } fn async_send(async: uv_handle) { alt async { uv_async(id, loop) { - comm::send(loop, msg_async_send(id)); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg_async_send(id)); } _ { fail "attempting to call async_send() with a" + @@ -348,14 +363,16 @@ fn close(h: uv_handle, cb: fn~()) { fn timer_init(loop: uv_loop, after_cb: fn~(uv_handle)) { let msg = msg_timer_init(after_cb); - comm::send(loop, msg); + let loop_chan = get_loop_chan_from_uv_loop(loop); + comm::send(loop_chan, msg); } fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, timer_cb: fn~(uv_handle)) { alt the_timer { - uv_timer(id, loop_chan) { + uv_timer(id, loop) { let msg = msg_timer_start(id, timeout, repeat, timer_cb); + let loop_chan = get_loop_chan_from_uv_loop(loop); comm::send(loop_chan, msg); } _ { @@ -367,7 +384,8 @@ fn timer_start(the_timer: uv_handle, timeout: u32, repeat:u32, fn timer_stop(the_timer: uv_handle, after_cb: fn~(uv_handle)) { alt the_timer { - uv_timer(id, loop_chan) { + uv_timer(id, loop) { + let loop_chan = get_loop_chan_from_uv_loop(loop); let msg = msg_timer_stop(id, after_cb); comm::send(loop_chan, msg); } @@ -397,15 +415,16 @@ fn get_handle_id_from(buf: *u8) -> [u8] unsafe { } fn get_loop_chan_from_data(data: *uv_loop_data) - -> uv_loop unsafe { + -> comm::chan unsafe { ret (*data).rust_loop_chan; } fn get_loop_chan_from_handle(handle: uv_handle) - -> uv_loop { + -> comm::chan { alt handle { uv_async(id,loop) | uv_timer(id,loop) { - ret loop; + let loop_chan = get_loop_chan_from_uv_loop(loop); + ret loop_chan; } _ { fail "unknown form of uv_handle for get_loop_chan_from " @@ -414,6 +433,21 @@ fn get_loop_chan_from_handle(handle: uv_handle) } } +fn get_loop_ptr_from_uv_loop(loop: uv_loop) -> *ctypes::void { + alt loop { + uv_loop_new(loop_chan, loop_ptr) { + ret loop_ptr; + } + } +} +fn get_loop_chan_from_uv_loop(loop: uv_loop) -> comm::chan { + alt loop { + uv_loop_new(loop_chan, loop_ptr) { + ret loop_chan; + } + } +} + fn get_id_from_handle(handle: uv_handle) -> [u8] { alt handle { uv_async(id,loop) | uv_timer(id,loop) { @@ -548,41 +582,44 @@ crust fn process_close_timer( #[test] fn test_uv_new_loop_no_handles() { let test_loop = uv::loop_new(); - run(test_loop); // this should return immediately + uv::run(test_loop); // this should return immediately // since there aren't any handles.. + uv::loop_delete(test_loop); } #[test] fn test_uv_simple_async() { - let test_loop = loop_new(); + let test_loop = uv::loop_new(); let exit_port = comm::port::(); let exit_chan = comm::chan::(exit_port); - async_init(test_loop, {|new_async| - close(new_async) {|| + uv::async_init(test_loop, {|new_async| + uv::close(new_async) {|| comm::send(exit_chan, true); }; }, {|new_async| - async_send(new_async); + uv::async_send(new_async); }); - run(test_loop); + uv::run(test_loop); let result = comm::recv(exit_port); assert result; + uv::loop_delete(test_loop); } #[test] fn test_uv_timer() { - let test_loop = loop_new(); + let test_loop = uv::loop_new(); let exit_port = comm::port::(); let exit_chan = comm::chan::(exit_port); - timer_init(test_loop) {|new_timer| - timer_start(new_timer, 1u32, 0u32) {|started_timer| - timer_stop(started_timer) {|stopped_timer| - close(stopped_timer) {|| + uv::timer_init(test_loop) {|new_timer| + uv::timer_start(new_timer, 1u32, 0u32) {|started_timer| + uv::timer_stop(started_timer) {|stopped_timer| + uv::close(stopped_timer) {|| comm::send(exit_chan, true); }; }; }; }; - run(test_loop); + uv::run(test_loop); assert comm::recv(exit_port); -} \ No newline at end of file + uv::loop_delete(test_loop); +} diff --git a/src/rt/rust_uv.cpp b/src/rt/rust_uv.cpp index 4126896895e0c..096d40d223560 100644 --- a/src/rt/rust_uv.cpp +++ b/src/rt/rust_uv.cpp @@ -2,7 +2,8 @@ #include "uv.h" // crust fn pointers -typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data); +typedef void (*crust_async_op_cb)(uv_loop_t* loop, void* data, + uv_async_t* op_handle); typedef void (*crust_simple_cb)(uint8_t* id_buf, void* loop_data); typedef void (*crust_close_cb)(uint8_t* id_buf, void* handle, void* data); @@ -43,7 +44,7 @@ static void native_crust_async_op_cb(uv_async_t* handle, int status) { crust_async_op_cb cb = (crust_async_op_cb)handle->data; void* loop_data = handle->loop->data; - cb(handle->loop, loop_data); + cb(handle->loop, loop_data, handle); } static void @@ -78,6 +79,11 @@ rust_uv_loop_new() { return (void*)uv_loop_new(); } +extern "C" void +rust_uv_loop_delete(uv_loop_t* loop) { + uv_loop_delete(loop); +} + extern "C" void rust_uv_loop_set_data(uv_loop_t* loop, void* data) { loop->data = data; From 333d056fa202cb902799f77ffc2ac4a734b38bdc Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sat, 25 Feb 2012 22:29:18 -0800 Subject: [PATCH 11/12] add rust_uv_loop_delete to rustrt.def.in --- src/rt/rustrt.def.in | 1 + 1 file changed, 1 insertion(+) diff --git a/src/rt/rustrt.def.in b/src/rt/rustrt.def.in index 12bb693774828..2b5929600b03b 100644 --- a/src/rt/rustrt.def.in +++ b/src/rt/rustrt.def.in @@ -81,6 +81,7 @@ upcall_new_stack upcall_del_stack upcall_reset_stack_limit rust_uv_loop_new +rust_uv_loop_delete rust_uv_loop_set_data rust_uv_bind_op_cb rust_uv_stop_op_cb From d304767c045d345629c43e3b1e392a0b288dadfd Mon Sep 17 00:00:00 2001 From: Jeff Olson Date: Sun, 26 Feb 2012 12:11:30 -0800 Subject: [PATCH 12/12] changing rust loop to 1 thread --- src/libstd/uv.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/libstd/uv.rs b/src/libstd/uv.rs index 3988bd378d2cf..8022a7a38b793 100644 --- a/src/libstd/uv.rs +++ b/src/libstd/uv.rs @@ -86,7 +86,7 @@ fn loop_new() -> uv_loop unsafe { let ret_recv_chan: comm::chan = comm::chan(ret_recv_port); - task::spawn_sched(task::manual_threads(4u)) {|| + task::spawn_sched(task::manual_threads(1u)) {|| // our beloved uv_loop_t ptr let loop_handle = rustrt:: rust_uv_loop_new();