Skip to content

transports/webrtc: Implement stream message framing#8

Closed
mxinden wants to merge 9 commits intomelekes:anton/webrtc-transportfrom
mxinden:webrtc-message-framing
Closed

transports/webrtc: Implement stream message framing#8
mxinden wants to merge 9 commits intomelekes:anton/webrtc-transportfrom
mxinden:webrtc-message-framing

Conversation

@mxinden
Copy link
Copy Markdown

@mxinden mxinden commented Sep 5, 2022

Description

Implements mxinden/specs#1 for libp2p#2622.

TODOs:

Links to any relevant issues

Open Questions

Change checklist

  • I have performed a self-review of my own code
  • I have made corresponding changes to the documentation
  • I have added tests that prove my fix is effective or that my feature works
  • A changelog entry has been made in the appropriate crates

Copy link
Copy Markdown

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for implementing this.

I've left some comments!

}

impl State {
fn handle_flag(&mut self, flag: crate::message_proto::message::Flag) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we import Flag directly? The full-qualified path adds a lot of noise IMO.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 done in 11c016f.

Comment on lines +260 to +263
PollDataChannel {
state: State::ReadClosed { .. },
..
}
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this consistent with the spec? I think this will just fill-up some internal buffer somewhere. Shouldn't we read into a local variable instead and simply drop that immediately?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Though I think we need to solve this differently.

Shouldn't we read into a local variable instead and simply drop that immediately?

For how long would one do that? We have to return Poll::Ready(Ok(0)) at some point to inform the upper layer that the read side closed. After that the upper layer will no longer call AsyncRead::poll_read, thus no longer enabling this implementation to discard incoming data.

I think this will just fill-up some internal buffer somewhere.

I don't think that in itself is a bad thing. We should have back-pressure throughout the entire system and buffer sizes should be reasonable. Thus a full buffer in the levels below should not cause any harm.

That said, say that the read side closed. The remote may still send a STOP_SENDING. We would never receive that message. I think what we should do, iff the read side closed, read on the underlying I/O in the AsyncWrite::poll_write implementation, discarding any messages, though still reacting to any flags from the remote.

Does that reasoning make sense @thomaseizinger? Do you agree? I will prepare a patch, which might make the above reasoning easier to understand.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Promised patch: d46a171

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this also highlights that this PR needs more tests :)

},
io,
} => {
match ready!(io.poll_next_unpin(cx))
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of nesting this functionality into here, we could have the match block evaluate to read_buffer and move all this logic below the match. All other match arms have an early return already so this would work.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you. Good idea. Done in 1a6e4bd.


Pin::new(&mut self.io).start_send(crate::message_proto::Message {
flag: None,
message: Some(buf.into()),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't we enforce the maximum message size here?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 Thanks. Done via 9cd4ef7.

Comment thread transports/webrtc/src/lib.rs Outdated
const PROTO_OVERHEAD: usize = 5;

#[test]
fn proto_size() {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am not sure I understand the value of this test?

Do we expect protobuf to change how much overhead it is producing?
Additionally, this test diverges from what we do in PollDataChannel. There we use prost_codec::Codec.

Would it make sense to:

  • bin this test
  • Enforce the max message size limit in PollDataChannel
  • Write some tests against PollDataChannel that ensure we actually check the limit

To decouple ourselves from RTCPollDataChannel, we can introduce a generic parameter on our PollDataChannel that defaults to RTCPollDataChannel but is replaced with some dummy buffer in the tests.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Still relevant even though now "Outdated". Tracked in top level pull request description.

Copy link
Copy Markdown

@thomaseizinger thomaseizinger left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks! I've left a few more - mostly nitpicking - comments! :)

self.io.get_mut().set_read_buf_capacity(capacity)
}

fn io_poll_next(
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'd suggest we move this to a free-function. Associated functions are typically only used for constructors which this isn't which is confusing IMO.

Comment on lines +204 to +213
Some(Message { flag, message }) => {
let flag = flag
.map(|f| {
Flag::from_i32(f).ok_or(io::Error::new(io::ErrorKind::InvalidData, ""))
})
.transpose()?;

Poll::Ready(Ok(Some((flag, message))))
}
None => Poll::Ready(Ok(None)),
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same idea here as mentioned earlier, we could early return from the None branch to reduce some nesting.

Comment on lines +226 to +227
if !read_buffer.is_empty() {
let n = std::cmp::min(read_buffer.len(), buf.len());
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Technically, the if is useless because with a length of 0, all the remaining code is a no-op. Do you think we lose anything in clarify if we make it unconditional?

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actually, that is not true because we would always return Poll::Ready then!

}
}

let PollDataChannel { state, io } = &mut *self;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: A question of taste but I'd mildly prefer this.

Suggested change
let PollDataChannel { state, io } = &mut *self;
let Self { state, io } = &mut *self;


let read_buffer = match state {
State::Open {
ref mut read_buffer,
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think borrowing state with &mut state would do the same thing as these keywords.

}

impl State {
fn handle_flag(&mut self, flag: Flag) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this function could benefit from some logging!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Something like "got flag X, moving from state A to B".

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Without any kind of connection identifier, this is pretty useless though. The underlying data channel as a "stream identifier'. If we move this function to PollDataChannel, we could access that and include it in the log message.

buf: &[u8],
) -> Poll<io::Result<usize>> {
tokio_crate::io::AsyncWrite::poll_write(Pin::new(&mut self.0), cx, buf)
// Handle flags iff read side closed.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This definitely needs tests! :)
For example, a really subtle implementation detail is that the check for which state we are in needs to be inside the loop because handling a flag might change to state to one that we no longer want to handle in here.

Comment on lines +52 to +62
enum State {
Open { read_buffer: Bytes },
WriteClosed { read_buffer: Bytes },
ReadClosed { read_buffer: Bytes },
ReadWriteClosed { read_buffer: Bytes },
ReadReset,
ReadResetWriteClosed,
Poisoned,
}

impl State {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: I'd prefer these to be further down in the file because they are implementation details.

pub struct PollDataChannel(RTCPollDataChannel);
// TODO
// #[derive(Debug)]
pub struct PollDataChannel {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nit: Can we move away from the PollDataChannel name? IMO it is a weird name from the webrtc library that we don't need to carry over here, plus with this PR, this is doing a lot more than just being a poll-based version of DataChannel / our wrapper around it.

Perhaps this can just be Substream?

Comment on lines +360 to +361
// TODO: Is flush the correct thing here? We don't want the underlying layer to close both write and read.
self.io.poll_flush_unpin(cx).map_err(Into::into)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think we should have a WriteClosing state that we remain in until we get a Poll::Ready from Sink::poll_flush. That I believe is the contract of Sink we need to uphold.

@melekes
Copy link
Copy Markdown
Owner

melekes commented Sep 19, 2022

pushed a few commits here: #9

@thomaseizinger
Copy link
Copy Markdown

pushed a few commits here: #9

Thank you!

Given that this is a PR against your repo, I think you should have push rights to it. Try to use the GitHub CLI to checkout the PR (gh pr checkout 8) and then put the commits on top of it. This way, we can have the commits in here together with the open comments :)

@melekes
Copy link
Copy Markdown
Owner

melekes commented Sep 20, 2022

Given that this is a PR against your repo, I think you should have push rights to it. Try to use the GitHub CLI to checkout the PR (gh pr checkout 8) and then put the commits on top of it. This way, we can have the commits in here together with the open comments :)

git push mxinden webrtc-message-framing:webrtc-message-framing
ERROR: Permission to mxinden/rust-libp2p.git denied to melekes.
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

looks like I don't have the necessary permissions.

@thomaseizinger
Copy link
Copy Markdown

Given that this is a PR against your repo, I think you should have push rights to it. Try to use the GitHub CLI to checkout the PR (gh pr checkout 8) and then put the commits on top of it. This way, we can have the commits in here together with the open comments :)

git push mxinden webrtc-message-framing:webrtc-message-framing
ERROR: Permission to mxinden/rust-libp2p.git denied to melekes.
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

looks like I don't have the necessary permissions.

@mxinden You should have a little checkbox on the right next to the "Subscribe" button of the PR that delegates permissions. Can you tick that one please? :)

@melekes
Copy link
Copy Markdown
Owner

melekes commented Sep 20, 2022

Don't use message framing during noise handshake

why? it would simplify all implementations. the overhead is negligible, I think 🤔

@thomaseizinger
Copy link
Copy Markdown

Don't use message framing during noise handshake

why? it would simplify all implementations. the overhead is negligible, I think 🤔

I think that may be an outdated TODO, it is from 9 days ago? I would expect the framing to always happen to simplify the implementation.

@melekes
Copy link
Copy Markdown
Owner

melekes commented Sep 20, 2022

Send RESET_STREAM when receiving STOP_SENDING

again, not sure why it's needed. The remote has already indicated it won't accept any more data, so why send RESET_STREAM?

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Sep 21, 2022

Don't use message framing during noise handshake

why? it would simplify all implementations. the overhead is negligible, I think thinking

I think that may be an outdated TODO, it is from 9 days ago? I would expect the framing to always happen to simplify the implementation.

Correct. My comment in the pull request description is outdated. Updated now. Sorry for the trouble.

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Sep 22, 2022

Send RESET_STREAM when receiving STOP_SENDING

again, not sure why it's needed. The remote has already indicated it won't accept any more data, so why send RESET_STREAM?

Moved to mxinden/specs#1. Hope you don't mind @melekes.

@thomaseizinger
Copy link
Copy Markdown

Given that this is a PR against your repo, I think you should have push rights to it. Try to use the GitHub CLI to checkout the PR (gh pr checkout 8) and then put the commits on top of it. This way, we can have the commits in here together with the open comments :)

git push mxinden webrtc-message-framing:webrtc-message-framing
ERROR: Permission to mxinden/rust-libp2p.git denied to melekes.
fatal: Could not read from remote repository.

Please make sure you have the correct access rights
and the repository exists.

looks like I don't have the necessary permissions.

@mxinden You should have a little checkbox on the right next to the "Subscribe" button of the PR that delegates permissions. Can you tick that one please? :)

@mxinden, not sure if you saw this! :)

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Sep 22, 2022

Thanks for the ping @thomaseizinger. The tick is already set. I am surprised that @melekes can not push here.

image

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Sep 22, 2022

Send RESET_STREAM when receiving STOP_SENDING

again, not sure why it's needed. The remote has already indicated it won't accept any more data, so why send RESET_STREAM?

Moved to mxinden/specs#1. Hope you don't mind @melekes.

For the record, this is no longer required. See mxinden/specs#1 (comment). Updated this pull request description.

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Sep 22, 2022

I am sorry for not giving this pull request the attention it deserves. I don't want this pull request to block libp2p#2622. In case either of you @thomaseizinger or @melekes has capacity, I would appreciate you taking this over. If not, I will do my best spending more time on it.

@thomaseizinger
Copy link
Copy Markdown

Thanks for the ping @thomaseizinger. The tick is already set. I am surprised that @melekes can not push here.

image

Okay, strange. Well lets continue in #9 then.

@mxinden
Copy link
Copy Markdown
Author

mxinden commented Oct 3, 2022

Closing here in favor of #10.

@mxinden mxinden closed this Oct 3, 2022
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants