1
- // Copyright 2012 The Rust Project Developers. See the COPYRIGHT
1
+ // Copyright 2012-2013 The Rust Project Developers. See the COPYRIGHT
2
2
// file at the top-level directory of this distribution and at
3
3
// http://rust-lang.org/COPYRIGHT.
4
4
//
@@ -90,9 +90,55 @@ pub fn DuplexStream<T:Send,U:Send>()
90
90
} )
91
91
}
92
92
93
+ /// An extension of `pipes::stream` that provides synchronous message sending.
94
+ pub struct SyncChan < T > { priv duplex_stream : DuplexStream < T , ( ) > }
95
+ /// An extension of `pipes::stream` that acknowledges each message received.
96
+ pub struct SyncPort < T > { priv duplex_stream : DuplexStream < ( ) , T > }
97
+
98
+ impl < T : Send > GenericChan < T > for SyncChan < T > {
99
+ fn send ( & self , val : T ) {
100
+ assert ! ( self . try_send( val) , "SyncChan.send: receiving port closed" ) ;
101
+ }
102
+ }
103
+
104
+ impl < T : Send > GenericSmartChan < T > for SyncChan < T > {
105
+ /// Sends a message, or report if the receiver has closed the connection before receiving.
106
+ fn try_send ( & self , val : T ) -> bool {
107
+ self . duplex_stream . try_send ( val) && self . duplex_stream . try_recv ( ) . is_some ( )
108
+ }
109
+ }
110
+
111
+ impl < T : Send > GenericPort < T > for SyncPort < T > {
112
+ fn recv ( & self ) -> T {
113
+ self . try_recv ( ) . expect ( "SyncPort.recv: sending channel closed" )
114
+ }
115
+
116
+ fn try_recv ( & self ) -> Option < T > {
117
+ do self . duplex_stream . try_recv ( ) . map_move |val| {
118
+ self . duplex_stream . try_send ( ( ) ) ;
119
+ val
120
+ }
121
+ }
122
+ }
123
+
124
+ impl < T : Send > Peekable < T > for SyncPort < T > {
125
+ fn peek ( & self ) -> bool {
126
+ self . duplex_stream . peek ( )
127
+ }
128
+ }
129
+
130
+ /// Creates a stream whose channel, upon sending a message, blocks until the message is received.
131
+ pub fn rendezvous < T : Send > ( ) -> ( SyncPort < T > , SyncChan < T > ) {
132
+ let ( chan_stream, port_stream) = DuplexStream ( ) ;
133
+ ( SyncPort { duplex_stream : port_stream } , SyncChan { duplex_stream : chan_stream } )
134
+ }
135
+
93
136
#[ cfg( test) ]
94
137
mod test {
95
- use comm:: DuplexStream ;
138
+ use comm:: { DuplexStream , rendezvous} ;
139
+ use std:: rt:: test:: run_in_newsched_task;
140
+ use std:: task:: spawn_unlinked;
141
+
96
142
97
143
#[ test]
98
144
pub fn DuplexStream1 ( ) {
@@ -104,4 +150,58 @@ mod test {
104
150
assert ! ( left. recv( ) == 123 ) ;
105
151
assert ! ( right. recv( ) == ~"abc");
106
152
}
153
+
154
+ #[test]
155
+ pub fn basic_rendezvous_test() {
156
+ let (port, chan) = rendezvous();
157
+
158
+ do spawn {
159
+ chan.send(" abc");
160
+ }
161
+
162
+ assert!(port.recv() == " abc" ) ;
163
+ }
164
+
165
+ #[ test]
166
+ fn recv_a_lot ( ) {
167
+ // Rendezvous streams should be able to handle any number of messages being sent
168
+ do run_in_newsched_task {
169
+ let ( port, chan) = rendezvous ( ) ;
170
+ do spawn {
171
+ do 1000000 . times { chan. send ( ( ) ) }
172
+ }
173
+ do 1000000 . times { port. recv ( ) }
174
+ }
175
+ }
176
+
177
+ #[ test]
178
+ fn send_and_fail_and_try_recv ( ) {
179
+ let ( port, chan) = rendezvous ( ) ;
180
+ do spawn_unlinked {
181
+ chan. duplex_stream . send ( ( ) ) ; // Can't access this field outside this module
182
+ fail ! ( )
183
+ }
184
+ port. recv ( )
185
+ }
186
+
187
+ #[ test]
188
+ fn try_send_and_recv_then_fail_before_ack ( ) {
189
+ let ( port, chan) = rendezvous ( ) ;
190
+ do spawn_unlinked {
191
+ port. duplex_stream . recv ( ) ;
192
+ fail ! ( )
193
+ }
194
+ chan. try_send ( ( ) ) ;
195
+ }
196
+
197
+ #[ test]
198
+ #[ should_fail]
199
+ fn send_and_recv_then_fail_before_ack ( ) {
200
+ let ( port, chan) = rendezvous ( ) ;
201
+ do spawn_unlinked {
202
+ port. duplex_stream . recv ( ) ;
203
+ fail ! ( )
204
+ }
205
+ chan. send ( ( ) ) ;
206
+ }
107
207
}
0 commit comments