@@ -15,6 +15,7 @@ Message passing
15
15
#[ allow( missing_doc) ] ;
16
16
17
17
use clone:: Clone ;
18
+ use iter:: Iterator ;
18
19
use kinds:: Send ;
19
20
use option:: Option ;
20
21
use rtcomm = rt:: comm;
@@ -43,10 +44,35 @@ pub trait GenericPort<T> {
43
44
/// Receives a message, or fails if the connection closes.
44
45
fn recv ( & self ) -> T ;
45
46
46
- /** Receives a message, or returns `none` if
47
- the connection is closed or closes.
48
- */
47
+ /// Receives a message, or returns `none` if
48
+ /// the connection is closed or closes.
49
49
fn try_recv ( & self ) -> Option < T > ;
50
+
51
+ /// Returns an iterator that breaks once the connection closes.
52
+ ///
53
+ /// # Example
54
+ ///
55
+ /// ~~~rust
56
+ /// do spawn {
57
+ /// for x in port.recv_iter() {
58
+ /// if pred(x) { break; }
59
+ /// println!("{}", x);
60
+ /// }
61
+ /// }
62
+ /// ~~~
63
+ fn recv_iter < ' a > ( & ' a self ) -> RecvIterator < ' a , Self > {
64
+ RecvIterator { port : self }
65
+ }
66
+ }
67
+
68
+ pub struct RecvIterator < ' a , P > {
69
+ priv port : & ' a P ,
70
+ }
71
+
72
+ impl < ' a , T , P : GenericPort < T > > Iterator < T > for RecvIterator < ' a , P > {
73
+ fn next ( & mut self ) -> Option < T > {
74
+ self . port . try_recv ( )
75
+ }
50
76
}
51
77
52
78
/// Ports that can `peek`
@@ -227,3 +253,58 @@ impl<T: Send> Clone for SharedPort<T> {
227
253
SharedPort { x : p. clone ( ) }
228
254
}
229
255
}
256
+
257
+ #[ cfg( test) ]
258
+ mod tests {
259
+ use comm:: * ;
260
+ use prelude:: * ;
261
+
262
+ #[ test]
263
+ fn test_nested_recv_iter ( ) {
264
+ let ( port, chan) = stream :: < int > ( ) ;
265
+ let ( total_port, total_chan) = oneshot :: < int > ( ) ;
266
+
267
+ do spawn {
268
+ let mut acc = 0 ;
269
+ for x in port. recv_iter ( ) {
270
+ acc += x;
271
+ for x in port. recv_iter ( ) {
272
+ acc += x;
273
+ for x in port. try_recv ( ) . move_iter ( ) {
274
+ acc += x;
275
+ total_chan. send ( acc) ;
276
+ }
277
+ }
278
+ }
279
+ }
280
+
281
+ chan. send( 3 ) ;
282
+ chan. send( 1 ) ;
283
+ chan. send( 2 ) ;
284
+ assert_eq ! ( total_port. recv( ) , 6 ) ;
285
+ }
286
+
287
+ #[ test]
288
+ fn test_recv_iter_break ( ) {
289
+ let ( port, chan) = stream :: < int > ( ) ;
290
+ let ( count_port, count_chan) = oneshot :: < int > ( ) ;
291
+
292
+ do spawn {
293
+ let mut count = 0 ;
294
+ for x in port. recv_iter ( ) {
295
+ if count >= 3 {
296
+ count_chan. send ( count) ;
297
+ break ;
298
+ } else {
299
+ count += x;
300
+ }
301
+ }
302
+ }
303
+
304
+ chan. send ( 2 ) ;
305
+ chan. send ( 2 ) ;
306
+ chan. send ( 2 ) ;
307
+ chan. send ( 2 ) ;
308
+ assert_eq ! ( count_port. recv( ) , 4 ) ;
309
+ }
310
+ }
0 commit comments