Skip to content

Commit 9e27e46

Browse files
[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (opensearch-project#13070)
* Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * code comments only Signed-off-by: Kiran Prakash <awskiran@amazon.com> * docs changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert catching AlreadyClosedException Signed-off-by: Kiran Prakash <awskiran@amazon.com> * assert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless apply Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * update code comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address bug & add tests Signed-off-by: Kiran Prakash <awskiran@amazon.com> --------- Signed-off-by: Kiran Prakash <awskiran@amazon.com> (cherry picked from commit db361ec)
1 parent 1d360ce commit 9e27e46

File tree

3 files changed

+675
-397
lines changed

3 files changed

+675
-397
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6262
- Fix issue with feature flags where default value may not be honored ([#12849](https://github.com/opensearch-project/OpenSearch/pull/12849))
6363
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
6464
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
65+
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
6566
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
6667
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
6768
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

Lines changed: 79 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,7 @@
4646
import org.opensearch.common.cache.LoadAwareCacheLoader;
4747
import org.opensearch.common.cache.RemovalListener;
4848
import org.opensearch.common.cache.RemovalNotification;
49+
import org.opensearch.common.cache.RemovalReason;
4950
import org.opensearch.common.cache.policy.CachedQueryResult;
5051
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
5152
import org.opensearch.common.cache.service.CacheService;
@@ -208,11 +209,24 @@ void clear(CacheEntity entity) {
208209
public void onRemoval(RemovalNotification<Key, BytesReference> notification) {
209210
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
210211
// shards as part of request cache.
212+
<<<<<<< HEAD
211213
Key key = notification.getKey();
212214
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
213215
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
214216
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
215217
);
218+
=======
219+
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
220+
Key key = notification.getKey().key;
221+
RemovalNotification<Key, BytesReference> newNotification = new RemovalNotification<>(
222+
key,
223+
notification.getValue(),
224+
notification.getRemovalReason()
225+
);
226+
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(newNotification));
227+
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
228+
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, newNotification);
229+
>>>>>>> db361ecead1 ([Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (#13070))
216230
}
217231

218232
BytesReference getOrCompute(
@@ -241,10 +255,11 @@ BytesReference getOrCompute(
241255
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
242256
}
243257
}
244-
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
258+
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
245259
} else {
246260
cacheEntity.onHit();
247261
}
262+
248263
return value;
249264
}
250265

@@ -477,7 +492,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
477492
*
478493
* @param cleanupKey the CleanupKey to be updated in the map
479494
*/
480-
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
495+
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
481496
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
482497
return;
483498
}
@@ -493,8 +508,29 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
493508
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
494509
}
495510

496-
private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
497-
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
511+
/**
512+
* Handles the eviction of a cache entry.
513+
*
514+
* <p>This method is called when an entry is evicted from the cache.
515+
* We consider all removal notifications except with the reason Replaced
516+
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
517+
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
518+
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
519+
*
520+
* @param cleanupKey the CleanupKey that has been evicted from the cache
521+
* @param notification RemovalNotification of the cache entry evicted
522+
*/
523+
private void updateStaleCountOnEntryRemoval(CleanupKey cleanupKey, RemovalNotification<Key, BytesReference> notification) {
524+
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
525+
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
526+
// does not affect the staleness count, we skip such notifications.
527+
return;
528+
}
529+
if (cleanupKey.entity == null) {
530+
// entity will only be null when the shard is closed/deleted
531+
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
532+
// readers
533+
staleKeysCount.decrementAndGet();
498534
return;
499535
}
500536
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
@@ -504,23 +540,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
504540
}
505541
ShardId shardId = indexShard.shardId();
506542

507-
cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
508-
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
509-
// decrement the stale key count
543+
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
544+
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
545+
// If ShardId is not present or readerCacheKeyId is not present
546+
// it should have already been accounted for and hence been removed from this map
547+
// so decrement staleKeysCount
510548
staleKeysCount.decrementAndGet();
511-
int newValue = currentValue - 1;
512-
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
513-
return newValue == 0 ? null : newValue;
514-
});
515-
return keyCountMap;
549+
// Return the current map
550+
return readerCacheKeyMap;
551+
} else {
552+
// If it is in the map, it is not stale yet.
553+
// Proceed to adjust the count for the readerCacheKeyId in the map
554+
// but do not decrement the staleKeysCount
555+
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
556+
// this should never be null
557+
assert (count != null && count >= 0);
558+
// Reduce the count by 1
559+
int newCount = count - 1;
560+
if (newCount > 0) {
561+
// Update the map with the new count
562+
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
563+
} else {
564+
// Remove the readerCacheKeyId entry if new count is zero
565+
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
566+
}
567+
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
568+
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
569+
}
516570
});
517571
}
518572

519573
/**
520574
* Updates the count of stale keys in the cache.
521575
* This method is called when a CleanupKey is added to the keysToClean set.
522576
*
523-
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
577+
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
524578
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
525579
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
526580
*
@@ -538,7 +592,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
538592
ShardId shardId = indexShard.shardId();
539593

540594
// Using computeIfPresent to atomically operate on the countMap for a given shardId
541-
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
595+
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
542596
if (cleanupKey.readerCacheKeyId == null) {
543597
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
544598
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
@@ -547,18 +601,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
547601
return null;
548602
} else {
549603
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
550-
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
551-
staleKeysCount.addAndGet(v);
604+
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
605+
staleKeysCount.addAndGet(count);
552606
// Return null to remove the key after updating staleKeysCount
553607
return null;
554608
});
555-
556609
// Check if countMap is empty after removal to decide if we need to remove the shardId entry
557610
if (countMap.isEmpty()) {
558-
return null; // Returning null removes the entry for shardId
611+
// Returning null removes the entry for shardId
612+
return null;
559613
}
560614
}
561-
return countMap; // Return the modified countMap to keep the mapping
615+
// Return the modified countMap to retain updates
616+
return countMap;
562617
});
563618
}
564619

@@ -673,6 +728,11 @@ public void close() {
673728
this.cacheCleaner.close();
674729
}
675730

731+
// for testing
732+
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
733+
return cleanupKeyToCountMap;
734+
}
735+
676736
private final class IndicesRequestCacheCleaner implements Runnable, Releasable {
677737

678738
private final IndicesRequestCacheCleanupManager cacheCleanupManager;

0 commit comments

Comments
 (0)