Skip to content
24 changes: 12 additions & 12 deletions common/dynamicconfig/dynamicproperties/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -3248,6 +3248,13 @@ const (
// Allowed filters: N/A
AllIsolationGroups

// RateLimiterBypassCallerTypes defines which caller types bypass rate limiters (both frontend and persistence)
// KeyName: system.rateLimiterBypassCallerTypes
// Value type: []string
// Default value: empty list
// Allowed filters: N/A
RateLimiterBypassCallerTypes

// HeaderForwardingRules defines which headers are forwarded from inbound calls to outbound.
// This value is only loaded at startup.
//
Expand All @@ -3258,13 +3265,6 @@ const (
// Default value: forward all headers. (this is a problematic value, and it will be changing as we reduce to a list of known values)
HeaderForwardingRules

// PersistenceRateLimiterBypassCallerTypes is the list of caller types that should bypass persistence rate limiting
// KeyName: persistence.rateLimiterBypassCallerTypes
// Value type: []string (list of caller type strings like "cli", "ui", "internal", etc.)
// Default value: empty list (no bypass)
// Allowed filters: N/A
PersistenceRateLimiterBypassCallerTypes

LastListKey
)

Expand Down Expand Up @@ -5760,6 +5760,11 @@ var ListKeys = map[ListKey]DynamicList{
KeyName: "system.allIsolationGroups",
Description: "A list of all the isolation groups in a system",
},
RateLimiterBypassCallerTypes: {
KeyName: "system.rateLimiterBypassCallerTypes",
Description: "List of caller types that bypass rate limiters (both frontend and persistence)",
DefaultValue: []interface{}{},
},
DefaultIsolationGroupConfigStoreManagerGlobalMapping: {
KeyName: "system.defaultIsolationGroupConfigStoreManagerGlobalMapping",
Description: "A configuration store for global isolation groups - used in isolation-group config only, not normal dynamic config." +
Expand All @@ -5779,11 +5784,6 @@ var ListKeys = map[ListKey]DynamicList{
},
},
},
PersistenceRateLimiterBypassCallerTypes: {
KeyName: "persistence.rateLimiterBypassCallerTypes",
Description: "List of caller types that should bypass persistence rate limiting (e.g., ['cli', 'internal'])",
DefaultValue: []interface{}{},
},
}

var _keyNames map[string]Key
Expand Down
74 changes: 36 additions & 38 deletions common/persistence/client/factory.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@ import (
"github.com/uber/cadence/common/codec"
"github.com/uber/cadence/common/config"
"github.com/uber/cadence/common/constants"
"github.com/uber/cadence/common/dynamicconfig"
es "github.com/uber/cadence/common/elasticsearch"
"github.com/uber/cadence/common/log"
"github.com/uber/cadence/common/log/tag"
Expand Down Expand Up @@ -111,14 +110,13 @@ type (
}
factoryImpl struct {
sync.RWMutex
config *config.Persistence
metricsClient metrics.Client
logger log.Logger
datastores map[storeType]Datastore
clusterName string
dc *p.DynamicConfiguration
hostname string
dynamicCollection *dynamicconfig.Collection
config *config.Persistence
metricsClient metrics.Client
logger log.Logger
datastores map[storeType]Datastore
clusterName string
dc *p.DynamicConfiguration
hostname string
}

storeType int
Expand Down Expand Up @@ -161,16 +159,14 @@ func NewFactory(
logger log.Logger,
dc *p.DynamicConfiguration,
hostname string,
dynamicCollection *dynamicconfig.Collection,
) Factory {
factory := &factoryImpl{
config: cfg,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
dc: dc,
hostname: hostname,
dynamicCollection: dynamicCollection,
config: cfg,
metricsClient: metricsClient,
logger: logger,
clusterName: clusterName,
dc: dc,
hostname: hostname,
}
limiters := buildRatelimiters(cfg, persistenceMaxQPS)
factory.init(clusterName, limiters)
Expand All @@ -189,7 +185,7 @@ func (f *factoryImpl) NewTaskManager() (p.TaskManager, error) {
result = errorinjectors.NewTaskManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewTaskManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewTaskManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewTaskManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand All @@ -209,7 +205,7 @@ func (f *factoryImpl) NewShardManager() (p.ShardManager, error) {
result = errorinjectors.NewShardManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewShardManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewShardManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewShardManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand All @@ -229,7 +225,7 @@ func (f *factoryImpl) NewHistoryManager() (p.HistoryManager, error) {
result = errorinjectors.NewHistoryManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewHistoryManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewHistoryManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewHistoryManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand All @@ -251,7 +247,7 @@ func (f *factoryImpl) NewDomainManager() (p.DomainManager, error) {
result = errorinjectors.NewDomainManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewDomainManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewDomainManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewDomainManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand Down Expand Up @@ -288,7 +284,7 @@ func (f *factoryImpl) NewExecutionManager(shardID int) (p.ExecutionManager, erro
result = errorinjectors.NewExecutionManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewExecutionManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewExecutionManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewExecutionManager(result, f.metricsClient, f.logger, f.config, f.dc.PersistenceSampleLoggingRate, f.dc.EnableShardIDMetrics, f.hostname, ds.name)
Expand Down Expand Up @@ -316,7 +312,7 @@ func (f *factoryImpl) NewVisibilityManager(

switch params.PersistenceConfig.AdvancedVisibilityStore {
case constants.PinotVisibilityStoreName:
visibilityFromPinot, err = setupPinotVisibilityManager(params, resourceConfig, f.logger, f.dc)
visibilityFromPinot, err = setupPinotVisibilityManager(params, resourceConfig, f.logger, f.dc, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
if err != nil {
f.logger.Fatal("Creating Pinot advanced visibility manager failed", tag.Error(err))
}
Expand All @@ -327,7 +323,7 @@ func (f *factoryImpl) NewVisibilityManager(
}

if params.PinotConfig.Migration.Enabled {
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc)
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
if err != nil {
f.logger.Fatal("Creating ES advanced visibility manager failed", tag.Error(err))
}
Expand All @@ -344,7 +340,7 @@ func (f *factoryImpl) NewVisibilityManager(
f.logger,
), nil
case constants.OSVisibilityStoreName:
visibilityFromOS, err = setupOSVisibilityManager(params, resourceConfig, f.logger, f.dc)
visibilityFromOS, err = setupOSVisibilityManager(params, resourceConfig, f.logger, f.dc, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
if err != nil {
f.logger.Fatal("Creating OS advanced visibility manager failed", tag.Error(err))
}
Expand All @@ -354,7 +350,7 @@ func (f *factoryImpl) NewVisibilityManager(
constants.VisibilityModeOS: visibilityFromOS,
}
if params.OSConfig.Migration.Enabled {
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc)
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
if err != nil {
f.logger.Fatal("Creating ES advanced visibility manager failed", tag.Error(err))
}
Expand All @@ -370,7 +366,7 @@ func (f *factoryImpl) NewVisibilityManager(
f.logger,
), nil
case constants.ESVisibilityStoreName:
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc)
visibilityFromES, err = setupESVisibilityManager(params, resourceConfig, f.logger, f.dc, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
if err != nil {
f.logger.Fatal("Creating advanced visibility manager failed", tag.Error(err))
}
Expand Down Expand Up @@ -414,14 +410,15 @@ func newPinotVisibilityManager(
metricsClient metrics.Client,
log log.Logger,
dc *p.DynamicConfiguration,
callerBypass quotas.CallerBypass,
) p.VisibilityManager {
visibilityFromPinotStore := pinotVisibility.NewPinotVisibilityStore(pinotClient, visibilityConfig, producer, log)
visibilityFromPinot := p.NewVisibilityManagerImpl(visibilityFromPinotStore, log, dc)

// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
pinotRateLimiter := quotas.NewDynamicRateLimiter(visibilityConfig.PersistenceMaxQPS.AsFloat64())
visibilityFromPinot = ratelimited.NewVisibilityManager(visibilityFromPinot, pinotRateLimiter, metricsClient, "pinot", nil)
visibilityFromPinot = ratelimited.NewVisibilityManager(visibilityFromPinot, pinotRateLimiter, metricsClient, "pinot", callerBypass)
}

if metricsClient != nil {
Expand All @@ -443,6 +440,7 @@ func newESVisibilityManager(
metricsClient metrics.Client,
log log.Logger,
dc *p.DynamicConfiguration,
callerBypass quotas.CallerBypass,
) p.VisibilityManager {

visibilityFromESStore := elasticsearch.NewElasticSearchVisibilityStore(esClient, indexName, producer, visibilityConfig, log)
Expand All @@ -451,7 +449,7 @@ func newESVisibilityManager(
// wrap with rate limiter
if visibilityConfig.PersistenceMaxQPS != nil && visibilityConfig.PersistenceMaxQPS() != 0 {
esRateLimiter := quotas.NewDynamicRateLimiter(visibilityConfig.PersistenceMaxQPS.AsFloat64())
visibilityFromES = ratelimited.NewVisibilityManager(visibilityFromES, esRateLimiter, metricsClient, "elasticsearch", nil)
visibilityFromES = ratelimited.NewVisibilityManager(visibilityFromES, esRateLimiter, metricsClient, "elasticsearch", callerBypass)
}
if metricsClient != nil {
// wrap with metrics
Expand Down Expand Up @@ -481,7 +479,7 @@ func (f *factoryImpl) newDBVisibilityManager(
result = errorinjectors.NewVisibilityManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewVisibilityManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewVisibilityManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if visibilityConfig.EnableDBVisibilitySampling != nil && visibilityConfig.EnableDBVisibilitySampling() {
result = sampled.NewVisibilityManager(result, sampled.Params{
Expand Down Expand Up @@ -514,7 +512,7 @@ func (f *factoryImpl) NewDomainReplicationQueueManager() (p.QueueManager, error)
result = errorinjectors.NewQueueManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewQueueManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewQueueManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewQueueManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand All @@ -534,7 +532,7 @@ func (f *factoryImpl) NewConfigStoreManager() (p.ConfigStoreManager, error) {
result = errorinjectors.NewConfigStoreManager(result, errorRate, f.logger, time.Now())
}
if ds.ratelimit != nil {
result = ratelimited.NewConfigStoreManager(result, ds.ratelimit, f.metricsClient, ds.name, f.dynamicCollection)
result = ratelimited.NewConfigStoreManager(result, ds.ratelimit, f.metricsClient, ds.name, quotas.NewCallerBypass(f.dc.RateLimiterBypassCallerTypes))
}
if f.metricsClient != nil {
result = metered.NewConfigStoreManager(result, f.metricsClient, f.logger, f.config, f.hostname, ds.name)
Expand Down Expand Up @@ -630,28 +628,28 @@ func buildRatelimiters(cfg *config.Persistence, maxQPS quotas.RPSFunc) map[strin
return result
}

func setupPinotVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration) (p.VisibilityManager, error) {
func setupPinotVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration, callerBypass quotas.CallerBypass) (p.VisibilityManager, error) {
visibilityProducer, err := params.MessagingClient.NewProducer(constants.PinotVisibilityAppName)
if err != nil {
return nil, err
}
return newPinotVisibilityManager(params.PinotClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc), nil
return newPinotVisibilityManager(params.PinotClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc, callerBypass), nil
}

func setupESVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration) (p.VisibilityManager, error) {
func setupESVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration, callerBypass quotas.CallerBypass) (p.VisibilityManager, error) {
visibilityIndexName := params.ESConfig.Indices[constants.VisibilityAppName]
visibilityProducer, err := params.MessagingClient.NewProducer(constants.VisibilityAppName)
if err != nil {
return nil, err
}
return newESVisibilityManager(visibilityIndexName, params.ESClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc), nil
return newESVisibilityManager(visibilityIndexName, params.ESClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc, callerBypass), nil
}

func setupOSVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration) (p.VisibilityManager, error) {
func setupOSVisibilityManager(params *Params, resourceConfig *service.Config, logger log.Logger, dc *p.DynamicConfiguration, callerBypass quotas.CallerBypass) (p.VisibilityManager, error) {
visibilityIndexName := params.OSConfig.Indices[constants.VisibilityAppName]
visibilityProducer, err := params.MessagingClient.NewProducer(constants.VisibilityAppName)
if err != nil {
return nil, err
}
return newESVisibilityManager(visibilityIndexName, params.OSClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc), nil
return newESVisibilityManager(visibilityIndexName, params.OSClient, resourceConfig, visibilityProducer, params.MetricsClient, logger, dc, callerBypass), nil
}
2 changes: 1 addition & 1 deletion common/persistence/client/factory_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -279,7 +279,7 @@ func makeFactoryWithMetrics(t *testing.T, withMetrics bool) Factory {
},
}

return NewFactory(cfg, qpsFn, "test cluster", met, logger, pdc, "", nil)
return NewFactory(cfg, qpsFn, "test cluster", met, logger, pdc, "")
}

func mockDatastore(t *testing.T, fact Factory, store storeType) *MockDataStoreFactory {
Expand Down
2 changes: 2 additions & 0 deletions common/persistence/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type (
SerializationEncoding dynamicproperties.StringPropertyFn
DomainAuditLogTTL dynamicproperties.DurationPropertyFnWithDomainIDFilter
HistoryNodeDeleteBatchSize dynamicproperties.IntPropertyFn
RateLimiterBypassCallerTypes dynamicproperties.ListPropertyFn
}
)

Expand All @@ -54,5 +55,6 @@ func NewDynamicConfiguration(dc *dynamicconfig.Collection) *DynamicConfiguration
SerializationEncoding: dc.GetStringProperty(dynamicproperties.SerializationEncoding),
DomainAuditLogTTL: dc.GetDurationPropertyFilteredByDomainID(dynamicproperties.DomainAuditLogTTL),
HistoryNodeDeleteBatchSize: dc.GetIntProperty(dynamicproperties.HistoryNodeDeleteBatchSize),
RateLimiterBypassCallerTypes: dc.GetListProperty(dynamicproperties.RateLimiterBypassCallerTypes),
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ func (s *DBVisibilityPersistenceSuite) SetupSuite() {
}
clusterName := s.ClusterMetadata.GetCurrentClusterName()
vCfg := s.VisibilityTestCluster.Config()
visibilityFactory := client.NewFactory(&vCfg, nil, clusterName, nil, s.Logger, &s.DynamicConfiguration, "test-host", nil)
visibilityFactory := client.NewFactory(&vCfg, nil, clusterName, nil, s.Logger, &s.DynamicConfiguration, "test-host")
// SQL currently doesn't have support for visibility manager
var err error
s.VisibilityMgr, err = visibilityFactory.NewVisibilityManager(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -232,7 +232,7 @@ func (s *TestBase) Setup() {
cfg := s.DefaultTestCluster.Config()
scope := tally.NewTestScope(service.History, make(map[string]string))
metricsClient := metrics.NewClient(scope, service.GetMetricsServiceIdx(service.History, s.Logger), metrics.HistogramMigration{})
factory := client.NewFactory(&cfg, nil, clusterName, metricsClient, s.Logger, &s.DynamicConfiguration, "test-host", nil)
factory := client.NewFactory(&cfg, nil, clusterName, metricsClient, s.Logger, &s.DynamicConfiguration, "test-host")

s.TaskMgr, err = factory.NewTaskManager()
s.fatalOnError("NewTaskManager", err)
Expand Down
Loading
Loading