diff --git a/service/matching/tasklist/matcher.go b/service/matching/tasklist/matcher.go index 82b4012697c..2e4542400c0 100644 --- a/service/matching/tasklist/matcher.go +++ b/service/matching/tasklist/matcher.go @@ -229,6 +229,17 @@ func (tm *taskMatcherImpl) Offer(ctx context.Context, task *InternalTask) (bool, // OfferOrTimeout offers a task to a poller and blocks until a poller picks up the task or context timeouts func (tm *taskMatcherImpl) OfferOrTimeout(ctx context.Context, startT time.Time, task *InternalTask) (bool, error) { + if !task.IsForwarded() { + err := tm.ratelimit(ctx) + if err != nil { + // If context was canceled/timed out, return without error (consistent with original behavior) + if err == ctx.Err() { + return false, nil + } + tm.scope.IncCounter(metrics.SyncThrottlePerTaskListCounter) + return false, err + } + } select { case tm.getTaskC(task) <- task: // poller picked up the task if task.ResponseC != nil { diff --git a/service/matching/tasklist/matcher_test.go b/service/matching/tasklist/matcher_test.go index cd689a34fa0..bd7323f0580 100644 --- a/service/matching/tasklist/matcher_test.go +++ b/service/matching/tasklist/matcher_test.go @@ -665,6 +665,33 @@ func (t *MatcherTestSuite) TestOffer_RateLimited() { t.False(matched) } +func (t *MatcherTestSuite) TestOfferOrTimeout_RateLimited() { + t.matcher.limiter = clock.NewRatelimiter(0, 0) + task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "", false, &types.ActivityTaskDispatchInfo{}, "") + + ctx := context.Background() + + matched, err := t.matcher.OfferOrTimeout(ctx, time.Now(), task) + + t.ErrorIs(err, ErrTasklistThrottled) + t.False(matched) +} + +func (t *MatcherTestSuite) TestOfferOrTimeout_ForwardedNotRateLimited() { + // Forwarded tasks should not be rate limited + t.matcher.limiter = clock.NewRatelimiter(0, 0) + task := newInternalTask(t.newTaskInfo(), nil, types.TaskSourceHistory, "forwarded-from", false, &types.ActivityTaskDispatchInfo{}, "") + + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Millisecond) + defer cancel() + + // Should timeout instead of being rate limited since forwarded tasks bypass rate limiting + matched, err := t.matcher.OfferOrTimeout(ctx, time.Now(), task) + + t.NoError(err) + t.False(matched) +} + func (t *MatcherTestSuite) TestOffer_NoTimeoutSyncMatchedNoError() { defer goleak.VerifyNone(t.T())