Skip to content
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 22 additions & 12 deletions internal/xds/clients/xdsclient/ads_stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,7 @@ func (s *adsStreamImpl) sendNew(stream clients.Stream, typ ResourceType) error {
default:
}
}
if s.fc.runIfPending(func() { bufferRequest() }) {
if s.fc.runIfPending(bufferRequest) {
return nil
}

Expand Down Expand Up @@ -483,7 +483,12 @@ func (s *adsStreamImpl) recv(stream clients.Stream) bool {
// Wait for ADS stream level flow control to be available, and send out
// a request if anything was buffered while we were waiting for local
// processing of the previous response to complete.
s.fc.wait()
if !s.fc.wait() {
if s.logger.V(2) {
s.logger.Infof("ADS stream stopped while waiting for flow control")
}
return msgReceived
}
s.sendBuffered(stream)

resources, url, version, nonce, err := s.recvMessage(stream)
Expand Down Expand Up @@ -707,14 +712,17 @@ func resourceNames(m map[string]*xdsresource.ResourceWatchState) []string {
// consumed by all watchers.
//
// The lifetime of the flow control is tied to the lifetime of the stream. When
// the stream is closed, it is the responsibility of the caller to set the
// pending state to false. This ensures that any goroutine blocked on the flow
// control's wait method is unblocked.
// the stream is closed, it is the responsibility of the caller to stop the flow
// control. This ensures that any goroutine blocked on the flow control's wait
// method is unblocked.
type adsFlowControl struct {
mu sync.Mutex
cond *sync.Cond // signals when the most recent update has been consumed
pending bool // indicates if the most recent update is pending consumption
stopped bool // indicates if the ADS stream has been stopped
mu sync.Mutex
// cond is used to signal when the most recent update has been consumed, or
// the flow control has been stopped (in which case, waiters should be
// unblocked as well).
cond *sync.Cond
pending bool // indicates if the most recent update is pending consumption
stopped bool // indicates if the ADS stream has been stopped
}

// newADSFlowControl returns a new adsFlowControl.
Expand Down Expand Up @@ -762,19 +770,21 @@ func (fc *adsFlowControl) runIfPending(f func()) bool {
// If there's a pending update, run the function while still holding the
// lock. This ensures that the pending state does not change between the
// check and the function call.
pending := fc.pending
if fc.pending {
f()
}
return pending
return fc.pending
}

// wait blocks until all the watchers have consumed the most recent update.
func (fc *adsFlowControl) wait() {
// Returns true if the flow control was stopped while waiting, false otherwise.
func (fc *adsFlowControl) wait() bool {
fc.mu.Lock()
defer fc.mu.Unlock()

for fc.pending && !fc.stopped {
fc.cond.Wait()
}

return fc.stopped
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this should this should return !fc.stopped to keep the previous behaviour, or the check above if !s.fc.wait() needs to be inverted. Previously fc.wait() was returning false when the context expired. Now, fc.wait() is returning true if the flow control object was stopped.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right to me; good catch. Let's try to make sure we have a test for this scenario if we can.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done. Thanks for catching this.

All tests in https://github.com/grpc/grpc-go/blob/master/internal/xds/clients/xdsclient/test/ads_stream_flow_control_test.go fail without fixing this. Also, TestADSFlowControl_ResourceUpdates_SingleResource specifically checks for this case in

// At this point, the xDS client is shut down (and the associated transport

I think I just forgot to run the tests after making the previous set of changes and forgot to check the CI on the PR.

}
Loading