2222import org .opensearch .cluster .metadata .IndexMetadata ;
2323import org .opensearch .common .cache .CacheType ;
2424import org .opensearch .common .cache .ICache ;
25- import org .opensearch .common .cache .settings .CacheSettings ;
26- import org .opensearch .common .cache .store .OpenSearchOnHeapCache ;
27- import org .opensearch .common .cache .store .settings .OpenSearchOnHeapCacheSettings ;
2825import org .opensearch .common .settings .Settings ;
2926import org .opensearch .common .unit .TimeValue ;
30- import org .opensearch .common .util .FeatureFlags ;
3127import org .opensearch .index .cache .request .RequestCacheStats ;
3228import org .opensearch .index .query .QueryBuilders ;
3329import org .opensearch .indices .IndicesRequestCache ;
4339import java .util .Arrays ;
4440import java .util .Collection ;
4541import java .util .List ;
42+ import java .util .Locale ;
4643import java .util .Map ;
44+ import java .util .UUID ;
4745import java .util .concurrent .TimeUnit ;
4846import java .util .function .Function ;
4947import java .util .stream .Collectors ;
5048import java .util .stream .Stream ;
5149
52- import static org .opensearch .common .cache .store . settings .OpenSearchOnHeapCacheSettings . MAXIMUM_SIZE_IN_BYTES_KEY ;
50+ import static org .opensearch .common .cache .settings .CacheSettings . INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE ;
5351import static org .opensearch .indices .IndicesService .INDICES_CACHE_CLEAN_INTERVAL_SETTING ;
5452import static org .opensearch .search .aggregations .AggregationBuilders .dateHistogram ;
5553import static org .opensearch .test .hamcrest .OpenSearchAssertions .assertAcked ;
5856import static org .hamcrest .Matchers .greaterThan ;
5957
6058@ OpenSearchIntegTestCase .ClusterScope (numDataNodes = 0 , scope = OpenSearchIntegTestCase .Scope .TEST )
61- public class TieredSpilloverCacheIT extends OpenSearchIntegTestCase {
59+ public class TieredSpilloverCacheIT extends TieredSpilloverCacheBaseIT {
6260
6361 @ Override
6462 protected Collection <Class <? extends Plugin >> nodePlugins () {
6563 return Arrays .asList (TieredSpilloverCachePlugin .class , MockDiskCachePlugin .class );
6664 }
6765
68- static Settings defaultSettings (String onHeapCacheSizeInBytesOrPercentage ) {
69- return Settings .builder ()
70- .put (FeatureFlags .PLUGGABLE_CACHE , "true" )
71- .put (
72- CacheSettings .getConcreteStoreNameSettingForCacheType (CacheType .INDICES_REQUEST_CACHE ).getKey (),
73- TieredSpilloverCache .TieredSpilloverCacheFactory .TIERED_SPILLOVER_CACHE_NAME
74- )
75- .put (
76- TieredSpilloverCacheSettings .TIERED_SPILLOVER_ONHEAP_STORE_NAME .getConcreteSettingForNamespace (
77- CacheType .INDICES_REQUEST_CACHE .getSettingPrefix ()
78- ).getKey (),
79- OpenSearchOnHeapCache .OpenSearchOnHeapCacheFactory .NAME
80- )
81- .put (
82- TieredSpilloverCacheSettings .TIERED_SPILLOVER_DISK_STORE_NAME .getConcreteSettingForNamespace (
83- CacheType .INDICES_REQUEST_CACHE .getSettingPrefix ()
84- ).getKey (),
85- MockDiskCache .MockDiskCacheFactory .NAME
86- )
87- .put (
88- OpenSearchOnHeapCacheSettings .getSettingListForCacheType (CacheType .INDICES_REQUEST_CACHE )
89- .get (MAXIMUM_SIZE_IN_BYTES_KEY )
90- .getKey (),
91- onHeapCacheSizeInBytesOrPercentage
92- )
93- .build ();
94- }
95-
9666 public void testPluginsAreInstalled () {
97- internalCluster ().startNode (Settings .builder ().put (defaultSettings ("1%" )).build ());
67+ internalCluster ().startNode (Settings .builder ().put (defaultSettings ("1%" , getNumberOfSegments () )).build ());
9868 NodesInfoRequest nodesInfoRequest = new NodesInfoRequest ();
9969 nodesInfoRequest .addMetric (NodesInfoRequest .Metric .PLUGINS .metricName ());
10070 NodesInfoResponse nodesInfoResponse = OpenSearchIntegTestCase .client ().admin ().cluster ().nodesInfo (nodesInfoRequest ).actionGet ();
@@ -111,7 +81,8 @@ public void testPluginsAreInstalled() {
11181 }
11282
11383 public void testSanityChecksWithIndicesRequestCache () throws InterruptedException {
114- internalCluster ().startNodes (3 , Settings .builder ().put (defaultSettings ("1%" )).build ());
84+ int numberOfSegments = getNumberOfSegments ();
85+ internalCluster ().startNodes (3 , Settings .builder ().put (defaultSettings ("1%" , numberOfSegments )).build ());
11586 Client client = client ();
11687 assertAcked (
11788 client .admin ()
@@ -147,9 +118,97 @@ public void testSanityChecksWithIndicesRequestCache() throws InterruptedExceptio
147118 );
148119 }
149120
121+ public void testWithDynamicTookTimePolicyWithMultiSegments () throws Exception {
122+ int numberOfSegments = getNumberOfSegments ();
123+ int onHeapCacheSizePerSegmentInBytes = 800 ; // Per cache entry below is around ~700 bytes, so keeping this
124+ // just a bit higher so that each segment can atleast hold 1 entry.
125+ int onHeapCacheSizeInBytes = onHeapCacheSizePerSegmentInBytes * numberOfSegments ;
126+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (onHeapCacheSizeInBytes + "b" , numberOfSegments )).build ());
127+ Client client = client ();
128+ assertAcked (
129+ client .admin ()
130+ .indices ()
131+ .prepareCreate ("index" )
132+ .setMapping ("k" , "type=keyword" )
133+ .setSettings (
134+ Settings .builder ()
135+ .put (IndicesRequestCache .INDEX_CACHE_REQUEST_ENABLED_SETTING .getKey (), true )
136+ .put (IndexMetadata .SETTING_NUMBER_OF_SHARDS , 1 )
137+ .put (IndexMetadata .SETTING_NUMBER_OF_REPLICAS , 0 )
138+ .put ("index.refresh_interval" , -1 )
139+ )
140+ .get ()
141+ );
142+ // Set a very high value for took time policy so that no items evicted from onHeap cache are spilled
143+ // to disk. And then hit requests so that few items are cached into cache.
144+ ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest ().transientSettings (
145+ Settings .builder ()
146+ .put (
147+ TieredSpilloverCacheSettings .TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP .get (CacheType .INDICES_REQUEST_CACHE ).getKey (),
148+ new TimeValue (100 , TimeUnit .SECONDS )
149+ )
150+ .build ()
151+ );
152+ assertAcked (internalCluster ().client ().admin ().cluster ().updateSettings (updateSettingsRequest ).get ());
153+ int numberOfIndexedItems = numberOfSegments + 1 ; // Best case if all keys are distributed among different
154+ // segment, atleast one of the segment will have 2 entries and we will see evictions.
155+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
156+ indexRandom (true , client .prepareIndex ("index" ).setSource ("k" + iterator , "hello" + iterator ));
157+ }
158+ ensureSearchable ("index" );
159+ refreshAndWaitForReplication ();
160+ // Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
161+ ForceMergeResponse forceMergeResponse = client .admin ().indices ().prepareForceMerge ("index" ).setFlush (true ).get ();
162+ OpenSearchAssertions .assertAllSuccessful (forceMergeResponse );
163+ long perQuerySizeInCacheInBytes = -1 ;
164+ for (int iterator = 0 ; iterator < numberOfIndexedItems ; iterator ++) {
165+ SearchResponse resp = client .prepareSearch ("index" )
166+ .setRequestCache (true )
167+ .setQuery (QueryBuilders .termQuery ("k" + iterator , "hello" + iterator ))
168+ .get ();
169+ if (perQuerySizeInCacheInBytes == -1 ) {
170+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
171+ perQuerySizeInCacheInBytes = requestCacheStats .getMemorySizeInBytes ();
172+ }
173+ assertSearchResponse (resp );
174+ }
175+ RequestCacheStats requestCacheStats = getRequestCacheStats (client , "index" );
176+ // Considering disk cache won't be used due to took time policy having a high value, we expect overall cache
177+ // size to be less than or equal to onHeapCache size.
178+ assertTrue (requestCacheStats .getMemorySizeInBytes () <= onHeapCacheSizeInBytes );
179+ assertEquals (numberOfIndexedItems , requestCacheStats .getMissCount ());
180+ // We should atleast one eviction considering disk cache isn't able to hold anything due to policy.
181+ assertTrue (requestCacheStats .getEvictions () > 0 );
182+ assertEquals (0 , requestCacheStats .getHitCount ());
183+ long lastEvictionSeen = requestCacheStats .getEvictions ();
184+
185+ // Decrease took time policy to zero so that disk cache also comes into play. Now we should be able
186+ // to cache all entries.
187+ updateSettingsRequest = new ClusterUpdateSettingsRequest ().transientSettings (
188+ Settings .builder ()
189+ .put (
190+ TieredSpilloverCacheSettings .TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP .get (CacheType .INDICES_REQUEST_CACHE ).getKey (),
191+ new TimeValue (0 , TimeUnit .MILLISECONDS )
192+ )
193+ .build ()
194+ );
195+ assertAcked (internalCluster ().client ().admin ().cluster ().updateSettings (updateSettingsRequest ).get ());
196+ for (int iterator = 0 ; iterator < numberOfIndexedItems * 2 ; iterator ++) {
197+ SearchResponse resp = client .prepareSearch ("index" )
198+ .setRequestCache (true )
199+ .setQuery (QueryBuilders .termQuery (UUID .randomUUID ().toString (), UUID .randomUUID ().toString ()))
200+ .get ();
201+ assertSearchResponse (resp );
202+ }
203+
204+ requestCacheStats = getRequestCacheStats (client , "index" );
205+ // We shouldn't see any new evictions now.
206+ assertEquals (lastEvictionSeen , requestCacheStats .getEvictions ());
207+ }
208+
150209 public void testWithDynamicTookTimePolicy () throws Exception {
151210 int onHeapCacheSizeInBytes = 2000 ;
152- internalCluster ().startNode (Settings .builder ().put (defaultSettings (onHeapCacheSizeInBytes + "b" )).build ());
211+ internalCluster ().startNode (Settings .builder ().put (defaultSettings (onHeapCacheSizeInBytes + "b" , 1 )).build ());
153212 Client client = client ();
154213 assertAcked (
155214 client .admin ()
@@ -271,9 +330,10 @@ public void testWithDynamicTookTimePolicy() throws Exception {
271330
272331 public void testInvalidationWithIndicesRequestCache () throws Exception {
273332 int onHeapCacheSizeInBytes = 2000 ;
333+ int numberOfSegments = getNumberOfSegments ();
274334 internalCluster ().startNode (
275335 Settings .builder ()
276- .put (defaultSettings (onHeapCacheSizeInBytes + "b" ))
336+ .put (defaultSettings (onHeapCacheSizeInBytes + "b" , numberOfSegments ))
277337 .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
278338 .build ()
279339 );
@@ -354,10 +414,11 @@ public void testInvalidationWithIndicesRequestCache() throws Exception {
354414 }
355415
356416 public void testWithExplicitCacheClear () throws Exception {
417+ int numberOfSegments = getNumberOfSegments ();
357418 int onHeapCacheSizeInBytes = 2000 ;
358419 internalCluster ().startNode (
359420 Settings .builder ()
360- .put (defaultSettings (onHeapCacheSizeInBytes + "b" ))
421+ .put (defaultSettings (onHeapCacheSizeInBytes + "b" , numberOfSegments ))
361422 .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
362423 .build ()
363424 );
@@ -426,10 +487,13 @@ public void testWithExplicitCacheClear() throws Exception {
426487 }
427488
428489 public void testWithDynamicDiskCacheSetting () throws Exception {
429- int onHeapCacheSizeInBytes = 10 ; // Keep it low so that all items are cached onto disk.
490+ int numberOfSegments = getNumberOfSegments ();
491+ int onHeapCacheSizeInBytes = randomIntBetween (numberOfSegments + 1 , numberOfSegments * 2 ); // Keep it low so
492+ // that all items are
493+ // cached onto disk.
430494 internalCluster ().startNode (
431495 Settings .builder ()
432- .put (defaultSettings (onHeapCacheSizeInBytes + "b" ))
496+ .put (defaultSettings (onHeapCacheSizeInBytes + "b" , numberOfSegments ))
433497 .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
434498 .build ()
435499 );
@@ -540,6 +604,27 @@ public void testWithDynamicDiskCacheSetting() throws Exception {
540604 assertEquals (0 , lastKnownHitCount - requestCacheStats .getHitCount ());
541605 }
542606
607+ public void testWithInvalidSegmentNumberSetting () throws Exception {
608+ int numberOfSegments = getNumberOfSegments ();
609+ int onHeapCacheSizeInBytes = randomIntBetween (numberOfSegments + 1 , numberOfSegments * 2 ); // Keep it low so
610+ // that all items are
611+ // cached onto disk.
612+ assertThrows (
613+ String .format (
614+ Locale .ROOT ,
615+ INVALID_SEGMENT_COUNT_EXCEPTION_MESSAGE ,
616+ TieredSpilloverCache .TieredSpilloverCacheFactory .TIERED_SPILLOVER_CACHE_NAME
617+ ),
618+ IllegalArgumentException .class ,
619+ () -> internalCluster ().startNode (
620+ Settings .builder ()
621+ .put (defaultSettings (onHeapCacheSizeInBytes + "b" , 300 ))
622+ .put (INDICES_CACHE_CLEAN_INTERVAL_SETTING .getKey (), new TimeValue (1 ))
623+ .build ()
624+ )
625+ );
626+ }
627+
543628 private RequestCacheStats getRequestCacheStats (Client client , String indexName ) {
544629 return client .admin ().indices ().prepareStats (indexName ).setRequestCache (true ).get ().getTotal ().getRequestCache ();
545630 }
@@ -550,7 +635,7 @@ public MockDiskCachePlugin() {}
550635
551636 @ Override
552637 public Map <String , ICache .Factory > getCacheFactoryMap () {
553- return Map .of (MockDiskCache .MockDiskCacheFactory .NAME , new MockDiskCache .MockDiskCacheFactory (0 , 1000 , false ));
638+ return Map .of (MockDiskCache .MockDiskCacheFactory .NAME , new MockDiskCache .MockDiskCacheFactory (0 , 10000 , false , 1 ));
554639 }
555640
556641 @ Override
0 commit comments