Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 15 additions & 4 deletions client/matching/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,11 @@ package matching

import (
"context"
"errors"

"go.uber.org/yarpc"

"github.com/uber/cadence/common/errors"
cadence_errors "github.com/uber/cadence/common/errors"
"github.com/uber/cadence/common/future"
"github.com/uber/cadence/common/persistence"
"github.com/uber/cadence/common/types"
Expand Down Expand Up @@ -75,6 +76,11 @@ func (c *clientImpl) AddActivityTask(
}
resp, err := c.client.AddActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
// ReadOnlyPartitionError indicates the partition is being drained - invalidate cache to force next request to route to root partition
var readOnlyErr *types.ReadOnlyPartitionError
if errors.As(err, &readOnlyErr) {
c.provider.InvalidatePartitionCache(request.GetDomainUUID(), *originalTaskList, persistence.TaskListTypeActivity)
}
return nil, err
}
request.TaskList = originalTaskList
Expand Down Expand Up @@ -107,6 +113,11 @@ func (c *clientImpl) AddDecisionTask(
}
resp, err := c.client.AddDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
// ReadOnlyPartitionError indicates the partition is being drained - invalidate cache to force next request to route to root partition
var readOnlyErr *types.ReadOnlyPartitionError
if errors.As(err, &readOnlyErr) {
c.provider.InvalidatePartitionCache(request.GetDomainUUID(), *originalTaskList, persistence.TaskListTypeDecision)
}
return nil, err
}
request.TaskList = originalTaskList
Expand Down Expand Up @@ -140,7 +151,7 @@ func (c *clientImpl) PollForActivityTask(
}
resp, err := c.client.PollForActivityTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, errors.NewPeerHostnameError(err, peer)
return nil, cadence_errors.NewPeerHostnameError(err, peer)
}

request.PollRequest.TaskList = originalTaskList
Expand Down Expand Up @@ -182,7 +193,7 @@ func (c *clientImpl) PollForDecisionTask(
}
resp, err := c.client.PollForDecisionTask(ctx, request, append(opts, yarpc.WithShardKey(peer))...)
if err != nil {
return nil, errors.NewPeerHostnameError(err, peer)
return nil, cadence_errors.NewPeerHostnameError(err, peer)
}
request.PollRequest.TaskList = originalTaskList
c.provider.UpdatePartitionConfig(
Expand Down Expand Up @@ -321,7 +332,7 @@ func (c *clientImpl) GetTaskListsByDomain(
for i, future := range futures {
var resp *types.GetTaskListsByDomainResponse
if err = future.Get(ctx, &resp); err != nil {
return nil, errors.NewPeerHostnameError(err, peers[i])
return nil, cadence_errors.NewPeerHostnameError(err, peers[i])
}
for name, tl := range resp.GetDecisionTaskListMap() {
if _, ok := decisionTaskListMap[name]; !ok {
Expand Down
36 changes: 36 additions & 0 deletions client/matching/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -202,6 +202,24 @@ func TestClient_withResponse(t *testing.T) {
},
wantError: true,
},
{
name: "AddActivityTask - ReadOnlyPartitionError triggers cache invalidation",
op: func(c Client) (any, error) {
return c.AddActivityTask(context.Background(), testAddActivityTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeActivity, testAddActivityTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddActivityTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, &types.ReadOnlyPartitionError{Message: "partition drained"})
mp.EXPECT().InvalidatePartitionCache(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeActivity)
},
wantError: true,
validateError: func(t *testing.T, err error) {
var readOnlyErr *types.ReadOnlyPartitionError
assert.True(t, stdErrors.As(err, &readOnlyErr))
assert.Equal(t, "partition drained", readOnlyErr.Message)
},
},
{
name: "AddDecisionTask",
op: func(c Client) (any, error) {
Expand Down Expand Up @@ -238,6 +256,24 @@ func TestClient_withResponse(t *testing.T) {
},
wantError: true,
},
{
name: "AddDecisionTask - ReadOnlyPartitionError triggers cache invalidation",
op: func(c Client) (any, error) {
return c.AddDecisionTask(context.Background(), testAddDecisionTaskRequest())
},
mock: func(p *MockPeerResolver, balancer *MockLoadBalancer, c *MockClient, mp *MockPartitionConfigProvider) {
balancer.EXPECT().PickWritePartition(persistence.TaskListTypeDecision, testAddDecisionTaskRequest()).Return(_testPartition)
p.EXPECT().FromTaskList(_testPartition).Return("peer0", nil)
c.EXPECT().AddDecisionTask(gomock.Any(), gomock.Any(), []yarpc.CallOption{yarpc.WithShardKey("peer0")}).Return(nil, &types.ReadOnlyPartitionError{Message: "partition drained"})
mp.EXPECT().InvalidatePartitionCache(_testDomainUUID, types.TaskList{Name: _testTaskList}, persistence.TaskListTypeDecision)
},
wantError: true,
validateError: func(t *testing.T, err error) {
var readOnlyErr *types.ReadOnlyPartitionError
assert.True(t, stdErrors.As(err, &readOnlyErr))
assert.Equal(t, "partition drained", readOnlyErr.Message)
},
},
{
name: "PollForActivityTask",
op: func(c Client) (any, error) {
Expand Down
15 changes: 15 additions & 0 deletions client/matching/partition_config_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type (
GetPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *types.TaskListPartitionConfig
// UpdatePartitionConfig updates the partition configuration for a task list
UpdatePartitionConfig(domainID string, taskList types.TaskList, taskListType int, config *types.TaskListPartitionConfig)
// InvalidatePartitionCache invalidates the cached partition configuration for a task list
InvalidatePartitionCache(domainID string, taskList types.TaskList, taskListType int)
// GetMetricsClient returns the metrics client
GetMetricsClient() metrics.Client
// GetLogger returns the logger
Expand Down Expand Up @@ -197,6 +199,19 @@ func (p *partitionConfigProviderImpl) GetLogger() log.Logger {
return p.logger
}

func (p *partitionConfigProviderImpl) InvalidatePartitionCache(domainID string, taskList types.TaskList, taskListType int) {
if taskList.GetKind() != types.TaskListKindNormal {
return
}
taskListKey := key{
domainID: domainID,
taskListName: taskList.Name,
taskListType: taskListType,
}
p.configCache.Delete(taskListKey)
p.logger.Info("tasklist partition config cache invalidated", tag.WorkflowDomainID(domainID), tag.WorkflowTaskListName(taskList.Name), tag.WorkflowTaskListType(taskListType))
}

func (p *partitionConfigProviderImpl) getCachedPartitionConfig(domainID string, taskList types.TaskList, taskListType int) *syncedTaskListPartitionConfig {
if taskList.GetKind() != types.TaskListKindNormal {
return nil
Expand Down
12 changes: 12 additions & 0 deletions client/matching/partition_config_provider_mock.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

39 changes: 39 additions & 0 deletions client/matching/partition_config_provider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -315,6 +315,45 @@ func TestUpdatePartitionConfig(t *testing.T) {
}
}

func TestInvalidatePartitionCache(t *testing.T) {
testCases := []struct {
name string
taskListKind types.TaskListKind
expectDeleteCall bool
}{
{
name: "invalidate cache for normal task list",
taskListKind: types.TaskListKindNormal,
expectDeleteCall: true,
},
{
name: "skip invalidation for sticky task list",
taskListKind: types.TaskListKindSticky,
expectDeleteCall: false,
},
}

for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
partitionProvider, mockCache := setUpMocksForPartitionConfigProvider(t, true)

kind := tc.taskListKind
taskList := types.TaskList{Name: "test-task-list", Kind: &kind}

if tc.expectDeleteCall {
expectedKey := key{
domainID: "test-domain-id",
taskListName: "test-task-list",
taskListType: 0,
}
mockCache.EXPECT().Delete(expectedKey).Times(1)
}

partitionProvider.InvalidatePartitionCache("test-domain-id", taskList, 0)
})
}
}

func partitions(num int) map[int]*types.TaskListPartition {
result := make(map[int]*types.TaskListPartition, num)
for i := 0; i < num; i++ {
Expand Down
2 changes: 1 addition & 1 deletion cmd/server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26
github.com/uber/ringpop-go v0.8.5 // indirect
github.com/uber/tchannel-go v1.22.2 // indirect
github.com/valyala/fastjson v1.4.1 // indirect
Expand Down
4 changes: 2 additions & 2 deletions cmd/server/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -414,8 +414,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c h1:bPYzyyko9wSLoNB+C5FAC8doOxNuozQo3L3Gx5+25Hg=
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26 h1:ayljsfgiQNLzoA1Bn29LyRPhFdU9gPKq/1zCUjC8cHE=
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
4 changes: 4 additions & 0 deletions common/metrics/defs.go
Original file line number Diff line number Diff line change
Expand Up @@ -2391,6 +2391,7 @@ const (
CadenceErrAuthorizeFailedPerTaskListCounter
CadenceErrRemoteSyncMatchFailedPerTaskListCounter
CadenceErrStickyWorkerUnavailablePerTaskListCounter
CadenceErrReadOnlyPartitionPerTaskListCounter
CadenceErrTaskListNotOwnedByHostPerTaskListCounter

CadenceShardSuccessGauge
Expand Down Expand Up @@ -3227,6 +3228,9 @@ var MetricDefs = map[ServiceIdx]map[MetricIdx]metricDefinition{
CadenceErrStickyWorkerUnavailablePerTaskListCounter: {
metricName: "cadence_errors_sticky_worker_unavailable_per_tl", metricRollupName: "cadence_errors_sticky_worker_unavailable_per_tl", metricType: Counter,
},
CadenceErrReadOnlyPartitionPerTaskListCounter: {
metricName: "cadence_errors_read_only_partition_per_tl", metricRollupName: "cadence_errors_read_only_partition", metricType: Counter,
},
CadenceErrTaskListNotOwnedByHostPerTaskListCounter: {
metricName: "cadence_errors_task_list_not_owned_by_host_per_tl", metricRollupName: "cadence_errors_task_list_not_owned_by_host", metricType: Counter,
},
Expand Down
4 changes: 4 additions & 0 deletions common/types/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,3 +166,7 @@ func (err EventAlreadyStartedError) Error() string {
func (err StickyWorkerUnavailableError) Error() string {
return err.Message
}

func (err ReadOnlyPartitionError) Error() string {
return err.Message
}
5 changes: 5 additions & 0 deletions common/types/errors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,11 @@ func Test_Error(t *testing.T) {
Message: errMessage,
},
},
{
err: ReadOnlyPartitionError{
Message: errMessage,
},
},
{
err: InternalServiceError{
Message: errMessage,
Expand Down
10 changes: 10 additions & 0 deletions common/types/mapper/proto/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ func FromError(err error) error {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromStickyWorkerUnavailableErr); ok {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromReadOnlyPartitionErr); ok {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromNamespaceNotFoundErr); ok {
return typedErr
} else if ok, typedErr = errorutils.ConvertError(err, fromShardNotFoundErr); ok {
Expand Down Expand Up @@ -189,6 +191,10 @@ func ToError(err error) error {
EndEventVersion: ToEventVersion(details.EndEvent),
}
}
case *apiv1.ReadOnlyPartitionError:
return &types.ReadOnlyPartitionError{
Message: status.Message(),
}
}
case yarpcerrors.CodeAlreadyExists:
switch details := getErrorDetails(err).(type) {
Expand Down Expand Up @@ -405,6 +411,10 @@ func fromStickyWorkerUnavailableErr(e *types.StickyWorkerUnavailableError) error
return protobuf.NewError(yarpcerrors.CodeUnavailable, e.Message, protobuf.WithErrorDetails(&apiv1.StickyWorkerUnavailableError{}))
}

func fromReadOnlyPartitionErr(e *types.ReadOnlyPartitionError) error {
return protobuf.NewError(yarpcerrors.CodeAborted, e.Message, protobuf.WithErrorDetails(&apiv1.ReadOnlyPartitionError{}))
}

func fromNamespaceNotFoundErr(e *types.NamespaceNotFoundError) error {
return protobuf.NewError(yarpcerrors.CodeNotFound, e.Error(), protobuf.WithErrorDetails(&sharddistributorv1.NamespaceNotFoundError{
Namespace: e.Namespace,
Expand Down
5 changes: 5 additions & 0 deletions common/types/shared.go
Original file line number Diff line number Diff line change
Expand Up @@ -9518,6 +9518,11 @@ type StickyWorkerUnavailableError struct {
Message string `json:"message,required"`
}

// ReadOnlyPartitionError is an internal type (TBD...)
type ReadOnlyPartitionError struct {
Message string `json:"message,required"`
}

// Any is an internal mirror of google.protobuf.Any, serving the same purposes, but
// intentionally breaking direct compatibility because it may hold data that is not
// actually protobuf encoded.
Expand Down
4 changes: 4 additions & 0 deletions common/types/testdata/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,9 @@ var (
StickyWorkerUnavailableError = types.StickyWorkerUnavailableError{
Message: ErrorMessage,
}
ReadOnlyPartitionError = types.ReadOnlyPartitionError{
Message: ErrorMessage,
}
TaskListNotOwnedByHostError = cadence_errors.TaskListNotOwnedByHostError{
OwnedByIdentity: HostName,
MyIdentity: HostName2,
Expand Down Expand Up @@ -151,6 +154,7 @@ var Errors = []error{
&ShardOwnershipLostError,
&WorkflowExecutionAlreadyStartedError,
&StickyWorkerUnavailableError,
&ReadOnlyPartitionError,
&TaskListNotOwnedByHostError,
&NamespaceNotFoundError,
&ShardNotFoundError,
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ require (
github.com/startreedata/pinot-client-go v0.2.0 // latest release supports pinot v0.12.0 which is also internal version
github.com/stretchr/testify v1.10.0
github.com/uber-go/tally v3.3.15+incompatible
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26
github.com/uber/ringpop-go v0.8.5
github.com/uber/tchannel-go v1.22.2
github.com/urfave/cli/v2 v2.27.4
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -453,8 +453,8 @@ github.com/uber-go/tally v3.3.12+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyu
github.com/uber-go/tally v3.3.15+incompatible h1:9hLSgNBP28CjIaDmAuRTq9qV+UZY+9PcvAkXO4nNMwg=
github.com/uber-go/tally v3.3.15+incompatible/go.mod h1:YDTIBxdXyOU/sCWilKB4bgyufu1cEi0jdVnRdxvjnmU=
github.com/uber/cadence-idl v0.0.0-20211111101836-d6b70b60eb8c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c h1:bPYzyyko9wSLoNB+C5FAC8doOxNuozQo3L3Gx5+25Hg=
github.com/uber/cadence-idl v0.0.0-20251211211800-a5ebf5fd3e0c/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26 h1:ayljsfgiQNLzoA1Bn29LyRPhFdU9gPKq/1zCUjC8cHE=
github.com/uber/cadence-idl v0.0.0-20260115163036-f68403083e26/go.mod h1:oyUK7GCNCRHCCyWyzifSzXpVrRYVBbAMHAzF5dXiKws=
github.com/uber/jaeger-client-go v2.22.1+incompatible h1:NHcubEkVbahf9t3p75TOCR83gdUHXjRJvjoBh1yACsM=
github.com/uber/jaeger-client-go v2.22.1+incompatible/go.mod h1:WVhlPFC8FDjOFMMWRy2pZqQJSXxYSwNYOkTr/Z6d3Kk=
github.com/uber/jaeger-lib v2.2.0+incompatible h1:MxZXOiR2JuoANZ3J6DE/U0kSFv/eJ/GfSYVCjK7dyaw=
Expand Down
2 changes: 1 addition & 1 deletion idls
3 changes: 3 additions & 0 deletions service/matching/handler/context.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,9 @@ func (reqCtx *handlerContext) handleErr(err error) error {
case errors.As(err, new(*types.StickyWorkerUnavailableError)):
reqCtx.scope.IncCounter(metrics.CadenceErrStickyWorkerUnavailablePerTaskListCounter)
return err
case errors.As(err, new(*types.ReadOnlyPartitionError)):
reqCtx.scope.IncCounter(metrics.CadenceErrReadOnlyPartitionPerTaskListCounter)
return err
case errors.As(err, new(*cadence_errors.TaskListNotOwnedByHostError)):
reqCtx.scope.IncCounter(metrics.CadenceErrTaskListNotOwnedByHostPerTaskListCounter)
return err
Expand Down
4 changes: 4 additions & 0 deletions service/matching/handler/context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ func TestHandleErrKnowErrors(t *testing.T) {
name: "StickyWorkerUnavailableError",
err: &types.StickyWorkerUnavailableError{},
},
{
name: "ReadOnlyPartitionError",
err: &types.ReadOnlyPartitionError{},
},
{
name: "TaskListNotOwnedByHostError",
err: &cadence_errors.TaskListNotOwnedByHostError{},
Expand Down
2 changes: 1 addition & 1 deletion service/matching/tasklist/task_list_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -533,7 +533,7 @@ func (c *taskListManagerImpl) AddTask(ctx context.Context, params AddTaskParams)
if c.config.EnableGetNumberOfPartitionsFromCache() {
_, ok := c.PartitionWriteConfig()
if !ok {
return false, &types.InternalServiceError{Message: "Current partition is drained."}
return false, &types.ReadOnlyPartitionError{Message: "Current partition is drained."}
}
}
if params.ForwardedFrom == "" {
Expand Down
Loading