Skip to content

Commit 58b2ff9

Browse files
committed
Split out starting a listener from accepting incoming connections.
The Listener trait takes two type parameters, the type of connection and the type of Acceptor, and specifies only one method, listen, which consumes the listener and produces an Acceptor. The Acceptor trait takes one type parameter, the type of connection, and defines two methods. The accept() method waits for an incoming connection attempt and returns the result. The incoming() method creates an iterator over incoming connections and is a default method. Example: let listener = TcpListener.bind(addr); // Bind to a socket let acceptor = listener.listen(); // Start the listener for stream in acceptor.incoming() { // Process incoming connections forever (or until you break out of the loop) }
1 parent efb8924 commit 58b2ff9

File tree

6 files changed

+181
-112
lines changed

6 files changed

+181
-112
lines changed

src/libstd/rt/io/mod.rs

Lines changed: 32 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -474,17 +474,43 @@ pub trait Seek {
474474
fn seek(&mut self, pos: i64, style: SeekStyle);
475475
}
476476

477-
/// A listener is a value that listens for connections
478-
pub trait Listener<S> {
479-
/// Wait for and accept an incoming connection
480-
///
481-
/// Returns `None` on timeout.
477+
/// A listener is a value that can consume itself to start listening for connections.
478+
/// Doing so produces some sort of Acceptor.
479+
pub trait Listener<T, A: Acceptor<T>> {
480+
/// Spin up the listener and start queueing incoming connections
482481
///
483482
/// # Failure
484483
///
485484
/// Raises `io_error` condition. If the condition is handled,
485+
/// then `listen` returns `None`.
486+
fn listen(self) -> Option<A>;
487+
}
488+
489+
/// An acceptor is a value that presents incoming connections
490+
pub trait Acceptor<T> {
491+
/// Wait for and accept an incoming connection
492+
///
493+
/// # Failure
494+
/// Raise `io_error` condition. If the condition is handled,
486495
/// then `accept` returns `None`.
487-
fn accept(&mut self) -> Option<S>;
496+
fn accept(&mut self) -> Option<T>;
497+
498+
/// Create an iterator over incoming connections
499+
fn incoming<'r>(&'r mut self) -> IncomingIterator<'r, Self> {
500+
IncomingIterator { inc: self }
501+
}
502+
}
503+
504+
/// An infinite iterator over incoming connection attempts.
505+
/// Calling `next` will block the task until a connection is attempted.
506+
struct IncomingIterator<'self, A> {
507+
priv inc: &'self mut A,
508+
}
509+
510+
impl<'self, T, A: Acceptor<T>> Iterator<T> for IncomingIterator<'self, A> {
511+
fn next(&mut self) -> Option<T> {
512+
self.inc.accept()
513+
}
488514
}
489515

490516
/// Common trait for decorator types.

src/libstd/rt/io/net/tcp.rs

Lines changed: 56 additions & 50 deletions
Original file line numberDiff line numberDiff line change
@@ -11,12 +11,13 @@
1111
use option::{Option, Some, None};
1212
use result::{Ok, Err};
1313
use rt::io::net::ip::SocketAddr;
14-
use rt::io::{Reader, Writer, Listener};
14+
use rt::io::{Reader, Writer, Listener, Acceptor};
1515
use rt::io::{io_error, read_error, EndOfFile};
1616
use rt::rtio::{IoFactory, IoFactoryObject,
17-
RtioSocket, RtioTcpListener,
18-
RtioTcpListenerObject, RtioTcpStream,
19-
RtioTcpStreamObject};
17+
RtioSocket,
18+
RtioTcpListener, RtioTcpListenerObject,
19+
RtioTcpAcceptor, RtioTcpAcceptorObject,
20+
RtioTcpStream, RtioTcpStreamObject};
2021
use rt::local::Local;
2122

2223
pub struct TcpStream(~RtioTcpStreamObject);
@@ -124,13 +125,27 @@ impl TcpListener {
124125
}
125126
}
126127

127-
impl Listener<TcpStream> for TcpListener {
128+
impl Listener<TcpStream, TcpAcceptor> for TcpListener {
129+
fn listen(self) -> Option<TcpAcceptor> {
130+
match (**self).listen() {
131+
Ok(acceptor) => Some(TcpAcceptor(acceptor)),
132+
Err(ioerr) => {
133+
io_error::cond.raise(ioerr);
134+
None
135+
}
136+
}
137+
}
138+
}
139+
140+
pub struct TcpAcceptor(~RtioTcpAcceptorObject);
141+
142+
impl Acceptor<TcpStream> for TcpAcceptor {
128143
fn accept(&mut self) -> Option<TcpStream> {
129144
match (**self).accept() {
130145
Ok(s) => Some(TcpStream::new(s)),
131146
Err(ioerr) => {
132147
io_error::cond.raise(ioerr);
133-
return None;
148+
None
134149
}
135150
}
136151
}
@@ -184,8 +199,8 @@ mod test {
184199
let addr = next_test_ip4();
185200

186201
do spawntask {
187-
let mut listener = TcpListener::bind(addr);
188-
let mut stream = listener.accept();
202+
let mut acceptor = TcpListener::bind(addr).listen();
203+
let mut stream = acceptor.accept();
189204
let mut buf = [0];
190205
stream.read(buf);
191206
assert!(buf[0] == 99);
@@ -204,8 +219,8 @@ mod test {
204219
let addr = next_test_ip6();
205220

206221
do spawntask {
207-
let mut listener = TcpListener::bind(addr);
208-
let mut stream = listener.accept();
222+
let mut acceptor = TcpListener::bind(addr).listen();
223+
let mut stream = acceptor.accept();
209224
let mut buf = [0];
210225
stream.read(buf);
211226
assert!(buf[0] == 99);
@@ -224,8 +239,8 @@ mod test {
224239
let addr = next_test_ip4();
225240

226241
do spawntask {
227-
let mut listener = TcpListener::bind(addr);
228-
let mut stream = listener.accept();
242+
let mut acceptor = TcpListener::bind(addr).listen();
243+
let mut stream = acceptor.accept();
229244
let mut buf = [0];
230245
let nread = stream.read(buf);
231246
assert!(nread.is_none());
@@ -244,8 +259,8 @@ mod test {
244259
let addr = next_test_ip6();
245260

246261
do spawntask {
247-
let mut listener = TcpListener::bind(addr);
248-
let mut stream = listener.accept();
262+
let mut acceptor = TcpListener::bind(addr).listen();
263+
let mut stream = acceptor.accept();
249264
let mut buf = [0];
250265
let nread = stream.read(buf);
251266
assert!(nread.is_none());
@@ -265,8 +280,8 @@ mod test {
265280
let addr = next_test_ip4();
266281

267282
do spawntask {
268-
let mut listener = TcpListener::bind(addr);
269-
let mut stream = listener.accept();
283+
let mut acceptor = TcpListener::bind(addr).listen();
284+
let mut stream = acceptor.accept();
270285
let mut buf = [0];
271286
let nread = stream.read(buf);
272287
assert!(nread.is_none());
@@ -288,8 +303,8 @@ mod test {
288303
let addr = next_test_ip6();
289304

290305
do spawntask {
291-
let mut listener = TcpListener::bind(addr);
292-
let mut stream = listener.accept();
306+
let mut acceptor = TcpListener::bind(addr).listen();
307+
let mut stream = acceptor.accept();
293308
let mut buf = [0];
294309
let nread = stream.read(buf);
295310
assert!(nread.is_none());
@@ -311,8 +326,8 @@ mod test {
311326
let addr = next_test_ip4();
312327

313328
do spawntask {
314-
let mut listener = TcpListener::bind(addr);
315-
let mut stream = listener.accept();
329+
let mut acceptor = TcpListener::bind(addr).listen();
330+
let mut stream = acceptor.accept();
316331
let buf = [0];
317332
loop {
318333
let mut stop = false;
@@ -341,8 +356,8 @@ mod test {
341356
let addr = next_test_ip6();
342357

343358
do spawntask {
344-
let mut listener = TcpListener::bind(addr);
345-
let mut stream = listener.accept();
359+
let mut acceptor = TcpListener::bind(addr).listen();
360+
let mut stream = acceptor.accept();
346361
let buf = [0];
347362
loop {
348363
let mut stop = false;
@@ -371,9 +386,8 @@ mod test {
371386
let max = 10;
372387

373388
do spawntask {
374-
let mut listener = TcpListener::bind(addr);
375-
do max.times {
376-
let mut stream = listener.accept();
389+
let mut acceptor = TcpListener::bind(addr).listen();
390+
for ref mut stream in acceptor.incoming().take(max) {
377391
let mut buf = [0];
378392
stream.read(buf);
379393
assert_eq!(buf[0], 99);
@@ -396,9 +410,8 @@ mod test {
396410
let max = 10;
397411

398412
do spawntask {
399-
let mut listener = TcpListener::bind(addr);
400-
do max.times {
401-
let mut stream = listener.accept();
413+
let mut acceptor = TcpListener::bind(addr).listen();
414+
for ref mut stream in acceptor.incoming().take(max) {
402415
let mut buf = [0];
403416
stream.read(buf);
404417
assert_eq!(buf[0], 99);
@@ -421,10 +434,9 @@ mod test {
421434
static MAX: int = 10;
422435

423436
do spawntask {
424-
let mut listener = TcpListener::bind(addr);
425-
for i in range(0, MAX) {
426-
let stream = Cell::new(listener.accept());
427-
rtdebug!("accepted");
437+
let mut acceptor = TcpListener::bind(addr).listen();
438+
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
439+
let stream = Cell::new(stream);
428440
// Start another task to handle the connection
429441
do spawntask {
430442
let mut stream = stream.take();
@@ -460,10 +472,9 @@ mod test {
460472
static MAX: int = 10;
461473

462474
do spawntask {
463-
let mut listener = TcpListener::bind(addr);
464-
for i in range(0, MAX) {
465-
let stream = Cell::new(listener.accept());
466-
rtdebug!("accepted");
475+
let mut acceptor = TcpListener::bind(addr).listen();
476+
for (i, stream) in acceptor.incoming().enumerate().take(MAX as uint) {
477+
let stream = Cell::new(stream);
467478
// Start another task to handle the connection
468479
do spawntask {
469480
let mut stream = stream.take();
@@ -499,10 +510,9 @@ mod test {
499510
static MAX: int = 10;
500511

501512
do spawntask {
502-
let mut listener = TcpListener::bind(addr);
503-
for _ in range(0, MAX) {
504-
let stream = Cell::new(listener.accept());
505-
rtdebug!("accepted");
513+
let mut acceptor = TcpListener::bind(addr).listen();
514+
for stream in acceptor.incoming().take(MAX as uint) {
515+
let stream = Cell::new(stream);
506516
// Start another task to handle the connection
507517
do spawntask_later {
508518
let mut stream = stream.take();
@@ -537,10 +547,9 @@ mod test {
537547
static MAX: int = 10;
538548

539549
do spawntask {
540-
let mut listener = TcpListener::bind(addr);
541-
for _ in range(0, MAX) {
542-
let stream = Cell::new(listener.accept());
543-
rtdebug!("accepted");
550+
let mut acceptor = TcpListener::bind(addr).listen();
551+
for stream in acceptor.incoming().take(MAX as uint) {
552+
let stream = Cell::new(stream);
544553
// Start another task to handle the connection
545554
do spawntask_later {
546555
let mut stream = stream.take();
@@ -573,10 +582,7 @@ mod test {
573582
fn socket_name(addr: SocketAddr) {
574583
do run_in_newsched_task {
575584
do spawntask {
576-
let listener = TcpListener::bind(addr);
577-
578-
assert!(listener.is_some());
579-
let mut listener = listener.unwrap();
585+
let mut listener = TcpListener::bind(addr).unwrap();
580586

581587
// Make sure socket_name gives
582588
// us the socket we binded to.
@@ -592,9 +598,9 @@ mod test {
592598
fn peer_name(addr: SocketAddr) {
593599
do run_in_newsched_task {
594600
do spawntask {
595-
let mut listener = TcpListener::bind(addr);
601+
let mut acceptor = TcpListener::bind(addr).listen();
596602

597-
listener.accept();
603+
acceptor.accept();
598604
}
599605

600606
do spawntask {

src/libstd/rt/io/net/unix.rs

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,6 +40,12 @@ impl UnixListener {
4040
}
4141
}
4242

43-
impl Listener<UnixStream> for UnixListener {
43+
impl Listener<UnixStream, UnixAcceptor> for UnixListener {
44+
fn listen(self) -> Option<UnixAcceptor> { fail!() }
45+
}
46+
47+
pub struct UnixAcceptor;
48+
49+
impl Acceptor<UnixStream> for UnixAcceptor {
4450
fn accept(&mut self) -> Option<UnixStream> { fail!() }
4551
}

src/libstd/rt/io/option.rs

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
//! # XXX Seek and Close
1818
1919
use option::*;
20-
use super::{Reader, Writer, Listener};
20+
use super::{Reader, Writer, Listener, Acceptor};
2121
use super::{standard_error, PreviousIoError, io_error, read_error, IoError};
2222

2323
fn prev_io_error() -> IoError {
@@ -62,10 +62,22 @@ impl<R: Reader> Reader for Option<R> {
6262
}
6363
}
6464

65-
impl<L: Listener<S>, S> Listener<S> for Option<L> {
66-
fn accept(&mut self) -> Option<S> {
65+
impl<T, A: Acceptor<T>, L: Listener<T, A>> Listener<T, A> for Option<L> {
66+
fn listen(self) -> Option<A> {
67+
match self {
68+
Some(listener) => listener.listen(),
69+
None => {
70+
io_error::cond.raise(prev_io_error());
71+
None
72+
}
73+
}
74+
}
75+
}
76+
77+
impl<T, A: Acceptor<T>> Acceptor<T> for Option<A> {
78+
fn accept(&mut self) -> Option<T> {
6779
match *self {
68-
Some(ref mut listener) => listener.accept(),
80+
Some(ref mut acceptor) => acceptor.accept(),
6981
None => {
7082
io_error::cond.raise(prev_io_error());
7183
None

src/libstd/rt/rtio.rs

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@ pub type EventLoopObject = uvio::UvEventLoop;
2626
pub type RemoteCallbackObject = uvio::UvRemoteCallback;
2727
pub type IoFactoryObject = uvio::UvIoFactory;
2828
pub type RtioTcpStreamObject = uvio::UvTcpStream;
29+
pub type RtioTcpAcceptorObject = uvio::UvTcpAcceptor;
2930
pub type RtioTcpListenerObject = uvio::UvTcpListener;
3031
pub type RtioUdpSocketObject = uvio::UvUdpSocket;
3132
pub type RtioTimerObject = uvio::UvTimer;
@@ -75,6 +76,10 @@ pub trait IoFactory {
7576
}
7677

7778
pub trait RtioTcpListener : RtioSocket {
79+
fn listen(self) -> Result<~RtioTcpAcceptorObject, IoError>;
80+
}
81+
82+
pub trait RtioTcpAcceptor : RtioSocket {
7883
fn accept(&mut self) -> Result<~RtioTcpStreamObject, IoError>;
7984
fn accept_simultaneously(&mut self) -> Result<(), IoError>;
8085
fn dont_accept_simultaneously(&mut self) -> Result<(), IoError>;

0 commit comments

Comments
 (0)