Skip to content
This repository was archived by the owner on Sep 5, 2019. It is now read-only.

Commit 6ec454f

Browse files
committed
handle disconnection
1 parent 7a12724 commit 6ec454f

File tree

3 files changed

+687
-13
lines changed

3 files changed

+687
-13
lines changed

src/main.rs

Lines changed: 57 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use std::{
99
use futures::{
1010
channel::mpsc,
1111
SinkExt,
12+
select,
1213
};
1314

1415
use async_std::{
@@ -22,6 +23,8 @@ type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>
2223
type Sender<T> = mpsc::UnboundedSender<T>;
2324
type Receiver<T> = mpsc::UnboundedReceiver<T>;
2425

26+
#[derive(Debug)]
27+
enum Void {}
2528

2629
fn main() -> Result<()> {
2730
task::block_on(server("127.0.0.1:8080"))
@@ -38,7 +41,8 @@ async fn server(addr: impl ToSocketAddrs) -> Result<()> {
3841
println!("Accepting from: {}", stream.peer_addr()?);
3942
spawn_and_log_error(client(broker_sender.clone(), stream));
4043
}
41-
broker.await?;
44+
drop(broker_sender);
45+
broker.await;
4246
Ok(())
4347
}
4448

@@ -51,7 +55,12 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
5155
None => Err("peer disconnected immediately")?,
5256
Some(line) => line?,
5357
};
54-
broker.send(Event::NewPeer { name: name.clone(), stream: Arc::clone(&stream) }).await.unwrap();
58+
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>();
59+
broker.send(Event::NewPeer {
60+
name: name.clone(),
61+
stream: Arc::clone(&stream),
62+
shutdown: shutdown_receiver,
63+
}).await.unwrap();
5564

5665
while let Some(line) = lines.next().await {
5766
let line = line?;
@@ -68,16 +77,27 @@ async fn client(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> {
6877
msg,
6978
}).await.unwrap();
7079
}
80+
7181
Ok(())
7282
}
7383

7484
async fn client_writer(
75-
mut messages: Receiver<String>,
85+
messages: &mut Receiver<String>,
7686
stream: Arc<TcpStream>,
87+
mut shutdown: Receiver<Void>,
7788
) -> Result<()> {
7889
let mut stream = &*stream;
79-
while let Some(msg) = messages.next().await {
80-
stream.write_all(msg.as_bytes()).await?;
90+
loop {
91+
select! {
92+
msg = messages.next() => match msg {
93+
Some(msg) => stream.write_all(msg.as_bytes()).await?,
94+
None => break,
95+
},
96+
void = shutdown.next() => match void {
97+
Some(void) => match void {},
98+
None => break,
99+
}
100+
}
81101
}
82102
Ok(())
83103
}
@@ -87,6 +107,7 @@ enum Event {
87107
NewPeer {
88108
name: String,
89109
stream: Arc<TcpStream>,
110+
shutdown: Receiver<Void>,
90111
},
91112
Message {
92113
from: String,
@@ -95,31 +116,54 @@ enum Event {
95116
},
96117
}
97118

98-
async fn broker(mut events: Receiver<Event>) -> Result<()> {
119+
async fn broker(mut events: Receiver<Event>) {
120+
let (disconnect_sender, mut disconnect_receiver) =
121+
mpsc::unbounded::<(String, Receiver<String>)>();
99122
let mut peers: HashMap<String, Sender<String>> = HashMap::new();
100123

101-
while let Some(event) = events.next().await {
124+
loop {
125+
let event = select! {
126+
event = events.next() => match event {
127+
None => break,
128+
Some(event) => event,
129+
},
130+
disconnect = disconnect_receiver.next() => {
131+
let (name, _pending_messages) = disconnect.unwrap();
132+
assert!(peers.remove(&name).is_some());
133+
continue;
134+
},
135+
};
102136
match event {
103137
Event::Message { from, to, msg } => {
104138
for addr in to {
105139
if let Some(peer) = peers.get_mut(&addr) {
106-
peer.send(format!("from {}: {}\n", from, msg)).await?
140+
peer.send(format!("from {}: {}\n", from, msg)).await
141+
.unwrap()
107142
}
108143
}
109144
}
110-
Event::NewPeer { name, stream} => {
111-
match peers.entry(name) {
145+
Event::NewPeer { name, stream, shutdown } => {
146+
match peers.entry(name.clone()) {
112147
Entry::Occupied(..) => (),
113148
Entry::Vacant(entry) => {
114-
let (client_sender, client_receiver) = mpsc::unbounded();
149+
let (client_sender, mut client_receiver) = mpsc::unbounded();
115150
entry.insert(client_sender);
116-
spawn_and_log_error(client_writer(client_receiver, stream));
151+
let mut disconnect_sender = disconnect_sender.clone();
152+
spawn_and_log_error(async move {
153+
let res = client_writer(&mut client_receiver, stream, shutdown).await;
154+
disconnect_sender.send((name, client_receiver)).await
155+
.unwrap();
156+
res
157+
});
117158
}
118159
}
119160
}
120161
}
121162
}
122-
Ok(())
163+
drop(peers);
164+
drop(disconnect_sender);
165+
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {
166+
}
123167
}
124168

125169
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()>

0 commit comments

Comments
 (0)