Skip to content

Send a single stream to multiple sinks? #486

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
NickeZ opened this issue May 24, 2017 · 10 comments
Closed

Send a single stream to multiple sinks? #486

NickeZ opened this issue May 24, 2017 · 10 comments

Comments

@NickeZ
Copy link
Contributor

NickeZ commented May 24, 2017

Hey,

So I have an mpsc where I named the stream endpoint rx and then I have a stream of sinks. I want to stream from rx to every sink in the order they come. So I stream to the first sink until send_start returns an error and then I continue to stream to the second sink until send_start returns an error and so on. I'm obviously a noob at tokio and maybe what I'm trying to do is really weird or completely wrong..

So the code is something like this:

        let x = child_writers.fold(rx, move |rx, writer| {
            writer.send_all(rx).map(|(_, rx)| rx)
        }).map_err(|_| ());

But I get the following compiler error:

error: internal compiler error: /checkout/src/librustc_trans/type_of.rs:157: Unexpected tail in unsized_info_ty: futures::future::join::MaybeDone<futures::MapErr<futures::stream::Fold<child::ProcessWriters, [closure@src/telnet_server.rs:94:40: 96:10], futures::Map<futures::sink::SendAll<pty::PtySink, futures::stream::MapErr<futures::stream::FilterMap<futures::sync::mpsc::Receiver<(std::net::SocketAddr, rust_telnet::codec::TelnetIn)>, [closure@src/telnet_server.rs:82:32: 93:10 process:std::rc::Rc<std::cell::RefCell<child::Process>>]>, [closure@src/telnet_server.rs:93:20: 93:68]>>, [closure@src/telnet_server.rs:95:37: 95:49]>, futures::stream::MapErr<futures::stream::FilterMap<futures::sync::mpsc::Receiver<(std::net::SocketAddr, rust_telnet::codec::TelnetIn)>, [closure@src/telnet_server.rs:82:32: 93:10 process:std::rc::Rc<std::cell::RefCell<child::Process>>]>, [closure@src/telnet_server.rs:93:20: 93:68]>>, [closure@src/telnet_server.rs:96:20: 96:26]>> for ty=futures::future::join::MaybeDone<futures::MapErr<futures::stream::Fold<child::ProcessWriters, [closure@src/telnet_server.rs:94:40: 96:10], futures::Map<futures::sink::SendAll<pty::PtySink, futures::stream::MapErr<futures::stream::FilterMap<futures::sync::mpsc::Receiver<(std::net::SocketAddr, rust_telnet::codec::TelnetIn)>, [closure@src/telnet_server.rs:82:32: 93:10 process:std::rc::Rc<std::cell::RefCell<child::Process>>]>, [closure@src/telnet_server.rs:93:20: 93:68]>>, [closure@src/telnet_server.rs:95:37: 95:49]>, futures::stream::MapErr<futures::stream::FilterMap<futures::sync::mpsc::Receiver<(std::net::SocketAddr, rust_telnet::codec::TelnetIn)>, [closure@src/telnet_server.rs:82:32: 93:10 process:std::rc::Rc<std::cell::RefCell<child::Process>>]>, [closure@src/telnet_server.rs:93:20: 93:68]>>, [closure@src/telnet_server.rs:96:20: 96:26]>>
note: the compiler unexpectedly panicked. this is a bug.
note: we would appreciate a bug report: https://github.com/rust-lang/rust/blob/master/CONTRIBUTING.md#bug-reports
thread 'rustc' panicked at 'Box<Any>', /checkout/src/librustc_errors/lib.rs:418
@NickeZ
Copy link
Contributor Author

NickeZ commented May 24, 2017

@carllerche
Copy link
Member

That.... looks like a rustc bug :) I would page @alexcrichton for this!

@NickeZ
Copy link
Contributor Author

NickeZ commented May 24, 2017

btw, I also think the "summing" variable in the fold example should be named "summing" instead of "a". It is not possible to see if it is "a" or "b" that is the sum variable.

https://docs.rs/futures/0.1.13/futures/stream/trait.Stream.html#method.fold

@NickeZ
Copy link
Contributor Author

NickeZ commented May 24, 2017

Fails on stable, beta and nightly: https://travis-ci.org/NickeZ/rups

@leoyvens
Copy link
Contributor

I also think the "summing" variable in the fold example should be named "summing" instead of "a". It is not possible to see if it is "a" or "b" that is the sum variable.

The tipical name for that is accumulator, shortened as acc, as you can see in this std example. You can open a PR to change that example!

@alexcrichton
Copy link
Member

The rustc error here may be similar to rust-lang/rust#40274, although I don't recognize it off hand :(

@NickeZ
Copy link
Contributor Author

NickeZ commented May 29, 2017

I got around the compiler error by simply adding features to handle.spawn() instead.

I don't understand how I should gracefully indicate that my sink cannot receive more items. It seems like the only way is to return an error. But if I return an error send_all() will also return an error instead of (stream, sink) which is what I need to continue..

@alexcrichton
Copy link
Member

@NickeZ ah yeah right now most standard combinators throw away intermediate state on errors, so if you need to preserve state on an error you may need to copy the combinator locally to tweak its implementation.

@NickeZ
Copy link
Contributor Author

NickeZ commented May 30, 2017

@alexcrichton thanks for pointing me in the right direction. I now have a working fork of SendAll that returns whether the stream or the sink ended instead of erroring. I'm not sure if this is useful to you. Anyway here is the code:

https://github.com/NickeZ/toy-tokio/blob/master/src/main.rs

@alexcrichton
Copy link
Member

Ok sounds like ICEs and functionality have been resolved/worked around, so yay! In that case I'm going to close this, but please let me know if that's in error!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

4 participants