-
Notifications
You must be signed in to change notification settings - Fork 4.6k
xdsclient: fix race in ADS stream flow control causing indefinite blocking #8605
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from 1 commit
6c6c3aa
a48b52c
0f6b1d4
320dd81
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||
|---|---|---|---|---|
|
|
@@ -322,7 +322,7 @@ func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error { | |||
| default: | ||||
| } | ||||
| } | ||||
| if s.fc.runIfPending(func() { bufferRequest() }) { | ||||
| if s.fc.runIfPending(bufferRequest) { | ||||
| return nil | ||||
| } | ||||
|
|
||||
|
|
@@ -483,7 +483,12 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool { | |||
| // Wait for ADS stream level flow control to be available, and send out | ||||
| // a request if anything was buffered while we were waiting for local | ||||
| // processing of the previous response to complete. | ||||
| s.fc.wait() | ||||
| if !s.fc.wait() { | ||||
| if s.logger.V(2) { | ||||
| s.logger.Infof("ADS stream stopped while waiting for flow control") | ||||
| } | ||||
| return msgReceived | ||||
| } | ||||
| s.sendBuffered(stream) | ||||
|
|
||||
| resources, url, version, nonce, err := s.recvMessage(stream) | ||||
|
|
@@ -707,14 +712,17 @@ func resourceNames(m map[string]*xdsresource.ResourceWatchState) []string { | |||
| // consumed by all watchers. | ||||
| // | ||||
| // The lifetime of the flow control is tied to the lifetime of the stream. When | ||||
| // the stream is closed, it is the responsibility of the caller to set the | ||||
| // pending state to false. This ensures that any goroutine blocked on the flow | ||||
| // control's wait method is unblocked. | ||||
| // the stream is closed, it is the responsibility of the caller to stop the flow | ||||
| // control. This ensures that any goroutine blocked on the flow control's wait | ||||
| // method is unblocked. | ||||
| type adsFlowControl struct { | ||||
| mu sync.Mutex | ||||
| cond *sync.Cond // signals when the most recent update has been consumed | ||||
| pending bool // indicates if the most recent update is pending consumption | ||||
| stopped bool // indicates if the ADS stream has been stopped | ||||
| mu sync.Mutex | ||||
| // cond is used to signal when the most recent update has been consumed, or | ||||
| // the flow control has been stopped (in which case, waiters should be | ||||
| // unblocked as well). | ||||
| cond *sync.Cond | ||||
| pending bool // indicates if the most recent update is pending consumption | ||||
| stopped bool // indicates if the ADS stream has been stopped | ||||
| } | ||||
|
|
||||
| // newADSFlowControl returns a new adsFlowControl. | ||||
|
|
@@ -762,19 +770,21 @@ func (fc *adsFlowControl) runIfPending(f func()) bool { | |||
| // If there's a pending update, run the function while still holding the | ||||
| // lock. This ensures that the pending state does not change between the | ||||
| // check and the function call. | ||||
| pending := fc.pending | ||||
| if fc.pending { | ||||
| f() | ||||
| } | ||||
| return pending | ||||
| return fc.pending | ||||
| } | ||||
|
|
||||
| // wait blocks until all the watchers have consumed the most recent update. | ||||
| func (fc *adsFlowControl) wait() { | ||||
| // Returns true if the flow control was stopped while waiting, false otherwise. | ||||
| func (fc *adsFlowControl) wait() bool { | ||||
| fc.mu.Lock() | ||||
| defer fc.mu.Unlock() | ||||
|
|
||||
| for fc.pending && !fc.stopped { | ||||
| fc.cond.Wait() | ||||
| } | ||||
|
|
||||
| return fc.stopped | ||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this should this should return
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds right to me; good catch. Let's try to make sure we have a test for this scenario if we can.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Done. Thanks for catching this. All tests in https://github.com/grpc/grpc-go/blob/master/internal/xds/clients/xdsclient/test/ads_stream_flow_control_test.go fail without fixing this. Also,
I think I just forgot to run the tests after making the previous set of changes and forgot to check the CI on the PR. |
||||
| } | ||||
Uh oh!
There was an error while loading. Please reload this page.