-
Notifications
You must be signed in to change notification settings - Fork 7
Wrong rtcp-reader flow when have more than a stream #2
Comments
Thanks for reporting this issue. PRs are welcome to fix this issue. |
@giangndm, I just checked the code, such bind_rtcp_reader overwrite in NACK responder or Report receiver is by design, since Could you provide an example with two streams that cause NACK or RR wrong behavior so that we can investigate more? Thanks. |
@rainliu I using simulcast example with MacOS network-link-conditioner for simulate 1% packet lose or can simulate packet lose in UDP socket; this scenario cause video frame cannot decode at receiver because of the missing packet, have NACK but don't resend. I check the interceptor code that below logic when has more than a stream.
That cause Stream1 and Stream2 rtcp_reader will be called with Stream3 srtp_rtcp_reader, therefore NACK for stream1 and stream2 never read |
Now I see this issue, after spending a few days trying to work out why I wasn't able to get RTCP sender reports out of WebRTC.RS I won't open another issue for that because it boils down to the same thing (And I've written the code to fix it, I'll send a pull request tomorrow with the 'correct' code for all affected modules, pending your feedback on this comment) Essentially, you can end up with multiple video streams even if you have a single video stream if your web browser sends the relevant SDP to trigger the RTX_SSRC code path. Consider that in *webrtc/src/rtp_transceiver/rtp_receiver/mod.rs * We have the following code let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) =
self.transport
.streams_for_ssrc(encoding.ssrc, &stream_info, &interceptor)
.await?; and let (rtp_read_stream, rtp_interceptor, rtcp_read_stream, rtcp_interceptor) = self
.transport
.streams_for_ssrc(rtx_ssrc, &stream_info, &interceptor)
.await?;
Both of these, in the case of DTLS transport trigger a call to bind_rtcp_reader on the same interceptor. Consider the simplest sample I can find in the default interceptors code, the NACK reader above. We have an Internal state object pub struct ResponderInternal {
log2_size: u8,
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
parent_rtcp_reader: Mutex<Option<Arc<dyn RTCPReader + Send + Sync>>>,
} and the following code during a bind async fn bind_rtcp_reader(
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
{
let mut parent_rtcp_reader = self.internal.parent_rtcp_reader.lock().await;
*parent_rtcp_reader = Some(reader);
}
Arc::clone(&self.internal) as Arc<dyn RTCPReader + Send + Sync>
} Glancing at this code, we might think that all is fine - we're doing a clone! What can go wrong? The answer is actually the second call to bind_rtcp_reader still trashes the underlying state which is shared across both calls. In my case, when I'm trying to read RTCP, this means that we end up calling read_rtcp on the wrong stream and never get any data. Looking at the Go, the function that gets returned from a call to BindRtcpReader is entirely stateless and this should really be the case here too, a quick experiment done in this single file results in the following code, where we keep the shared state for keeping track of streams/config, and a new struct for wrapping a reader every time the function is called. pub struct ResponderInternal {
log2_size: u8,
streams: Arc<Mutex<HashMap<u32, Arc<ResponderStream>>>>,
}
pub struct ResponderRtcpReader {
parent_rtcp_reader: Arc<dyn RTCPReader + Send + Sync>,
internal: Arc<ResponderInternal>
} The code in both the bind and implementation gets a bit simpler then, we no longer need the mutexes (I'm not sure we need the mutexes anyway if we're using Arc, but I digress). #[async_trait]
impl RTCPReader for ResponderRtcpReader {
async fn read(&self, buf: &mut [u8], a: &Attributes) -> Result<(usize, Attributes)> {
let (n, attr) = {
self.parent_rtcp_reader.read(buf, a).await?
};
let mut b = &buf[..n];
let pkts = rtcp::packet::unmarshal(&mut b)?;
for p in &pkts {
if let Some(nack) = p.as_any().downcast_ref::<TransportLayerNack>() {
let nack = nack.clone();
let streams = Arc::clone(&self.internal.streams);
tokio::spawn(async move {
ResponderInternal::resend_packets(streams, nack).await;
});
}
}
Ok((n, attr))
}
} and async fn bind_rtcp_reader(
&self,
reader: Arc<dyn RTCPReader + Send + Sync>,
) -> Arc<dyn RTCPReader + Send + Sync> {
Arc::new(ResponderRtcpReader {
internal: Arc::clone(&self.internal),
parent_rtcp_reader: reader
}) as Arc<dyn RTCPReader + Send + Sync>
} I've made a similar change across all cases of bind_rtcp_reader in the current tree (and my own interceptor) and now I see a flow of traffic, I can call read_rtcp without the my code hanging and all my trace tells me that the right streams are always being read from. If this looks 'right' to you, then I'll make the changes properly and send a PR. |
@robashton, Thank you for debugging this issue. Yes, please send a PR to fix this. |
When we have more than a stream, then the below logic cause overwrite the previous bonded reader. This issue related to webrtc-rs/webrtc#138
We can resolve this by wrapper a new RTCPReader for each bonded reader.
The text was updated successfully, but these errors were encountered: