From e5b8b2d02fafb9d001712214106b4480600938bc Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Wed, 20 Sep 2023 19:24:47 -0700 Subject: [PATCH 1/9] Configurable merge policy for index * additional setting to configure merge policy for timestamp based index * introduction of logbytesize merge policy as an option Signed-off-by: Rishabh Maurya --- CHANGELOG.md | 1 + .../gateway/RecoveryFromGatewayIT.java | 4 +- .../RemoveCorruptedShardDataCommandIT.java | 4 +- .../index/store/CorruptedFileIT.java | 10 +- .../indices/stats/IndexStatsIT.java | 10 +- .../java/org/opensearch/update/UpdateIT.java | 4 +- .../common/settings/ClusterSettings.java | 1 + .../common/settings/IndexScopedSettings.java | 29 +- .../org/opensearch/index/IndexSettings.java | 160 ++++++- .../index/LogByteSizeMergePolicyProvider.java | 171 ++++++++ .../opensearch/index/MergePolicyProvider.java | 26 ++ ...ig.java => TieredMergePolicyProvider.java} | 72 ++-- .../opensearch/index/shard/IndexShard.java | 2 +- .../segments/IndicesSegmentsRequestTests.java | 4 +- .../index/MergePolicySettingsTests.java | 397 ++++++++++++++---- .../index/MergeSchedulerSettingsTests.java | 8 +- .../RemoveCorruptedShardDataCommandTests.java | 4 +- .../indices/recovery/RecoveryTests.java | 4 +- .../test/OpenSearchIntegTestCase.java | 4 +- 19 files changed, 761 insertions(+), 154 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java create mode 100644 server/src/main/java/org/opensearch/index/MergePolicyProvider.java rename server/src/main/java/org/opensearch/index/{MergePolicyConfig.java => TieredMergePolicyProvider.java} (83%) diff --git a/CHANGELOG.md b/CHANGELOG.md index 772198d5d0544..2340cd7d8927f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), - Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679)) - Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110)) - Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618)) +- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992)) ### Dependencies - Bump `log4j-core` from 2.18.0 to 2.19.0 diff --git a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java index 2bab61f3e1c4c..229cd7bffad2f 100644 --- a/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java @@ -53,7 +53,7 @@ import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.Engine; import org.opensearch.index.query.QueryBuilders; import org.opensearch.index.shard.ShardPath; @@ -519,7 +519,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception { .put("number_of_replicas", 1) // disable merges to keep segments the same - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // expire retention leases quickly .put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms") diff --git a/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java b/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java index f8c2acbf99f70..b431079476624 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java @@ -73,7 +73,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.GatewayMetaState; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.MockEngineFactoryPlugin; import org.opensearch.index.seqno.SeqNoStats; import org.opensearch.index.translog.TestTranslog; @@ -135,7 +135,7 @@ public void testCorruptIndex() throws Exception { Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1") .put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true) .put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum") diff --git a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java index 7e1d0792e3ddb..8291fef5d177b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java @@ -72,7 +72,7 @@ import org.opensearch.core.index.shard.ShardId; import org.opensearch.env.NodeEnvironment; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.shard.IndexEventListener; import org.opensearch.index.shard.IndexShard; import org.opensearch.index.shard.IndexShardState; @@ -167,7 +167,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1") - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no translog based flush - it might change the .liv / segments.N files @@ -286,7 +286,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on // purpose // no translog based flush - it might change the .liv / segments.N files @@ -552,7 +552,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) // no checkindex - we corrupt shards on purpose .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no translog based flush - it might change the .liv / segments.N files @@ -624,7 +624,7 @@ public void testReplicaCorruption() throws Exception { prepareCreate("test").setSettings( Settings.builder() .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on // purpose .put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java index a0f01acd1f8e9..0967acb37d3e8 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java @@ -66,8 +66,8 @@ import org.opensearch.index.IndexModule; import org.opensearch.index.IndexService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.VersionType; import org.opensearch.index.cache.query.QueryCacheStats; import org.opensearch.index.engine.VersionConflictEngineException; @@ -589,8 +589,8 @@ public void testNonThrottleStats() throws Exception { prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000") ) @@ -621,8 +621,8 @@ public void testThrottleStats() throws Exception { prepareCreate("test").setSettings( settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1") .put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name()) diff --git a/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java b/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java index 442268d513fc3..b46d27bafb2a5 100644 --- a/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java @@ -50,7 +50,7 @@ import org.opensearch.common.xcontent.XContentFactory; import org.opensearch.core.action.ActionListener; import org.opensearch.core.rest.RestStatus; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.DocumentMissingException; import org.opensearch.index.engine.VersionConflictEngineException; import org.opensearch.plugins.Plugin; @@ -669,7 +669,7 @@ public void run() { public void testStressUpdateDeleteConcurrency() throws Exception { // We create an index with merging disabled so that deletes don't get merged away - assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false))); + assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false))); ensureGreen(); Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field")); diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 032027384f106..0861657005acc 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -457,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) { NetworkService.TCP_CONNECT_TIMEOUT, IndexSettings.QUERY_STRING_ANALYZE_WILDCARD, IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD, + IndexSettings.TIME_INDEX_MERGE_POLICY, ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING, ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING, ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING, diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 5b2afc44600bd..2c2eb1d71628f 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -45,9 +45,11 @@ import org.opensearch.index.IndexSettings; import org.opensearch.index.IndexSortConfig; import org.opensearch.index.IndexingSlowLog; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.LogByteSizeMergePolicyProvider; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.SearchSlowLog; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.cache.bitset.BitsetFilterCache; import org.opensearch.index.engine.EngineConfig; import org.opensearch.index.fielddata.IndexFieldDataService; @@ -120,14 +122,14 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING, IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING, IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING, - MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, - MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, + TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING, IndexSortConfig.INDEX_SORT_FIELD_SETTING, IndexSortConfig.INDEX_SORT_ORDER_SETTING, IndexSortConfig.INDEX_SORT_MISSING_SETTING, @@ -202,6 +204,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED, IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME, IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, + IndexSettings.INDEX_MERGE_POLICY, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING, IndexSettings.DEFAULT_SEARCH_PIPELINE, // Settings for Searchable Snapshots @@ -275,7 +284,7 @@ public boolean isPrivateSetting(String key) { case IndexMetadata.SETTING_HISTORY_UUID: case IndexMetadata.SETTING_VERSION_UPGRADED: case IndexMetadata.SETTING_INDEX_PROVIDED_NAME: - case MergePolicyConfig.INDEX_MERGE_ENABLED: + case MergePolicyProvider.INDEX_MERGE_ENABLED: // we keep the shrink settings for BWC - this can be removed in 8.0 // we can't remove in 7 since this setting might be baked into an index coming in via a full cluster restart from 6.0 case "index.shrink.source.uuid": diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 1e4224c314f05..4938958154c68 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -85,7 +85,10 @@ public final class IndexSettings { private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; - + public static final String TIERED_MERGE_POLICY = "tiered"; + public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; + private static final String DEFAULT_MERGE_POLICY = "default_merge_policy"; + private static final String DEFAULT_TIME_INDEX_MERGE_POLICY = "default_time_index_merge_policy"; public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -571,6 +574,18 @@ public final class IndexSettings { Property.Dynamic ); + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString( + "index.merge.policy", + DEFAULT_MERGE_POLICY, + Property.IndexScope + ); + + public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( + "indices.time_index.merge.policy", + DEFAULT_TIME_INDEX_MERGE_POLICY, + Property.NodeScope + ); + public static final Setting SEARCHABLE_SNAPSHOT_REPOSITORY = Setting.simpleString( "index.searchable_snapshot.repository", Property.IndexScope, @@ -651,7 +666,10 @@ public final class IndexSettings { private volatile ByteSizeValue generationThresholdSize; private volatile ByteSizeValue flushAfterMergeThresholdSize; private final MergeSchedulerConfig mergeSchedulerConfig; - private final MergePolicyConfig mergePolicyConfig; + private final TieredMergePolicyProvider tieredMergePolicyProvider; + private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider; + private final MergePolicyProvider defaultMergePolicyProvider; + private final MergePolicyProvider defaultTimeIndexMergePolicyProvider; private final IndexSortConfig indexSortConfig; private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @@ -729,6 +747,9 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; + private volatile MergePolicyProvider mergePolicyProvider; + private volatile MergePolicyProvider timeIndexMergePolicyProvider; + /** * Returns the default search fields for this index. */ @@ -844,7 +865,12 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxAnalyzedOffset = scopedSettings.get(MAX_ANALYZED_OFFSET_SETTING); maxTermsCount = scopedSettings.get(MAX_TERMS_COUNT_SETTING); maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); - this.mergePolicyConfig = new MergePolicyConfig(logger, this); + this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this); + this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this); + this.defaultMergePolicyProvider = tieredMergePolicyProvider; + this.defaultTimeIndexMergePolicyProvider = tieredMergePolicyProvider; + setMergePolicyProvider(scopedSettings.get(INDEX_MERGE_POLICY)); + setTimeIndexMergePolicy(scopedSettings.get(INDEX_MERGE_POLICY), TIME_INDEX_MERGE_POLICY.get(nodeSettings)); this.indexSortConfig = new IndexSortConfig(this); searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); @@ -866,33 +892,59 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti * Now this sortField (IndexSort) is stored in SegmentInfo and we need to maintain backward compatibility for them. */ widenIndexSortType = IndexMetadata.SETTING_INDEX_VERSION_CREATED.get(settings).before(V_2_7_0); - - scopedSettings.addSettingsUpdateConsumer(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING, mergePolicyConfig::setNoCFSRatio); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, - mergePolicyConfig::setDeletesPctAllowed + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING, + tieredMergePolicyProvider::setNoCFSRatio + ); + scopedSettings.addSettingsUpdateConsumer( + TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING, + tieredMergePolicyProvider::setDeletesPctAllowed ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, - mergePolicyConfig::setExpungeDeletesAllowed + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING, + tieredMergePolicyProvider::setExpungeDeletesAllowed ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, - mergePolicyConfig::setFloorSegmentSetting + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING, + tieredMergePolicyProvider::setFloorSegmentSetting ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, - mergePolicyConfig::setMaxMergesAtOnce + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING, + tieredMergePolicyProvider::setMaxMergesAtOnce ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, - mergePolicyConfig::setMaxMergedSegment + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING, + tieredMergePolicyProvider::setMaxMergedSegment ); scopedSettings.addSettingsUpdateConsumer( - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, - mergePolicyConfig::setSegmentsPerTier + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING, + tieredMergePolicyProvider::setSegmentsPerTier ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, + logByteSizeMergePolicyProvider::setLBSMergeFactor + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, + logByteSizeMergePolicyProvider::setLBSMinMergedMB + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeSegment + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeMBForForcedMerge + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING, + logByteSizeMergePolicyProvider::setLBSMaxMergeDocs + ); + scopedSettings.addSettingsUpdateConsumer( + LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING, + logByteSizeMergePolicyProvider::setLBSNoCFSRatio + ); scopedSettings.addSettingsUpdateConsumer( MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING, MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING, @@ -1439,9 +1491,14 @@ public long getGcDeletesInMillis() { /** * Returns the merge policy that should be used for this index. + * @param isTimeIndex true if index contains @timestamp field */ - public MergePolicy getMergePolicy() { - return mergePolicyConfig.getMergePolicy(); + public MergePolicy getMergePolicy(boolean isTimeIndex) { + if (isTimeIndex) { + return timeIndexMergePolicyProvider.getMergePolicy(); + } else { + return mergePolicyProvider.getMergePolicy(); + } } public T getValue(Setting setting) { @@ -1650,6 +1707,71 @@ private void setMergeOnFlushPolicy(String policy) { } } + private void setMergePolicyProvider(String indexScopedPolicy) { + if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { + this.mergePolicyProvider = tieredMergePolicyProvider; + } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + this.mergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { + this.mergePolicyProvider = defaultMergePolicyProvider; + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + indexScopedPolicy + + ". Please use one of: " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + if (logger.isTraceEnabled()) { + logger.trace("Merge policy used: " + mergePolicyProvider.toString()); + } + } + + private void setTimeIndexMergePolicy(String indexScopedPolicy, String nodeScopedTimeIndexPolicy) { + if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { + this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; + } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { + if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { + this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_TIME_INDEX_MERGE_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { + this.timeIndexMergePolicyProvider = defaultTimeIndexMergePolicyProvider; + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + nodeScopedTimeIndexPolicy + + ". Please use one of: " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + indexScopedPolicy + + ". Please use one of: " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + if (logger.isTraceEnabled()) { + logger.trace("Time index merge policy used: " + timeIndexMergePolicyProvider.toString()); + } + } + public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } diff --git a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java new file mode 100644 index 0000000000000..cf0ee2fff08e0 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java @@ -0,0 +1,171 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LogByteSizeMergePolicy; +import org.apache.lucene.index.MergePolicy; +import org.apache.lucene.index.NoMergePolicy; +import org.opensearch.common.settings.Setting; +import org.opensearch.core.common.unit.ByteSizeUnit; +import org.opensearch.core.common.unit.ByteSizeValue; + +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_MAX_MERGE_DOCS; +import static org.apache.lucene.index.LogMergePolicy.DEFAULT_NO_CFS_RATIO; + +/** + *

+ * The LogByteSizeMergePolicy is an alternative merge policy primarily used here to optimize the merging of segments in scenarios + * with index with timestamps. + * While the TieredMergePolicy is the default choice, the LogByteSizeMergePolicy can be configured + * as the default merge policy for time-index data using the index.datastream_merge.policy setting. + * + *

+ * Unlike the TieredMergePolicy, which prioritizes merging segments of equal sizes, the LogByteSizeMergePolicy + * specializes in merging adjacent segments efficiently. + * This characteristic makes it particularly well-suited for range queries on time-index data. + * Typically, adjacent segments in time-index data often contain documents with similar timestamps. + * When these segments are merged, the resulting segment covers a range of timestamps with reduced overlap compared + * to the adjacent segments. This reduced overlap remains even as segments grow older and larger, + * which can significantly benefit range queries on timestamps. + * + *

+ * In contrast, the TieredMergePolicy does not honor this timestamp range optimization. It focuses on merging segments + * of equal sizes and does not consider adjacency. Consequently, as segments grow older and larger, + * the overlap of timestamp ranges among adjacent segments managed by TieredMergePolicy can increase. + * This can lead to inefficiencies in range queries on timestamps, as the number of segments to be scanned + * within a given timestamp range could become high. + * + * @opensearch.internal + */ +public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { + private final LogByteSizeMergePolicy logByteSizeMergePolicy = new LogByteSizeMergePolicy(); + + private final Logger logger; + private final boolean mergesEnabled; + + public static final ByteSizeValue DEFAULT_MIN_MERGE_MB = new ByteSizeValue(2, ByteSizeUnit.MB); + public static final int DEFAULT_MERGE_FACTOR = 10; + + public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB); + + public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE = new ByteSizeValue( + Long.MAX_VALUE / ByteSizeUnit.GB.toBytes(1), + ByteSizeUnit.GB + ); + + // settings for LogByteSizeMergePolicy + + public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( + "index.merge.policy.log_byte_size.merge_factor", + DEFAULT_MERGE_FACTOR, // keeping it same as default max merge at once for tiered merge policy + 2, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.min_merge_mb", + DEFAULT_MIN_MERGE_MB, // keeping it same as default floor segment for tiered merge policy + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.max_merge_segment_mb", + DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge", + DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting( + "index.merge.policy.log_byte_size.max_merged_docs", + DEFAULT_MAX_MERGE_DOCS, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + public static final Setting INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>( + "index.merge.policy.log_byte_size.no_cfs_ratio", + Double.toString(DEFAULT_NO_CFS_RATIO), + TieredMergePolicyProvider::parseNoCFSRatio, + Setting.Property.Dynamic, + Setting.Property.IndexScope + ); + + LogByteSizeMergePolicyProvider(Logger logger, IndexSettings indexSettings) { + this.logger = logger; + this.mergesEnabled = indexSettings.getSettings().getAsBoolean(INDEX_MERGE_ENABLED, true); + + // Undocumented settings, works great with defaults + logByteSizeMergePolicy.setMergeFactor(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING)); + logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge( + indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING).getMbFrac() + ); + logByteSizeMergePolicy.setMaxMergeDocs(indexSettings.getValue(INDEX_LBS_MAX_MERGED_DOCS_SETTING)); + logByteSizeMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_LBS_NO_CFS_RATIO_SETTING)); + } + + @Override + public MergePolicy getMergePolicy() { + return mergesEnabled ? logByteSizeMergePolicy : NoMergePolicy.INSTANCE; + } + + void setLBSMergeFactor(int mergeFactor) { + logByteSizeMergePolicy.setMergeFactor(mergeFactor); + } + + void setLBSMaxMergeSegment(ByteSizeValue maxMergeSegment) { + logByteSizeMergePolicy.setMaxMergeMB(maxMergeSegment.getMbFrac()); + } + + void setLBSMinMergedMB(ByteSizeValue minMergedMB) { + logByteSizeMergePolicy.setMinMergeMB(minMergedMB.getMbFrac()); + } + + void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeMBForcedMerge) { + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeMBForcedMerge.getMbFrac()); + } + + void setLBSMaxMergeDocs(int maxMergeDocs) { + logByteSizeMergePolicy.setMaxMergeDocs(maxMergeDocs); + } + + void setLBSNoCFSRatio(Double noCFSRatio) { + logByteSizeMergePolicy.setNoCFSRatio(noCFSRatio); + } + + @Override + public String toString() { + return "LogByteSizeMergePolicyProvider{" + + "mergeFactor=" + + logByteSizeMergePolicy.getMergeFactor() + + ", minMergeMB=" + + logByteSizeMergePolicy.getMinMergeMB() + + ", maxMergeMB=" + + logByteSizeMergePolicy.getMaxMergeMB() + + ", maxMergeMBForForcedMerge=" + + logByteSizeMergePolicy.getMaxMergeMBForForcedMerge() + + ", maxMergedDocs=" + + logByteSizeMergePolicy.getMaxMergeDocs() + + ", noCFSRatio=" + + logByteSizeMergePolicy.getNoCFSRatio() + + '}'; + } + +} diff --git a/server/src/main/java/org/opensearch/index/MergePolicyProvider.java b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java new file mode 100644 index 0000000000000..1c2388fedd633 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java @@ -0,0 +1,26 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index; + +import org.apache.lucene.index.MergePolicy; + +/** + * A provider for obtaining merge policies used by OpenSearch indexes. + */ +public interface MergePolicyProvider { + // don't convert to Setting<> and register... we only set this in tests and register via a plugin + String INDEX_MERGE_ENABLED = "index.merge.enabled"; + + /** + * Gets the merge policy to be used for index. + * + * @return The merge policy instance. + */ + MergePolicy getMergePolicy(); +} diff --git a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java similarity index 83% rename from server/src/main/java/org/opensearch/index/MergePolicyConfig.java rename to server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java index fe2af21dfe039..2cfae46055206 100644 --- a/server/src/main/java/org/opensearch/index/MergePolicyConfig.java +++ b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java @@ -33,6 +33,7 @@ package org.opensearch.index; import org.apache.logging.log4j.Logger; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.MergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.apache.lucene.index.TieredMergePolicy; @@ -47,9 +48,12 @@ * where the index data is stored, and are immutable up to delete markers. * Segments are, periodically, merged into larger segments to keep the * index size at bay and expunge deletes. + * This class customizes and exposes 2 merge policies from lucene - + * {@link LogByteSizeMergePolicy} and {@link TieredMergePolicy}. + * * *

- * Merges select segments of approximately equal size, subject to an allowed + * Tiered merge policy select segments of approximately equal size, subject to an allowed * number of segments per tier. The merge policy is able to merge * non-adjacent segments, and separates how many segments are merged at once from how many * segments are allowed per tier. It also does not over-merge (i.e., cascade merges). @@ -125,8 +129,9 @@ * @opensearch.internal */ -public final class MergePolicyConfig { - private final OpenSearchTieredMergePolicy mergePolicy = new OpenSearchTieredMergePolicy(); +public final class TieredMergePolicyProvider implements MergePolicyProvider { + private final OpenSearchTieredMergePolicy tieredMergePolicy = new OpenSearchTieredMergePolicy(); + private final Logger logger; private final boolean mergesEnabled; @@ -137,10 +142,11 @@ public final class MergePolicyConfig { public static final double DEFAULT_SEGMENTS_PER_TIER = 10.0d; public static final double DEFAULT_RECLAIM_DELETES_WEIGHT = 2.0d; public static final double DEFAULT_DELETES_PCT_ALLOWED = 20.0d; + public static final Setting INDEX_COMPOUND_FORMAT_SETTING = new Setting<>( "index.compound_format", Double.toString(TieredMergePolicy.DEFAULT_NO_CFS_RATIO), - MergePolicyConfig::parseNoCFSRatio, + TieredMergePolicyProvider::parseNoCFSRatio, Property.Dynamic, Property.IndexScope ); @@ -194,10 +200,8 @@ public final class MergePolicyConfig { Property.Dynamic, Property.IndexScope ); - // don't convert to Setting<> and register... we only set this in tests and register via a plugin - public static final String INDEX_MERGE_ENABLED = "index.merge.enabled"; - MergePolicyConfig(Logger logger, IndexSettings indexSettings) { + TieredMergePolicyProvider(Logger logger, IndexSettings indexSettings) { this.logger = logger; double forceMergeDeletesPctAllowed = indexSettings.getValue(INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING); // percentage ByteSizeValue floorSegment = indexSettings.getValue(INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING); @@ -216,13 +220,14 @@ public final class MergePolicyConfig { ); } maxMergeAtOnce = adjustMaxMergeAtOnceIfNeeded(maxMergeAtOnce, segmentsPerTier); - mergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); - mergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); - mergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); - mergePolicy.setSegmentsPerTier(segmentsPerTier); - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); + tieredMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_COMPOUND_FORMAT_SETTING)); + tieredMergePolicy.setForceMergeDeletesPctAllowed(forceMergeDeletesPctAllowed); + tieredMergePolicy.setFloorSegmentMB(floorSegment.getMbFrac()); + tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + tieredMergePolicy.setSegmentsPerTier(segmentsPerTier); + tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); + if (logger.isTraceEnabled()) { logger.trace( "using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}]," @@ -239,31 +244,31 @@ public final class MergePolicyConfig { } void setSegmentsPerTier(Double segmentsPerTier) { - mergePolicy.setSegmentsPerTier(segmentsPerTier); + tieredMergePolicy.setSegmentsPerTier(segmentsPerTier); } void setMaxMergedSegment(ByteSizeValue maxMergedSegment) { - mergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); + tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); } void setMaxMergesAtOnce(Integer maxMergeAtOnce) { - mergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); + tieredMergePolicy.setMaxMergeAtOnce(maxMergeAtOnce); } void setFloorSegmentSetting(ByteSizeValue floorSegementSetting) { - mergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); + tieredMergePolicy.setFloorSegmentMB(floorSegementSetting.getMbFrac()); } void setExpungeDeletesAllowed(Double value) { - mergePolicy.setForceMergeDeletesPctAllowed(value); + tieredMergePolicy.setForceMergeDeletesPctAllowed(value); } void setNoCFSRatio(Double noCFSRatio) { - mergePolicy.setNoCFSRatio(noCFSRatio); + tieredMergePolicy.setNoCFSRatio(noCFSRatio); } void setDeletesPctAllowed(Double deletesPctAllowed) { - mergePolicy.setDeletesPctAllowed(deletesPctAllowed); + tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); } private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerTier) { @@ -285,11 +290,11 @@ private int adjustMaxMergeAtOnceIfNeeded(int maxMergeAtOnce, double segmentsPerT return maxMergeAtOnce; } - MergePolicy getMergePolicy() { - return mergesEnabled ? mergePolicy : NoMergePolicy.INSTANCE; + public MergePolicy getMergePolicy() { + return mergesEnabled ? tieredMergePolicy : NoMergePolicy.INSTANCE; } - private static double parseNoCFSRatio(String noCFSRatio) { + public static double parseNoCFSRatio(String noCFSRatio) { noCFSRatio = noCFSRatio.trim(); if (noCFSRatio.equalsIgnoreCase("true")) { return 1.0d; @@ -310,4 +315,23 @@ private static double parseNoCFSRatio(String noCFSRatio) { } } } + + @Override + public String toString() { + return "TieredMergePolicyProvider{" + + "expungeDeletesAllowed=" + + tieredMergePolicy.getForceMergeDeletesPctAllowed() + + ", floorSegment=" + + tieredMergePolicy.getFloorSegmentMB() + + ", maxMergeAtOnce=" + + tieredMergePolicy.getMaxMergeAtOnce() + + ", maxMergedSegment=" + + tieredMergePolicy.getMaxMergedSegmentMB() + + ", segmentsPerTier=" + + tieredMergePolicy.getSegmentsPerTier() + + ", deletesPctAllowed=" + + tieredMergePolicy.getDeletesPctAllowed() + + '}'; + } + } diff --git a/server/src/main/java/org/opensearch/index/shard/IndexShard.java b/server/src/main/java/org/opensearch/index/shard/IndexShard.java index d476e8b7c9288..5ce066b156775 100644 --- a/server/src/main/java/org/opensearch/index/shard/IndexShard.java +++ b/server/src/main/java/org/opensearch/index/shard/IndexShard.java @@ -3772,7 +3772,7 @@ private EngineConfig newEngineConfig(LongSupplier globalCheckpointSupplier) thro indexSettings, warmer, store, - indexSettings.getMergePolicy(), + indexSettings.getMergePolicy(isTimeSeriesIndex), mapperService != null ? mapperService.indexAnalyzer() : null, similarityService.similarity(mapperService), engineConfigFactory.newCodecServiceOrDefault(indexSettings, mapperService, logger, codecService), diff --git a/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java b/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java index 67846efab2af8..d35c821b41aa0 100644 --- a/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java +++ b/server/src/test/java/org/opensearch/action/admin/indices/segments/IndicesSegmentsRequestTests.java @@ -34,7 +34,7 @@ import org.opensearch.action.support.IndicesOptions; import org.opensearch.common.settings.Settings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.indices.IndexClosedException; import org.opensearch.plugins.Plugin; import org.opensearch.test.InternalSettingsPlugin; @@ -56,7 +56,7 @@ protected Collection> getPlugins() { public void setupIndex() { Settings settings = Settings.builder() // don't allow any merges so that the num docs is the expected segments - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .build(); createIndex("test", settings); diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 387997892ee30..7b39191482813 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -31,6 +31,7 @@ package org.opensearch.index; +import org.apache.lucene.index.LogByteSizeMergePolicy; import org.apache.lucene.index.NoMergePolicy; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeUnit; @@ -49,17 +50,17 @@ public class MergePolicySettingsTests extends OpenSearchTestCase { protected final ShardId shardId = new ShardId("index", "_na_", 1); public void testCompoundFileSettings() throws IOException { - assertThat(new MergePolicyConfig(logger, indexSettings(Settings.EMPTY)).getMergePolicy().getNoCFSRatio(), equalTo(0.1)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(true))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0.5))).getMergePolicy().getNoCFSRatio(), equalTo(0.5)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(1.0))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("true"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("True"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("False"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build("false"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(false))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); - assertThat(new MergePolicyConfig(logger, indexSettings(build(0.0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(Settings.EMPTY)).getMergePolicy().getNoCFSRatio(), equalTo(0.1)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(true))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0.5))).getMergePolicy().getNoCFSRatio(), equalTo(0.5)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(1.0))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("true"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("True"))).getMergePolicy().getNoCFSRatio(), equalTo(1.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("False"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build("false"))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(false))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); + assertThat(new TieredMergePolicyProvider(logger, indexSettings(build(0.0))).getMergePolicy().getNoCFSRatio(), equalTo(0.0)); } private static IndexSettings indexSettings(Settings settings) { @@ -67,33 +68,176 @@ private static IndexSettings indexSettings(Settings settings) { } public void testNoMerges() { - MergePolicyConfig mp = new MergePolicyConfig( + TieredMergePolicyProvider tmp = new TieredMergePolicyProvider( logger, - indexSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false).build()) + indexSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false).build()) ); - assertTrue(mp.getMergePolicy() instanceof NoMergePolicy); + LogByteSizeMergePolicyProvider lbsmp = new LogByteSizeMergePolicyProvider( + logger, + indexSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false).build()) + ); + assertTrue(tmp.getMergePolicy() instanceof NoMergePolicy); + assertTrue(lbsmp.getMergePolicy() instanceof NoMergePolicy); } public void testUpdateSettings() throws IOException { - IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); - assertThat(indexSettings.getMergePolicy().getNoCFSRatio(), equalTo(0.1)); + Settings settings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + IndexSettings indexSettings = indexSettings(settings); + assertThat(indexSettings.getMergePolicy(false).getNoCFSRatio(), equalTo(0.1)); indexSettings = indexSettings(build(0.9)); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.9)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.9)); indexSettings.updateIndexMetadata(newIndexMeta("index", build(0.1))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.1)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.1)); indexSettings.updateIndexMetadata(newIndexMeta("index", build(0.0))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.0)); indexSettings.updateIndexMetadata(newIndexMeta("index", build("true"))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(1.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(1.0)); indexSettings.updateIndexMetadata(newIndexMeta("index", build("false"))); - assertThat((indexSettings.getMergePolicy()).getNoCFSRatio(), equalTo(0.0)); + assertThat((indexSettings.getMergePolicy(false)).getNoCFSRatio(), equalTo(0.0)); + } + + public void testDefaultMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + } + + public void testMergePolicyPrecedence() throws IOException { + // 1. INDEX_MERGE_POLICY is not set + // assert defaults + IndexSettings indexSettings = indexSettings(EMPTY_SETTINGS); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 1.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size + // assert index policy is tiered whereas time index policy is log_byte_size + Settings nodeSettings = Settings.builder() + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY) + .build(); + indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + // 1.2 node setting TIME_INDEX_MERGE_POLICY is set as tiered + // assert both index and time index policy is tiered + nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 2. INDEX_MERGE_POLICY set as tiered + // assert both index and time-index merge policy is set as tiered + indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build() + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 2.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size + // assert both index and time-index merge policy is set as tiered + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY) + .build(); + indexSettings = new IndexSettings( + newIndexMeta( + "test", + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build() + ), + nodeSettings + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); + + // 3. INDEX_MERGE_POLICY set as log_byte_size + // assert both index and time-index merge policy is set as log_byte_size + indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + // 3.1 node setting TIME_INDEX_MERGE_POLICY is set as tiered + // assert both index and time-index merge policy is set as log_byte_size + nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + indexSettings = new IndexSettings( + newIndexMeta( + "test", + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + ), + nodeSettings + ); + assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + + } + + public void testInvalidMergePolicy() throws IOException { + IllegalArgumentException exc1 = expectThrows( + IllegalArgumentException.class, + () -> indexSettings(Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build()) + ); + + assertThat(exc1.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); + + Settings nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); + IllegalArgumentException exc2 = expectThrows( + IllegalArgumentException.class, + () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings) + ); + assertThat( + exc2.getMessage(), + containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") + ); + } + + public void testUpdateSettingsForLogByteSizeMergePolicy() throws IOException { + IndexSettings indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + ); + assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); + assertThat(indexSettings.getMergePolicy(true).getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.9) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.9)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.1)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.0) + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "true") + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(1.0)); + indexSettings = indexSettings( + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), "false") + .build() + ); + assertThat((indexSettings.getMergePolicy(true)).getNoCFSRatio(), equalTo(0.0)); } public void testTieredMergePolicySettingsUpdate() throws IOException { IndexSettings indexSettings = indexSettings(Settings.EMPTY); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d ); @@ -102,21 +246,21 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING.getKey(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d + TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED + 1.0d, 0.0d ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMbFrac(), 0 ); indexSettings.updateIndexMetadata( @@ -124,41 +268,41 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB) + TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING.getKey(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB) ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), 0.001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE ); indexSettings.updateIndexMetadata( newIndexMeta( "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE - 1 + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE - 1 ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE - 1 + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE - 1 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMbFrac(), 0.0001 ); indexSettings.updateIndexMetadata( @@ -166,21 +310,21 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1) + TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING.getKey(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1) ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), 0.0001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER, 0 ); indexSettings.updateIndexMetadata( @@ -188,37 +332,37 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { "index", Settings.builder() .put( - MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1 + TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1 ) .build() ) ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER + 1, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER + 1, 0 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_DELETES_PCT_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_DELETES_PCT_ALLOWED, 0 ); indexSettings.updateIndexMetadata( newIndexMeta( "index", - Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 22).build() + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 22).build() ) ); - assertEquals(((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), 22, 0); + assertEquals(((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), 22, 0); IllegalArgumentException exc = expectThrows( IllegalArgumentException.class, () -> indexSettings.updateIndexMetadata( newIndexMeta( "index", - Settings.builder().put(MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 53).build() + Settings.builder().put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING.getKey(), 53).build() ) ) ); @@ -226,50 +370,159 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { assertThat(cause.getMessage(), containsString("must be <= 50.0")); indexSettings.updateIndexMetadata(newIndexMeta("index", EMPTY_SETTINGS)); // see if defaults are restored assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getForceMergeDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_EXPUNGE_DELETES_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getForceMergeDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_EXPUNGE_DELETES_ALLOWED, 0.0d ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getFloorSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_FLOOR_SEGMENT.getMb(), ByteSizeUnit.MB).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getFloorSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb(), ByteSizeUnit.MB).getMbFrac(), 0.00 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergeAtOnce(), - MergePolicyConfig.DEFAULT_MAX_MERGE_AT_ONCE + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergeAtOnce(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getMaxMergedSegmentMB(), - new ByteSizeValue(MergePolicyConfig.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getMaxMergedSegmentMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getBytes() + 1).getMbFrac(), 0.0001 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getSegmentsPerTier(), - MergePolicyConfig.DEFAULT_SEGMENTS_PER_TIER, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getSegmentsPerTier(), + TieredMergePolicyProvider.DEFAULT_SEGMENTS_PER_TIER, 0 ); assertEquals( - ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy()).getDeletesPctAllowed(), - MergePolicyConfig.DEFAULT_DELETES_PCT_ALLOWED, + ((OpenSearchTieredMergePolicy) indexSettings.getMergePolicy(false)).getDeletesPctAllowed(), + TieredMergePolicyProvider.DEFAULT_DELETES_PCT_ALLOWED, 0 ); } + public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { + + IndexSettings indexSettings = indexSettings( + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING.getKey(), + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR + 1 + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), + TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE + 1 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING.getKey(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_MB.getMb() + 1, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMinMergeMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING.getKey(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB) + ) + .build() + ) + ); + + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMB(), + new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put( + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING.getKey(), + new ByteSizeValue( + LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, + ByteSizeUnit.MB + ) + ) + .build() + ) + ); + assertEquals( + ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + 0.001 + ); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING.getKey(), 10000000) + .build() + ) + ); + assertEquals(((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeDocs(), 10000000); + + indexSettings.updateIndexMetadata( + newIndexMeta( + "index", + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") + .put(LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING.getKey(), 0.1) + .build() + ) + ); + assertEquals(indexSettings.getMergePolicy(true).getNoCFSRatio(), 0.1, 0.0); + } + public Settings build(String value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(double value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(int value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } public Settings build(boolean value) { - return Settings.builder().put(MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); + return Settings.builder().put(TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), value).build(); } } diff --git a/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java b/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java index 2443ee1ab40be..baaf584702f78 100644 --- a/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergeSchedulerSettingsTests.java @@ -92,8 +92,8 @@ public void testUpdateAutoThrottleSettings() throws Exception { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "2") .put(MergeSchedulerConfig.AUTO_THROTTLE_SETTING.getKey(), "true"); @@ -123,8 +123,8 @@ public void testUpdateMergeMaxThreadCount() throws Exception { .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1") .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") - .put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2") + .put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2") .put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "10000") .put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000"); IndexSettings settings = new IndexSettings(newIndexMeta("index", builder.build()), Settings.EMPTY); diff --git a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java index 9c8f9896850c6..c88c86d51be08 100644 --- a/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java +++ b/server/src/test/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandTests.java @@ -58,7 +58,7 @@ import org.opensearch.env.TestEnvironment; import org.opensearch.gateway.PersistedClusterStateService; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.engine.EngineConfigFactory; import org.opensearch.index.engine.EngineCreationFailureException; import org.opensearch.index.engine.InternalEngineFactory; @@ -134,7 +134,7 @@ public void setup() throws IOException { final Settings settings = Settings.builder() .put(IndexMetadata.SETTING_VERSION_CREATED, Version.CURRENT) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) .put(IndexMetadata.SETTING_INDEX_UUID, shardId.getIndex().getUUID()) .build(); diff --git a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java index 11d916616578d..ad90255a3cc3f 100644 --- a/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java +++ b/server/src/test/java/org/opensearch/indices/recovery/RecoveryTests.java @@ -54,7 +54,7 @@ import org.opensearch.core.common.bytes.BytesArray; import org.opensearch.core.xcontent.MediaTypeRegistry; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; +import org.opensearch.index.MergePolicyProvider; import org.opensearch.index.VersionType; import org.opensearch.index.engine.DocIdSeqNoAndSource; import org.opensearch.index.engine.Engine; @@ -168,7 +168,7 @@ public void testRecoveryWithOutOfOrderDeleteWithSoftDeletes() throws Exception { .put(IndexSettings.INDEX_SOFT_DELETES_RETENTION_OPERATIONS_SETTING.getKey(), 10) // If soft-deletes is enabled, delete#1 will be reclaimed because its segment (segment_1) is fully deleted // index#0 will be retained if merge is disabled; otherwise it will be reclaimed because gcp=3 and retained_ops=0 - .put(MergePolicyConfig.INDEX_MERGE_ENABLED, false) + .put(MergePolicyProvider.INDEX_MERGE_ENABLED, false) .build(); try (ReplicationGroup shards = createGroup(1, settings)) { shards.startAll(); diff --git a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java index 6e064f943ca07..0b80c6e577f95 100644 --- a/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java +++ b/test/framework/src/main/java/org/opensearch/test/OpenSearchIntegTestCase.java @@ -130,9 +130,9 @@ import org.opensearch.http.HttpInfo; import org.opensearch.index.IndexModule; import org.opensearch.index.IndexSettings; -import org.opensearch.index.MergePolicyConfig; import org.opensearch.index.MergeSchedulerConfig; import org.opensearch.index.MockEngineFactoryPlugin; +import org.opensearch.index.TieredMergePolicyProvider; import org.opensearch.index.codec.CodecService; import org.opensearch.index.engine.Segment; import org.opensearch.index.mapper.CompletionFieldMapper; @@ -500,7 +500,7 @@ protected Settings.Builder setRandomIndexSettings(Random random, Settings.Builde private static Settings.Builder setRandomIndexMergeSettings(Random random, Settings.Builder builder) { if (random.nextBoolean()) { builder.put( - MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING.getKey(), + TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING.getKey(), (random.nextBoolean() ? random.nextDouble() : random.nextBoolean()).toString() ); } From 54b00aa30b61e0612e0f6dca5ba0b522c34c2739 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Wed, 20 Sep 2023 19:36:49 -0700 Subject: [PATCH 2/9] remove the trace log not required anymore Signed-off-by: Rishabh Maurya --- .../index/TieredMergePolicyProvider.java | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java index 2cfae46055206..d5d354c6c960a 100644 --- a/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/TieredMergePolicyProvider.java @@ -227,20 +227,6 @@ public final class TieredMergePolicyProvider implements MergePolicyProvider { tieredMergePolicy.setMaxMergedSegmentMB(maxMergedSegment.getMbFrac()); tieredMergePolicy.setSegmentsPerTier(segmentsPerTier); tieredMergePolicy.setDeletesPctAllowed(deletesPctAllowed); - - if (logger.isTraceEnabled()) { - logger.trace( - "using [tiered] merge mergePolicy with expunge_deletes_allowed[{}], floor_segment[{}]," - + " max_merge_at_once[{}], max_merged_segment[{}], segments_per_tier[{}]," - + " deletes_pct_allowed[{}]", - forceMergeDeletesPctAllowed, - floorSegment, - maxMergeAtOnce, - maxMergedSegment, - segmentsPerTier, - deletesPctAllowed - ); - } } void setSegmentsPerTier(Double segmentsPerTier) { From cce2b2d33d6bbc22e690c404ea4007508c7a66f0 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 21 Sep 2023 14:54:09 -0700 Subject: [PATCH 3/9] Refactor the merge policy extraction logic Signed-off-by: Rishabh Maurya --- .../org/opensearch/index/IndexSettings.java | 182 +++++++++--------- .../index/LogByteSizeMergePolicyProvider.java | 14 +- .../index/MergePolicySettingsTests.java | 30 ++- 3 files changed, 121 insertions(+), 105 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 4938958154c68..b83673508e68b 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -83,12 +83,10 @@ */ @PublicApi(since = "1.0.0") public final class IndexSettings { - private static final String MERGE_ON_FLUSH_DEFAULT_POLICY = "default"; + private static final String DEFAULT = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; public static final String TIERED_MERGE_POLICY = "tiered"; public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; - private static final String DEFAULT_MERGE_POLICY = "default_merge_policy"; - private static final String DEFAULT_TIME_INDEX_MERGE_POLICY = "default_time_index_merge_policy"; public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -569,20 +567,53 @@ public final class IndexSettings { public static final Setting INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString( "index.merge_on_flush.policy", - MERGE_ON_FLUSH_DEFAULT_POLICY, + DEFAULT, Property.IndexScope, Property.Dynamic ); - public static final Setting INDEX_MERGE_POLICY = Setting.simpleString( - "index.merge.policy", - DEFAULT_MERGE_POLICY, - Property.IndexScope - ); + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT, policy -> { + if (!(policy.isEmpty() + || policy.equals(DEFAULT) + || policy.equals(TIERED_MERGE_POLICY) + || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + }, Property.IndexScope); public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( - "indices.time_index.merge.policy", - DEFAULT_TIME_INDEX_MERGE_POLICY, + "indices.time_index.default_index_merge_policy", + DEFAULT, + policy -> { + if (!(policy.isEmpty() + || policy.equals(DEFAULT) + || policy.equals(TIERED_MERGE_POLICY) + || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + throw new IllegalArgumentException( + "The " + + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + policy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + }, Property.NodeScope ); @@ -668,8 +699,6 @@ public final class IndexSettings { private final MergeSchedulerConfig mergeSchedulerConfig; private final TieredMergePolicyProvider tieredMergePolicyProvider; private final LogByteSizeMergePolicyProvider logByteSizeMergePolicyProvider; - private final MergePolicyProvider defaultMergePolicyProvider; - private final MergePolicyProvider defaultTimeIndexMergePolicyProvider; private final IndexSortConfig indexSortConfig; private final IndexScopedSettings scopedSettings; private long gcDeletesInMillis = DEFAULT_GC_DELETES.millis(); @@ -747,9 +776,6 @@ private void setRetentionLeaseMillis(final TimeValue retentionLease) { */ private volatile UnaryOperator mergeOnFlushPolicy; - private volatile MergePolicyProvider mergePolicyProvider; - private volatile MergePolicyProvider timeIndexMergePolicyProvider; - /** * Returns the default search fields for this index. */ @@ -867,10 +893,6 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti maxRegexLength = scopedSettings.get(MAX_REGEX_LENGTH_SETTING); this.tieredMergePolicyProvider = new TieredMergePolicyProvider(logger, this); this.logByteSizeMergePolicyProvider = new LogByteSizeMergePolicyProvider(logger, this); - this.defaultMergePolicyProvider = tieredMergePolicyProvider; - this.defaultTimeIndexMergePolicyProvider = tieredMergePolicyProvider; - setMergePolicyProvider(scopedSettings.get(INDEX_MERGE_POLICY)); - setTimeIndexMergePolicy(scopedSettings.get(INDEX_MERGE_POLICY), TIME_INDEX_MERGE_POLICY.get(nodeSettings)); this.indexSortConfig = new IndexSortConfig(this); searchIdleAfter = scopedSettings.get(INDEX_SEARCH_IDLE_AFTER); defaultPipeline = scopedSettings.get(DEFAULT_PIPELINE); @@ -1494,11 +1516,56 @@ public long getGcDeletesInMillis() { * @param isTimeIndex true if index contains @timestamp field */ public MergePolicy getMergePolicy(boolean isTimeIndex) { - if (isTimeIndex) { - return timeIndexMergePolicyProvider.getMergePolicy(); + String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY); + MergePolicyProvider mergePolicyProvider; + if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + mergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (indexScopedPolicy.equals(DEFAULT) || Strings.isEmpty(indexScopedPolicy)) { + if (!isTimeIndex) { + mergePolicyProvider = tieredMergePolicyProvider; + } else { + String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings); + if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { + mergePolicyProvider = logByteSizeMergePolicyProvider; + } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { + mergePolicyProvider = tieredMergePolicyProvider; + } else { + throw new IllegalArgumentException( + "The " + + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + nodeScopedTimeIndexPolicy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + } } else { - return mergePolicyProvider.getMergePolicy(); + throw new IllegalArgumentException( + "The " + + IndexSettings.INDEX_MERGE_POLICY.getKey() + + " has unsupported policy specified: " + + indexScopedPolicy + + ". Please use one of: " + + DEFAULT + + ", " + + TIERED_MERGE_POLICY + + ", " + + LOG_BYTE_SIZE_MERGE_POLICY + ); + } + if (logger.isTraceEnabled()) { + logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider.toString()); } + return mergePolicyProvider.getMergePolicy(); } public T getValue(Setting setting) { @@ -1689,7 +1756,7 @@ public boolean isMergeOnFlushEnabled() { } private void setMergeOnFlushPolicy(String policy) { - if (Strings.isEmpty(policy) || MERGE_ON_FLUSH_DEFAULT_POLICY.equalsIgnoreCase(policy)) { + if (Strings.isEmpty(policy) || DEFAULT.equalsIgnoreCase(policy)) { mergeOnFlushPolicy = null; } else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) { this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new; @@ -1700,78 +1767,13 @@ private void setMergeOnFlushPolicy(String policy) { + " has unsupported policy specified: " + policy + ". Please use one of: " - + MERGE_ON_FLUSH_DEFAULT_POLICY + + DEFAULT + ", " + MERGE_ON_FLUSH_MERGE_POLICY ); } } - private void setMergePolicyProvider(String indexScopedPolicy) { - if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { - this.mergePolicyProvider = tieredMergePolicyProvider; - } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { - this.mergePolicyProvider = defaultMergePolicyProvider; - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + indexScopedPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - if (logger.isTraceEnabled()) { - logger.trace("Merge policy used: " + mergePolicyProvider.toString()); - } - } - - private void setTimeIndexMergePolicy(String indexScopedPolicy, String nodeScopedTimeIndexPolicy) { - if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; - } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT_MERGE_POLICY) || Strings.isEmpty(indexScopedPolicy)) { - if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = tieredMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - this.timeIndexMergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_TIME_INDEX_MERGE_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { - this.timeIndexMergePolicyProvider = defaultTimeIndexMergePolicyProvider; - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + nodeScopedTimeIndexPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + indexScopedPolicy - + ". Please use one of: " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); - } - if (logger.isTraceEnabled()) { - logger.trace("Time index merge policy used: " + timeIndexMergePolicyProvider.toString()); - } - } - public Optional> getMergeOnFlushPolicy() { return Optional.ofNullable(mergeOnFlushPolicy); } diff --git a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java index cf0ee2fff08e0..0e66338148099 100644 --- a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java @@ -60,10 +60,8 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { ByteSizeUnit.GB ); - // settings for LogByteSizeMergePolicy - public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( - "index.merge.policy.log_byte_size.merge_factor", + "index.merge.log_byte_size_policy.merge_factor", DEFAULT_MERGE_FACTOR, // keeping it same as default max merge at once for tiered merge policy 2, Setting.Property.Dynamic, @@ -71,35 +69,35 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { ); public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.min_merge_mb", + "index.merge.log_byte_size_policy.min_merge_mb", DEFAULT_MIN_MERGE_MB, // keeping it same as default floor segment for tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.max_merge_segment_mb", + "index.merge.log_byte_size_policy.max_merge_segment_mb", DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( - "index.merge.policy.log_byte_size.max_merge_segment_mb_forced_merge", + "index.merge.log_byte_size_policy.max_merge_segment_mb_forced_merge", DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE, Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_MAX_MERGED_DOCS_SETTING = Setting.intSetting( - "index.merge.policy.log_byte_size.max_merged_docs", + "index.merge.log_byte_size_policy.max_merged_docs", DEFAULT_MAX_MERGE_DOCS, Setting.Property.Dynamic, Setting.Property.IndexScope ); public static final Setting INDEX_LBS_NO_CFS_RATIO_SETTING = new Setting<>( - "index.merge.policy.log_byte_size.no_cfs_ratio", + "index.merge.log_byte_size_policy.no_cfs_ratio", Double.toString(DEFAULT_NO_CFS_RATIO), TieredMergePolicyProvider::parseNoCFSRatio, Setting.Property.Dynamic, diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 7b39191482813..0ceaef60b7863 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -172,20 +172,35 @@ public void testMergePolicyPrecedence() throws IOException { } public void testInvalidMergePolicy() throws IOException { + + final Settings invalidSettings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc1 = expectThrows( IllegalArgumentException.class, - () -> indexSettings(Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "invalid").build()) + () -> IndexSettings.INDEX_MERGE_POLICY.get(invalidSettings) ); - assertThat(exc1.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); - - Settings nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc2 = expectThrows( IllegalArgumentException.class, - () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings) + () -> indexSettings(invalidSettings).getMergePolicy(false) + ); + assertThat(exc2.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); + + final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); + IllegalArgumentException exc3 = expectThrows( + IllegalArgumentException.class, + () -> IndexSettings.TIME_INDEX_MERGE_POLICY.get(invalidSettings2) + ); + assertThat( + exc3.getMessage(), + containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") + ); + + IllegalArgumentException exc4 = expectThrows( + IllegalArgumentException.class, + () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), invalidSettings2).getMergePolicy(true) ); assertThat( - exc2.getMessage(), + exc4.getMessage(), containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") ); } @@ -482,7 +497,8 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { ); assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(), - new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB) + .getMbFrac(), 0.001 ); From 65d88f6d9c4673357c15efb7dee2034409da9e96 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 21 Sep 2023 15:19:50 -0700 Subject: [PATCH 4/9] Rename constant DEFAULT to DEFAULT_POLICY Signed-off-by: Rishabh Maurya --- .../org/opensearch/index/IndexSettings.java | 28 +++++++++---------- 1 file changed, 14 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index b83673508e68b..015edd2f3304d 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -83,7 +83,7 @@ */ @PublicApi(since = "1.0.0") public final class IndexSettings { - private static final String DEFAULT = "default"; + private static final String DEFAULT_POLICY = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; public static final String TIERED_MERGE_POLICY = "tiered"; public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; @@ -567,14 +567,14 @@ public final class IndexSettings { public static final Setting INDEX_MERGE_ON_FLUSH_POLICY = Setting.simpleString( "index.merge_on_flush.policy", - DEFAULT, + DEFAULT_POLICY, Property.IndexScope, Property.Dynamic ); - public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT, policy -> { + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT_POLICY, policy -> { if (!(policy.isEmpty() - || policy.equals(DEFAULT) + || policy.equals(DEFAULT_POLICY) || policy.equals(TIERED_MERGE_POLICY) || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { throw new IllegalArgumentException( @@ -583,7 +583,7 @@ public final class IndexSettings { + " has unsupported policy specified: " + policy + ". Please use one of: " - + DEFAULT + + DEFAULT_POLICY + ", " + TIERED_MERGE_POLICY + ", " @@ -594,10 +594,10 @@ public final class IndexSettings { public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( "indices.time_index.default_index_merge_policy", - DEFAULT, + DEFAULT_POLICY, policy -> { if (!(policy.isEmpty() - || policy.equals(DEFAULT) + || policy.equals(DEFAULT_POLICY) || policy.equals(TIERED_MERGE_POLICY) || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { throw new IllegalArgumentException( @@ -606,7 +606,7 @@ public final class IndexSettings { + " has unsupported policy specified: " + policy + ". Please use one of: " - + DEFAULT + + DEFAULT_POLICY + ", " + TIERED_MERGE_POLICY + ", " @@ -1522,7 +1522,7 @@ public MergePolicy getMergePolicy(boolean isTimeIndex) { mergePolicyProvider = tieredMergePolicyProvider; } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT) || Strings.isEmpty(indexScopedPolicy)) { + } else if (indexScopedPolicy.equals(DEFAULT_POLICY) || Strings.isEmpty(indexScopedPolicy)) { if (!isTimeIndex) { mergePolicyProvider = tieredMergePolicyProvider; } else { @@ -1531,7 +1531,7 @@ public MergePolicy getMergePolicy(boolean isTimeIndex) { mergePolicyProvider = tieredMergePolicyProvider; } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { + } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { mergePolicyProvider = tieredMergePolicyProvider; } else { throw new IllegalArgumentException( @@ -1540,7 +1540,7 @@ public MergePolicy getMergePolicy(boolean isTimeIndex) { + " has unsupported policy specified: " + nodeScopedTimeIndexPolicy + ". Please use one of: " - + DEFAULT + + DEFAULT_POLICY + ", " + TIERED_MERGE_POLICY + ", " @@ -1555,7 +1555,7 @@ public MergePolicy getMergePolicy(boolean isTimeIndex) { + " has unsupported policy specified: " + indexScopedPolicy + ". Please use one of: " - + DEFAULT + + DEFAULT_POLICY + ", " + TIERED_MERGE_POLICY + ", " @@ -1756,7 +1756,7 @@ public boolean isMergeOnFlushEnabled() { } private void setMergeOnFlushPolicy(String policy) { - if (Strings.isEmpty(policy) || DEFAULT.equalsIgnoreCase(policy)) { + if (Strings.isEmpty(policy) || DEFAULT_POLICY.equalsIgnoreCase(policy)) { mergeOnFlushPolicy = null; } else if (MERGE_ON_FLUSH_MERGE_POLICY.equalsIgnoreCase(policy)) { this.mergeOnFlushPolicy = MergeOnFlushMergePolicy::new; @@ -1767,7 +1767,7 @@ private void setMergeOnFlushPolicy(String policy) { + " has unsupported policy specified: " + policy + ". Please use one of: " - + DEFAULT + + DEFAULT_POLICY + ", " + MERGE_ON_FLUSH_MERGE_POLICY ); From 50985923eb7e43aa44140b4d3b215030024028c3 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Fri, 22 Sep 2023 13:45:58 -0700 Subject: [PATCH 5/9] Simplify merge policy extraction and selection logic Signed-off-by: Rishabh Maurya --- .../org/opensearch/index/IndexSettings.java | 120 +++++++++--------- .../index/MergePolicySettingsTests.java | 42 ++++-- 2 files changed, 86 insertions(+), 76 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 015edd2f3304d..4ed39f7c1de87 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -54,9 +54,11 @@ import org.opensearch.node.Node; import org.opensearch.search.pipeline.SearchPipelineService; +import java.util.Arrays; import java.util.Collections; import java.util.List; import java.util.Locale; +import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -85,8 +87,32 @@ public final class IndexSettings { private static final String DEFAULT_POLICY = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; - public static final String TIERED_MERGE_POLICY = "tiered"; - public static final String LOG_BYTE_SIZE_MERGE_POLICY = "log_byte_size"; + + public enum IndexMergePolicy { + TIERED("tiered"), + LOG_BYTE_SIZE("log_byte_size"), + DEFAULT_POLICY(IndexSettings.DEFAULT_POLICY); + + private final String value; + + IndexMergePolicy(String value) { + this.value = value; + } + + public String getValue() { + return value; + } + + public static IndexMergePolicy fromString(String text) { + for (IndexMergePolicy policy : IndexMergePolicy.values()) { + if (policy.value.equals(text)) { + return policy; + } + } + return null; + } + } + public static final Setting> DEFAULT_FIELD_SETTING = Setting.listSetting( "index.query.default_field", Collections.singletonList("*"), @@ -573,21 +599,14 @@ public final class IndexSettings { ); public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT_POLICY, policy -> { - if (!(policy.isEmpty() - || policy.equals(DEFAULT_POLICY) - || policy.equals(TIERED_MERGE_POLICY) - || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + if (IndexMergePolicy.fromString(policy) == null) { throw new IllegalArgumentException( "The " + IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: " + policy + ". Please use one of: " - + DEFAULT_POLICY - + ", " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY + + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) ); } }, Property.IndexScope); @@ -596,21 +615,14 @@ public final class IndexSettings { "indices.time_index.default_index_merge_policy", DEFAULT_POLICY, policy -> { - if (!(policy.isEmpty() - || policy.equals(DEFAULT_POLICY) - || policy.equals(TIERED_MERGE_POLICY) - || policy.equals(LOG_BYTE_SIZE_MERGE_POLICY))) { + if (IndexMergePolicy.fromString(policy) == null) { throw new IllegalArgumentException( "The " + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: " + policy + ". Please use one of: " - + DEFAULT_POLICY - + ", " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY + + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) ); } }, @@ -1517,53 +1529,37 @@ public long getGcDeletesInMillis() { */ public MergePolicy getMergePolicy(boolean isTimeIndex) { String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY); - MergePolicyProvider mergePolicyProvider; - if (indexScopedPolicy.equals(TIERED_MERGE_POLICY)) { - mergePolicyProvider = tieredMergePolicyProvider; - } else if (indexScopedPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (indexScopedPolicy.equals(DEFAULT_POLICY) || Strings.isEmpty(indexScopedPolicy)) { - if (!isTimeIndex) { + MergePolicyProvider mergePolicyProvider = null; + IndexMergePolicy indexMergePolicy = IndexMergePolicy.fromString(indexScopedPolicy); + switch (Objects.requireNonNull(indexMergePolicy)) { + case TIERED: mergePolicyProvider = tieredMergePolicyProvider; - } else { - String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings); - if (nodeScopedTimeIndexPolicy.equals(TIERED_MERGE_POLICY)) { - mergePolicyProvider = tieredMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(LOG_BYTE_SIZE_MERGE_POLICY)) { - mergePolicyProvider = logByteSizeMergePolicyProvider; - } else if (nodeScopedTimeIndexPolicy.equals(DEFAULT_POLICY) || Strings.isEmpty(nodeScopedTimeIndexPolicy)) { - mergePolicyProvider = tieredMergePolicyProvider; + break; + case LOG_BYTE_SIZE: + mergePolicyProvider = logByteSizeMergePolicyProvider; + break; + case DEFAULT_POLICY: + if (isTimeIndex) { + String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings); + IndexMergePolicy nodeMergePolicy = IndexMergePolicy.fromString(nodeScopedTimeIndexPolicy); + switch (Objects.requireNonNull(nodeMergePolicy)) { + case TIERED: + case DEFAULT_POLICY: + mergePolicyProvider = tieredMergePolicyProvider; + break; + case LOG_BYTE_SIZE: + mergePolicyProvider = logByteSizeMergePolicyProvider; + break; + } } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + nodeScopedTimeIndexPolicy - + ". Please use one of: " - + DEFAULT_POLICY - + ", " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); + mergePolicyProvider = tieredMergePolicyProvider; } - } - } else { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + indexScopedPolicy - + ". Please use one of: " - + DEFAULT_POLICY - + ", " - + TIERED_MERGE_POLICY - + ", " - + LOG_BYTE_SIZE_MERGE_POLICY - ); + break; } + assert mergePolicyProvider != null : "should not happen as validation for invalid merge policy values " + + "are part of setting definition"; if (logger.isTraceEnabled()) { - logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider.toString()); + logger.trace("Index: " + this.index.getName() + ", Merge policy used: " + mergePolicyProvider); } return mergePolicyProvider.getMergePolicy(); } diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 0ceaef60b7863..5f9a0a50cf389 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -81,7 +81,9 @@ public void testNoMerges() { } public void testUpdateSettings() throws IOException { - Settings settings = Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + Settings settings = Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); IndexSettings indexSettings = indexSettings(settings); assertThat(indexSettings.getMergePolicy(false).getNoCFSRatio(), equalTo(0.1)); indexSettings = indexSettings(build(0.9)); @@ -112,7 +114,7 @@ public void testMergePolicyPrecedence() throws IOException { // 1.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size // assert index policy is tiered whereas time index policy is log_byte_size Settings nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY) + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) .build(); indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); @@ -120,7 +122,9 @@ public void testMergePolicyPrecedence() throws IOException { // 1.2 node setting TIME_INDEX_MERGE_POLICY is set as tiered // assert both index and time index policy is tiered - nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); @@ -128,7 +132,7 @@ public void testMergePolicyPrecedence() throws IOException { // 2. INDEX_MERGE_POLICY set as tiered // assert both index and time-index merge policy is set as tiered indexSettings = indexSettings( - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build() + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()).build() ); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); @@ -136,12 +140,12 @@ public void testMergePolicyPrecedence() throws IOException { // 2.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size // assert both index and time-index merge policy is set as tiered nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY) + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) .build(); indexSettings = new IndexSettings( newIndexMeta( "test", - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build() + Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()).build() ), nodeSettings ); @@ -151,18 +155,24 @@ public void testMergePolicyPrecedence() throws IOException { // 3. INDEX_MERGE_POLICY set as log_byte_size // assert both index and time-index merge policy is set as log_byte_size indexSettings = indexSettings( - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() ); assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); // 3.1 node setting TIME_INDEX_MERGE_POLICY is set as tiered // assert both index and time-index merge policy is set as log_byte_size - nodeSettings = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.TIERED_MERGE_POLICY).build(); + nodeSettings = Settings.builder() + .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .build(); indexSettings = new IndexSettings( newIndexMeta( "test", - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() ), nodeSettings ); @@ -207,7 +217,9 @@ public void testInvalidMergePolicy() throws IOException { public void testUpdateSettingsForLogByteSizeMergePolicy() throws IOException { IndexSettings indexSettings = indexSettings( - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() ); assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); assertThat(indexSettings.getMergePolicy(true).getNoCFSRatio(), equalTo(0.1)); @@ -418,11 +430,13 @@ public void testTieredMergePolicySettingsUpdate() throws IOException { public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { IndexSettings indexSettings = indexSettings( - Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.LOG_BYTE_SIZE_MERGE_POLICY).build() + Settings.builder() + .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .build() ); assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), - TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR ); indexSettings.updateIndexMetadata( @@ -439,7 +453,7 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { ); assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMergeFactor(), - TieredMergePolicyProvider.DEFAULT_MAX_MERGE_AT_ONCE + 1 + LogByteSizeMergePolicyProvider.DEFAULT_MERGE_FACTOR + 1 ); indexSettings.updateIndexMetadata( @@ -457,7 +471,7 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMinMergeMB(), - new ByteSizeValue(TieredMergePolicyProvider.DEFAULT_FLOOR_SEGMENT.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_MB.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), 0.001 ); From 32ab21ad14eccc3afaf54ec102de6d9ef0b3b2b2 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Fri, 22 Sep 2023 13:56:19 -0700 Subject: [PATCH 6/9] missing javadoc error Signed-off-by: Rishabh Maurya --- server/src/main/java/org/opensearch/index/IndexSettings.java | 3 +++ 1 file changed, 3 insertions(+) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 4ed39f7c1de87..586041417bc2c 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -88,6 +88,9 @@ public final class IndexSettings { private static final String DEFAULT_POLICY = "default"; private static final String MERGE_ON_FLUSH_MERGE_POLICY = "merge-on-flush"; + /** + * Enum representing supported merge policies + */ public enum IndexMergePolicy { TIERED("tiered"), LOG_BYTE_SIZE("log_byte_size"), From aa7e5fb03978e8cc028317158312dd9266e2a587 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Fri, 22 Sep 2023 17:44:58 -0700 Subject: [PATCH 7/9] Renaming log byte size policy setting with mb Signed-off-by: Rishabh Maurya --- .../common/settings/IndexScopedSettings.java | 6 +-- .../org/opensearch/index/IndexSettings.java | 6 +-- .../index/LogByteSizeMergePolicyProvider.java | 37 +++++++++---------- .../index/MergePolicySettingsTests.java | 14 +++---- 4 files changed, 30 insertions(+), 33 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java index 2c2eb1d71628f..83bf8c82ee3dd 100644 --- a/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java @@ -206,9 +206,9 @@ public final class IndexScopedSettings extends AbstractScopedSettings { IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY, IndexSettings.INDEX_MERGE_POLICY, LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING, - LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING, LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING, LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING, IndexSettings.DEFAULT_SEARCH_PIPELINE, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 586041417bc2c..8ac024e003f74 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -963,15 +963,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti logByteSizeMergePolicyProvider::setLBSMergeFactor ); scopedSettings.addSettingsUpdateConsumer( - LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING, logByteSizeMergePolicyProvider::setLBSMinMergedMB ); scopedSettings.addSettingsUpdateConsumer( - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING, logByteSizeMergePolicyProvider::setLBSMaxMergeSegment ); scopedSettings.addSettingsUpdateConsumer( - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING, + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING, logByteSizeMergePolicyProvider::setLBSMaxMergeMBForForcedMerge ); scopedSettings.addSettingsUpdateConsumer( diff --git a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java index 0e66338148099..0b762d781957c 100644 --- a/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/LogByteSizeMergePolicyProvider.java @@ -50,15 +50,12 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { private final Logger logger; private final boolean mergesEnabled; - public static final ByteSizeValue DEFAULT_MIN_MERGE_MB = new ByteSizeValue(2, ByteSizeUnit.MB); + public static final ByteSizeValue DEFAULT_MIN_MERGE = new ByteSizeValue(2, ByteSizeUnit.MB); public static final int DEFAULT_MERGE_FACTOR = 10; public static final ByteSizeValue DEFAULT_MAX_MERGED_SEGMENT = new ByteSizeValue(5, ByteSizeUnit.GB); - public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE = new ByteSizeValue( - Long.MAX_VALUE / ByteSizeUnit.GB.toBytes(1), - ByteSizeUnit.GB - ); + public static final ByteSizeValue DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE = new ByteSizeValue(Long.MAX_VALUE); public static final Setting INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING = Setting.intSetting( "index.merge.log_byte_size_policy.merge_factor", @@ -68,23 +65,23 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { Setting.Property.IndexScope ); - public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING = Setting.byteSizeSetting( - "index.merge.log_byte_size_policy.min_merge_mb", - DEFAULT_MIN_MERGE_MB, // keeping it same as default floor segment for tiered merge policy + public static final Setting INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.min_merge", + DEFAULT_MIN_MERGE, // keeping it same as default floor segment for tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); - public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING = Setting.byteSizeSetting( - "index.merge.log_byte_size_policy.max_merge_segment_mb", + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.max_merge_segment", DEFAULT_MAX_MERGED_SEGMENT, // keeping default same as tiered merge policy Setting.Property.Dynamic, Setting.Property.IndexScope ); - public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( - "index.merge.log_byte_size_policy.max_merge_segment_mb_forced_merge", - DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE, + public static final Setting INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING = Setting.byteSizeSetting( + "index.merge.log_byte_size_policy.max_merge_segment_forced_merge", + DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE, Setting.Property.Dynamic, Setting.Property.IndexScope ); @@ -110,10 +107,10 @@ public class LogByteSizeMergePolicyProvider implements MergePolicyProvider { // Undocumented settings, works great with defaults logByteSizeMergePolicy.setMergeFactor(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING)); - logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING).getMbFrac()); - logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMinMergeMB(indexSettings.getValue(INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING).getMbFrac()); + logByteSizeMergePolicy.setMaxMergeMB(indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_SETTING).getMbFrac()); logByteSizeMergePolicy.setMaxMergeMBForForcedMerge( - indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING).getMbFrac() + indexSettings.getValue(INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING).getMbFrac() ); logByteSizeMergePolicy.setMaxMergeDocs(indexSettings.getValue(INDEX_LBS_MAX_MERGED_DOCS_SETTING)); logByteSizeMergePolicy.setNoCFSRatio(indexSettings.getValue(INDEX_LBS_NO_CFS_RATIO_SETTING)); @@ -132,12 +129,12 @@ void setLBSMaxMergeSegment(ByteSizeValue maxMergeSegment) { logByteSizeMergePolicy.setMaxMergeMB(maxMergeSegment.getMbFrac()); } - void setLBSMinMergedMB(ByteSizeValue minMergedMB) { - logByteSizeMergePolicy.setMinMergeMB(minMergedMB.getMbFrac()); + void setLBSMinMergedMB(ByteSizeValue minMergedSize) { + logByteSizeMergePolicy.setMinMergeMB(minMergedSize.getMbFrac()); } - void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeMBForcedMerge) { - logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeMBForcedMerge.getMbFrac()); + void setLBSMaxMergeMBForForcedMerge(ByteSizeValue maxMergeForcedMerge) { + logByteSizeMergePolicy.setMaxMergeMBForForcedMerge(maxMergeForcedMerge.getMbFrac()); } void setLBSMaxMergeDocs(int maxMergeDocs) { diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 5f9a0a50cf389..2b6bff33a18ca 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -462,8 +462,8 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { Settings.builder() .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") .put( - LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_MB_SETTING.getKey(), - new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_MB.getMb() + 1, ByteSizeUnit.MB) + LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING.getKey(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE.getMb() + 1, ByteSizeUnit.MB) ) .build() ) @@ -471,7 +471,7 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMinMergeMB(), - new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE_MB.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MIN_MERGE.getMb() + 1, ByteSizeUnit.MB).getMbFrac(), 0.001 ); @@ -481,7 +481,7 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { Settings.builder() .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") .put( - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_SETTING.getKey(), + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING.getKey(), new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGED_SEGMENT.getMb() + 100, ByteSizeUnit.MB) ) .build() @@ -500,9 +500,9 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { Settings.builder() .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), "log_byte_size") .put( - LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_MB_FOR_FORCED_MERGE_SETTING.getKey(), + LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING.getKey(), new ByteSizeValue( - LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, + LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE.getMb() - 100, ByteSizeUnit.MB ) ) @@ -511,7 +511,7 @@ public void testLogByteSizeMergePolicySettingsUpdate() throws IOException { ); assertEquals( ((LogByteSizeMergePolicy) indexSettings.getMergePolicy(true)).getMaxMergeMBForForcedMerge(), - new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_MB_FORCE_MERGE.getMb() + 1, ByteSizeUnit.MB) + new ByteSizeValue(LogByteSizeMergePolicyProvider.DEFAULT_MAX_MERGE_SEGMENT_FORCE_MERGE.getMb() - 100, ByteSizeUnit.MB) .getMbFrac(), 0.001 ); From 8040c7650893ab809d61c89e3c0af19df027d964 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Mon, 25 Sep 2023 11:50:24 -0700 Subject: [PATCH 8/9] Move validation exception to enum from setting defn Signed-off-by: Rishabh Maurya --- .../org/opensearch/index/IndexSettings.java | 38 +++++++------------ .../index/MergePolicySettingsTests.java | 14 ++----- 2 files changed, 17 insertions(+), 35 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index 8ac024e003f74..ac6ba8d688c45 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -112,7 +112,12 @@ public static IndexMergePolicy fromString(String text) { return policy; } } - return null; + throw new IllegalArgumentException( + "The setting has unsupported policy specified: " + + text + + ". Please use one of: " + + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) + ); } } @@ -601,34 +606,17 @@ public static IndexMergePolicy fromString(String text) { Property.Dynamic ); - public static final Setting INDEX_MERGE_POLICY = Setting.simpleString("index.merge.policy", DEFAULT_POLICY, policy -> { - if (IndexMergePolicy.fromString(policy) == null) { - throw new IllegalArgumentException( - "The " - + IndexSettings.INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + policy - + ". Please use one of: " - + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) - ); - } - }, Property.IndexScope); + public static final Setting INDEX_MERGE_POLICY = Setting.simpleString( + "index.merge.policy", + DEFAULT_POLICY, + IndexMergePolicy::fromString, + Property.IndexScope + ); public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( "indices.time_index.default_index_merge_policy", DEFAULT_POLICY, - policy -> { - if (IndexMergePolicy.fromString(policy) == null) { - throw new IllegalArgumentException( - "The " - + IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() - + " has unsupported policy specified: " - + policy - + ". Please use one of: " - + String.join(", ", Arrays.stream(IndexMergePolicy.values()).map(IndexMergePolicy::getValue).toArray(String[]::new)) - ); - } - }, + IndexMergePolicy::fromString, Property.NodeScope ); diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index 2b6bff33a18ca..a98c00879fbc1 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -188,31 +188,25 @@ public void testInvalidMergePolicy() throws IOException { IllegalArgumentException.class, () -> IndexSettings.INDEX_MERGE_POLICY.get(invalidSettings) ); - assertThat(exc1.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); + assertThat(exc1.getMessage(), containsString(" has unsupported policy specified: ")); IllegalArgumentException exc2 = expectThrows( IllegalArgumentException.class, () -> indexSettings(invalidSettings).getMergePolicy(false) ); - assertThat(exc2.getMessage(), containsString(IndexSettings.INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ")); + assertThat(exc2.getMessage(), containsString(" has unsupported policy specified: ")); final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc3 = expectThrows( IllegalArgumentException.class, () -> IndexSettings.TIME_INDEX_MERGE_POLICY.get(invalidSettings2) ); - assertThat( - exc3.getMessage(), - containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") - ); + assertThat(exc3.getMessage(), containsString(" has unsupported policy specified: ")); IllegalArgumentException exc4 = expectThrows( IllegalArgumentException.class, () -> new IndexSettings(newIndexMeta("test", Settings.EMPTY), invalidSettings2).getMergePolicy(true) ); - assertThat( - exc4.getMessage(), - containsString(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey() + " has unsupported policy specified: ") - ); + assertThat(exc4.getMessage(), containsString(" has unsupported policy specified: ")); } public void testUpdateSettingsForLogByteSizeMergePolicy() throws IOException { From 7caa6210a574ba83e18f960f3e314c1f73d70eb6 Mon Sep 17 00:00:00 2001 From: Rishabh Maurya Date: Thu, 28 Sep 2023 11:01:21 -0700 Subject: [PATCH 9/9] rename time_index to time_series_index Signed-off-by: Rishabh Maurya --- .../common/settings/ClusterSettings.java | 2 +- .../org/opensearch/index/IndexSettings.java | 19 ++++++----- .../opensearch/index/MergePolicyProvider.java | 5 +++ .../index/MergePolicySettingsTests.java | 32 +++++++++---------- 4 files changed, 31 insertions(+), 27 deletions(-) diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 0861657005acc..5261d40387dc6 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -457,7 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) { NetworkService.TCP_CONNECT_TIMEOUT, IndexSettings.QUERY_STRING_ANALYZE_WILDCARD, IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD, - IndexSettings.TIME_INDEX_MERGE_POLICY, + IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY, ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING, ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING, ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING, diff --git a/server/src/main/java/org/opensearch/index/IndexSettings.java b/server/src/main/java/org/opensearch/index/IndexSettings.java index ac6ba8d688c45..ce6c1a5ad6284 100644 --- a/server/src/main/java/org/opensearch/index/IndexSettings.java +++ b/server/src/main/java/org/opensearch/index/IndexSettings.java @@ -58,7 +58,6 @@ import java.util.Collections; import java.util.List; import java.util.Locale; -import java.util.Objects; import java.util.Optional; import java.util.concurrent.TimeUnit; import java.util.function.Consumer; @@ -613,8 +612,8 @@ public static IndexMergePolicy fromString(String text) { Property.IndexScope ); - public static final Setting TIME_INDEX_MERGE_POLICY = Setting.simpleString( - "indices.time_index.default_index_merge_policy", + public static final Setting TIME_SERIES_INDEX_MERGE_POLICY = Setting.simpleString( + "indices.time_series_index.default_index_merge_policy", DEFAULT_POLICY, IndexMergePolicy::fromString, Property.NodeScope @@ -1516,13 +1515,13 @@ public long getGcDeletesInMillis() { /** * Returns the merge policy that should be used for this index. - * @param isTimeIndex true if index contains @timestamp field + * @param isTimeSeriesIndex true if index contains @timestamp field */ - public MergePolicy getMergePolicy(boolean isTimeIndex) { + public MergePolicy getMergePolicy(boolean isTimeSeriesIndex) { String indexScopedPolicy = scopedSettings.get(INDEX_MERGE_POLICY); MergePolicyProvider mergePolicyProvider = null; IndexMergePolicy indexMergePolicy = IndexMergePolicy.fromString(indexScopedPolicy); - switch (Objects.requireNonNull(indexMergePolicy)) { + switch (indexMergePolicy) { case TIERED: mergePolicyProvider = tieredMergePolicyProvider; break; @@ -1530,10 +1529,10 @@ public MergePolicy getMergePolicy(boolean isTimeIndex) { mergePolicyProvider = logByteSizeMergePolicyProvider; break; case DEFAULT_POLICY: - if (isTimeIndex) { - String nodeScopedTimeIndexPolicy = TIME_INDEX_MERGE_POLICY.get(nodeSettings); - IndexMergePolicy nodeMergePolicy = IndexMergePolicy.fromString(nodeScopedTimeIndexPolicy); - switch (Objects.requireNonNull(nodeMergePolicy)) { + if (isTimeSeriesIndex) { + String nodeScopedTimeSeriesIndexPolicy = TIME_SERIES_INDEX_MERGE_POLICY.get(nodeSettings); + IndexMergePolicy nodeMergePolicy = IndexMergePolicy.fromString(nodeScopedTimeSeriesIndexPolicy); + switch (nodeMergePolicy) { case TIERED: case DEFAULT_POLICY: mergePolicyProvider = tieredMergePolicyProvider; diff --git a/server/src/main/java/org/opensearch/index/MergePolicyProvider.java b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java index 1c2388fedd633..6f734314f758f 100644 --- a/server/src/main/java/org/opensearch/index/MergePolicyProvider.java +++ b/server/src/main/java/org/opensearch/index/MergePolicyProvider.java @@ -9,10 +9,15 @@ package org.opensearch.index; import org.apache.lucene.index.MergePolicy; +import org.opensearch.common.annotation.InternalApi; /** * A provider for obtaining merge policies used by OpenSearch indexes. + * + * @opensearch.internal */ + +@InternalApi public interface MergePolicyProvider { // don't convert to Setting<> and register... we only set this in tests and register via a plugin String INDEX_MERGE_ENABLED = "index.merge.enabled"; diff --git a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java index a98c00879fbc1..32c4c048d77ba 100644 --- a/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java +++ b/server/src/test/java/org/opensearch/index/MergePolicySettingsTests.java @@ -111,36 +111,36 @@ public void testMergePolicyPrecedence() throws IOException { assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); - // 1.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size - // assert index policy is tiered whereas time index policy is log_byte_size + // 1.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as log_byte_size + // assert index policy is tiered whereas time series index policy is log_byte_size Settings nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) .build(); indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); - // 1.2 node setting TIME_INDEX_MERGE_POLICY is set as tiered - // assert both index and time index policy is tiered + // 1.2 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as tiered + // assert both index and time series index policy is tiered nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) .build(); indexSettings = new IndexSettings(newIndexMeta("test", Settings.EMPTY), nodeSettings); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); // 2. INDEX_MERGE_POLICY set as tiered - // assert both index and time-index merge policy is set as tiered + // assert both index and time-series-index merge policy is set as tiered indexSettings = indexSettings( Settings.builder().put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()).build() ); assertTrue(indexSettings.getMergePolicy(false) instanceof OpenSearchTieredMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); - // 2.1 node setting TIME_INDEX_MERGE_POLICY is set as log_byte_size - // assert both index and time-index merge policy is set as tiered + // 2.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as log_byte_size + // assert both index and time-series-index merge policy is set as tiered nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) .build(); indexSettings = new IndexSettings( newIndexMeta( @@ -153,7 +153,7 @@ public void testMergePolicyPrecedence() throws IOException { assertTrue(indexSettings.getMergePolicy(true) instanceof OpenSearchTieredMergePolicy); // 3. INDEX_MERGE_POLICY set as log_byte_size - // assert both index and time-index merge policy is set as log_byte_size + // assert both index and time-series-index merge policy is set as log_byte_size indexSettings = indexSettings( Settings.builder() .put(IndexSettings.INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.LOG_BYTE_SIZE.getValue()) @@ -162,10 +162,10 @@ public void testMergePolicyPrecedence() throws IOException { assertTrue(indexSettings.getMergePolicy(false) instanceof LogByteSizeMergePolicy); assertTrue(indexSettings.getMergePolicy(true) instanceof LogByteSizeMergePolicy); - // 3.1 node setting TIME_INDEX_MERGE_POLICY is set as tiered - // assert both index and time-index merge policy is set as log_byte_size + // 3.1 node setting TIME_SERIES_INDEX_MERGE_POLICY is set as tiered + // assert both index and time-series-index merge policy is set as log_byte_size nodeSettings = Settings.builder() - .put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) + .put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), IndexSettings.IndexMergePolicy.TIERED.getValue()) .build(); indexSettings = new IndexSettings( newIndexMeta( @@ -195,10 +195,10 @@ public void testInvalidMergePolicy() throws IOException { ); assertThat(exc2.getMessage(), containsString(" has unsupported policy specified: ")); - final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_INDEX_MERGE_POLICY.getKey(), "invalid").build(); + final Settings invalidSettings2 = Settings.builder().put(IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.getKey(), "invalid").build(); IllegalArgumentException exc3 = expectThrows( IllegalArgumentException.class, - () -> IndexSettings.TIME_INDEX_MERGE_POLICY.get(invalidSettings2) + () -> IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY.get(invalidSettings2) ); assertThat(exc3.getMessage(), containsString(" has unsupported policy specified: "));