Skip to content

Commit aa50a60

Browse files
authored
[Tiered Caching] Expose a dynamic setting to disable/enable disk cache (#13373) (#13403)
* [Tiered Caching] Expose a dynamic setting to disable/enable disk cache * Putting tiered cache settings behind feature flag * Adding a changelog * Addressing Sorabh's comments * Putting new setting behind feature flag --------- Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com> Signed-off-by: Sagar <99425694+sgup432@users.noreply.github.com>
1 parent 8ed9823 commit aa50a60

File tree

7 files changed

+323
-28
lines changed

7 files changed

+323
-28
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2525
- [Streaming Indexing] Ensure support of the new transport by security plugin ([#13174](https://github.com/opensearch-project/OpenSearch/pull/13174))
2626
- Add cluster setting to dynamically configure the buckets for filter rewrite optimization. ([#13179](https://github.com/opensearch-project/OpenSearch/pull/13179))
2727
- Make search query counters dynamic to support all query types ([#12601](https://github.com/opensearch-project/OpenSearch/pull/12601))
28+
- [Tiered Caching] Add a dynamic setting to disable/enable disk cache. ([#13373](https://github.com/opensearch-project/OpenSearch/pull/13373))
2829

2930
### Dependencies
3031
- Bump `org.apache.commons:commons-configuration2` from 2.10.0 to 2.10.1 ([#12896](https://github.com/opensearch-project/OpenSearch/pull/12896))

modules/cache-common/src/internalClusterTest/java/org.opensearch.cache.common.tier/TieredSpilloverCacheIT.java

Lines changed: 115 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -425,6 +425,121 @@ public void testWithExplicitCacheClear() throws Exception {
425425
}, 1, TimeUnit.SECONDS);
426426
}
427427

428+
public void testWithDynamicDiskCacheSetting() throws Exception {
429+
int onHeapCacheSizeInBytes = 10; // Keep it low so that all items are cached onto disk.
430+
internalCluster().startNode(
431+
Settings.builder()
432+
.put(defaultSettings(onHeapCacheSizeInBytes + "b"))
433+
.put(INDICES_CACHE_CLEAN_INTERVAL_SETTING.getKey(), new TimeValue(1))
434+
.build()
435+
);
436+
Client client = client();
437+
assertAcked(
438+
client.admin()
439+
.indices()
440+
.prepareCreate("index")
441+
.setMapping("k", "type=keyword")
442+
.setSettings(
443+
Settings.builder()
444+
.put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true)
445+
.put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1)
446+
.put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0)
447+
.put("index.refresh_interval", -1)
448+
)
449+
.get()
450+
);
451+
// Update took time policy to zero so that all entries are eligible to be cached on disk.
452+
ClusterUpdateSettingsRequest updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
453+
Settings.builder()
454+
.put(
455+
TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(),
456+
new TimeValue(0, TimeUnit.MILLISECONDS)
457+
)
458+
.build()
459+
);
460+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
461+
int numberOfIndexedItems = randomIntBetween(5, 10);
462+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
463+
indexRandom(true, client.prepareIndex("index").setSource("k" + iterator, "hello" + iterator));
464+
}
465+
ensureSearchable("index");
466+
refreshAndWaitForReplication();
467+
// Force merge the index to ensure there can be no background merges during the subsequent searches that would invalidate the cache
468+
ForceMergeResponse forceMergeResponse = client.admin().indices().prepareForceMerge("index").setFlush(true).get();
469+
OpenSearchAssertions.assertAllSuccessful(forceMergeResponse);
470+
long perQuerySizeInCacheInBytes = -1;
471+
// Step 1: Hit some queries. We will see misses and queries will be cached(onto disk cache) for subsequent
472+
// requests.
473+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
474+
SearchResponse resp = client.prepareSearch("index")
475+
.setRequestCache(true)
476+
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
477+
.get();
478+
if (perQuerySizeInCacheInBytes == -1) {
479+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
480+
perQuerySizeInCacheInBytes = requestCacheStats.getMemorySizeInBytes();
481+
}
482+
assertSearchResponse(resp);
483+
}
484+
485+
RequestCacheStats requestCacheStats = getRequestCacheStats(client, "index");
486+
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
487+
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
488+
assertEquals(0, requestCacheStats.getHitCount());
489+
assertEquals(0, requestCacheStats.getEvictions());
490+
491+
// Step 2: Hit same queries again. We will see hits now.
492+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
493+
SearchResponse resp = client.prepareSearch("index")
494+
.setRequestCache(true)
495+
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
496+
.get();
497+
assertSearchResponse(resp);
498+
}
499+
requestCacheStats = getRequestCacheStats(client, "index");
500+
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes());
501+
assertEquals(numberOfIndexedItems, requestCacheStats.getMissCount());
502+
assertEquals(numberOfIndexedItems, requestCacheStats.getHitCount());
503+
assertEquals(0, requestCacheStats.getEvictions());
504+
long lastKnownHitCount = requestCacheStats.getHitCount();
505+
long lastKnownMissCount = requestCacheStats.getMissCount();
506+
507+
// Step 3: Turn off disk cache now. And hit same queries again. We should not see hits now as all queries
508+
// were cached onto disk cache.
509+
updateSettingsRequest = new ClusterUpdateSettingsRequest().transientSettings(
510+
Settings.builder()
511+
.put(TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP.get(CacheType.INDICES_REQUEST_CACHE).getKey(), false)
512+
.build()
513+
);
514+
assertAcked(internalCluster().client().admin().cluster().updateSettings(updateSettingsRequest).get());
515+
516+
for (int iterator = 0; iterator < numberOfIndexedItems; iterator++) {
517+
SearchResponse resp = client.prepareSearch("index")
518+
.setRequestCache(true)
519+
.setQuery(QueryBuilders.termQuery("k" + iterator, "hello" + iterator))
520+
.get();
521+
assertSearchResponse(resp);
522+
}
523+
requestCacheStats = getRequestCacheStats(client, "index");
524+
assertEquals(numberOfIndexedItems * perQuerySizeInCacheInBytes, requestCacheStats.getMemorySizeInBytes()); //
525+
// Still shows disk cache entries as explicit clear or invalidation is required to clean up disk cache.
526+
assertEquals(lastKnownMissCount + numberOfIndexedItems, requestCacheStats.getMissCount());
527+
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount()); // No new hits being seen.
528+
lastKnownMissCount = requestCacheStats.getMissCount();
529+
lastKnownHitCount = requestCacheStats.getHitCount();
530+
531+
// Step 4: Invalidate entries via refresh.
532+
// Explicit refresh would invalidate cache entries.
533+
refreshAndWaitForReplication();
534+
assertBusy(() -> {
535+
// Explicit refresh should clear up cache entries
536+
assertTrue(getRequestCacheStats(client, "index").getMemorySizeInBytes() == 0);
537+
}, 1, TimeUnit.SECONDS);
538+
requestCacheStats = getRequestCacheStats(client, "index");
539+
assertEquals(0, lastKnownMissCount - requestCacheStats.getMissCount());
540+
assertEquals(0, lastKnownHitCount - requestCacheStats.getHitCount());
541+
}
542+
428543
private RequestCacheStats getRequestCacheStats(Client client, String indexName) {
429544
return client.admin().indices().prepareStats(indexName).setRequestCache(true).get().getTotal().getRequestCache();
430545
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCache.java

Lines changed: 49 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@
2727

2828
import java.io.IOException;
2929
import java.util.ArrayList;
30-
import java.util.Arrays;
30+
import java.util.Collections;
3131
import java.util.Iterator;
32+
import java.util.LinkedHashMap;
3233
import java.util.List;
3334
import java.util.Map;
3435
import java.util.NoSuchElementException;
@@ -38,6 +39,8 @@
3839
import java.util.function.Function;
3940
import java.util.function.Predicate;
4041

42+
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
43+
4144
/**
4245
* This cache spillover the evicted items from heap tier to disk tier. All the new items are first cached on heap
4346
* and the items evicted from on heap cache are moved to disk based cache. If disk based cache also gets full,
@@ -67,20 +70,23 @@ public class TieredSpilloverCache<K, V> implements ICache<K, V> {
6770
/**
6871
* Maintains caching tiers in ascending order of cache latency.
6972
*/
70-
private final List<ICache<K, V>> cacheList;
73+
private final Map<ICache<K, V>, Boolean> caches;
7174
private final List<Predicate<V>> policies;
7275

7376
TieredSpilloverCache(Builder<K, V> builder) {
7477
Objects.requireNonNull(builder.onHeapCacheFactory, "onHeap cache builder can't be null");
7578
Objects.requireNonNull(builder.diskCacheFactory, "disk cache builder can't be null");
79+
Objects.requireNonNull(builder.cacheConfig, "cache config can't be null");
80+
Objects.requireNonNull(builder.cacheConfig.getClusterSettings(), "cluster settings can't be null");
7681
this.removalListener = Objects.requireNonNull(builder.removalListener, "Removal listener can't be null");
7782

7883
this.onHeapCache = builder.onHeapCacheFactory.create(
7984
new CacheConfig.Builder<K, V>().setRemovalListener(new RemovalListener<ICacheKey<K>, V>() {
8085
@Override
8186
public void onRemoval(RemovalNotification<ICacheKey<K>, V> notification) {
8287
try (ReleasableLock ignore = writeLock.acquire()) {
83-
if (SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
88+
if (caches.get(diskCache)
89+
&& SPILLOVER_REMOVAL_REASONS.contains(notification.getRemovalReason())
8490
&& evaluatePolicies(notification.getValue())) {
8591
diskCache.put(notification.getKey(), notification.getValue());
8692
} else {
@@ -103,9 +109,15 @@ && evaluatePolicies(notification.getValue())) {
103109

104110
);
105111
this.diskCache = builder.diskCacheFactory.create(builder.cacheConfig, builder.cacheType, builder.cacheFactories);
106-
this.cacheList = Arrays.asList(onHeapCache, diskCache);
112+
Boolean isDiskCacheEnabled = DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType).get(builder.cacheConfig.getSettings());
113+
LinkedHashMap<ICache<K, V>, Boolean> cacheListMap = new LinkedHashMap<>();
114+
cacheListMap.put(onHeapCache, true);
115+
cacheListMap.put(diskCache, isDiskCacheEnabled);
116+
this.caches = Collections.synchronizedMap(cacheListMap);
107117
this.dimensionNames = builder.cacheConfig.getDimensionNames();
108118
this.policies = builder.policies; // Will never be null; builder initializes it to an empty list
119+
builder.cacheConfig.getClusterSettings()
120+
.addSettingsUpdateConsumer(DISK_CACHE_ENABLED_SETTING_MAP.get(builder.cacheType), this::enableDisableDiskCache);
109121
}
110122

111123
// Package private for testing
@@ -118,6 +130,13 @@ ICache<K, V> getDiskCache() {
118130
return diskCache;
119131
}
120132

133+
// Package private for testing.
134+
void enableDisableDiskCache(Boolean isDiskCacheEnabled) {
135+
// When disk cache is disabled, we are not clearing up the disk cache entries yet as that should be part of
136+
// separate cache/clear API.
137+
this.caches.put(diskCache, isDiskCacheEnabled);
138+
}
139+
121140
@Override
122141
public V get(ICacheKey<K> key) {
123142
return getValueFromTieredCache().apply(key);
@@ -132,7 +151,6 @@ public void put(ICacheKey<K> key, V value) {
132151

133152
@Override
134153
public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V> loader) throws Exception {
135-
136154
V cacheValue = getValueFromTieredCache().apply(key);
137155
if (cacheValue == null) {
138156
// Add the value to the onHeap cache. We are calling computeIfAbsent which does another get inside.
@@ -151,19 +169,19 @@ public V computeIfAbsent(ICacheKey<K> key, LoadAwareCacheLoader<ICacheKey<K>, V>
151169
public void invalidate(ICacheKey<K> key) {
152170
// We are trying to invalidate the key from all caches though it would be present in only of them.
153171
// Doing this as we don't know where it is located. We could do a get from both and check that, but what will
154-
// also trigger a hit/miss listener event, so ignoring it for now.
172+
// also count hits/misses stats, so ignoring it for now.
155173
try (ReleasableLock ignore = writeLock.acquire()) {
156-
for (ICache<K, V> cache : cacheList) {
157-
cache.invalidate(key);
174+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
175+
cacheEntry.getKey().invalidate(key);
158176
}
159177
}
160178
}
161179

162180
@Override
163181
public void invalidateAll() {
164182
try (ReleasableLock ignore = writeLock.acquire()) {
165-
for (ICache<K, V> cache : cacheList) {
166-
cache.invalidateAll();
183+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
184+
cacheEntry.getKey().invalidateAll();
167185
}
168186
}
169187
}
@@ -175,32 +193,39 @@ public void invalidateAll() {
175193
@SuppressWarnings({ "unchecked" })
176194
@Override
177195
public Iterable<ICacheKey<K>> keys() {
178-
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) new Iterable<?>[] { onHeapCache.keys(), diskCache.keys() };
179-
return new ConcatenatedIterables<ICacheKey<K>>(iterables);
196+
List<Iterable<ICacheKey<K>>> iterableList = new ArrayList<>();
197+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
198+
iterableList.add(cacheEntry.getKey().keys());
199+
}
200+
Iterable<ICacheKey<K>>[] iterables = (Iterable<ICacheKey<K>>[]) iterableList.toArray(new Iterable<?>[0]);
201+
return new ConcatenatedIterables<>(iterables);
180202
}
181203

182204
@Override
183205
public long count() {
184206
long count = 0;
185-
for (ICache<K, V> cache : cacheList) {
186-
count += cache.count();
207+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
208+
// Count for all the tiers irrespective of whether they are enabled or not. As eventually
209+
// this will turn to zero once cache is cleared up either via invalidation or manually.
210+
count += cacheEntry.getKey().count();
187211
}
188212
return count;
189213
}
190214

191215
@Override
192216
public void refresh() {
193217
try (ReleasableLock ignore = writeLock.acquire()) {
194-
for (ICache<K, V> cache : cacheList) {
195-
cache.refresh();
218+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
219+
cacheEntry.getKey().refresh();
196220
}
197221
}
198222
}
199223

200224
@Override
201225
public void close() throws IOException {
202-
for (ICache<K, V> cache : cacheList) {
203-
cache.close();
226+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
227+
// Close all the caches here irrespective of whether they are enabled or not.
228+
cacheEntry.getKey().close();
204229
}
205230
}
206231

@@ -212,13 +237,12 @@ public ImmutableCacheStatsHolder stats() {
212237
private Function<ICacheKey<K>, V> getValueFromTieredCache() {
213238
return key -> {
214239
try (ReleasableLock ignore = readLock.acquire()) {
215-
for (ICache<K, V> cache : cacheList) {
216-
V value = cache.get(key);
217-
if (value != null) {
218-
// update hit stats
219-
return value;
220-
} else {
221-
// update miss stats
240+
for (Map.Entry<ICache<K, V>, Boolean> cacheEntry : caches.entrySet()) {
241+
if (cacheEntry.getValue()) {
242+
V value = cacheEntry.getKey().get(key);
243+
if (value != null) {
244+
return value;
245+
}
222246
}
223247
}
224248
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCachePlugin.java

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,13 +11,16 @@
1111
import org.opensearch.common.cache.CacheType;
1212
import org.opensearch.common.cache.ICache;
1313
import org.opensearch.common.settings.Setting;
14+
import org.opensearch.common.settings.Settings;
15+
import org.opensearch.common.util.FeatureFlags;
1416
import org.opensearch.plugins.CachePlugin;
1517
import org.opensearch.plugins.Plugin;
1618

1719
import java.util.ArrayList;
1820
import java.util.List;
1921
import java.util.Map;
2022

23+
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.DISK_CACHE_ENABLED_SETTING_MAP;
2124
import static org.opensearch.cache.common.tier.TieredSpilloverCacheSettings.TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
2225

2326
/**
@@ -30,10 +33,15 @@ public class TieredSpilloverCachePlugin extends Plugin implements CachePlugin {
3033
*/
3134
public static final String TIERED_CACHE_SPILLOVER_PLUGIN_NAME = "tieredSpilloverCachePlugin";
3235

36+
private final Settings settings;
37+
3338
/**
3439
* Default constructor
40+
* @param settings settings
3541
*/
36-
public TieredSpilloverCachePlugin() {}
42+
public TieredSpilloverCachePlugin(Settings settings) {
43+
this.settings = settings;
44+
}
3745

3846
@Override
3947
public Map<String, ICache.Factory> getCacheFactoryMap() {
@@ -54,6 +62,9 @@ public List<Setting<?>> getSettings() {
5462
TieredSpilloverCacheSettings.TIERED_SPILLOVER_DISK_STORE_NAME.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
5563
);
5664
settingList.add(TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP.get(cacheType));
65+
if (FeatureFlags.PLUGGABLE_CACHE_SETTING.get(settings)) {
66+
settingList.add(DISK_CACHE_ENABLED_SETTING_MAP.get(cacheType));
67+
}
5768
}
5869
return settingList;
5970
}

modules/cache-common/src/main/java/org/opensearch/cache/common/tier/TieredSpilloverCacheSettings.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,14 @@ public class TieredSpilloverCacheSettings {
4242
(key) -> Setting.simpleString(key, "", NodeScope)
4343
);
4444

45+
/**
46+
* Setting to disable/enable disk cache dynamically.
47+
*/
48+
public static final Setting.AffixSetting<Boolean> TIERED_SPILLOVER_DISK_CACHE_SETTING = Setting.suffixKeySetting(
49+
TieredSpilloverCache.TieredSpilloverCacheFactory.TIERED_SPILLOVER_CACHE_NAME + ".disk.store.enabled",
50+
(key) -> Setting.boolSetting(key, true, NodeScope, Setting.Property.Dynamic)
51+
);
52+
4553
/**
4654
* Setting defining the minimum took time for a query to be allowed into the disk cache.
4755
*/
@@ -63,17 +71,29 @@ public class TieredSpilloverCacheSettings {
6371
public static final Map<CacheType, Setting<TimeValue>> TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP;
6472

6573
/**
66-
* Fetches concrete took time policy settings.
74+
* Stores disk cache enabled settings for various cache types as these are dynamic so that can be registered and
75+
* retrieved accordingly.
76+
*/
77+
public static final Map<CacheType, Setting<Boolean>> DISK_CACHE_ENABLED_SETTING_MAP;
78+
79+
/**
80+
* Fetches concrete took time policy and disk cache settings.
6781
*/
6882
static {
6983
Map<CacheType, Setting<TimeValue>> concreteTookTimePolicySettingMap = new HashMap<>();
84+
Map<CacheType, Setting<Boolean>> diskCacheSettingMap = new HashMap<>();
7085
for (CacheType cacheType : CacheType.values()) {
7186
concreteTookTimePolicySettingMap.put(
7287
cacheType,
7388
TIERED_SPILLOVER_DISK_TOOK_TIME_THRESHOLD.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
7489
);
90+
diskCacheSettingMap.put(
91+
cacheType,
92+
TIERED_SPILLOVER_DISK_CACHE_SETTING.getConcreteSettingForNamespace(cacheType.getSettingPrefix())
93+
);
7594
}
7695
TOOK_TIME_POLICY_CONCRETE_SETTINGS_MAP = concreteTookTimePolicySettingMap;
96+
DISK_CACHE_ENABLED_SETTING_MAP = diskCacheSettingMap;
7797
}
7898

7999
/**

0 commit comments

Comments
 (0)