Skip to content

Commit f531d4d

Browse files
kiranprakash154Peter Alfonsi
authored and
Peter Alfonsi
committed
[Tiered Caching] Bug fix for IndicesRequestCache StaleKey management (opensearch-project#13070)
* Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * code comments only Signed-off-by: Kiran Prakash <awskiran@amazon.com> * docs changes Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update CHANGELOG.md Signed-off-by: Kiran Prakash <awskiran@amazon.com> * revert catching AlreadyClosedException Signed-off-by: Kiran Prakash <awskiran@amazon.com> * assert Signed-off-by: Kiran Prakash <awskiran@amazon.com> * conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCacheTests.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * Update IndicesRequestCache.java Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address conflicts Signed-off-by: Kiran Prakash <awskiran@amazon.com> * spotless apply Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * update code comments Signed-off-by: Kiran Prakash <awskiran@amazon.com> * address bug & add tests Signed-off-by: Kiran Prakash <awskiran@amazon.com> --------- Signed-off-by: Kiran Prakash <awskiran@amazon.com>
1 parent cd7a96e commit f531d4d

File tree

3 files changed

+490
-514
lines changed

3 files changed

+490
-514
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
6868
- Fix from and size parameter can be negative when searching ([#13047](https://github.com/opensearch-project/OpenSearch/pull/13047))
6969
- Enabled mockTelemetryPlugin for IT and fixed OOM issues ([#13054](https://github.com/opensearch-project/OpenSearch/pull/13054))
7070
- Fix implement mark() and markSupported() in class FilterStreamInput ([#13098](https://github.com/opensearch-project/OpenSearch/pull/13098))
71+
- Fix IndicesRequestCache Stale calculation ([#13070](https://github.com/opensearch-project/OpenSearch/pull/13070)]
7172
- Fix snapshot _status API to return correct status for partial snapshots ([#12812](https://github.com/opensearch-project/OpenSearch/pull/12812))
7273
- Improve the error messages for _stats with closed indices ([#13012](https://github.com/opensearch-project/OpenSearch/pull/13012))
7374
- Ignore BaseRestHandler unconsumed content check as it's always consumed. ([#13290](https://github.com/opensearch-project/OpenSearch/pull/13290))

server/src/main/java/org/opensearch/indices/IndicesRequestCache.java

Lines changed: 72 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@
4747
import org.opensearch.common.cache.LoadAwareCacheLoader;
4848
import org.opensearch.common.cache.RemovalListener;
4949
import org.opensearch.common.cache.RemovalNotification;
50+
import org.opensearch.common.cache.RemovalReason;
5051
import org.opensearch.common.cache.policy.CachedQueryResult;
5152
import org.opensearch.common.cache.serializer.BytesReferenceSerializer;
5253
import org.opensearch.common.cache.service.CacheService;
@@ -216,12 +217,11 @@ void clear(CacheEntity entity) {
216217
public void onRemoval(RemovalNotification<ICacheKey<Key>, BytesReference> notification) {
217218
// In case this event happens for an old shard, we can safely ignore this as we don't keep track for old
218219
// shards as part of request cache.
219-
220+
// Pass a new removal notification containing Key rather than ICacheKey<Key> to the CacheEntity for backwards compatibility.
220221
Key key = notification.getKey().key;
221222
cacheEntityLookup.apply(key.shardId).ifPresent(entity -> entity.onRemoval(notification));
222-
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheEviction(
223-
new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId)
224-
);
223+
CleanupKey cleanupKey = new CleanupKey(cacheEntityLookup.apply(key.shardId).orElse(null), key.readerCacheKeyId);
224+
cacheCleanupManager.updateStaleCountOnEntryRemoval(cleanupKey, notification);
225225
}
226226

227227
private ICacheKey<Key> getICacheKey(Key key) {
@@ -265,10 +265,11 @@ BytesReference getOrCompute(
265265
OpenSearchDirectoryReader.addReaderCloseListener(reader, cleanupKey);
266266
}
267267
}
268-
cacheCleanupManager.updateCleanupKeyToCountMapOnCacheInsertion(cleanupKey);
268+
cacheCleanupManager.updateStaleCountOnCacheInsert(cleanupKey);
269269
} else {
270270
cacheEntity.onHit();
271271
}
272+
272273
return value;
273274
}
274275

@@ -501,7 +502,7 @@ void enqueueCleanupKey(CleanupKey cleanupKey) {
501502
*
502503
* @param cleanupKey the CleanupKey to be updated in the map
503504
*/
504-
private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
505+
private void updateStaleCountOnCacheInsert(CleanupKey cleanupKey) {
505506
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
506507
return;
507508
}
@@ -517,8 +518,32 @@ private void updateCleanupKeyToCountMapOnCacheInsertion(CleanupKey cleanupKey) {
517518
cleanupKeyToCountMap.computeIfAbsent(shardId, k -> new HashMap<>()).merge(cleanupKey.readerCacheKeyId, 1, Integer::sum);
518519
}
519520

520-
private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
521-
if (stalenessThreshold == 0.0 || cleanupKey.entity == null) {
521+
/**
522+
* Handles the eviction of a cache entry.
523+
*
524+
* <p>This method is called when an entry is evicted from the cache.
525+
* We consider all removal notifications except with the reason Replaced
526+
* {@link #incrementStaleKeysCount} would have removed the entries from the map and increment the {@link #staleKeysCount}
527+
* Hence we decrement {@link #staleKeysCount} if we do not find the shardId or readerCacheKeyId in the map.
528+
* Skip decrementing staleKeysCount if we find the shardId or readerCacheKeyId in the map since it would have not been accounted for in the staleKeysCount in
529+
*
530+
* @param cleanupKey the CleanupKey that has been evicted from the cache
531+
* @param notification RemovalNotification of the cache entry evicted
532+
*/
533+
private void updateStaleCountOnEntryRemoval(
534+
CleanupKey cleanupKey,
535+
RemovalNotification<ICacheKey<Key>, BytesReference> notification
536+
) {
537+
if (notification.getRemovalReason() == RemovalReason.REPLACED) {
538+
// The reason of the notification is REPLACED when a cache entry's value is updated, since replacing an entry
539+
// does not affect the staleness count, we skip such notifications.
540+
return;
541+
}
542+
if (cleanupKey.entity == null) {
543+
// entity will only be null when the shard is closed/deleted
544+
// we would have accounted this in staleKeysCount when the closing/deletion of shard would have closed the associated
545+
// readers
546+
staleKeysCount.decrementAndGet();
522547
return;
523548
}
524549
IndexShard indexShard = (IndexShard) cleanupKey.entity.getCacheIdentity();
@@ -528,23 +553,41 @@ private void updateCleanupKeyToCountMapOnCacheEviction(CleanupKey cleanupKey) {
528553
}
529554
ShardId shardId = indexShard.shardId();
530555

531-
cleanupKeyToCountMap.computeIfPresent(shardId, (shard, keyCountMap) -> {
532-
keyCountMap.computeIfPresent(cleanupKey.readerCacheKeyId, (key, currentValue) -> {
533-
// decrement the stale key count
556+
cleanupKeyToCountMap.compute(shardId, (key, readerCacheKeyMap) -> {
557+
if (readerCacheKeyMap == null || !readerCacheKeyMap.containsKey(cleanupKey.readerCacheKeyId)) {
558+
// If ShardId is not present or readerCacheKeyId is not present
559+
// it should have already been accounted for and hence been removed from this map
560+
// so decrement staleKeysCount
534561
staleKeysCount.decrementAndGet();
535-
int newValue = currentValue - 1;
536-
// Remove the key if the new value is zero by returning null; otherwise, update with the new value.
537-
return newValue == 0 ? null : newValue;
538-
});
539-
return keyCountMap;
562+
// Return the current map
563+
return readerCacheKeyMap;
564+
} else {
565+
// If it is in the map, it is not stale yet.
566+
// Proceed to adjust the count for the readerCacheKeyId in the map
567+
// but do not decrement the staleKeysCount
568+
Integer count = readerCacheKeyMap.get(cleanupKey.readerCacheKeyId);
569+
// this should never be null
570+
assert (count != null && count >= 0);
571+
// Reduce the count by 1
572+
int newCount = count - 1;
573+
if (newCount > 0) {
574+
// Update the map with the new count
575+
readerCacheKeyMap.put(cleanupKey.readerCacheKeyId, newCount);
576+
} else {
577+
// Remove the readerCacheKeyId entry if new count is zero
578+
readerCacheKeyMap.remove(cleanupKey.readerCacheKeyId);
579+
}
580+
// If after modification, the readerCacheKeyMap is empty, we return null to remove the ShardId entry
581+
return readerCacheKeyMap.isEmpty() ? null : readerCacheKeyMap;
582+
}
540583
});
541584
}
542585

543586
/**
544587
* Updates the count of stale keys in the cache.
545588
* This method is called when a CleanupKey is added to the keysToClean set.
546589
*
547-
* It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
590+
* <p>It increments the staleKeysCount by the count of the CleanupKey in the cleanupKeyToCountMap.
548591
* If the CleanupKey's readerCacheKeyId is null or the CleanupKey's entity is not open, it increments the staleKeysCount
549592
* by the total count of keys associated with the CleanupKey's ShardId in the cleanupKeyToCountMap and removes the ShardId from the map.
550593
*
@@ -562,7 +605,7 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
562605
ShardId shardId = indexShard.shardId();
563606

564607
// Using computeIfPresent to atomically operate on the countMap for a given shardId
565-
cleanupKeyToCountMap.computeIfPresent(shardId, (key, countMap) -> {
608+
cleanupKeyToCountMap.computeIfPresent(shardId, (currentShardId, countMap) -> {
566609
if (cleanupKey.readerCacheKeyId == null) {
567610
// Aggregate and add to staleKeysCount atomically if readerCacheKeyId is null
568611
int totalSum = countMap.values().stream().mapToInt(Integer::intValue).sum();
@@ -571,18 +614,19 @@ private void incrementStaleKeysCount(CleanupKey cleanupKey) {
571614
return null;
572615
} else {
573616
// Update staleKeysCount based on specific readerCacheKeyId, then remove it from the countMap
574-
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (k, v) -> {
575-
staleKeysCount.addAndGet(v);
617+
countMap.computeIfPresent(cleanupKey.readerCacheKeyId, (readerCacheKey, count) -> {
618+
staleKeysCount.addAndGet(count);
576619
// Return null to remove the key after updating staleKeysCount
577620
return null;
578621
});
579-
580622
// Check if countMap is empty after removal to decide if we need to remove the shardId entry
581623
if (countMap.isEmpty()) {
582-
return null; // Returning null removes the entry for shardId
624+
// Returning null removes the entry for shardId
625+
return null;
583626
}
584627
}
585-
return countMap; // Return the modified countMap to keep the mapping
628+
// Return the modified countMap to retain updates
629+
return countMap;
586630
});
587631
}
588632

@@ -708,6 +752,11 @@ public void close() {
708752
this.cacheCleaner.close();
709753
}
710754

755+
// for testing
756+
ConcurrentMap<ShardId, HashMap<String, Integer>> getCleanupKeyToCountMap() {
757+
return cleanupKeyToCountMap;
758+
}
759+
711760
private final class IndicesRequestCacheCleaner implements Runnable, Releasable {
712761

713762
private final IndicesRequestCacheCleanupManager cacheCleanupManager;

0 commit comments

Comments
 (0)