-
Notifications
You must be signed in to change notification settings - Fork 340
Modernize a-chat #419
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Modernize a-chat #419
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,45 +1,43 @@ | ||
use futures::select; | ||
use futures::FutureExt; | ||
use std::sync::Arc; | ||
|
||
use async_std::{ | ||
io::{stdin, BufReader}, | ||
net::{TcpStream, ToSocketAddrs}, | ||
prelude::*, | ||
task, | ||
future::select, | ||
}; | ||
|
||
|
||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; | ||
|
||
pub(crate) fn main() -> Result<()> { | ||
task::block_on(try_main("127.0.0.1:8080")) | ||
} | ||
|
||
async fn try_main(addr: impl ToSocketAddrs) -> Result<()> { | ||
let stream = TcpStream::connect(addr).await?; | ||
let (reader, mut writer) = (&stream, &stream); | ||
let reader = BufReader::new(reader); | ||
let mut lines_from_server = futures::StreamExt::fuse(reader.lines()); | ||
|
||
let stdin = BufReader::new(stdin()); | ||
let mut lines_from_stdin = futures::StreamExt::fuse(stdin.lines()); | ||
loop { | ||
select! { | ||
line = lines_from_server.next().fuse() => match line { | ||
Some(line) => { | ||
let line = line?; | ||
println!("{}", line); | ||
}, | ||
None => break, | ||
}, | ||
line = lines_from_stdin.next().fuse() => match line { | ||
Some(line) => { | ||
let line = line?; | ||
writer.write_all(line.as_bytes()).await?; | ||
writer.write_all(b"\n").await?; | ||
} | ||
None => break, | ||
} | ||
} | ||
} | ||
Ok(()) | ||
let stream = Arc::new(TcpStream::connect(addr).await?); | ||
let (reader, writer) = (stream.clone(), stream.clone()); | ||
|
||
let incoming = task::spawn(async move { | ||
let mut messages = BufReader::new(&*reader).lines(); | ||
while let Some(message) = messages.next().await { | ||
let message = message?; | ||
println!("{}", message); | ||
} | ||
Ok(()) | ||
}); | ||
|
||
let outgoing = task::spawn(async move { | ||
let mut stdin = BufReader::new(stdin()).lines(); | ||
|
||
while let Some(line) = stdin.next().await { | ||
let line = line?; | ||
let message = format!("{}\n", line); | ||
(&*writer).write_all(message.as_bytes()).await?; | ||
} | ||
Ok(()) | ||
}); | ||
|
||
select!(incoming, outgoing).await | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think ideally we should either join both tasks, or cancel the one that is still running. And look like hard-cancelling would a wrong thing to do, as it could cancel a task mid-line. The solution with one There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. But, given that this is a quick&dirty CLI client, I don't think it's super important to really care here. |
||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -3,18 +3,16 @@ use std::{ | |
sync::Arc, | ||
}; | ||
|
||
use futures::{channel::mpsc, select, FutureExt, SinkExt}; | ||
|
||
use async_std::{ | ||
io::BufReader, | ||
net::{TcpListener, TcpStream, ToSocketAddrs}, | ||
prelude::*, | ||
task, | ||
sync::{channel, Sender, Receiver}, | ||
stream, | ||
}; | ||
|
||
type Result<T> = std::result::Result<T, Box<dyn std::error::Error + Send + Sync>>; | ||
type Sender<T> = mpsc::UnboundedSender<T>; | ||
type Receiver<T> = mpsc::UnboundedReceiver<T>; | ||
|
||
#[derive(Debug)] | ||
enum Void {} | ||
|
@@ -26,7 +24,7 @@ pub(crate) fn main() -> Result<()> { | |
async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { | ||
let listener = TcpListener::bind(addr).await?; | ||
|
||
let (broker_sender, broker_receiver) = mpsc::unbounded(); | ||
let (broker_sender, broker_receiver) = channel(10); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I have never understood how one is supposed to pick buffer size properly. Maybe the correct choice here is actually There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe @stjepang can answer that? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think you should not think about buffer capacities too much. Capacities of 1, 10, and 100 are all valid here, it doesn't really matter. Ideally, we'd use capacity of 0 as a safe default choice (like Go does, i.e. A channel can truly have capacity of 0 only in the context of preemptible threads. With Go channels and With futures-based channel, we can't do that. Imagine a receive operation is pending and registered in the channel. Then comes a send operation and sends a message. What happens if the receiving side then wakes up and cancels its receive operation? Did the message get through or not? Doesn't matter if your answer is "yes" or "no", we'll run into some inconsistencies either way. I guess the point is that we can only create an illusion of 0-capacity channels, but the channel will in some ways behave as if the capacity was 1. |
||
let broker = task::spawn(broker_loop(broker_receiver)); | ||
let mut incoming = listener.incoming(); | ||
while let Some(stream) = incoming.next().await { | ||
|
@@ -39,7 +37,7 @@ async fn accept_loop(addr: impl ToSocketAddrs) -> Result<()> { | |
Ok(()) | ||
} | ||
|
||
async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result<()> { | ||
async fn connection_loop(broker: Sender<Event>, stream: TcpStream) -> Result<()> { | ||
let stream = Arc::new(stream); | ||
let reader = BufReader::new(&*stream); | ||
let mut lines = reader.lines(); | ||
|
@@ -48,15 +46,14 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result | |
None => return Err("peer disconnected immediately".into()), | ||
Some(line) => line?, | ||
}; | ||
let (_shutdown_sender, shutdown_receiver) = mpsc::unbounded::<Void>(); | ||
let (_shutdown_sender, shutdown_receiver) = channel::<Void>(0); | ||
broker | ||
.send(Event::NewPeer { | ||
name: name.clone(), | ||
stream: Arc::clone(&stream), | ||
shutdown: shutdown_receiver, | ||
}) | ||
.await | ||
.unwrap(); | ||
.await; | ||
|
||
while let Some(line) = lines.next().await { | ||
let line = line?; | ||
|
@@ -76,28 +73,36 @@ async fn connection_loop(mut broker: Sender<Event>, stream: TcpStream) -> Result | |
to: dest, | ||
msg, | ||
}) | ||
.await | ||
.unwrap(); | ||
.await; | ||
} | ||
|
||
Ok(()) | ||
} | ||
|
||
#[derive(Debug)] | ||
enum ConnectionWriterEvent { | ||
Message(String), | ||
Shutdown | ||
} | ||
|
||
async fn connection_writer_loop( | ||
messages: &mut Receiver<String>, | ||
stream: Arc<TcpStream>, | ||
mut shutdown: Receiver<Void>, | ||
shutdown: Receiver<Void>, | ||
) -> Result<()> { | ||
let mut stream = &*stream; | ||
loop { | ||
select! { | ||
msg = messages.next().fuse() => match msg { | ||
Some(msg) => stream.write_all(msg.as_bytes()).await?, | ||
None => break, | ||
}, | ||
void = shutdown.next().fuse() => match void { | ||
Some(void) => match void {}, | ||
None => break, | ||
let messages = messages.map(ConnectionWriterEvent::Message); | ||
let shutdown = shutdown.map(|_| ConnectionWriterEvent::Shutdown).chain(stream::once(ConnectionWriterEvent::Shutdown)); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That's smart! Although I wonder if there's some simpler cooperative shutdown idiom... There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Ok, I think I've conjured something up: fn with_shutdown<S, T>(stream: S, shutdown: Receiver<Void>) -> impl Stream<Item=T>
where
S: Stream<Item=T>,
T: Unpin,
{
let items = stream.map(Some);
let shutdown = shutdown.map(|void| match void {}).chain(stream::once(None));
items.merge(shutdown).scan((), |&mut (), item| item)
} The loop can then be written as let mut messages = with_shutdown(messages, shutdown);
while let Some(msg) = messages.next().await {
} Note that this also allows pushing cancellation completely out of this function and onto the call site (at the cost of making this generic over There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @matklad I feel like the I feel like in the case of this example we should probably try to err on keeping code as simple as possible. Never types, and empty matches could be tricky for people to pick up on. I think keeping what is in the PR already would be easier to follow for most people. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Same! I see how the "void" pattern is useful but is definitely something that is pretty unusual and gives me pause :) There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think I've managed to make cancellation in this case rather easy, at the cost of building a 75-lines long stop_source/stop_token library: matklad@ab02901. I feel this is roughly the place where we want to end-up eventually, but I am not sure we want to do this for tutorial. I am genuinely don't know what to do with tutorial, I have how there's no simple solution :( There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Met up with @skade in person today, and we had some discussions about this. I think we ended up with two conclusions:
Regarding this PR. I think focusing on making it as understandable as possible might be the right direction for now. And once we figure out a more formal strategy for cancellation we can apply that in a follow-up PR. How does that sound? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. SGTM, I’ve sort-of reaches similar conclusions, specifically:
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
That's definitely an option I think. It's a very cool system to show off though, so I feel like it's a bit of a loss. But given the circumstance maybe it's for the best. Also yeah having it on crates.io would be neat; big fan of publishing things! There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @yoshuawuyts published https://github.com/async-rs/stop-token. You are also an owner on crates-io (I was surprised that there's no async-rs-publishes group to which I can grant permissions). There's a high change that I won't be able to properly maintain this crate, but docs are very upfront that it is an experiment, and it's also in a finished state already, so there's hopefully little to maintain (well, until folks ask for linked tokens and cancellation callbacks...) |
||
|
||
let mut events = shutdown.merge(messages); | ||
|
||
while let Some(event) = events.next().await { | ||
match event { | ||
ConnectionWriterEvent::Message(msg) => { | ||
stream.write_all(msg.as_bytes()).await?; | ||
} | ||
ConnectionWriterEvent::Shutdown => { | ||
break | ||
} | ||
} | ||
} | ||
|
@@ -118,58 +123,61 @@ enum Event { | |
}, | ||
} | ||
|
||
async fn broker_loop(mut events: Receiver<Event>) { | ||
let (disconnect_sender, mut disconnect_receiver) = | ||
mpsc::unbounded::<(String, Receiver<String>)>(); | ||
#[derive(Debug)] | ||
enum BrokerEvent { | ||
ClientEvent(Event), | ||
Disconnection((String, Receiver<String>)), | ||
Shutdown, | ||
} | ||
|
||
async fn broker_loop(events: Receiver<Event>) { | ||
let (disconnect_sender, disconnect_receiver) = channel(10); | ||
|
||
let mut peers: HashMap<String, Sender<String>> = HashMap::new(); | ||
let disconnect_receiver = disconnect_receiver.map(BrokerEvent::Disconnection); | ||
let events = events.map(BrokerEvent::ClientEvent).chain(stream::once(BrokerEvent::Shutdown)); | ||
|
||
loop { | ||
let event = select! { | ||
event = events.next().fuse() => match event { | ||
None => break, | ||
Some(event) => event, | ||
}, | ||
disconnect = disconnect_receiver.next().fuse() => { | ||
let (name, _pending_messages) = disconnect.unwrap(); | ||
assert!(peers.remove(&name).is_some()); | ||
continue; | ||
}, | ||
}; | ||
let mut stream = disconnect_receiver.merge(events); | ||
|
||
while let Some(event) = stream.next().await { | ||
match event { | ||
Event::Message { from, to, msg } => { | ||
BrokerEvent::ClientEvent(Event::Message { from, to, msg }) => { | ||
for addr in to { | ||
if let Some(peer) = peers.get_mut(&addr) { | ||
let msg = format!("from {}: {}\n", from, msg); | ||
peer.send(msg).await.unwrap(); | ||
peer.send(msg).await; | ||
} | ||
} | ||
} | ||
Event::NewPeer { | ||
BrokerEvent::ClientEvent(Event::NewPeer { | ||
name, | ||
stream, | ||
shutdown, | ||
} => match peers.entry(name.clone()) { | ||
}) => match peers.entry(name.clone()) { | ||
Entry::Occupied(..) => (), | ||
Entry::Vacant(entry) => { | ||
let (client_sender, mut client_receiver) = mpsc::unbounded(); | ||
let (client_sender, mut client_receiver) = channel(10); | ||
entry.insert(client_sender); | ||
let mut disconnect_sender = disconnect_sender.clone(); | ||
let disconnect_sender = disconnect_sender.clone(); | ||
spawn_and_log_error(async move { | ||
let res = | ||
connection_writer_loop(&mut client_receiver, stream, shutdown).await; | ||
disconnect_sender | ||
.send((name, client_receiver)) | ||
.await | ||
.unwrap(); | ||
.await; | ||
res | ||
}); | ||
} | ||
}, | ||
} | ||
BrokerEvent::Disconnection((name, _pending_messages)) => { | ||
assert!(peers.remove(&name).is_some()); | ||
} | ||
BrokerEvent::Shutdown => break, | ||
} | ||
} | ||
drop(peers); | ||
drop(disconnect_sender); | ||
while let Some((_name, _pending_messages)) = disconnect_receiver.next().await {} | ||
while let Some(BrokerEvent::Disconnection((_name, _pending_messages))) = stream.next().await {} | ||
} | ||
|
||
fn spawn_and_log_error<F>(fut: F) -> task::JoinHandle<()> | ||
|
Uh oh!
There was an error while loading. Please reload this page.