transports/webrtc: Implement stream message framing#8
transports/webrtc: Implement stream message framing#8mxinden wants to merge 9 commits intomelekes:anton/webrtc-transportfrom
Conversation
…brtc-message-framing
With mxinden/specs@865f4f2 the RESET no longer resets both write and read part of a stream, but only the former.
thomaseizinger
left a comment
There was a problem hiding this comment.
Thanks for implementing this.
I've left some comments!
| } | ||
|
|
||
| impl State { | ||
| fn handle_flag(&mut self, flag: crate::message_proto::message::Flag) { |
There was a problem hiding this comment.
Could we import Flag directly? The full-qualified path adds a lot of noise IMO.
| PollDataChannel { | ||
| state: State::ReadClosed { .. }, | ||
| .. | ||
| } |
There was a problem hiding this comment.
Is this consistent with the spec? I think this will just fill-up some internal buffer somewhere. Shouldn't we read into a local variable instead and simply drop that immediately?
There was a problem hiding this comment.
Good catch. Though I think we need to solve this differently.
Shouldn't we read into a local variable instead and simply drop that immediately?
For how long would one do that? We have to return Poll::Ready(Ok(0)) at some point to inform the upper layer that the read side closed. After that the upper layer will no longer call AsyncRead::poll_read, thus no longer enabling this implementation to discard incoming data.
I think this will just fill-up some internal buffer somewhere.
I don't think that in itself is a bad thing. We should have back-pressure throughout the entire system and buffer sizes should be reasonable. Thus a full buffer in the levels below should not cause any harm.
That said, say that the read side closed. The remote may still send a STOP_SENDING. We would never receive that message. I think what we should do, iff the read side closed, read on the underlying I/O in the AsyncWrite::poll_write implementation, discarding any messages, though still reacting to any flags from the remote.
Does that reasoning make sense @thomaseizinger? Do you agree? I will prepare a patch, which might make the above reasoning easier to understand.
There was a problem hiding this comment.
I think this also highlights that this PR needs more tests :)
| }, | ||
| io, | ||
| } => { | ||
| match ready!(io.poll_next_unpin(cx)) |
There was a problem hiding this comment.
Instead of nesting this functionality into here, we could have the match block evaluate to read_buffer and move all this logic below the match. All other match arms have an early return already so this would work.
|
|
||
| Pin::new(&mut self.io).start_send(crate::message_proto::Message { | ||
| flag: None, | ||
| message: Some(buf.into()), |
There was a problem hiding this comment.
Shouldn't we enforce the maximum message size here?
| const PROTO_OVERHEAD: usize = 5; | ||
|
|
||
| #[test] | ||
| fn proto_size() { |
There was a problem hiding this comment.
I am not sure I understand the value of this test?
Do we expect protobuf to change how much overhead it is producing?
Additionally, this test diverges from what we do in PollDataChannel. There we use prost_codec::Codec.
Would it make sense to:
- bin this test
- Enforce the max message size limit in
PollDataChannel - Write some tests against
PollDataChannelthat ensure we actually check the limit
To decouple ourselves from RTCPollDataChannel, we can introduce a generic parameter on our PollDataChannel that defaults to RTCPollDataChannel but is replaced with some dummy buffer in the tests.
There was a problem hiding this comment.
Still relevant even though now "Outdated". Tracked in top level pull request description.
thomaseizinger
left a comment
There was a problem hiding this comment.
Thanks! I've left a few more - mostly nitpicking - comments! :)
| self.io.get_mut().set_read_buf_capacity(capacity) | ||
| } | ||
|
|
||
| fn io_poll_next( |
There was a problem hiding this comment.
I'd suggest we move this to a free-function. Associated functions are typically only used for constructors which this isn't which is confusing IMO.
| Some(Message { flag, message }) => { | ||
| let flag = flag | ||
| .map(|f| { | ||
| Flag::from_i32(f).ok_or(io::Error::new(io::ErrorKind::InvalidData, "")) | ||
| }) | ||
| .transpose()?; | ||
|
|
||
| Poll::Ready(Ok(Some((flag, message)))) | ||
| } | ||
| None => Poll::Ready(Ok(None)), |
There was a problem hiding this comment.
Same idea here as mentioned earlier, we could early return from the None branch to reduce some nesting.
| if !read_buffer.is_empty() { | ||
| let n = std::cmp::min(read_buffer.len(), buf.len()); |
There was a problem hiding this comment.
Technically, the if is useless because with a length of 0, all the remaining code is a no-op. Do you think we lose anything in clarify if we make it unconditional?
There was a problem hiding this comment.
Actually, that is not true because we would always return Poll::Ready then!
| } | ||
| } | ||
|
|
||
| let PollDataChannel { state, io } = &mut *self; |
There was a problem hiding this comment.
Nit: A question of taste but I'd mildly prefer this.
| let PollDataChannel { state, io } = &mut *self; | |
| let Self { state, io } = &mut *self; |
|
|
||
| let read_buffer = match state { | ||
| State::Open { | ||
| ref mut read_buffer, |
There was a problem hiding this comment.
I think borrowing state with &mut state would do the same thing as these keywords.
| } | ||
|
|
||
| impl State { | ||
| fn handle_flag(&mut self, flag: Flag) { |
There was a problem hiding this comment.
I think this function could benefit from some logging!
There was a problem hiding this comment.
Something like "got flag X, moving from state A to B".
There was a problem hiding this comment.
Without any kind of connection identifier, this is pretty useless though. The underlying data channel as a "stream identifier'. If we move this function to PollDataChannel, we could access that and include it in the log message.
| buf: &[u8], | ||
| ) -> Poll<io::Result<usize>> { | ||
| tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf) | ||
| // Handle flags iff read side closed. |
There was a problem hiding this comment.
This definitely needs tests! :)
For example, a really subtle implementation detail is that the check for which state we are in needs to be inside the loop because handling a flag might change to state to one that we no longer want to handle in here.
| enum State { | ||
| Open { read_buffer: Bytes }, | ||
| WriteClosed { read_buffer: Bytes }, | ||
| ReadClosed { read_buffer: Bytes }, | ||
| ReadWriteClosed { read_buffer: Bytes }, | ||
| ReadReset, | ||
| ReadResetWriteClosed, | ||
| Poisoned, | ||
| } | ||
|
|
||
| impl State { |
There was a problem hiding this comment.
Nit: I'd prefer these to be further down in the file because they are implementation details.
| pub struct PollDataChannel(RTCPollDataChannel); | ||
| // TODO | ||
| // #[derive(Debug)] | ||
| pub struct PollDataChannel { |
There was a problem hiding this comment.
Nit: Can we move away from the PollDataChannel name? IMO it is a weird name from the webrtc library that we don't need to carry over here, plus with this PR, this is doing a lot more than just being a poll-based version of DataChannel / our wrapper around it.
Perhaps this can just be Substream?
| // TODO: Is flush the correct thing here? We don't want the underlying layer to close both write and read. | ||
| self.io.poll_flush_unpin(cx).map_err(Into::into) |
There was a problem hiding this comment.
I think we should have a WriteClosing state that we remain in until we get a Poll::Ready from Sink::poll_flush. That I believe is the contract of Sink we need to uphold.
|
pushed a few commits here: #9 |
Thank you! Given that this is a PR against your repo, I think you should have push rights to it. Try to use the GitHub CLI to checkout the PR ( |
looks like I don't have the necessary permissions. |
@mxinden You should have a little checkbox on the right next to the "Subscribe" button of the PR that delegates permissions. Can you tick that one please? :) |
why? it would simplify all implementations. the overhead is negligible, I think 🤔 |
I think that may be an outdated TODO, it is from 9 days ago? I would expect the framing to always happen to simplify the implementation. |
again, not sure why it's needed. The remote has already indicated it won't accept any more data, so why send |
Correct. My comment in the pull request description is outdated. Updated now. Sorry for the trouble. |
Moved to mxinden/specs#1. Hope you don't mind @melekes. |
@mxinden, not sure if you saw this! :) |
|
Thanks for the ping @thomaseizinger. The tick is already set. I am surprised that @melekes can not push here. |
For the record, this is no longer required. See mxinden/specs#1 (comment). Updated this pull request description. |
|
I am sorry for not giving this pull request the attention it deserves. I don't want this pull request to block libp2p#2622. In case either of you @thomaseizinger or @melekes has capacity, I would appreciate you taking this over. If not, I will do my best spending more time on it. |
Okay, strange. Well lets continue in #9 then. |
|
Closing here in favor of #10. |

Description
Implements mxinden/specs#1 for libp2p#2622.
TODOs:
Don't use message framing during noise handshakeUse message framing during noise handshake (see webrtc/: Add message framing to support half-close and reset of stream mxinden/specs#1)SendNo longer required. See webrtc/: Add message framing to support half-close and reset of stream mxinden/specs#1 (comment).RESET_STREAMwhen receivingSTOP_SENDINGLinks to any relevant issues
Open Questions
Change checklist