Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Implement WithFieldName interface in ValuesSourceAggregationBuilder & FieldSortBuilder ([#15916](https://github.com/opensearch-project/OpenSearch/pull/15916))
- Add successfulSearchShardIndices in searchRequestContext ([#15967](https://github.com/opensearch-project/OpenSearch/pull/15967))
- Fallback to Remote cluster-state on Term-Version check mismatch - ([#15424](https://github.com/opensearch-project/OpenSearch/pull/15424))
- [Tiered Caching] Segmented cache changes ([#16047](https://github.com/opensearch-project/OpenSearch/pull/16047))
- Add support for msearch API to pass search pipeline name - ([#15923](https://github.com/opensearch-project/OpenSearch/pull/15923))
- Add success and failure metrics for async shard fetch ([#15976](https://github.com/opensearch-project/OpenSearch/pull/15976))
- [S3 Repository] Change default retry mechanism of s3 clients to Standard Mode ([#15978](https://github.com/opensearch-project/OpenSearch/pull/15978))
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* SPDX-License-Identifier: Apache-2.0
*
* The OpenSearch Contributors require contributions made to
* this file be licensed under the Apache-2.0 license or a
* compatible open source license.
*/

package org.opensearch.cache.common.tier;

import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.test.OpenSearchIntegTestCase;

public class TieredSpilloverCacheBaseIT extends OpenSearchIntegTestCase {

public Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage, int numberOfSegments) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_SEGMENTS.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
numberOfSegments
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public int getNumberOfSegments() {
return randomFrom(1, 2, 4, 8, 16, 64, 128, 256);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,12 +22,8 @@
import org.opensearch.cluster.metadata.IndexMetadata;
import org.opensearch.common.cache.CacheType;
import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.settings.CacheSettings;
import org.opensearch.common.cache.store.OpenSearchOnHeapCache;
import org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.common.util.FeatureFlags;
import org.opensearch.index.cache.request.RequestCacheStats;
import org.opensearch.index.query.QueryBuilders;
import org.opensearch.indices.IndicesRequestCache;
Expand All @@ -43,13 +39,15 @@
import java.util.Arrays;
import java.util.Collection;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import static org.opensearch.common.cache.store.settings.OpenSearchOnHeapCacheSettings.MAXIMUM_SIZE_IN_BYTES_KEY;
import static org.opensearch.common.cache.settings.CacheSettings.INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE;
import static org.opensearch.indices.IndicesService.INDICES_CACHE_CLEAN_INTERVAL_SETTING;
import static org.opensearch.search.aggregations.AggregationBuilders.dateHistogram;
import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked;
Expand All @@ -58,43 +56,15 @@
import static org.hamcrest.Matchers.greaterThan;

@OpenSearchIntegTestCase.ClusterScope(numDataNodes = 0, scope = OpenSearchIntegTestCase.Scope.TEST)
public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
public class TieredSpilloverCacheIT extends TieredSpilloverCacheBaseIT {

@Override
protected Collection<Class<? extends Plugin>> nodePlugins() {
return Arrays.asList(TieredSpilloverCachePlugin.class, MockDiskCachePlugin.class);
}

static Settings defaultSettings(String onHeapCacheSizeInBytesOrPercentage) {
return Settings.builder()
.put(FeatureFlags.PLUGGABLE_CACHE, "true")
.put(
CacheSettings.getConcreteStoreNameSettingForCacheType(CacheType.INDICES_REQUEST_CACHE).getKey(),
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
OpenSearchOnHeapCache.OpenSearchOnHeapCacheFactory.NAME
)
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
MockDiskCache.MockDiskCacheFactory.NAME
)
.put(
OpenSearchOnHeapCacheSettings.getSettingListForCacheType(CacheType.INDICES_REQUEST_CACHE)
.get(MAXIMUM_SIZE_IN_BYTES_KEY)
.getKey(),
onHeapCacheSizeInBytesOrPercentage
)
.build();
}

public void testPluginsAreInstalled() {
internalCluster().startNode(Settings.builder().put(defaultSettings("1%")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings("1%", getNumberOfSegments())).build());
NodesInfoRequest nodesInfoRequest = new NodesInfoRequest();
nodesInfoRequest.addMetric(NodesInfoRequest.Metric.PLUGINS.metricName());
NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase.client().admin().cluster().nodesInfo(nodesInfoRequest).actionGet();
Expand All @@ -111,7 +81,8 @@ public void testPluginsAreInstalled() {
}

public void testSanityChecksWithIndicesRequestCache() throws InterruptedException {
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%")).build());
int numberOfSegments = getNumberOfSegments();
internalCluster().startNodes(3, Settings.builder().put(defaultSettings("1%", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -147,9 +118,97 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
);
}

public void testWithDynamicTookTimePolicyWithMultiSegments() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizePerSegmentInBytes = 800; // Per cache entry below is around ~700 bytes, so keeping this
// just a bit higher so that each segment can atleast hold 1 entry.
int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments)).build());
Client client = client();
assertAcked(
client.admin()
.indices()
.prepareCreate("index")
.setMapping("k", "type=keyword")
.setSettings(
Settings.builder()
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
.put("index.refresh_interval", -1)
)
.get()
);
// Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
// to disk. And then hit requests so that few items are cached into cache.
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(100, TimeUnit.SECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
int numberOfIndexedItems = numberOfSegments + 1; // Best case if all keys are distributed among different
// segment, atleast one of the segment will have 2 entries and we will see evictions.
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
}
ensureSearchable("index");
refreshAndWaitForReplication();
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
long perQuerySizeInCacheInBytes = -1;
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
.get();
if (perQuerySizeInCacheInBytes == -1) {
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
}
assertSearchResponse(resp);
}
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
// Considering disk cache won't be used due to took time policy having a high value, we expect overall cache
// size to be less than or equal to onHeapCache size.
assertTrue(requestCacheStats.getMemorySizeInBytes() <= onHeapCacheSizeInBytes);
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
// We should atleast one eviction considering disk cache isn't able to hold anything due to policy.
assertTrue(requestCacheStats.getEvictions() > 0);
assertEquals(0, requestCacheStats.getHitCount());
long lastEvictionSeen = requestCacheStats.getEvictions();

// Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
// to cache all entries.
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
Settings.builder()
.put(
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
new TimeValue(0, TimeUnit.MILLISECONDS)
)
.build()
);
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
for (int iterator = 0; iterator < numberOfIndexedItems * 2; iterator++) {
SearchResponse resp = client.prepareSearch("index")
.setRequestCache(true)
.setQuery(QueryBuilders.termQuery(UUID.randomUUID().toString(), UUID.randomUUID().toString()))
.get();
assertSearchResponse(resp);
}

requestCacheStats = getRequestCacheStats(client, "index");
// We shouldn't see any new evictions now.
assertEquals(lastEvictionSeen, requestCacheStats.getEvictions());
}

public void testWithDynamicTookTimePolicy() throws Exception {
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b")).build());
internalCluster().startNode(Settings.builder().put(defaultSettings(onHeapCacheSizeInBytes + "b", 1)).build());
Client client = client();
assertAcked(
client.admin()
Expand Down Expand Up @@ -271,9 +330,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {

public void testInvalidationWithIndicesRequestCache() throws Exception {
int onHeapCacheSizeInBytes = 2000;
int numberOfSegments = getNumberOfSegments();
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -354,10 +414,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
}

public void testWithExplicitCacheClear() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = 2000;
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -426,10 +487,13 @@ public void testWithExplicitCacheClear() throws Exception {
}

public void testWithDynamicDiskCacheSetting() throws Exception {
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
.put(defaultSettings(onHeapCacheSizeInBytes + "b", numberOfSegments))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
);
Expand Down Expand Up @@ -540,6 +604,27 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
}

public void testWithInvalidSegmentNumberSetting() throws Exception {
int numberOfSegments = getNumberOfSegments();
int onHeapCacheSizeInBytes = randomIntBetween(numberOfSegments + 1, numberOfSegments * 2); // Keep it low so
// that all items are
// cached onto disk.
assertThrows(
String.format(
Locale.ROOT,
INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE,
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME
),
IllegalArgumentException.class,
() -> internalCluster().startNode(
Settings.builder()
.put(defaultSettings(onHeapCacheSizeInBytes + "b", 300))
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
.build()
)
);
}

private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
}
Expand All @@ -550,7 +635,7 @@ public MockDiskCachePlugin() {}

@Override
public Map<String, ICache.Factory> getCacheFactoryMap() {
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 1000, false));
return Map.of(MockDiskCache.MockDiskCacheFactory.NAME, new MockDiskCache.MockDiskCacheFactory(0, 10000, false, 1));
}

@Override
Expand Down
Loading