Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

### Fixed
- Fix flaky tests in CloseIndexIT by addressing cluster state synchronization issues ([#18878](https://github.com/opensearch-project/OpenSearch/issues/18878))
- [Tiered Caching] Handle query execution exception ([#19000](https://github.com/opensearch-project/OpenSearch/issues/19000))

### Security

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -348,28 +348,33 @@ private Tuple<V, Tuple<Boolean, Boolean>> compute(
boolean wasCacheMiss = false;
boolean wasRejectedByPolicy = false;
BiFunction<Tuple<Tuple<ICacheKey<K>, V>, Boolean>, Throwable, Void> handler = (pairInfo, ex) -> {
Tuple<ICacheKey<K>, V> pair = pairInfo.v1();
boolean rejectedByPolicy = pairInfo.v2();
if (pair != null && !rejectedByPolicy) {
boolean didAddToCache = false;
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
didAddToCache = true;
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
if (didAddToCache) {
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, pair.v2());
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
try {
if (pairInfo != null) {
Tuple<ICacheKey<K>, V> pair = pairInfo.v1();
boolean rejectedByPolicy = pairInfo.v2();
if (pair != null && !rejectedByPolicy) {
boolean didAddToCache = false;
try (ReleasableLock ignore = writeLock.acquire()) {
onHeapCache.put(pair.v1(), pair.v2());
didAddToCache = true;
} catch (Exception e) {
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
// exception.
logger.warn("Exception occurred while putting item onto heap cache", e);
}
if (didAddToCache) {
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, pair.v2());
}
}
} else {
if (ex != null) {
logger.warn("Exception occurred while trying to compute the value", ex);
}
}
} finally {
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
}
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
return null;
};
V value = null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.core.tasks.TaskCancelledException;
import org.opensearch.env.NodeEnvironment;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;
Expand Down Expand Up @@ -89,6 +90,70 @@ public void setup() {
clusterSettings.registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE));
}

public void testComputeIfAbsentWhenTheQueryThrowsAnException() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
int keyValueSize = 50;

MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
keyValueSize,
randomIntBetween(1, 4),
removalListener,
Settings.builder()
.put(
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
).getKey(),
onHeapCacheSize * keyValueSize + "b"
)
.build(),
0,
1
);
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
LoadAwareCacheLoader<ICacheKey<String>, String> tieredCacheLoader = new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public String load(ICacheKey<String> key) {
isLoaded = true;
throw new TaskCancelledException("Query cancelled!");
}

@Override
public boolean isLoaded() {
return isLoaded;
}
};
// With this call, we expect an exception from the underlying loader which eventually causes the below call to result into
// exception.
try {
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
} catch (Exception ex) {
assertEquals(TaskCancelledException.class, ex.getCause().getClass());
assertEquals("Query cancelled!", ex.getCause().getMessage());
}
// We will call computeIfAbsent again with the same key, but this time the underlying loader should run fine and we should get back
// the response.
String expectedRespone = "Cool response!";
LoadAwareCacheLoader<ICacheKey<String>, String> tieredCacheLoaderWithNoException = new LoadAwareCacheLoader<>() {
boolean isLoaded = false;

@Override
public String load(ICacheKey<String> key) {
isLoaded = true;
return expectedRespone;
}

@Override
public boolean isLoaded() {
return isLoaded;
}
};
String value = tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoaderWithNoException);
assertEquals(expectedRespone, value);
}

public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception {
int onHeapCacheSize = randomIntBetween(10, 30);
int keyValueSize = 50;
Expand Down
Loading