Skip to content

Commit f967a72

Browse files
authored
[Tiered Caching] Handle query execution exception (opensearch-project#19000)
Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
1 parent 646f937 commit f967a72

File tree

3 files changed

+91
-20
lines changed

3 files changed

+91
-20
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
1919

2020
### Fixed
2121
- Fix flaky tests in CloseIndexIT by addressing cluster state synchronization issues ([#18878](https://github.com/opensearch-project/OpenSearch/issues/18878))
22+
- [Tiered Caching] Handle query execution exception ([#19000](https://github.com/opensearch-project/OpenSearch/issues/19000))
2223

2324
### Security
2425

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

Lines changed: 25 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -348,28 +348,33 @@ private Tuple<V, Tuple<Boolean, Boolean>> compute(
348348
boolean wasCacheMiss = false;
349349
boolean wasRejectedByPolicy = false;
350350
BiFunction<Tuple<Tuple<ICacheKey<K>, V>, Boolean>, Throwable, Void> handler = (pairInfo, ex) -> {
351-
Tuple<ICacheKey<K>, V> pair = pairInfo.v1();
352-
boolean rejectedByPolicy = pairInfo.v2();
353-
if (pair != null && !rejectedByPolicy) {
354-
boolean didAddToCache = false;
355-
try (ReleasableLock ignore = writeLock.acquire()) {
356-
onHeapCache.put(pair.v1(), pair.v2());
357-
didAddToCache = true;
358-
} catch (Exception e) {
359-
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
360-
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
361-
// exception.
362-
logger.warn("Exception occurred while putting item onto heap cache", e);
363-
}
364-
if (didAddToCache) {
365-
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, pair.v2());
366-
}
367-
} else {
368-
if (ex != null) {
369-
logger.warn("Exception occurred while trying to compute the value", ex);
351+
try {
352+
if (pairInfo != null) {
353+
Tuple<ICacheKey<K>, V> pair = pairInfo.v1();
354+
boolean rejectedByPolicy = pairInfo.v2();
355+
if (pair != null && !rejectedByPolicy) {
356+
boolean didAddToCache = false;
357+
try (ReleasableLock ignore = writeLock.acquire()) {
358+
onHeapCache.put(pair.v1(), pair.v2());
359+
didAddToCache = true;
360+
} catch (Exception e) {
361+
// TODO: Catch specific exceptions to know whether this resulted from cache or underlying removal
362+
// listeners/stats. Needs better exception handling at underlying layers.For now swallowing
363+
// exception.
364+
logger.warn("Exception occurred while putting item onto heap cache", e);
365+
}
366+
if (didAddToCache) {
367+
updateStatsOnPut(TIER_DIMENSION_VALUE_ON_HEAP, key, pair.v2());
368+
}
369+
}
370+
} else {
371+
if (ex != null) {
372+
logger.warn("Exception occurred while trying to compute the value", ex);
373+
}
370374
}
375+
} finally {
376+
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
371377
}
372-
completableFutureMap.remove(key);// Remove key from map as not needed anymore.
373378
return null;
374379
};
375380
V value = null;

modules/cache-common/src/test/java/org/opensearch/cache/common/tier/TieredSpilloverCacheTests.java

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.opensearch.common.settings.Setting;
3131
import org.opensearch.common.settings.Settings;
3232
import org.opensearch.common.unit.TimeValue;
33+
import org.opensearch.core.tasks.TaskCancelledException;
3334
import org.opensearch.env.NodeEnvironment;
3435
import org.opensearch.test.OpenSearchTestCase;
3536
import org.junit.Before;
@@ -89,6 +90,70 @@ public void setup() {
8990
clusterSettings.registerSetting(DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE));
9091
}
9192

93+
public void testComputeIfAbsentWhenTheQueryThrowsAnException() throws Exception {
94+
int onHeapCacheSize = randomIntBetween(10, 30);
95+
int keyValueSize = 50;
96+
97+
MockCacheRemovalListener<String, String> removalListener = new MockCacheRemovalListener<>();
98+
TieredSpilloverCache<String, String> tieredSpilloverCache = initializeTieredSpilloverCache(
99+
keyValueSize,
100+
randomIntBetween(1, 4),
101+
removalListener,
102+
Settings.builder()
103+
.put(
104+
TieredSpilloverCacheSettings.TIERED_SPILLOVER_ONHEAP_STORE_SIZE.getConcreteSettingForNamespace(
105+
CacheType.INDICES_REQUEST_CACHE.getSettingPrefix()
106+
).getKey(),
107+
onHeapCacheSize * keyValueSize + "b"
108+
)
109+
.build(),
110+
0,
111+
1
112+
);
113+
ICacheKey<String> key = getICacheKey(UUID.randomUUID().toString());
114+
LoadAwareCacheLoader<ICacheKey<String>, String> tieredCacheLoader = new LoadAwareCacheLoader<>() {
115+
boolean isLoaded = false;
116+
117+
@Override
118+
public String load(ICacheKey<String> key) {
119+
isLoaded = true;
120+
throw new TaskCancelledException("Query cancelled!");
121+
}
122+
123+
@Override
124+
public boolean isLoaded() {
125+
return isLoaded;
126+
}
127+
};
128+
// With this call, we expect an exception from the underlying loader which eventually causes the below call to result into
129+
// exception.
130+
try {
131+
tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoader);
132+
} catch (Exception ex) {
133+
assertEquals(TaskCancelledException.class, ex.getCause().getClass());
134+
assertEquals("Query cancelled!", ex.getCause().getMessage());
135+
}
136+
// We will call computeIfAbsent again with the same key, but this time the underlying loader should run fine and we should get back
137+
// the response.
138+
String expectedRespone = "Cool response!";
139+
LoadAwareCacheLoader<ICacheKey<String>, String> tieredCacheLoaderWithNoException = new LoadAwareCacheLoader<>() {
140+
boolean isLoaded = false;
141+
142+
@Override
143+
public String load(ICacheKey<String> key) {
144+
isLoaded = true;
145+
return expectedRespone;
146+
}
147+
148+
@Override
149+
public boolean isLoaded() {
150+
return isLoaded;
151+
}
152+
};
153+
String value = tieredSpilloverCache.computeIfAbsent(key, tieredCacheLoaderWithNoException);
154+
assertEquals(expectedRespone, value);
155+
}
156+
92157
public void testComputeIfAbsentWithoutAnyOnHeapCacheEviction() throws Exception {
93158
int onHeapCacheSize = randomIntBetween(10, 30);
94159
int keyValueSize = 50;

0 commit comments

Comments
 (0)