Skip to content

fix(shard-distributor): separate watch event processing from the cache refresh#7670

Merged
arzonus merged 2 commits intocadence-workflow:masterfrom
arzonus:split-watch-processing-cache-refresh
Feb 9, 2026
Merged

fix(shard-distributor): separate watch event processing from the cache refresh#7670
arzonus merged 2 commits intocadence-workflow:masterfrom
arzonus:split-watch-processing-cache-refresh

Conversation

@arzonus
Copy link
Contributor

@arzonus arzonus commented Feb 3, 2026

What changed?

  • Watch event processing in watch function is separated from a call of refreshCache function

Why?

  • We observed that intensive watch event updates may cause a growing backlog on the server side and etcd that may lead to OOMKills

How did you test it?

  • Unit tests
  • Run on dev cluster

Potential risks
N/A

Release notes
N/A

Documentation Changes
N/A


Reviewer Validation

PR Description Quality (check these before reviewing code):

  • "What changed" provides a clear 1-2 line summary
    • Project Issue is linked
  • "Why" explains the full motivation with sufficient context
  • Testing is documented:
    • Unit test commands are included (with exact go test invocation)
    • Integration test setup/commands included (if integration tests were run)
    • Canary testing details included (if canary was mentioned)
  • Potential risks section is thoughtfully filled out (or legitimately N/A)
  • Release notes included if this completes a user-facing feature
  • Documentation needs are addressed (or noted if uncertain)

@arzonus arzonus force-pushed the split-watch-processing-cache-refresh branch 2 times, most recently from 153ac81 to 1b9b18d Compare February 3, 2026 15:19
e, err := newNamespaceShardToExecutor(testPrefix, testNamespace, mockClient, stopCh, logger, clock.NewRealTimeSource())
require.NoError(t, err)

triggerChan := make(chan struct{})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it also be buffered here? Otherwise you won't get anything written unless someone is blocked on reading from it.

@arzonus arzonus force-pushed the split-watch-processing-cache-refresh branch from d2c509c to 530b74a Compare February 5, 2026 09:52
Comment on lines +250 to +264
ctrl := gomock.NewController(t)
defer ctrl.Finish()

logger := testlogger.New(t)
mockClient := etcdclient.NewMockClient(ctrl)
timeSource := clock.NewMockedTimeSource()
stopCh := make(chan struct{})
testPrefix := "/test-prefix"
testNamespace := "test-namespace"
executorID := "executor-1"

watchChan := make(chan clientv3.WatchResponse)
mockClient.EXPECT().
Watch(gomock.Any(), gomock.Any(), gomock.Any()).
Return(watchChan)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: I wonder if big portion of this could be extracted

@arzonus arzonus force-pushed the split-watch-processing-cache-refresh branch from 530b74a to f750dd3 Compare February 5, 2026 09:54
select {
case triggerCh <- struct{}{}:
default:
n.logger.Info("Cache is being refreshed, skipping trigger")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I suggest we have a metric here instead of a log, so we can monitor how often this happens.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's a good idea to have this metric; Perhaps, it's aligned with etcd watch metrics, which we wanted to introduce. Right now, etcd/executorstore doesn't have a metric client and metric scope, so it requires additional non-related changes. I think it can be extracted to another PR.

@gazi-yestemirova what do you think? Is it aligned with your task Add metrics for etcd watch events?

@gitar-bot
Copy link

gitar-bot bot commented Feb 6, 2026

Code Review ✅ Approved 3 resolved / 3 findings

Clean, well-structured refactoring that correctly decouples watch event processing from cache refresh using a producer-consumer pattern with a buffered channel. The non-blocking send with event coalescing is the right approach for preventing etcd backpressure, and the new tests provide good coverage of both the blocking behavior and end-to-end refresh triggering.

✅ 3 resolved
Edge Case: namespaceRefreshLoop only listens on triggerCh, not stopCh

📄 service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache.go:128-132
The refactored namespaceRefreshLoop() only listens on triggerCh and does not directly listen on n.stopCh:

select {
case _, ok := <-triggerCh:
    if !ok {
        return
    }

The loop relies on watch() returning (which closes triggerCh) when stopCh is closed. This creates an indirect dependency for shutdown signaling.

Looking at the code, this appears intentional - watch() listens to n.stopCh and returns nil when it's closed, which causes runWatchLoop to exit and close triggerCh. This is acceptable, but adding n.stopCh to the select would make the shutdown path more explicit and robust:

select {
case <-n.stopCh:
    return
case _, ok := <-triggerCh:
    // ...
}

This is a minor suggestion for improved clarity and could handle edge cases where the watch hasn't started yet.

Bug: Test has unbounded busy-wait loop that could hang forever

📄 service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go:312-319
The test TestNamespaceShardToExecutor_namespaceRefreshLoop_triggersRefresh contains a busy-wait loop (lines 312-319) that polls e.shardOwners[executorID] indefinitely without any timeout:

for {
    e.RLock()
    _, ok := e.shardOwners[executorID]
    e.RUnlock()
    if ok {
        break
    }
}

If the refresh fails or never completes, this test will hang forever, potentially causing CI timeouts and making test failures difficult to diagnose.

Suggested fix: Add a timeout with time.After or use require.Eventually:

require.Eventually(t, func() bool {
    e.RLock()
    defer e.RUnlock()
    _, ok := e.shardOwners[executorID]
    return ok
}, 5*time.Second, 10*time.Millisecond, "expected executor to be added to shardOwners")
Bug: RLock not released when breaking out of loop

📄 service/sharddistributor/store/etcd/executorstore/shardcache/namespaceshardcache_test.go:254-260
In the test TestNamespaceShardToExecutor_namespaceRefreshLoop_triggersRefresh, the polling loop acquires e.RLock() at the start of each iteration but only calls e.RUnlock() when the executor is NOT found. When the condition ok is true and the loop breaks, the RLock is never released.

This causes:

  1. The lock remains held when the test continues to close(stopCh) and wg.Wait()
  2. Any goroutine trying to acquire a write lock will deadlock
  3. The test may hang or have undefined behavior

Suggested fix:

for {
    e.RLock()
    _, ok := e.shardOwners[executorID]
    e.RUnlock()  // Always release the lock
    if ok {
        break
    }
}

Or use defer pattern with an inner function:

for {
    found := func() bool {
        e.RLock()
        defer e.RUnlock()
        _, ok := e.shardOwners[executorID]
        return ok
    }()
    if found {
        break
    }
}
Rules ❌ No requirements met

Repository Rules

PR Description Quality Standards: Add exact test commands: Replace 'Unit tests' with copyable `go test` command (e.g., `go test -v ./service/sharddistributor/store/etcd/executorstore/shardcache -run TestNamespaceShardToExecutor`). Replace 'Run on dev cluster' with specific setup steps and verification commands. Expand 'Why' section to explain how watch processing worked previously and why the producer-consumer pattern was chosen as the solution.
Options

Auto-apply is off → Gitar will not commit updates to this branch.
Display: compact → Showing less information.

Comment with these commands to change:

Auto-apply Compact
gitar auto-apply:on         
gitar display:verbose         

Was this helpful? React with 👍 / 👎 | Gitar

@arzonus arzonus merged commit 3c8a7b8 into cadence-workflow:master Feb 9, 2026
42 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants