Skip to content

Commit bbf24b2

Browse files
committed
Code clean-ups for custom socket support.
This fixes a number of issues discussed in the pull request.
1 parent e5bcfeb commit bbf24b2

File tree

5 files changed

+104
-111
lines changed

5 files changed

+104
-111
lines changed

src/libnative/io/mod.rs

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -211,14 +211,9 @@ impl rtio::IoFactory for IoFactory {
211211
{
212212
addrinfo::GetAddrInfoRequest::run(host, servname, hint)
213213
}
214-
fn socket_from_raw_fd(&mut self, fd: net::sock_t, close: rtio::CloseBehavior)
214+
fn socket_from_raw_fd(&mut self, fd: net::sock_t)
215215
-> IoResult<Box<rtio::RtioCustomSocket + Send>> {
216-
let close = match close {
217-
rtio::CloseSynchronously | rtio::CloseAsynchronously => true,
218-
rtio::DontClose => false
219-
};
220-
221-
net::Socket::new(fd, close).map(|s| box s as Box<rtio::RtioCustomSocket + Send>)
216+
net::Socket::new(fd).map(|s| box s as Box<rtio::RtioCustomSocket + Send>)
222217
}
223218

224219
// filesystem operations

src/libnative/io/net.rs

Lines changed: 19 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use alloc::arc::Arc;
1212
use libc;
13-
use std::intrinsics;
1413
use std::mem;
1514
use std::ptr;
1615
use std::rt::mutex;
@@ -883,24 +882,18 @@ impl rtio::RtioUdpSocket for UdpSocket {
883882
////////////////////////////////////////////////////////////////////////////////
884883

885884
#[cfg(windows)]
886-
pub fn net_buflen(buf: &[u8]) -> i32 {
887-
buf.len() as i32
888-
}
889-
885+
type Buflen = i32;
890886
#[cfg(not(windows))]
891-
pub fn net_buflen(buf: &[u8]) -> u64 {
892-
buf.len() as u64
893-
}
887+
type Buflen = u64;
894888

895889
pub struct Socket {
896890
fd: sock_t,
897-
close_on_drop: bool
898891
}
899892

900893
impl Socket {
901-
pub fn new(sock: sock_t, close_on_drop: bool) -> IoResult<Socket>
894+
pub fn new(sock: sock_t) -> IoResult<Socket>
902895
{
903-
let socket = Socket { fd: sock, close_on_drop: close_on_drop };
896+
let socket = Socket { fd: sock };
904897
return Ok(socket);
905898
}
906899
}
@@ -909,16 +902,11 @@ impl rtio::RtioCustomSocket for Socket {
909902
fn recv_from(&mut self, buf: &mut [u8], addr: *mut libc::sockaddr_storage)
910903
-> IoResult<uint>
911904
{
912-
let mut caddrlen = unsafe {
913-
intrinsics::size_of::<libc::sockaddr_storage>()
914-
} as libc::socklen_t;
905+
let mut caddrlen = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
915906
let len = unsafe {
916-
retry( || libc::recvfrom(self.fd,
917-
buf.as_ptr() as *mut libc::c_void,
918-
net_buflen(buf),
919-
0,
920-
addr as *mut libc::sockaddr,
921-
&mut caddrlen))
907+
retry( || libc::recvfrom(self.fd, buf.as_ptr() as *mut libc::c_void,
908+
buf.len() as Buflen, 0, addr as *mut libc::sockaddr,
909+
&mut caddrlen))
922910
};
923911
if len == -1 {
924912
return Err(last_error());
@@ -931,12 +919,8 @@ impl rtio::RtioCustomSocket for Socket {
931919
-> IoResult<uint>
932920
{
933921
let len = unsafe {
934-
retry( || libc::sendto(self.fd,
935-
buf.as_ptr() as *const libc::c_void,
936-
net_buflen(buf),
937-
0,
938-
addr,
939-
slen as libc::socklen_t))
922+
retry( || libc::sendto(self.fd, buf.as_ptr() as *const libc::c_void,
923+
buf.len() as Buflen, 0, addr, slen as libc::socklen_t))
940924
};
941925

942926
return if len < 0 {
@@ -945,12 +929,19 @@ impl rtio::RtioCustomSocket for Socket {
945929
Ok(len as uint)
946930
};
947931
}
932+
933+
fn clone(&self) -> Box<rtio::RtioCustomSocket + Send> {
934+
box Socket {
935+
fd: self.fd
936+
} as Box<rtio::RtioCustomSocket + Send>
937+
}
938+
948939
}
949940

950941
impl Drop for Socket {
951942
fn drop(&mut self) {
952-
if self.close_on_drop {
953-
unsafe { close(self.fd) }
943+
unsafe {
944+
close(self.fd)
954945
}
955946
}
956947
}

src/librustrt/rtio.rs

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -204,7 +204,7 @@ pub trait IoFactory {
204204
fn get_host_addresses(&mut self, host: Option<&str>, servname: Option<&str>,
205205
hint: Option<AddrinfoHint>)
206206
-> IoResult<Vec<AddrinfoInfo>>;
207-
fn socket_from_raw_fd(&mut self, fd: CSocketT, close: CloseBehavior)
207+
fn socket_from_raw_fd(&mut self, fd: CSocketT)
208208
-> IoResult<Box<RtioCustomSocket + Send>>;
209209

210210
// filesystem operations
@@ -300,6 +300,7 @@ pub trait RtioUdpSocket : RtioSocket {
300300
pub trait RtioCustomSocket {
301301
fn recv_from(&mut self, buf: &mut [u8], *mut libc::sockaddr_storage) -> IoResult<uint>;
302302
fn send_to(&mut self, buf: &[u8], dst: *const libc::sockaddr, len: uint) -> IoResult<uint>;
303+
fn clone(&self) -> Box<RtioCustomSocket + Send>;
303304
}
304305

305306
pub trait RtioTimer {

src/librustuv/net.rs

Lines changed: 79 additions & 68 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,6 @@
1010

1111
use libc::{size_t, ssize_t, c_int, c_void, c_uint};
1212
use libc;
13-
use std::intrinsics;
1413
use std::mem;
1514
use std::ptr;
1615
use std::rt::rtio;
@@ -849,18 +848,18 @@ pub struct SocketWatcher {
849848
handle: *mut uvll::uv_poll_t,
850849
socket: uvll::uv_os_socket_t,
851850
home: HomeHandle,
852-
close_on_drop: bool,
853-
}
854851

855-
#[cfg(windows)]
856-
pub fn net_buflen(buf: &[u8]) -> i32 {
857-
buf.len() as i32
852+
// See above for what these are
853+
refcount: Refcount,
854+
read_access: AccessTimeout,
855+
write_access: AccessTimeout,
856+
858857
}
859858

859+
#[cfg(windows)]
860+
type Buflen = i32;
860861
#[cfg(not(windows))]
861-
pub fn net_buflen(buf: &[u8]) -> u64 {
862-
buf.len() as u64
863-
}
862+
type Buflen = u64;
864863

865864
#[cfg(windows)]
866865
fn last_error() -> IoError {
@@ -909,7 +908,9 @@ fn make_nonblocking(socket: c_int) -> Option<IoError> {
909908
}
910909

911910
impl SocketWatcher {
912-
pub fn new(io: &mut UvIoFactory, socket: uvll::uv_os_socket_t, close_on_drop: bool)
911+
// NOTE It is an error to have multiple SocketWatchers for the same socket,
912+
// see documentation for uv_poll_s in uv.h.
913+
pub fn new(io: &mut UvIoFactory, socket: uvll::uv_os_socket_t)
913914
-> Result<SocketWatcher, IoError>
914915
{
915916
let handle = unsafe { uvll::malloc_handle(uvll::UV_POLL) };
@@ -918,7 +919,9 @@ impl SocketWatcher {
918919
handle: handle,
919920
home: io.make_handle(),
920921
socket: socket,
921-
close_on_drop: close_on_drop
922+
refcount: Refcount::new(),
923+
read_access: AccessTimeout::new(),
924+
write_access: AccessTimeout::new(),
922925
};
923926

924927
// Make socket non-blocking - required for libuv
@@ -941,7 +944,7 @@ impl UvHandle<uvll::uv_poll_t> for SocketWatcher {
941944
impl Drop for SocketWatcher {
942945
fn drop(&mut self) {
943946
let _m = self.fire_homing_missile();
944-
if self.close_on_drop {
947+
if self.refcount.decrement() {
945948
self.close();
946949
}
947950
}
@@ -959,10 +962,12 @@ impl rtio::RtioCustomSocket for SocketWatcher {
959962
task: Option<BlockedTask>,
960963
buf: &'b [u8],
961964
result: Option<Result<ssize_t, IoError>>,
962-
socket: Option<uvll::uv_os_socket_t>,
965+
socket: uvll::uv_os_socket_t,
963966
addr: *mut libc::sockaddr_storage
964967
}
965-
let _m = self.fire_homing_missile();
968+
let loop_ = self.uv_loop();
969+
let m = self.fire_homing_missile();
970+
let _guard = try!(self.read_access.grant(m));
966971
let a = match unsafe {
967972
uvll::uv_poll_start(self.handle, uvll::UV_READABLE as c_int, recv_cb)
968973
} {
@@ -971,11 +976,12 @@ impl rtio::RtioCustomSocket for SocketWatcher {
971976
task: None,
972977
buf: buf,
973978
result: None,
974-
socket: Some(self.socket),
979+
socket: self.socket,
975980
addr: addr,
976981
};
977-
wait_until_woken_after(&mut cx.task, &self.uv_loop(), || {
978-
unsafe { uvll::set_data_for_uv_handle(self.handle, &mut cx) }
982+
let handle = self.handle;
983+
wait_until_woken_after(&mut cx.task, &loop_, || {
984+
unsafe { uvll::set_data_for_uv_handle(handle, &mut cx) }
979985
});
980986
cx.result.unwrap().map(|n| n as uint)
981987
}
@@ -985,43 +991,38 @@ impl rtio::RtioCustomSocket for SocketWatcher {
985991

986992
extern fn recv_cb(handle: *mut uvll::uv_poll_t, status: c_int, events: c_int) {
987993
assert!((events & (uvll::UV_READABLE as c_int)) != 0);
988-
let cx: &mut Ctx = unsafe {
989-
intrinsics::transmute(uvll::get_data_for_uv_handle(handle))
994+
let cx = unsafe {
995+
uvll::get_data_for_uv_handle(handle) as *mut Ctx
990996
};
991997

992998
if status < 0 {
993-
cx.result = Some(Err(uv_error_to_io_error(UvError(status))));
994-
wakeup(&mut cx.task);
999+
unsafe {
1000+
(*cx).result = Some(Err(uv_error_to_io_error(UvError(status))));
1001+
wakeup(&mut (*cx).task);
1002+
}
9951003
return;
9961004
}
9971005

9981006
unsafe {
9991007
assert_eq!(uvll::uv_poll_stop(handle), 0)
10001008
}
10011009

1002-
let mut caddrlen = unsafe {
1003-
intrinsics::size_of::<libc::sockaddr_storage>()
1004-
} as libc::socklen_t;
1005-
let len = match cx.socket {
1006-
Some(sock) => unsafe {
1007-
libc::recvfrom(sock,
1008-
cx.buf.as_ptr() as *mut c_void,
1009-
net_buflen(cx.buf),
1010-
0,
1011-
cx.addr as *mut libc::sockaddr,
1012-
&mut caddrlen)
1013-
},
1014-
_ => -1
1010+
let mut caddrlen = mem::size_of::<libc::sockaddr_storage>() as libc::socklen_t;
1011+
let len = unsafe {
1012+
libc::recvfrom((*cx).socket, (*cx).buf.as_ptr() as *mut c_void,
1013+
(*cx).buf.len() as Buflen, 0,
1014+
(*cx).addr as *mut libc::sockaddr, &mut caddrlen)
10151015
};
1016-
if len == -1 {
1017-
cx.result = Some(Err(last_error()));
1018-
wakeup(&mut cx.task);
1019-
return;
1020-
}
10211016

1022-
cx.result = Some(Ok(len as ssize_t));
1017+
unsafe {
1018+
(*cx).result = if len == -1 {
1019+
Some(Err(last_error()))
1020+
} else {
1021+
Some(Ok(len as ssize_t))
1022+
};
10231023

1024-
wakeup(&mut cx.task);
1024+
wakeup(&mut (*cx).task);
1025+
}
10251026
}
10261027
}
10271028

@@ -1032,11 +1033,13 @@ impl rtio::RtioCustomSocket for SocketWatcher {
10321033
task: Option<BlockedTask>,
10331034
buf: &'b [u8],
10341035
result: Option<Result<uint, IoError>>,
1035-
socket: Option<uvll::uv_os_socket_t>,
1036+
socket: uvll::uv_os_socket_t,
10361037
addr: *const libc::sockaddr,
10371038
len: uint
10381039
}
1039-
let _m = self.fire_homing_missile();
1040+
let loop_ = self.uv_loop();
1041+
let m = self.fire_homing_missile();
1042+
let _guard = try!(self.write_access.grant(m));
10401043

10411044
let a = match unsafe {
10421045
uvll::uv_poll_start(self.handle, uvll::UV_WRITABLE as c_int, send_cb)
@@ -1046,12 +1049,13 @@ impl rtio::RtioCustomSocket for SocketWatcher {
10461049
task: None,
10471050
buf: buf,
10481051
result: None,
1049-
socket: Some(self.socket),
1052+
socket: self.socket,
10501053
addr: dst,
10511054
len: slen
10521055
};
1053-
wait_until_woken_after(&mut cx.task, &self.uv_loop(), || {
1054-
unsafe { uvll::set_data_for_uv_handle(self.handle, &mut cx) }
1056+
let handle = self.handle;
1057+
wait_until_woken_after(&mut cx.task, &loop_, || {
1058+
unsafe { uvll::set_data_for_uv_handle(handle, &mut cx) }
10551059
});
10561060
cx.result.unwrap()
10571061
}
@@ -1061,42 +1065,49 @@ impl rtio::RtioCustomSocket for SocketWatcher {
10611065

10621066
extern fn send_cb(handle: *mut uvll::uv_poll_t, status: c_int, events: c_int) {
10631067
assert!((events & (uvll::UV_WRITABLE as c_int)) != 0);
1064-
let cx: &mut Ctx = unsafe {
1065-
intrinsics::transmute(uvll::get_data_for_uv_handle(handle))
1068+
let cx = unsafe {
1069+
uvll::get_data_for_uv_handle(handle) as *mut Ctx
10661070
};
10671071
if status < 0 {
1068-
cx.result = Some(Err(uv_error_to_io_error(UvError(status))));
1069-
wakeup(&mut cx.task);
1072+
unsafe {
1073+
(*cx).result = Some(Err(uv_error_to_io_error(UvError(status))));
1074+
wakeup(&mut (*cx).task);
1075+
}
10701076
return;
10711077
}
10721078

10731079
unsafe {
10741080
assert_eq!(uvll::uv_poll_stop(handle), 0)
10751081
}
10761082

1077-
let len = match cx.socket {
1078-
Some(sock) => {
1079-
unsafe {
1080-
libc::sendto(sock,
1081-
cx.buf.as_ptr() as *const c_void,
1082-
net_buflen(cx.buf),
1083-
0,
1084-
cx.addr,
1085-
cx.len as libc::socklen_t)
1086-
}
1087-
},
1088-
_ => -1
1083+
let len = unsafe {
1084+
libc::sendto((*cx).socket, (*cx).buf.as_ptr() as *const c_void,
1085+
(*cx).buf.len() as Buflen, 0,
1086+
(*cx).addr, (*cx).len as libc::socklen_t)
10891087
};
10901088

1091-
cx.result = if len < 0 {
1092-
Some(Err(last_error()))
1093-
} else {
1094-
Some(Ok(len as uint))
1095-
};
1089+
unsafe {
1090+
(*cx).result = if len < 0 {
1091+
Some(Err(last_error()))
1092+
} else {
1093+
Some(Ok(len as uint))
1094+
};
10961095

1097-
wakeup(&mut cx.task);
1096+
wakeup(&mut (*cx).task);
1097+
}
10981098
}
10991099
}
1100+
1101+
fn clone(&self) -> Box<rtio::RtioCustomSocket + Send> {
1102+
box SocketWatcher {
1103+
handle: self.handle,
1104+
socket: self.socket,
1105+
home: self.home.clone(),
1106+
refcount: self.refcount.clone(),
1107+
write_access: self.write_access.clone(),
1108+
read_access: self.read_access.clone(),
1109+
} as Box<rtio::RtioCustomSocket + Send>
1110+
}
11001111
}
11011112

11021113

0 commit comments

Comments
 (0)