Conversation
|
@marten-seemann @mxinden I've implemented the multibase multihash noise handshake here. For verification, we use the remote certificate of the underlying DTLS transport. |
|
@marten-seemann @mxinden this is ready for review |
marten-seemann
left a comment
There was a problem hiding this comment.
Lots of good stuff here. Thank you!
I added a large number of comments, please don't be shocked by that ;)
One thing I don't really understand yet is how the opening / accepting of data channels works, and what it means for them to be detached. Maybe you could explain that to me here, that would make the next round of reviews easier.
|
2022-08-05 triage conversation: with the latest spec changes, #1663 is a prereq for this. |
|
@BigLep That PR is approved and awaiting merging. |
2cf950f to
71f607f
Compare
|
@marten-seemann or @MarcoPolo do you have capacity to give this another review? If not, I will take a deeper look myself, though I think your review is way more valuable than mine. |
marten-seemann
left a comment
There was a problem hiding this comment.
Partial review, but there’s already a lot to do here.
The biggest problem is that the datachannel doesn’t implement any backpressure: You’re reading every message that’s sent, and appending it to a buffer. If the application is reading less quickly than the sender is sending us messages, all those messages will accumulate in the buffer, eventually leading to an OOM panic.
Instead, you need to keep the messages at the WebRTC / SCTP layer, until we’re ready to process them (i.e. until Read is called). Only then can we dequeue the message, so that backpressure can build up.
| if err != nil { | ||
| return "", err | ||
| } | ||
| return multibase.Encode(multibase.Base58BTC, encoded) |
There was a problem hiding this comment.
Any reason for using Base 58 here?
There was a problem hiding this comment.
I think the spec defines an encoding that we should use, doesn’t it?
There was a problem hiding this comment.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Resolved. We now use return multibase.Encode(multibase.Base64url, encoded)
| atomic.StoreUint32(&d.remoteReadClosed, 1) | ||
| case pb.Message_RESET: | ||
| log.Errorf("remote reset") | ||
| d.Close() |
There was a problem hiding this comment.
I don’t think that’s correct. Just because the remote reset the stream, we can still write on it.
There was a problem hiding this comment.
@ckousik Why did you resolve this comment?? You haven’t addressed it at all!
Either my point is invalid, then please comment here. Or my point is valid, then it needs to be fixed.
There was a problem hiding this comment.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Code has completely changed since, so a new review in future will need to indicate if this is fixed / fine or not. For now can be ignored I am afraid.
| "time" | ||
| ) | ||
|
|
||
| type deadline struct { |
There was a problem hiding this comment.
This seems overly complicated, and wasteful. We should only start a timer when there’s actually a Read / Write call running (and stop the timer as soon as that call returns), not just because the deadline was set.
It also seems like we’re not using the deadline correctly (see comments in datachannel.go).
There was a problem hiding this comment.
This allows updating the deadline while the Read/Write call is running.
There was a problem hiding this comment.
… as does the other solution, which probably won’t add that many LOC and long-running timers.
There was a problem hiding this comment.
@ckousik what is to be done about this?
There was a problem hiding this comment.
Code has completely changed since, so if something similar is still an issue it will have to be rechecked in a new review I am afraid.
bedf270 to
1e0375f
Compare
36f6ae4 to
83b6e3d
Compare
| if addr.IP.To4() == nil { | ||
| ipVersion = "IP6" | ||
| } | ||
| fp := fingerprintToSDP(fingerprint) |
There was a problem hiding this comment.
fp is empty string if fingerprint is nil, is that ever a valid state?
If not we probably want to return an error here?!?!!
As otherwise you can get subtle hard to debug issues later?
| case multihash.SHA2_512: | ||
| return crypto.SHA512, true | ||
| default: | ||
| return crypto.Hash(0), false |
There was a problem hiding this comment.
| return crypto.Hash(0), false | |
| return 0, false |
type gets elided here implicitly
| remaining := len(d.readBuf) | ||
| d.m.Unlock() | ||
|
|
||
| if state := d.getState(); remaining == 0 && (state == stateReadClosed || state == stateClosed) { |
There was a problem hiding this comment.
| if state := d.getState(); remaining == 0 && (state == stateReadClosed || state == stateClosed) { | |
| if remaining == 0 && !d.getState().allowRead() { |
There was a problem hiding this comment.
Thanks, I seem to have missed this.
| return string(buf[:n]) | ||
| } | ||
|
|
||
| func replaceAll(s string, b byte) string { |
There was a problem hiding this comment.
better to name this removeAll, or so,
as it doesn't replace it but rather removes them from the string
| return nil, fmt.Errorf("could not get local peer ID: %w", err) | ||
| } | ||
| // We use elliptic P-256 since it is widely supported by browsers. | ||
| // See: https://github.com/libp2p/specs/pull/412#discussion_r968294244 |
There was a problem hiding this comment.
the link above does not really tell anything to an outsider? Perhaps link to an actual spec paragraph?!
|
|
||
| listenerMultiaddr = listenerMultiaddr.Encapsulate(certMultiaddress) | ||
|
|
||
| return newListener( |
There was a problem hiding this comment.
TODO: socket is not closed in case newListener returns an error
| // The only requirement here is that the ufrag and password | ||
| // must be equal, which will allow the server to determine | ||
| // the password using the STUN message. | ||
| ufrag := "libp2p+webrtc+v1/" + genUfrag(32) |
There was a problem hiding this comment.
TODO: given we always use this prefix, we might as well pre-allocate this as part of the 32 byte string,
and already use it in the front
| settingEngine := webrtc.SettingEngine{} | ||
| // suppress pion logs | ||
| loggerFactory := pionlogger.NewDefaultLoggerFactory() | ||
| loggerFactory.DefaultLogLevel = pionlogger.LogLevelDisabled |
There was a problem hiding this comment.
TODO: could it not be useful to have these logs in verbose mode (so opt-in?)
|
|
||
| remoteMultihash, err := decodeRemoteFingerprint(remoteMultiaddr) | ||
| if err != nil { | ||
| return pc, nil, fmt.Errorf("could not decode fingerprint: %w", err) |
There was a problem hiding this comment.
NOTE: more of a remark, but it's not very useful to say stuff like "could not" in an error,
as an error is always some kind of failure. So you might as well shorten it to: "instantiate peerconnection: %w"
| // set the local address from the candidate pair | ||
| cp, err := rawHandshakeChannel.Transport().Transport().ICETransport().GetSelectedCandidatePair() | ||
| if cp == nil || err != nil { | ||
| return pc, nil, fmt.Errorf("ice connection did not have selected candidate pair") |
There was a problem hiding this comment.
might want to split up these cases, as now this error gets surpresed, meaning it will also never be logged
| if err != nil { | ||
| return nil, err | ||
| } | ||
| remoteFp = replaceAll(strings.ToLower(remoteFp), byte(':')) |
There was a problem hiding this comment.
TODO[Question]: is the : in the string really needed in the first place, seems a human-readable thing
that for machine processing is making things just less efficient / straightforward?!
I get that this is perhaps a spec thing, but still, find it odd...
There was a problem hiding this comment.
It is needed in the fingerprint spec. From the spec https://www.rfc-editor.org/rfc/rfc4572#section-5
A fingerprint is represented in SDP as an attribute (an 'a' line).
It consists of the name of the hash function used, followed by the
hash value itself. The hash value is represented as a sequence of
uppercase hexadecimal bytes, separated by colons. The number of
bytes is defined by the hash function. (This is the syntax used by
openssl and by the browsers' certificate managers. It is different
from the syntax used to represent hash values in, e.g., HTTP digest
authentication [18], which uses unseparated lowercase hexadecimal
bytes. It was felt that consistency with other applications of
fingerprints was more important.)
| close(start) | ||
| wg.Wait() | ||
| require.Equal(t, count, atomic.LoadUint32(&success)) | ||
| } |
There was a problem hiding this comment.
+func TestWebsocketTransport(t *testing.T) {
+ ta, _ := getTransport(t)
+ tb, _ := getTransport(t)
+ ttransport.SubtestTransport(t, ta, tb, fmt.Sprintf("/ip4/%s/udp/0/webrtc", listenerIp), "peerA")
+}with import ttransport "github.com/libp2p/go-libp2p/p2p/transport/testsuite"
is missing.
Seems like a standard transport test. Granted not all seem to implement it, but might be good to support this as to adhere to their generic transport logic?! Or what is the desire around this?
| socket net.PacketConn | ||
| unknownUfragCallback func(string, net.Addr) | ||
|
|
||
| m sync.Mutex |
There was a problem hiding this comment.
TODO: UDP mux is complex enough that you might want to isolate this thread-unsafe code into a separate data structure,
so you can use it as a blackbox thread-safe data structure, this ensures the mutexes will be used safely
| // UDP addresses of the same IP address family (eg. Server-reflexive addresses | ||
| // and peer-reflexive addresses). | ||
| func (mux *udpMux) GetConn(ufrag string, addr net.Addr) (net.PacketConn, error) { | ||
| a, ok := addr.(*net.UDPAddr) |
There was a problem hiding this comment.
is it really ever valid for this addr not to be an UDP address??! as for now that error is silently ignored
| _ = conn.closeConnection() | ||
| delete(mux.ufragMap, key) | ||
| for _, addr := range conn.addresses { | ||
| // log.Errorf("deleting address : %v %v", ufrag, addr) |
| } | ||
|
|
||
| func newMuxedConnection(mux *udpMux, ufrag string) *muxedConnection { | ||
| ctx, cancel := context.WithCancel(context.Background()) |
There was a problem hiding this comment.
No, we just have a reference to the parent mux because we write to the socket. That can be removed. Also, not sure which t you are referring to.
|
|
||
| // SetDeadline implements net.PacketConn | ||
| func (*muxedConnection) SetDeadline(t time.Time) error { | ||
| return nil |
| } | ||
|
|
||
| // SetReadDeadline implements net.PacketConn | ||
| func (*muxedConnection) SetReadDeadline(t time.Time) error { |
|
|
||
| // SetWriteDeadline implements net.PacketConn | ||
| func (*muxedConnection) SetWriteDeadline(t time.Time) error { | ||
| return nil |
| func (conn *muxedConnection) closeConnection() error { | ||
| select { | ||
| case <-conn.ctx.Done(): | ||
| return fmt.Errorf("already closed") |
There was a problem hiding this comment.
TODO: turn into a private static error (var alreadyClosedErr = errors.New("already closed"))
such that you can replace the line below with "return alreadyClosedErr"
| } | ||
|
|
||
| var ( | ||
| errTooManyPackets = fmt.Errorf("too many packets in queue; dropping") |
There was a problem hiding this comment.
TODO: use errors.New instead of fmt.Errorf
| // pop reads a packet from the packetQueue or blocks until | ||
| // either a packet becomes available or the buffer is closed. | ||
| func (pq *packetQueue) pop(ctx context.Context, buf []byte) (int, net.Addr, error) { | ||
| select { |
There was a problem hiding this comment.
TODO: cannot put my finger on it yet, but this code smells,
it seems like you are mixing logic boundaries here,
making it harder to reason about the code
There was a problem hiding this comment.
The reason this does not return when the internal context's Done() returns is because there could still be packets in the channel that could be read even if the channel is closed. It should ideally be a priority select with priority for reading from the channel, followed by checking if the context is closed.
|
|
||
| // push adds a packet to the packetQueue | ||
| func (pq *packetQueue) push(buf []byte, addr net.Addr) error { | ||
| // we acquire a lock when sending on the channel to prevent |
There was a problem hiding this comment.
Yes, I've seen it panic without this. We cannot guarantee sending on the channel and closure occurs in the same goroutine.
| return "" | ||
| } | ||
| fpDigest := intersperse2(hex.EncodeToString(fp.Digest), ':', 2) | ||
| return getSupportedSDPString(fp.Code) + " " + fpDigest |
There was a problem hiding this comment.
TODO: below can be a subtle bug as getSupportedSDPString can return
an empty string due to an error, but as we now append it no longer is an empty string,
thus probably lead do an obscure error much later?
Not better to just return an error to begin with?!
|
|
||
| const letterBytes = "abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ1234567890" | ||
|
|
||
| func genUfrag(n int) string { |
There was a problem hiding this comment.
TODO: not sure you need to make it as generic as you do here.
might as well hardcode 32 here and reuse the objects in a pool?
You would need to benchmark to be sure though
There was a problem hiding this comment.
Sure, I can hardcode it to 32, but given that I return a string to make this usage convenient, I would somehow have to track the lifetime of the string to ensure it is returned to the pool.
|
This PR can be closed in favour of #1999. |
|
Closing in favor of #1999 |
This PR implements the webrtc transport spec according to libp2p/specs#412 .
The webrtc protocol for multiaddr is implemented in this PR and needs to be implemented in
go-multiaddrprior to merging this PR. This PR also uses multibase encoded multihash for DTLS fingerprint verification after the NOISE handshake.