Skip to content

Commit fa66beb

Browse files
Use of LogByteSizeMergePolicy for data stream use cases (opensearch-project#9992)
* Configurable merge policy for index * additional setting to configure merge policy for timestamp based index * introduction of logbytesize merge policy as an option Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * remove the trace log not required anymore Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * Refactor the merge policy extraction logic Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * Rename constant DEFAULT to DEFAULT_POLICY Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * Simplify merge policy extraction and selection logic Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * missing javadoc error Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * Renaming log byte size policy setting with mb Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * Move validation exception to enum from setting defn Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> * rename time_index to time_series_index Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com> --------- Signed-off-by: Rishabh Maurya <rishabhmaurya05@gmail.com>
1 parent 6003560 commit fa66beb

19 files changed

Lines changed: 776 additions & 171 deletions

File tree

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1313
- Implement on behalf of token passing for extensions ([#8679](https://github.com/opensearch-project/OpenSearch/pull/8679))
1414
- Implement Visitor Design pattern in QueryBuilder to enable the capability to traverse through the complex QueryBuilder tree. ([#10110](https://github.com/opensearch-project/OpenSearch/pull/10110))
1515
- Provide service accounts tokens to extensions ([#9618](https://github.com/opensearch-project/OpenSearch/pull/9618))
16+
- Configurable merge policy for index with an option to choose from LogByteSize and Tiered merge policy ([#9992](https://github.com/opensearch-project/OpenSearch/pull/9992))
1617

1718
### Dependencies
1819
- Bump `log4j-core` from 2.18.0 to 2.19.0

server/src/internalClusterTest/java/org/opensearch/gateway/RecoveryFromGatewayIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,7 +53,7 @@
5353
import org.opensearch.env.NodeEnvironment;
5454
import org.opensearch.index.IndexService;
5555
import org.opensearch.index.IndexSettings;
56-
import org.opensearch.index.MergePolicyConfig;
56+
import org.opensearch.index.MergePolicyProvider;
5757
import org.opensearch.index.engine.Engine;
5858
import org.opensearch.index.query.QueryBuilders;
5959
import org.opensearch.index.shard.ShardPath;
@@ -519,7 +519,7 @@ public void testReuseInFileBasedPeerRecovery() throws Exception {
519519
.put("number_of_replicas", 1)
520520

521521
// disable merges to keep segments the same
522-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
522+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
523523

524524
// expire retention leases quickly
525525
.put(IndexService.RETENTION_LEASE_SYNC_INTERVAL_SETTING.getKey(), "100ms")

server/src/internalClusterTest/java/org/opensearch/index/shard/RemoveCorruptedShardDataCommandIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,7 @@
7373
import org.opensearch.env.TestEnvironment;
7474
import org.opensearch.gateway.GatewayMetaState;
7575
import org.opensearch.index.IndexSettings;
76-
import org.opensearch.index.MergePolicyConfig;
76+
import org.opensearch.index.MergePolicyProvider;
7777
import org.opensearch.index.MockEngineFactoryPlugin;
7878
import org.opensearch.index.seqno.SeqNoStats;
7979
import org.opensearch.index.translog.TestTranslog;
@@ -135,7 +135,7 @@ public void testCorruptIndex() throws Exception {
135135
Settings.builder()
136136
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
137137
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
138-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
138+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
139139
.put(IndexSettings.INDEX_REFRESH_INTERVAL_SETTING.getKey(), "-1")
140140
.put(MockEngineSupport.DISABLE_FLUSH_ON_CLOSE.getKey(), true)
141141
.put(IndexSettings.INDEX_CHECK_ON_STARTUP.getKey(), "checksum")

server/src/internalClusterTest/java/org/opensearch/index/store/CorruptedFileIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@
7272
import org.opensearch.core.index.shard.ShardId;
7373
import org.opensearch.env.NodeEnvironment;
7474
import org.opensearch.index.IndexSettings;
75-
import org.opensearch.index.MergePolicyConfig;
75+
import org.opensearch.index.MergePolicyProvider;
7676
import org.opensearch.index.shard.IndexEventListener;
7777
import org.opensearch.index.shard.IndexShard;
7878
import org.opensearch.index.shard.IndexShardState;
@@ -167,7 +167,7 @@ public void testCorruptFileAndRecover() throws ExecutionException, InterruptedEx
167167
Settings.builder()
168168
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
169169
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "1")
170-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
170+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
171171
// no checkindex - we corrupt shards on purpose
172172
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
173173
// no translog based flush - it might change the .liv / segments.N files
@@ -286,7 +286,7 @@ public void testCorruptPrimaryNoReplica() throws ExecutionException, Interrupted
286286
prepareCreate("test").setSettings(
287287
Settings.builder()
288288
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
289-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
289+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
290290
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
291291
// purpose
292292
// no translog based flush - it might change the .liv / segments.N files
@@ -552,7 +552,7 @@ public void testCorruptFileThenSnapshotAndRestore() throws ExecutionException, I
552552
prepareCreate("test").setSettings(
553553
Settings.builder()
554554
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0") // no replicas for this test
555-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
555+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
556556
// no checkindex - we corrupt shards on purpose
557557
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false)
558558
// no translog based flush - it might change the .liv / segments.N files
@@ -624,7 +624,7 @@ public void testReplicaCorruption() throws Exception {
624624
prepareCreate("test").setSettings(
625625
Settings.builder()
626626
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, cluster().numDataNodes() - 1)
627-
.put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)
627+
.put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)
628628
.put(MockFSIndexStore.INDEX_CHECK_INDEX_ON_CLOSE_SETTING.getKey(), false) // no checkindex - we corrupt shards on
629629
// purpose
630630
.put(IndexSettings.INDEX_TRANSLOG_FLUSH_THRESHOLD_SIZE_SETTING.getKey(), new ByteSizeValue(1, ByteSizeUnit.PB)) // no

server/src/internalClusterTest/java/org/opensearch/indices/stats/IndexStatsIT.java

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -66,8 +66,8 @@
6666
import org.opensearch.index.IndexModule;
6767
import org.opensearch.index.IndexService;
6868
import org.opensearch.index.IndexSettings;
69-
import org.opensearch.index.MergePolicyConfig;
7069
import org.opensearch.index.MergeSchedulerConfig;
70+
import org.opensearch.index.TieredMergePolicyProvider;
7171
import org.opensearch.index.VersionType;
7272
import org.opensearch.index.cache.query.QueryCacheStats;
7373
import org.opensearch.index.engine.VersionConflictEngineException;
@@ -589,8 +589,8 @@ public void testNonThrottleStats() throws Exception {
589589
prepareCreate("test").setSettings(
590590
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
591591
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
592-
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
593-
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
592+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
593+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
594594
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
595595
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "10000")
596596
)
@@ -621,8 +621,8 @@ public void testThrottleStats() throws Exception {
621621
prepareCreate("test").setSettings(
622622
settingsBuilder().put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, "1")
623623
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, "0")
624-
.put(MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
625-
.put(MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
624+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING.getKey(), "2")
625+
.put(TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING.getKey(), "2")
626626
.put(MergeSchedulerConfig.MAX_THREAD_COUNT_SETTING.getKey(), "1")
627627
.put(MergeSchedulerConfig.MAX_MERGE_COUNT_SETTING.getKey(), "1")
628628
.put(IndexSettings.INDEX_TRANSLOG_DURABILITY_SETTING.getKey(), Translog.Durability.ASYNC.name())

server/src/internalClusterTest/java/org/opensearch/update/UpdateIT.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@
5050
import org.opensearch.common.xcontent.XContentFactory;
5151
import org.opensearch.core.action.ActionListener;
5252
import org.opensearch.core.rest.RestStatus;
53-
import org.opensearch.index.MergePolicyConfig;
53+
import org.opensearch.index.MergePolicyProvider;
5454
import org.opensearch.index.engine.DocumentMissingException;
5555
import org.opensearch.index.engine.VersionConflictEngineException;
5656
import org.opensearch.plugins.Plugin;
@@ -669,7 +669,7 @@ public void run() {
669669

670670
public void testStressUpdateDeleteConcurrency() throws Exception {
671671
// We create an index with merging disabled so that deletes don't get merged away
672-
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyConfig.INDEX_MERGE_ENABLED, false)));
672+
assertAcked(prepareCreate("test").setSettings(Settings.builder().put(MergePolicyProvider.INDEX_MERGE_ENABLED, false)));
673673
ensureGreen();
674674

675675
Script fieldIncScript = new Script(ScriptType.INLINE, UPDATE_SCRIPTS, FIELD_INC_SCRIPT, Collections.singletonMap("field", "field"));

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -457,6 +457,7 @@ public void apply(Settings value, Settings current, Settings previous) {
457457
NetworkService.TCP_CONNECT_TIMEOUT,
458458
IndexSettings.QUERY_STRING_ANALYZE_WILDCARD,
459459
IndexSettings.QUERY_STRING_ALLOW_LEADING_WILDCARD,
460+
IndexSettings.TIME_SERIES_INDEX_MERGE_POLICY,
460461
ScriptService.SCRIPT_GENERAL_CACHE_SIZE_SETTING,
461462
ScriptService.SCRIPT_GENERAL_CACHE_EXPIRE_SETTING,
462463
ScriptService.SCRIPT_GENERAL_MAX_COMPILATIONS_RATE_SETTING,

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 19 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -45,9 +45,11 @@
4545
import org.opensearch.index.IndexSettings;
4646
import org.opensearch.index.IndexSortConfig;
4747
import org.opensearch.index.IndexingSlowLog;
48-
import org.opensearch.index.MergePolicyConfig;
48+
import org.opensearch.index.LogByteSizeMergePolicyProvider;
49+
import org.opensearch.index.MergePolicyProvider;
4950
import org.opensearch.index.MergeSchedulerConfig;
5051
import org.opensearch.index.SearchSlowLog;
52+
import org.opensearch.index.TieredMergePolicyProvider;
5153
import org.opensearch.index.cache.bitset.BitsetFilterCache;
5254
import org.opensearch.index.engine.EngineConfig;
5355
import org.opensearch.index.fielddata.IndexFieldDataService;
@@ -120,14 +122,14 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
120122
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_LEVEL_SETTING,
121123
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_REFORMAT_SETTING,
122124
IndexingSlowLog.INDEX_INDEXING_SLOWLOG_MAX_SOURCE_CHARS_TO_LOG_SETTING,
123-
MergePolicyConfig.INDEX_COMPOUND_FORMAT_SETTING,
124-
MergePolicyConfig.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
125-
MergePolicyConfig.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
126-
MergePolicyConfig.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
127-
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
128-
MergePolicyConfig.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
129-
MergePolicyConfig.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
130-
MergePolicyConfig.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
125+
TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING,
126+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_DELETES_PCT_ALLOWED_SETTING,
127+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_EXPUNGE_DELETES_ALLOWED_SETTING,
128+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_FLOOR_SEGMENT_SETTING,
129+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGE_AT_ONCE_SETTING,
130+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_MAX_MERGED_SEGMENT_SETTING,
131+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_SEGMENTS_PER_TIER_SETTING,
132+
TieredMergePolicyProvider.INDEX_MERGE_POLICY_RECLAIM_DELETES_WEIGHT_SETTING,
131133
IndexSortConfig.INDEX_SORT_FIELD_SETTING,
132134
IndexSortConfig.INDEX_SORT_ORDER_SETTING,
133135
IndexSortConfig.INDEX_SORT_MISSING_SETTING,
@@ -202,6 +204,13 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
202204
IndexSettings.INDEX_MERGE_ON_FLUSH_ENABLED,
203205
IndexSettings.INDEX_MERGE_ON_FLUSH_MAX_FULL_FLUSH_MERGE_WAIT_TIME,
204206
IndexSettings.INDEX_MERGE_ON_FLUSH_POLICY,
207+
IndexSettings.INDEX_MERGE_POLICY,
208+
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MERGE_FACTOR_SETTING,
209+
LogByteSizeMergePolicyProvider.INDEX_LBS_MERGE_POLICY_MIN_MERGE_SETTING,
210+
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_SETTING,
211+
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGE_SEGMENT_FOR_FORCED_MERGE_SETTING,
212+
LogByteSizeMergePolicyProvider.INDEX_LBS_MAX_MERGED_DOCS_SETTING,
213+
LogByteSizeMergePolicyProvider.INDEX_LBS_NO_CFS_RATIO_SETTING,
205214
IndexSettings.DEFAULT_SEARCH_PIPELINE,
206215

207216
// Settings for Searchable Snapshots
@@ -275,7 +284,7 @@ public boolean isPrivateSetting(String key) {
275284
case IndexMetadata.SETTING_HISTORY_UUID:
276285
case IndexMetadata.SETTING_VERSION_UPGRADED:
277286
case IndexMetadata.SETTING_INDEX_PROVIDED_NAME:
278-
case MergePolicyConfig.INDEX_MERGE_ENABLED:
287+
case MergePolicyProvider.INDEX_MERGE_ENABLED:
279288
// we keep the shrink settings for BWC - this can be removed in 8.0
280289
// we can't remove in 7 since this setting might be baked into an index coming in via a full cluster restart from 6.0
281290
case "index.shrink.source.uuid":

0 commit comments

Comments
 (0)