Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions service/matching/tasklist/matcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,17 @@ func (tm *taskMatcherImpl) Offer(ctx context.Context, task *InternalTask) (bool,

// OfferOrTimeout offers a task to a poller and blocks until a poller picks up the task or context timeouts
func (tm *taskMatcherImpl) OfferOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error) {
if !task.IsForwarded() {
err := tm.ratelimit(ctx)
if err != nil {
// If context was canceled/timed out, return without error (consistent with original behavior)
if err == ctx.Err() {
return false, nil
}
tm.scope.IncCounter(metrics.SyncThrottlePerTaskListCounter)
return false, err
}
}
select {
case tm.getTaskC(task) <- task: // poller picked up the task
if task.ResponseC != nil {
Expand Down
27 changes: 27 additions & 0 deletions service/matching/tasklist/matcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,33 @@ func (t *MatcherTestSuite) TestOffer_RateLimited() {
t.False(matched)
}

func (t *MatcherTestSuite) TestOfferOrTimeout_RateLimited() {
t.matcher.limiter = clock.NewRatelimiter(0, 0)
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", false, &types.ActivityTaskDispatchInfo{}, "")

ctx := context.Background()

matched, err := t.matcher.OfferOrTimeout(ctx, time.Now(), task)

t.ErrorIs(err, ErrTasklistThrottled)
t.False(matched)
}

func (t *MatcherTestSuite) TestOfferOrTimeout_ForwardedNotRateLimited() {
// Forwarded tasks should not be rate limited
t.matcher.limiter = clock.NewRatelimiter(0, 0)
task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "forwarded-from", false, &types.ActivityTaskDispatchInfo{}, "")

ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond)
defer cancel()

// Should timeout instead of being rate limited since forwarded tasks bypass rate limiting
matched, err := t.matcher.OfferOrTimeout(ctx, time.Now(), task)

t.NoError(err)
t.False(matched)
}

func (t *MatcherTestSuite) TestOffer_NoTimeoutSyncMatchedNoError() {
defer goleak.VerifyNone(t.T())

Expand Down
Loading