Skip to content

Commit db361ec

Browse files
[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070)
* Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * code comments only Signed-off-by: Kiran Prakash <awskiran@amazon.com> * docs changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert catching AlreadyClosedException Signed-off-by: Kiran Prakash <awskiran@amazon.com> * assert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless apply Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * update code comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address bug & add tests Signed-off-by: Kiran Prakash <awskiran@amazon.com> --------- Signed-off-by: Kiran Prakash <awskiran@amazon.com>
1 parent 76e4c86 commit db361ec

File tree

3 files changed

+486
-515
lines changed

3 files changed

+486
-515
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,6 +67,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6767
- Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047))
6868
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
6969
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
70+
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
7071
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
7172
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
7273
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

Lines changed: 68 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.common.cache.LoadAwareCacheLoader;
4848
import org.opensearch.common.cache.RemovalListener;
4949
import org.opensearch.common.cache.RemovalNotification;
50+
import org.opensearch.common.cache.RemovalReason;
5051
import org.opensearch.common.cache.policy.CachedQueryResult;
5152
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
5253
import org.opensearch.common.cache.service.CacheService;
@@ -216,19 +217,16 @@ void clear(CacheEntity entity) {
216217
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
217218
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
218219
// shards as part of request cache.
219-
220220
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
221221
Key key = notification.getKey().key;
222222
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
223223
key,
224224
notification.getValue(),
225225
notification.getRemovalReason()
226226
);
227-
228227
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
229-
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
230-
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
231-
);
228+
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
229+
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
232230
}
233231

234232
private ICacheKey<Key> getICacheKey(Key key) {
@@ -272,10 +270,11 @@ BytesReference getOrCompute(
272270
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
273271
}
274272
}
275-
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
273+
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
276274
} else {
277275
cacheEntity.onHit();
278276
}
277+
279278
return value;
280279
}
281280

@@ -508,7 +507,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
508507
*
509508
* @param cleanupKey the CleanupKey to be updated in the map
510509
*/
511-
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
510+
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
512511
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
513512
return;
514513
}
@@ -524,8 +523,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
524523
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
525524
}
526525

527-
private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
528-
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
526+
/**
527+
* Handles the eviction of a cache entry.
528+
*
529+
* <p>This method is called when an entry is evicted from the cache.
530+
* We consider all removal notifications except with the reason Replaced
531+
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
532+
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
533+
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
534+
*
535+
* @param cleanupKey the CleanupKey that has been evicted from the cache
536+
* @param notification RemovalNotification of the cache entry evicted
537+
*/
538+
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
539+
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
540+
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
541+
// does not affect the staleness count, we skip such notifications.
542+
return;
543+
}
544+
if (cleanupKey.entity == null) {
545+
// entity will only be null when the shard is closed/deleted
546+
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
547+
// readers
548+
staleKeysCount.decrementAndGet();
529549
return;
530550
}
531551
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
@@ -535,23 +555,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
535555
}
536556
ShardId shardId = indexShard.shardId();
537557

538-
cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
539-
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
540-
// decrement the stale key count
558+
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
559+
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
560+
// If ShardId is not present or readerCacheKeyId is not present
561+
// it should have already been accounted for and hence been removed from this map
562+
// so decrement staleKeysCount
541563
staleKeysCount.decrementAndGet();
542-
int newValue = currentValue - 1;
543-
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
544-
return newValue == 0 ? null : newValue;
545-
});
546-
return keyCountMap;
564+
// Return the current map
565+
return readerCacheKeyMap;
566+
} else {
567+
// If it is in the map, it is not stale yet.
568+
// Proceed to adjust the count for the readerCacheKeyId in the map
569+
// but do not decrement the staleKeysCount
570+
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
571+
// this should never be null
572+
assert (count != null && count >= 0);
573+
// Reduce the count by 1
574+
int newCount = count - 1;
575+
if (newCount > 0) {
576+
// Update the map with the new count
577+
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
578+
} else {
579+
// Remove the readerCacheKeyId entry if new count is zero
580+
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
581+
}
582+
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
583+
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
584+
}
547585
});
548586
}
549587

550588
/**
551589
* Updates the count of stale keys in the cache.
552590
* This method is called when a CleanupKey is added to the keysToClean set.
553591
*
554-
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
592+
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
555593
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
556594
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
557595
*
@@ -569,7 +607,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
569607
ShardId shardId = indexShard.shardId();
570608

571609
// Using computeIfPresent to atomically operate on the countMap for a given shardId
572-
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
610+
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
573611
if (cleanupKey.readerCacheKeyId == null) {
574612
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
575613
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
@@ -578,18 +616,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
578616
return null;
579617
} else {
580618
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
581-
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
582-
staleKeysCount.addAndGet(v);
619+
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
620+
staleKeysCount.addAndGet(count);
583621
// Return null to remove the key after updating staleKeysCount
584622
return null;
585623
});
586-
587624
// Check if countMap is empty after removal to decide if we need to remove the shardId entry
588625
if (countMap.isEmpty()) {
589-
return null; // Returning null removes the entry for shardId
626+
// Returning null removes the entry for shardId
627+
return null;
590628
}
591629
}
592-
return countMap; // Return the modified countMap to keep the mapping
630+
// Return the modified countMap to retain updates
631+
return countMap;
593632
});
594633
}
595634

@@ -715,6 +754,11 @@ public void close() {
715754
this.cacheCleaner.close();
716755
}
717756

757+
// for testing
758+
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
759+
return cleanupKeyToCountMap;
760+
}
761+
718762
private final class IndicesRequestCacheCleaner implements Runnable, Releasable {
719763

720764
private final IndicesRequestCacheCleanupManager cacheCleanupManager;

0 commit comments

Comments
 (0)