From c760594789139a686cefc3e51269c1e40ff4fef6 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 18 Oct 2023 14:37:07 -0700 Subject: [PATCH 1/9] added test entries metric for on heap --- .../index/cache/request/RequestCacheStats.java | 13 ++++++++++++- .../index/cache/request/ShardRequestCache.java | 11 +++++++++-- 2 files changed, 21 insertions(+), 3 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 24f68899c2ac7..5aa9f57b64032 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -52,6 +52,7 @@ public class RequestCacheStats implements Writeable, ToXContentFragment { private long evictions; private long hitCount; private long missCount; + private long entries; // number of entries in the cache public RequestCacheStats() {} @@ -60,13 +61,15 @@ public RequestCacheStats(StreamInput in) throws IOException { evictions = in.readVLong(); hitCount = in.readVLong(); missCount = in.readVLong(); + entries = in.readVLong(); } - public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount) { + public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount, long entries) { // this.memorySize = memorySize; this.evictions = evictions; this.hitCount = hitCount; this.missCount = missCount; + this.entries = entries; } public void add(RequestCacheStats stats) { @@ -74,6 +77,7 @@ public void add(RequestCacheStats stats) { this.evictions += stats.evictions; this.hitCount += stats.hitCount; this.missCount += stats.missCount; + this.entries += stats.entries; } public long getMemorySizeInBytes() { @@ -96,12 +100,17 @@ public long getMissCount() { return this.missCount; } + public long getEntries() { + return this.entries; + } + @Override public void writeTo(StreamOutput out) throws IOException { out.writeVLong(memorySize); out.writeVLong(evictions); out.writeVLong(hitCount); out.writeVLong(missCount); + out.writeVLong(entries); } @Override @@ -111,6 +120,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(Fields.EVICTIONS, getEvictions()); builder.field(Fields.HIT_COUNT, getHitCount()); builder.field(Fields.MISS_COUNT, getMissCount()); + builder.field(Fields.ENTRIES, getEntries()); builder.endObject(); return builder; } @@ -127,5 +137,6 @@ static final class Fields { static final String EVICTIONS = "evictions"; static final String HIT_COUNT = "hit_count"; static final String MISS_COUNT = "miss_count"; + static final String ENTRIES = "entries"; } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index 3194aee757fc4..d43418d63a6a6 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -65,7 +65,8 @@ public RequestCacheStats stats(TierType tierType) { statsHolder.get(tierType).totalMetric.count(), statsHolder.get(tierType).evictionsMetric.count(), statsHolder.get(tierType).hitCount.count(), - statsHolder.get(tierType).missCount.count() + statsHolder.get(tierType).missCount.count(), + statsHolder.get(tierType).entries.count() ); } @@ -74,17 +75,20 @@ public RequestCacheStats overallStats() { long totalEvictions = 0; long totalHits = 0; long totalMisses = 0; + long totalEntries = 0; for (TierType tierType : TierType.values()) { totalSize += statsHolder.get(tierType).totalMetric.count(); totalEvictions += statsHolder.get(tierType).evictionsMetric.count(); totalHits += statsHolder.get(tierType).hitCount.count(); totalMisses += statsHolder.get(tierType).missCount.count(); + totalEntries += statsHolder.get(tierType).entries.count(); } return new RequestCacheStats( totalSize, totalEvictions, totalHits, - totalMisses + totalMisses, + totalEntries ); } @@ -98,6 +102,7 @@ public void onMiss(TierType tierType) { public void onCached(Accountable key, BytesReference value, TierType tierType) { statsHolder.get(tierType).totalMetric.inc(key.ramBytesUsed() + value.ramBytesUsed()); + statsHolder.get(tierType).entries.inc(); } public void onRemoval(Accountable key, BytesReference value, boolean evicted, TierType tierType) { @@ -112,6 +117,7 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted, Ti dec += value.ramBytesUsed(); } statsHolder.get(tierType).totalMetric.dec(dec); + statsHolder.get(tierType).entries.dec(); } static class StatsHolder implements Serializable { @@ -120,5 +126,6 @@ static class StatsHolder implements Serializable { final CounterMetric totalMetric = new CounterMetric(); final CounterMetric hitCount = new CounterMetric(); final CounterMetric missCount = new CounterMetric(); + final CounterMetric entries = new CounterMetric(); } } From 4680ea7eac5275dec812181517e281266f77109a Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Wed, 18 Oct 2023 16:07:03 -0700 Subject: [PATCH 2/9] Added check for new entries field in IT test, added permissions allowing the domain to run --- .../opensearch/indices/IndicesRequestCacheIT.java | 14 ++++++++++++++ .../opensearch/indices/EhcacheDiskCachingTier.java | 2 -- .../org/opensearch/bootstrap/security.policy | 2 ++ 3 files changed, 16 insertions(+), 2 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index a1815d9be2daf..36e327ce11b6a 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -663,6 +663,7 @@ public void testCacheWithInvalidation() throws Exception { assertSearchResponse(resp); // Should expect hit as here as refresh didn't happen assertCacheState(client, "index", 1, 1); + assertNumCacheEntries(client, "index", 1); // Explicit refresh would invalidate cache refresh(); @@ -671,6 +672,8 @@ public void testCacheWithInvalidation() throws Exception { assertSearchResponse(resp); // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) assertCacheState(client, "index", 1, 2); + assertNumCacheEntries(client, "index", 1); // Shouldn't it just be the most recent query, since the first one was invalidated? (prob invalidation isnt in yet) + // yeah - evictions = 0, its not in yet } private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { @@ -690,4 +693,15 @@ private static void assertCacheState(Client client, String index, long expectedH } + private static void assertNumCacheEntries(Client client, String index, long expectedEntries) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + assertEquals(expectedEntries, requestCacheStats.getEntries()); + } + } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 01fe6d491a58a..2967600ff612a 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -94,8 +94,6 @@ public EhcacheDiskCachingTier( } public void getManager() { - // based on https://stackoverflow.com/questions/53756412/ehcache-org-ehcache-statetransitionexception-persistence-directory-already-lo - // resolving double-initialization issue when using OpenSearchSingleNodeTestCase PersistentCacheManager oldCacheManager = cacheManagers.get(nodeId); if (oldCacheManager != null) { try { diff --git a/server/src/main/resources/org/opensearch/bootstrap/security.policy b/server/src/main/resources/org/opensearch/bootstrap/security.policy index 1fbfbb323e3af..cc7d83e07c6a1 100644 --- a/server/src/main/resources/org/opensearch/bootstrap/security.policy +++ b/server/src/main/resources/org/opensearch/bootstrap/security.policy @@ -192,6 +192,8 @@ grant { permission java.lang.RuntimePermission "createClassLoader"; permission java.lang.RuntimePermission "accessClassInPackage.sun.misc"; permission java.lang.RuntimePermission "getenv.*"; + permission java.lang.RuntimePermission "accessDeclaredMembers"; + permission java.lang.reflect.ReflectPermission "suppressAccessChecks"; permission java.io.FilePermission "disk_cache_tier", "read"; // change this to wherever we will put disk tier folder permission java.io.FilePermission "disk_cache_tier", "write"; permission java.io.FilePermission "disk_cache_tier", "delete"; From c300d8bf8d0ad79fc9674bdfdcbedd7d691d41df Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 20 Oct 2023 15:51:38 -0700 Subject: [PATCH 3/9] Initial implementation for tiered node request cache stats --- .../cache/request/RequestCacheStats.java | 120 ++++++++++++------ .../cache/request/ShardRequestCache.java | 46 +------ .../index/cache/request/StatsHolder.java | 99 +++++++++++++++ .../java/org/opensearch/indices/TierType.java | 15 ++- .../indices/IndicesRequestCacheTests.java | 8 +- 5 files changed, 200 insertions(+), 88 deletions(-) create mode 100644 server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 5aa9f57b64032..5480dd5b01240 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -38,8 +38,11 @@ import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.core.xcontent.ToXContentFragment; import org.opensearch.core.xcontent.XContentBuilder; +import org.opensearch.indices.TierType; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; /** * Request for the query cache statistics @@ -48,79 +51,117 @@ */ public class RequestCacheStats implements Writeable, ToXContentFragment { - private long memorySize; - private long evictions; - private long hitCount; - private long missCount; - private long entries; // number of entries in the cache - - public RequestCacheStats() {} + private Map map; + public RequestCacheStats() { + this.map = new HashMap<>(); + for (TierType tierType : TierType.values()) { + map.put(tierType.getStringValue(), new StatsHolder()); + // Every possible tier type must have counters, even if they are disabled. Then the counters report 0 + } + } public RequestCacheStats(StreamInput in) throws IOException { - memorySize = in.readVLong(); - evictions = in.readVLong(); - hitCount = in.readVLong(); - missCount = in.readVLong(); - entries = in.readVLong(); + // Any RequestCacheStats written to a stream should already have a counter for each possible tier type + this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need to be registered? + } + + public RequestCacheStats(TierType tierType, StatsHolder statsHolder) { + // Create a RequestCacheStats object with only one tier's statistics populated + this(); + map.put(tierType.getStringValue(), statsHolder); } - public RequestCacheStats(long memorySize, long evictions, long hitCount, long missCount, long entries) { // - this.memorySize = memorySize; - this.evictions = evictions; - this.hitCount = hitCount; - this.missCount = missCount; - this.entries = entries; + public RequestCacheStats(Map inputMap) { + // Create a RequestCacheStats with multiple tiers' statistics + this(); + for (TierType tierType : inputMap.keySet()) { + map.put(tierType.getStringValue(), inputMap.get(tierType)); + } } + // can prob eliminate some of these constructors + public void add(RequestCacheStats stats) { - this.memorySize += stats.memorySize; - this.evictions += stats.evictions; - this.hitCount += stats.hitCount; - this.missCount += stats.missCount; - this.entries += stats.entries; + for (String tier : stats.map.keySet()) { + map.get(tier).add(stats.map.get(tier)); + } + } + + private StatsHolder getTierStats(TierType tierType) { + return map.get(tierType.getStringValue()); + } + + // should these take in strings bc thats whats done in the xcontent builder? seems wasteful + public long getMemorySizeInBytes(TierType tierType) { + return getTierStats(tierType).totalMetric.count(); + } + + public ByteSizeValue getMemorySize(TierType tierType) { + return new ByteSizeValue(getMemorySizeInBytes(tierType)); } + public long getEvictions(TierType tierType) { + return getTierStats(tierType).evictionsMetric.count(); + } + + public long getHitCount(TierType tierType) { + return getTierStats(tierType).hitCount.count(); + } + + public long getMissCount(TierType tierType) { + return getTierStats(tierType).missCount.count(); + } + + public long getEntries(TierType tierType) { + return getTierStats(tierType).entries.count(); + } + + // By default, return on-heap stats if no tier is specified + public long getMemorySizeInBytes() { - return this.memorySize; + return getMemorySizeInBytes(TierType.ON_HEAP); } public ByteSizeValue getMemorySize() { - return new ByteSizeValue(memorySize); + return getMemorySize(TierType.ON_HEAP); } public long getEvictions() { - return this.evictions; + return getEvictions(TierType.ON_HEAP); } public long getHitCount() { - return this.hitCount; + return getHitCount(TierType.ON_HEAP); } public long getMissCount() { - return this.missCount; + return getMissCount(TierType.ON_HEAP); } public long getEntries() { - return this.entries; + return getEntries(TierType.ON_HEAP); } @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(memorySize); - out.writeVLong(evictions); - out.writeVLong(hitCount); - out.writeVLong(missCount); - out.writeVLong(entries); + out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ? } @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.REQUEST_CACHE_STATS); - builder.humanReadableField(Fields.MEMORY_SIZE_IN_BYTES, Fields.MEMORY_SIZE, getMemorySize()); - builder.field(Fields.EVICTIONS, getEvictions()); - builder.field(Fields.HIT_COUNT, getHitCount()); - builder.field(Fields.MISS_COUNT, getMissCount()); - builder.field(Fields.ENTRIES, getEntries()); + // write on-heap stats outside of tiers object + getTierStats(TierType.ON_HEAP).toXContent(builder, params); + builder.startObject(Fields.TIERS); + for (TierType tierType : TierType.values()) { // fixed order + if (tierType != TierType.ON_HEAP) { + String tier = tierType.getStringValue(); + builder.startObject(tier); + map.get(tier).toXContent(builder, params); + builder.endObject(); + } + } + builder.endObject(); builder.endObject(); return builder; } @@ -132,6 +173,7 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws */ static final class Fields { static final String REQUEST_CACHE_STATS = "request_cache"; + static final String TIERS = "tiers"; static final String MEMORY_SIZE = "memory_size"; static final String MEMORY_SIZE_IN_BYTES = "memory_size_in_bytes"; static final String EVICTIONS = "evictions"; diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index d43418d63a6a6..f8d77a4c0dbaf 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -33,11 +33,9 @@ package org.opensearch.index.cache.request; import org.apache.lucene.util.Accountable; -import org.opensearch.common.metrics.CounterMetric; import org.opensearch.core.common.bytes.BytesReference; import org.opensearch.indices.TierType; -import java.io.Serializable; import java.util.EnumMap; /** @@ -57,40 +55,11 @@ public ShardRequestCache() { public RequestCacheStats stats() { // TODO: Change RequestCacheStats to support disk tier stats. - return stats(TierType.ON_HEAP); + // Changing this function to return a RequestCacheStats with stats from all tiers. + //return stats(TierType.ON_HEAP); + return new RequestCacheStats(statsHolder); } - public RequestCacheStats stats(TierType tierType) { - return new RequestCacheStats( - statsHolder.get(tierType).totalMetric.count(), - statsHolder.get(tierType).evictionsMetric.count(), - statsHolder.get(tierType).hitCount.count(), - statsHolder.get(tierType).missCount.count(), - statsHolder.get(tierType).entries.count() - ); - } - - public RequestCacheStats overallStats() { - long totalSize = 0; - long totalEvictions = 0; - long totalHits = 0; - long totalMisses = 0; - long totalEntries = 0; - for (TierType tierType : TierType.values()) { - totalSize += statsHolder.get(tierType).totalMetric.count(); - totalEvictions += statsHolder.get(tierType).evictionsMetric.count(); - totalHits += statsHolder.get(tierType).hitCount.count(); - totalMisses += statsHolder.get(tierType).missCount.count(); - totalEntries += statsHolder.get(tierType).entries.count(); - } - return new RequestCacheStats( - totalSize, - totalEvictions, - totalHits, - totalMisses, - totalEntries - ); - } public void onHit(TierType tierType) { statsHolder.get(tierType).hitCount.inc(); @@ -119,13 +88,4 @@ public void onRemoval(Accountable key, BytesReference value, boolean evicted, Ti statsHolder.get(tierType).totalMetric.dec(dec); statsHolder.get(tierType).entries.dec(); } - - static class StatsHolder implements Serializable { - - final CounterMetric evictionsMetric = new CounterMetric(); - final CounterMetric totalMetric = new CounterMetric(); - final CounterMetric hitCount = new CounterMetric(); - final CounterMetric missCount = new CounterMetric(); - final CounterMetric entries = new CounterMetric(); - } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java new file mode 100644 index 0000000000000..86f99871fea26 --- /dev/null +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -0,0 +1,99 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.cache.request; + +import org.opensearch.common.metrics.CounterMetric; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.core.common.io.stream.Writeable; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.core.xcontent.ToXContentFragment; +import org.opensearch.core.xcontent.XContentBuilder; + +import java.io.IOException; +import java.io.Serializable; + +public class StatsHolder implements Serializable, Writeable, ToXContentFragment { + final CounterMetric evictionsMetric; + final CounterMetric totalMetric; + final CounterMetric hitCount; + final CounterMetric missCount; + final CounterMetric entries; + + + public StatsHolder() { + this.evictionsMetric = new CounterMetric(); + this.totalMetric = new CounterMetric(); + this.hitCount = new CounterMetric(); + this.missCount = new CounterMetric(); + this.entries = new CounterMetric(); + } + + public StatsHolder(StreamInput in) throws IOException { + // Read and write the values of the counter metrics. They should always be positive + this.evictionsMetric = new CounterMetric(); + this.evictionsMetric.inc(in.readVLong()); + this.totalMetric = new CounterMetric(); + this.totalMetric.inc(in.readVLong()); + this.hitCount = new CounterMetric(); + this.hitCount.inc(in.readVLong()); + this.missCount = new CounterMetric(); + this.missCount.inc(in.readVLong()); + this.entries = new CounterMetric(); + this.entries.inc(in.readVLong()); + } + + @Override + public void writeTo(StreamOutput out) throws IOException { + out.writeVLong(evictionsMetric.count()); + out.writeVLong(totalMetric.count()); + out.writeVLong(hitCount.count()); + out.writeVLong(missCount.count()); + out.writeVLong(entries.count()); + } + + public void add(StatsHolder otherStats) { + // Add the argument's metrics to this object's metrics. + evictionsMetric.inc(otherStats.evictionsMetric.count()); + totalMetric.inc(otherStats.totalMetric.count()); + hitCount.inc(otherStats.hitCount.count()); + missCount.inc(otherStats.missCount.count()); + entries.inc(otherStats.entries.count()); + } + + public long getEvictions() { + return evictionsMetric.count(); + } + + public long getMemorySize() { + return totalMetric.count(); + } + + public long getHitCount() { + return hitCount.count(); + } + + public long getMissCount() { + return missCount.count(); + } + + public long getEntries() { + return entries.count(); + } + + @Override + public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + builder.humanReadableField(RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, RequestCacheStats.Fields.MEMORY_SIZE, new ByteSizeValue(getMemorySize())); + builder.field(RequestCacheStats.Fields.EVICTIONS, getEvictions()); + builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount()); + builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount()); + builder.field(RequestCacheStats.Fields.ENTRIES, getEntries()); + return builder; + } +} diff --git a/server/src/main/java/org/opensearch/indices/TierType.java b/server/src/main/java/org/opensearch/indices/TierType.java index 9a286fd26151b..900df2fce57c5 100644 --- a/server/src/main/java/org/opensearch/indices/TierType.java +++ b/server/src/main/java/org/opensearch/indices/TierType.java @@ -10,6 +10,17 @@ public enum TierType { - ON_HEAP, - DISK; + ON_HEAP("on_heap"), + DISK("disk"); + + private final String stringValue; + + TierType(String stringValue) { + // Associate each TierType with a string representation, for use in API responses and elsewhere + this.stringValue = stringValue; + } + + public String getStringValue() { + return this.stringValue; + } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 2bd2bd53a787a..6e75a0e8102c2 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -196,8 +196,8 @@ public void testSpillover() throws Exception { assertEquals(maxNumInHeap * heapKeySize, requestCacheStats.stats().getMemorySizeInBytes()); // TODO: disk weight bytes assertEquals(1, requestCacheStats.stats().getEvictions()); - assertEquals(1, requestCacheStats.stats(TierType.DISK).getHitCount()); - assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(1, requestCacheStats.stats().getHitCount(TierType.DISK)); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats().getMissCount(TierType.DISK)); assertEquals(0, requestCacheStats.stats().getHitCount()); assertEquals(maxNumInHeap + 2, requestCacheStats.stats().getMissCount()); assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); @@ -209,8 +209,8 @@ public void testSpillover() throws Exception { BytesReference firstValueAgain = cache.getOrCompute(entity, loader, reader, termBytesArr[0]); assertEquals(1, requestCacheStats.stats().getEvictions()); - assertEquals(2, requestCacheStats.stats(TierType.DISK).getHitCount()); - assertEquals(maxNumInHeap + 1, requestCacheStats.stats(TierType.DISK).getMissCount()); + assertEquals(2, requestCacheStats.stats().getHitCount(TierType.DISK)); + assertEquals(maxNumInHeap + 1, requestCacheStats.stats().getMissCount(TierType.DISK)); assertEquals(1, requestCacheStats.stats().getHitCount()); assertEquals(maxNumInHeap + 3, requestCacheStats.stats().getMissCount()); assertEquals(maxNumInHeap, cache.tieredCacheHandler.count(TierType.ON_HEAP)); From 9d8d433197b55dd5a4877485ec6ba4eb4dfcc8e3 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Fri, 20 Oct 2023 16:47:20 -0700 Subject: [PATCH 4/9] Added version checks to streaming functions --- .../cache/request/RequestCacheStats.java | 38 ++++++++++++++----- .../index/cache/request/StatsHolder.java | 21 ++++++---- .../TieredCacheSpilloverStrategyHandler.java | 3 +- 3 files changed, 44 insertions(+), 18 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 5480dd5b01240..4bdb91ba9e973 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -32,6 +32,7 @@ package org.opensearch.index.cache.request; +import org.opensearch.Version; import org.opensearch.core.common.io.stream.StreamInput; import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.core.common.io.stream.Writeable; @@ -61,14 +62,25 @@ public RequestCacheStats() { } public RequestCacheStats(StreamInput in) throws IOException { - // Any RequestCacheStats written to a stream should already have a counter for each possible tier type - this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need to be registered? - } - - public RequestCacheStats(TierType tierType, StatsHolder statsHolder) { - // Create a RequestCacheStats object with only one tier's statistics populated this(); - map.put(tierType.getStringValue(), statsHolder); + if (in.getVersion().onOrAfter(Version.V_3_0_0)) { + this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need to be registered? + } else { + // objects from earlier versions only contain on-heap info, and do not have entries info + long memorySize = in.readVLong(); + long evictions = in.readVLong(); + long hitCount = in.readVLong(); + long missCount = in.readVLong(); + this.map.put( + TierType.ON_HEAP.getStringValue(), + new StatsHolder( + memorySize, + evictions, + hitCount, + missCount, + 0 + )); + } } public RequestCacheStats(Map inputMap) { @@ -91,7 +103,6 @@ private StatsHolder getTierStats(TierType tierType) { return map.get(tierType.getStringValue()); } - // should these take in strings bc thats whats done in the xcontent builder? seems wasteful public long getMemorySizeInBytes(TierType tierType) { return getTierStats(tierType).totalMetric.count(); } @@ -144,7 +155,16 @@ public long getEntries() { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ? + if (out.getVersion().onOrAfter(Version.V_3_0_0)) { + out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ? + } else { + // Write only on-heap values, and don't write entries metric + StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue()); + out.writeVLong(heapStats.getMemorySize()); + out.writeVLong(heapStats.getEvictions()); + out.writeVLong(heapStats.getHitCount()); + out.writeVLong(heapStats.getMissCount()); + } } @Override diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java index 86f99871fea26..d506d8e78ec7a 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -35,18 +35,25 @@ public StatsHolder() { this.entries = new CounterMetric(); } - public StatsHolder(StreamInput in) throws IOException { - // Read and write the values of the counter metrics. They should always be positive + public StatsHolder(long evictions, long memorySize, long hitCount, long missCount, long entries) { this.evictionsMetric = new CounterMetric(); - this.evictionsMetric.inc(in.readVLong()); + this.evictionsMetric.inc(evictions); this.totalMetric = new CounterMetric(); - this.totalMetric.inc(in.readVLong()); + this.totalMetric.inc(memorySize); this.hitCount = new CounterMetric(); - this.hitCount.inc(in.readVLong()); + this.hitCount.inc(hitCount); this.missCount = new CounterMetric(); - this.missCount.inc(in.readVLong()); + this.missCount.inc(missCount); this.entries = new CounterMetric(); - this.entries.inc(in.readVLong()); + this.entries.inc(entries); + } + + public StatsHolder(StreamInput in) throws IOException { + // Read and write the values of the counter metrics. They should always be positive + // This object is new, so we shouldn't need version checks for different behavior + this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + // java forces us to do this in one line + // guaranteed to be evaluated in correct order (https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.7.4) } @Override diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index db9d4ffe41447..b3b4134033c28 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -54,8 +54,6 @@ public V computeIfAbsent(K key, TieredCacheLoader loader) throws Exception V value = onHeapCachingTier.compute(key, loader); tieredCacheEventListener.onCached(key, value, TierType.ON_HEAP); return value; - } else { - //tieredCacheEventListener.onHit(key, cacheValue.value, cacheValue.source); // this double counts, see line 122 } return cacheValue.value; } @@ -105,6 +103,7 @@ public void onRemoval(RemovalNotification notification) { switch (notification.getTierType()) { case ON_HEAP: diskCachingTier.put(notification.getKey(), notification.getValue()); + break; default: break; From 7d0356214600dbd6052c5868f5c722c2794fe199 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 23 Oct 2023 12:37:29 -0700 Subject: [PATCH 5/9] Added UT for RequestCacheStats --- .../indices/IndicesRequestCacheIT.java | 43 ++++++--- .../index/cache/request/StatsHolder.java | 15 +-- .../cache/request/RequestCacheStatsTests.java | 95 +++++++++++++++++++ 3 files changed, 134 insertions(+), 19 deletions(-) create mode 100644 server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 36e327ce11b6a..7e48fee7d1e6b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,6 +34,7 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; +import org.opensearch.action.IndicesRequestIT; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; @@ -43,6 +44,7 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.util.FeatureFlags; +import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; @@ -636,16 +638,27 @@ public void testProfileDisableCache() throws Exception { public void testCacheWithInvalidation() throws Exception { Client client = client(); + //int heapSizeBytes = 2000; // enough to fit 2 queries, as each is 687 B + + Settings.Builder builder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000)); + // Why is it appending "index." to the beginning of the key?? + + String heapSizeBytes = builder.get(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey()); + System.out.println("Current cap = " + heapSizeBytes); + + client.admin().setSettings(Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000))); + assertAcked( client.admin() .indices() .prepareCreate("index") .setMapping("k", "type=keyword") .setSettings( - Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + builder ) .get() ); @@ -662,8 +675,9 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect hit as here as refresh didn't happen - assertCacheState(client, "index", 1, 1); - assertNumCacheEntries(client, "index", 1); + assertCacheState(client, "index", 1, 1, TierType.ON_HEAP); + assertCacheState(client, "index", 0, 1, TierType.DISK); + assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Explicit refresh would invalidate cache refresh(); @@ -671,12 +685,13 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) - assertCacheState(client, "index", 1, 2); - assertNumCacheEntries(client, "index", 1); // Shouldn't it just be the most recent query, since the first one was invalidated? (prob invalidation isnt in yet) + assertCacheState(client, "index", 1, 2, TierType.ON_HEAP); + assertCacheState(client, "index", 0, 2, TierType.DISK); + assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was invalidated? (prob invalidation isnt in yet) // yeah - evictions = 0, its not in yet } - private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses, TierType tierType) { RequestCacheStats requestCacheStats = client.admin() .indices() .prepareStats(index) @@ -686,14 +701,18 @@ private static void assertCacheState(Client client, String index, long expectedH .getRequestCache(); // Check the hit count and miss count together so if they are not // correct we can see both values + System.out.println("mem size " + requestCacheStats.getMemorySize(tierType)); assertEquals( Arrays.asList(expectedHits, expectedMisses, 0L), - Arrays.asList(requestCacheStats.getHitCount(), requestCacheStats.getMissCount(), requestCacheStats.getEvictions()) + Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType), requestCacheStats.getEvictions(tierType)) ); + } + private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP); } - private static void assertNumCacheEntries(Client client, String index, long expectedEntries) { + private static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) { RequestCacheStats requestCacheStats = client.admin() .indices() .prepareStats(index) @@ -701,7 +720,7 @@ private static void assertNumCacheEntries(Client client, String index, long expe .get() .getTotal() .getRequestCache(); - assertEquals(expectedEntries, requestCacheStats.getEntries()); + assertEquals(expectedEntries, requestCacheStats.getEntries(tierType)); } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java index d506d8e78ec7a..c7d7447a68459 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -20,26 +20,27 @@ import java.io.Serializable; public class StatsHolder implements Serializable, Writeable, ToXContentFragment { - final CounterMetric evictionsMetric; final CounterMetric totalMetric; + final CounterMetric evictionsMetric; final CounterMetric hitCount; final CounterMetric missCount; final CounterMetric entries; public StatsHolder() { - this.evictionsMetric = new CounterMetric(); this.totalMetric = new CounterMetric(); + this.evictionsMetric = new CounterMetric(); this.hitCount = new CounterMetric(); this.missCount = new CounterMetric(); this.entries = new CounterMetric(); } - public StatsHolder(long evictions, long memorySize, long hitCount, long missCount, long entries) { - this.evictionsMetric = new CounterMetric(); - this.evictionsMetric.inc(evictions); + public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries) { + // Switched argument order to match RequestCacheStats this.totalMetric = new CounterMetric(); this.totalMetric.inc(memorySize); + this.evictionsMetric = new CounterMetric(); + this.evictionsMetric.inc(evictions); this.hitCount = new CounterMetric(); this.hitCount.inc(hitCount); this.missCount = new CounterMetric(); @@ -58,8 +59,8 @@ public StatsHolder(StreamInput in) throws IOException { @Override public void writeTo(StreamOutput out) throws IOException { - out.writeVLong(evictionsMetric.count()); out.writeVLong(totalMetric.count()); + out.writeVLong(evictionsMetric.count()); out.writeVLong(hitCount.count()); out.writeVLong(missCount.count()); out.writeVLong(entries.count()); @@ -67,8 +68,8 @@ public void writeTo(StreamOutput out) throws IOException { public void add(StatsHolder otherStats) { // Add the argument's metrics to this object's metrics. - evictionsMetric.inc(otherStats.evictionsMetric.count()); totalMetric.inc(otherStats.totalMetric.count()); + evictionsMetric.inc(otherStats.evictionsMetric.count()); hitCount.inc(otherStats.hitCount.count()); missCount.inc(otherStats.missCount.count()); entries.inc(otherStats.entries.count()); diff --git a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java new file mode 100644 index 0000000000000..0b1adbe0f87c7 --- /dev/null +++ b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java @@ -0,0 +1,95 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +package org.opensearch.index.cache.request; + +import org.opensearch.common.io.stream.BytesStreamOutput; +import org.opensearch.core.common.bytes.BytesReference; +import org.opensearch.core.common.io.stream.BytesStream; +import org.opensearch.core.common.io.stream.BytesStreamInput; +import org.opensearch.core.common.io.stream.StreamInput; +import org.opensearch.core.common.io.stream.StreamOutput; +import org.opensearch.indices.TierType; +import org.opensearch.test.OpenSearchTestCase; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +public class RequestCacheStatsTests extends OpenSearchTestCase { + public void testConstructorsAndAdd() throws Exception { + RequestCacheStats emptyStats = new RequestCacheStats(); + for (TierType tierType : TierType.values()) { + assertTierState(emptyStats, tierType, 0, 0, 0, 0, 0); + } + Map testHeapMap = new HashMap<>(); + testHeapMap.put(TierType.ON_HEAP, new StatsHolder( + 1, 2, 3, 4, 5 + )); + RequestCacheStats heapOnlyStats = new RequestCacheStats(testHeapMap); + for (TierType tierType : TierType.values()) { + if (tierType == TierType.ON_HEAP) { + assertTierState(heapOnlyStats, tierType, 1, 2, 3, 4, 5); + } else { + assertTierState(heapOnlyStats, tierType, 0, 0, 0, 0, 0); + } + } + + Map testBothTiersMap = new HashMap<>(); + testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder( + 11, 12, 13, 14, 15 + )); + testBothTiersMap.put(TierType.DISK, new StatsHolder( + 6, 7, 8, 9, 10 + )); + RequestCacheStats bothTiersStats = new RequestCacheStats(testBothTiersMap); + assertTierState(bothTiersStats, TierType.ON_HEAP, 11, 12, 13, 14, 15); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + + bothTiersStats.add(heapOnlyStats); + assertTierState(bothTiersStats, TierType.ON_HEAP, 12, 14, 16, 18, 20); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + } + + public void testSerialization() throws Exception { + // This test also implicitly tests StreamHolder serialization + BytesStreamOutput os = new BytesStreamOutput(); + + Map testMap = new HashMap<>(); + testMap.put(TierType.ON_HEAP, new StatsHolder( + 11, 12, 13, 14, 15 + )); + testMap.put(TierType.DISK, new StatsHolder( + 6, 7, 8, 9, 10 + )); + RequestCacheStats stats = new RequestCacheStats(testMap); + stats.writeTo(os); + BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); + RequestCacheStats deserialized = new RequestCacheStats(is); + + assertTierState(deserialized, TierType.ON_HEAP, 11, 12, 13, 14, 15); + assertTierState(deserialized, TierType.DISK, 6, 7, 8, 9, 10); + } + + private void assertTierState( + RequestCacheStats stats, + TierType tierType, + long memSize, + long evictions, + long hitCount, + long missCount, + long entries + ) { + assertEquals(memSize, stats.getMemorySizeInBytes(tierType)); + assertEquals(evictions, stats.getEvictions(tierType)); + assertEquals(hitCount, stats.getHitCount(tierType)); + assertEquals(missCount, stats.getMissCount(tierType)); + assertEquals(entries, stats.getEntries(tierType)); + } + +} From c5099f1d3c9bd453265b2e0fd05e6344810b1ade Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 23 Oct 2023 15:20:14 -0700 Subject: [PATCH 6/9] Added IT for disk tier stats, plus spotlessApply --- .../IndicesRequestCacheDiskTierIT.java | 88 +++++++++++++++++++ .../indices/IndicesRequestCacheIT.java | 72 ++++++++------- .../common/metrics/CounterMetric.java | 1 - .../cache/request/RequestCacheStats.java | 14 +-- .../cache/request/ShardRequestCache.java | 3 +- .../index/cache/request/StatsHolder.java | 9 +- .../org/opensearch/indices/CachingTier.java | 2 - .../indices/DummySerializableKey.java | 4 + .../indices/EhcacheDiskCachingTier.java | 74 +++++++++------- .../indices/EhcacheEventListener.java | 12 ++- .../indices/IndicesRequestCache.java | 7 +- .../TieredCacheSpilloverStrategyHandler.java | 1 + .../cache/request/RequestCacheStatsTests.java | 27 ++---- .../indices/IndicesRequestCacheTests.java | 2 +- 14 files changed, 204 insertions(+), 112 deletions(-) create mode 100644 server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java new file mode 100644 index 0000000000000..47bda50ca788a --- /dev/null +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -0,0 +1,88 @@ +/* + * SPDX-License-Identifier: Apache-2.0 + * + * The OpenSearch Contributors require contributions made to + * this file be licensed under the Apache-2.0 license or a + * compatible open source license. + */ + +/* + * Licensed to Elasticsearch under one or more contributor + * license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright + * ownership. Elasticsearch licenses this file to you under + * the Apache License, Version 2.0 (the "License"); you may + * not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +/* + * Modifications Copyright OpenSearch Contributors. See + * GitHub history for details. + */ + +package org.opensearch.indices; + +import org.opensearch.action.search.SearchResponse; +import org.opensearch.client.Client; +import org.opensearch.cluster.metadata.IndexMetadata; +import org.opensearch.common.settings.Settings; +import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.query.QueryBuilders; +import org.opensearch.test.OpenSearchIntegTestCase; + +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertAcked; +import static org.opensearch.test.hamcrest.OpenSearchAssertions.assertSearchResponse; + +// This is a separate file from IndicesRequestCacheIT because we only want to run our test +// on a node with a maximum request cache size that we set. + +@OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) +public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { + public void testDiskTierStats() throws Exception { + int heapSizeBytes = 1800; // enough to fit 2 queries, as each is 687 B + int requestSize = 687; // each request is 687 B + String node = internalCluster().startNode( + Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) + ); + Client client = client(node); + + Settings.Builder indicesSettingBuilder = Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); + + assertAcked( + client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(indicesSettingBuilder).get() + ); + indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); + ensureSearchable("index"); + SearchResponse resp; + + int numOnDisk = 5; + int numRequests = heapSizeBytes / requestSize + numOnDisk; + for (int i = 0; i < numRequests; i++) { + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); + assertSearchResponse(resp); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + System.out.println("request number " + i); + } + + System.out.println("Num requests = " + numRequests); + + // the first request, for "hello0", should have been evicted to the disk tier + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); + IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false); + IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false); + } +} diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 7e48fee7d1e6b..2d25079305b17 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -34,7 +34,6 @@ import com.carrotsearch.randomizedtesting.annotations.ParametersFactory; -import org.opensearch.action.IndicesRequestIT; import org.opensearch.action.admin.indices.alias.Alias; import org.opensearch.action.admin.indices.forcemerge.ForceMergeResponse; import org.opensearch.action.search.SearchResponse; @@ -638,30 +637,13 @@ public void testProfileDisableCache() throws Exception { public void testCacheWithInvalidation() throws Exception { Client client = client(); - //int heapSizeBytes = 2000; // enough to fit 2 queries, as each is 687 B Settings.Builder builder = Settings.builder() .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) - .put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000)); - // Why is it appending "index." to the beginning of the key?? - - String heapSizeBytes = builder.get(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey()); - System.out.println("Current cap = " + heapSizeBytes); + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); - client.admin().setSettings(Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(2000))); - - assertAcked( - client.admin() - .indices() - .prepareCreate("index") - .setMapping("k", "type=keyword") - .setSettings( - builder - ) - .get() - ); + assertAcked(client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(builder).get()); indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); ensureSearchable("index"); SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); @@ -675,8 +657,8 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect hit as here as refresh didn't happen - assertCacheState(client, "index", 1, 1, TierType.ON_HEAP); - assertCacheState(client, "index", 0, 1, TierType.DISK); + assertCacheState(client, "index", 1, 1, TierType.ON_HEAP, false); + assertCacheState(client, "index", 0, 1, TierType.DISK, false); assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Explicit refresh would invalidate cache @@ -685,13 +667,21 @@ public void testCacheWithInvalidation() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); assertSearchResponse(resp); // Should expect miss as key has changed due to change in IndexReader.CacheKey (due to refresh) - assertCacheState(client, "index", 1, 2, TierType.ON_HEAP); - assertCacheState(client, "index", 0, 2, TierType.DISK); - assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was invalidated? (prob invalidation isnt in yet) + assertCacheState(client, "index", 1, 2, TierType.ON_HEAP, false); + assertCacheState(client, "index", 0, 2, TierType.DISK, false); + assertNumCacheEntries(client, "index", 1, TierType.ON_HEAP); // Shouldn't it just be the most recent query, since the first one was + // invalidated? (prob invalidation isnt in yet) // yeah - evictions = 0, its not in yet } - private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses, TierType tierType) { + protected static void assertCacheState( + Client client, + String index, + long expectedHits, + long expectedMisses, + TierType tierType, + boolean enforceZeroEvictions + ) { RequestCacheStats requestCacheStats = client.admin() .indices() .prepareStats(index) @@ -701,18 +691,32 @@ private static void assertCacheState(Client client, String index, long expectedH .getRequestCache(); // Check the hit count and miss count together so if they are not // correct we can see both values - System.out.println("mem size " + requestCacheStats.getMemorySize(tierType)); - assertEquals( - Arrays.asList(expectedHits, expectedMisses, 0L), - Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType), requestCacheStats.getEvictions(tierType)) - ); + ByteSizeValue memSize = requestCacheStats.getMemorySize(tierType); + if (memSize.getBytes() > 0) { + System.out.println("mem size " + memSize); + } + if (enforceZeroEvictions) { + assertEquals( + Arrays.asList(expectedHits, expectedMisses, 0L), + Arrays.asList( + requestCacheStats.getHitCount(tierType), + requestCacheStats.getMissCount(tierType), + requestCacheStats.getEvictions(tierType) + ) + ); + } else { + assertEquals( + Arrays.asList(expectedHits, expectedMisses), + Arrays.asList(requestCacheStats.getHitCount(tierType), requestCacheStats.getMissCount(tierType)) + ); + } } - private static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { - assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP); + protected static void assertCacheState(Client client, String index, long expectedHits, long expectedMisses) { + assertCacheState(client, index, expectedHits, expectedMisses, TierType.ON_HEAP, true); } - private static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) { + protected static void assertNumCacheEntries(Client client, String index, long expectedEntries, TierType tierType) { RequestCacheStats requestCacheStats = client.admin() .indices() .prepareStats(index) diff --git a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java index 2e5eae5ceebe0..5c48c1f772ff0 100644 --- a/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java +++ b/server/src/main/java/org/opensearch/common/metrics/CounterMetric.java @@ -32,7 +32,6 @@ package org.opensearch.common.metrics; -import java.io.Serializable; import java.util.concurrent.atomic.LongAdder; /** diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 4bdb91ba9e973..961a000964e98 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -53,6 +53,7 @@ public class RequestCacheStats implements Writeable, ToXContentFragment { private Map map; + public RequestCacheStats() { this.map = new HashMap<>(); for (TierType tierType : TierType.values()) { @@ -64,22 +65,15 @@ public RequestCacheStats() { public RequestCacheStats(StreamInput in) throws IOException { this(); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need to be registered? + this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need + // to be registered? } else { // objects from earlier versions only contain on-heap info, and do not have entries info long memorySize = in.readVLong(); long evictions = in.readVLong(); long hitCount = in.readVLong(); long missCount = in.readVLong(); - this.map.put( - TierType.ON_HEAP.getStringValue(), - new StatsHolder( - memorySize, - evictions, - hitCount, - missCount, - 0 - )); + this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0)); } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index f8d77a4c0dbaf..bfec09cf449da 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -56,11 +56,10 @@ public ShardRequestCache() { public RequestCacheStats stats() { // TODO: Change RequestCacheStats to support disk tier stats. // Changing this function to return a RequestCacheStats with stats from all tiers. - //return stats(TierType.ON_HEAP); + // return stats(TierType.ON_HEAP); return new RequestCacheStats(statsHolder); } - public void onHit(TierType tierType) { statsHolder.get(tierType).hitCount.inc(); } diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java index c7d7447a68459..92d057ab8fd9c 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -26,7 +26,6 @@ public class StatsHolder implements Serializable, Writeable, ToXContentFragment final CounterMetric missCount; final CounterMetric entries; - public StatsHolder() { this.totalMetric = new CounterMetric(); this.evictionsMetric = new CounterMetric(); @@ -49,7 +48,7 @@ public StatsHolder(long memorySize, long evictions, long hitCount, long missCoun this.entries.inc(entries); } - public StatsHolder(StreamInput in) throws IOException { + public StatsHolder(StreamInput in) throws IOException { // Read and write the values of the counter metrics. They should always be positive // This object is new, so we shouldn't need version checks for different behavior this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); @@ -97,7 +96,11 @@ public long getEntries() { @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { - builder.humanReadableField(RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, RequestCacheStats.Fields.MEMORY_SIZE, new ByteSizeValue(getMemorySize())); + builder.humanReadableField( + RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, + RequestCacheStats.Fields.MEMORY_SIZE, + new ByteSizeValue(getMemorySize()) + ); builder.field(RequestCacheStats.Fields.EVICTIONS, getEvictions()); builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount()); builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount()); diff --git a/server/src/main/java/org/opensearch/indices/CachingTier.java b/server/src/main/java/org/opensearch/indices/CachingTier.java index 6726167fe469d..85596929cfd6b 100644 --- a/server/src/main/java/org/opensearch/indices/CachingTier.java +++ b/server/src/main/java/org/opensearch/indices/CachingTier.java @@ -10,8 +10,6 @@ import org.opensearch.common.cache.RemovalListener; -import java.io.IOException; - /** * asdsadssa * @param diff --git a/server/src/main/java/org/opensearch/indices/DummySerializableKey.java b/server/src/main/java/org/opensearch/indices/DummySerializableKey.java index 7f2888f6e65f7..d69734f344d4e 100644 --- a/server/src/main/java/org/opensearch/indices/DummySerializableKey.java +++ b/server/src/main/java/org/opensearch/indices/DummySerializableKey.java @@ -14,6 +14,7 @@ public class DummySerializableKey implements Serializable { private Integer i; private String s; + public DummySerializableKey(Integer i, String s) { this.i = i; this.s = s; @@ -22,9 +23,11 @@ public DummySerializableKey(Integer i, String s) { public int getI() { return i; } + public String getS() { return s; } + @Override public boolean equals(Object o) { if (o == this) { @@ -36,6 +39,7 @@ public boolean equals(Object o) { DummySerializableKey other = (DummySerializableKey) o; return Objects.equals(this.i, other.i) && this.s.equals(other.s); } + @Override public final int hashCode() { int result = 11; diff --git a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java index 2967600ff612a..7d96f587bac65 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheDiskCachingTier.java @@ -8,18 +8,8 @@ package org.opensearch.indices; -import org.ehcache.PersistentCacheManager; -import org.ehcache.config.builders.CacheConfigurationBuilder; -import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; -import org.ehcache.config.builders.CacheManagerBuilder; -import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; -import org.ehcache.config.builders.ResourcePoolsBuilder; -import org.ehcache.config.units.MemoryUnit; -import org.ehcache.event.EventType; -import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; import org.opensearch.common.ExponentiallyWeightedMovingAverage; import org.opensearch.common.cache.RemovalListener; -import org.ehcache.Cache; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.io.PathUtils; import org.opensearch.common.metrics.CounterMetric; @@ -30,7 +20,21 @@ import java.util.Collections; import java.util.HashMap; -public class EhcacheDiskCachingTier implements DiskCachingTier, RemovalListener { +import org.ehcache.Cache; +import org.ehcache.PersistentCacheManager; +import org.ehcache.config.builders.CacheConfigurationBuilder; +import org.ehcache.config.builders.CacheEventListenerConfigurationBuilder; +import org.ehcache.config.builders.CacheManagerBuilder; +import org.ehcache.config.builders.PooledExecutionServiceConfigurationBuilder; +import org.ehcache.config.builders.ResourcePoolsBuilder; +import org.ehcache.config.units.MemoryUnit; +import org.ehcache.event.EventType; +import org.ehcache.impl.config.executor.PooledExecutionServiceConfiguration; + +public class EhcacheDiskCachingTier + implements + DiskCachingTier, + RemovalListener { public static HashMap cacheManagers = new HashMap<>(); // Because of the way test cases are set up, each node may try to instantiate several disk caching tiers. @@ -52,7 +56,7 @@ public class EhcacheDiskCachingTier implements DiskCachingTier removalListener; private ExponentiallyWeightedMovingAverage getTimeMillisEWMA; - private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch + private static final double GET_TIME_EWMA_ALPHA = 0.3; // This is the value used elsewhere in OpenSearch private static final int MIN_WRITE_THREADS = 0; private static final int MAX_WRITE_THREADS = 4; // Max number of threads for the PooledExecutionService which handles writes private static final String cacheAlias = "diskTier"; @@ -109,39 +113,42 @@ public void getManager() { // actual logging later } } - PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder.newPooledExecutionServiceConfigurationBuilder() + PooledExecutionServiceConfiguration threadConfig = PooledExecutionServiceConfigurationBuilder + .newPooledExecutionServiceConfigurationBuilder() .defaultPool("default", MIN_WRITE_THREADS, MAX_WRITE_THREADS) .build(); - cacheManagers.put(nodeId, - CacheManagerBuilder.newCacheManagerBuilder() - .using(threadConfig) - .with(CacheManagerBuilder.persistence(diskCacheFP) - ).build(true) + cacheManagers.put( + nodeId, + CacheManagerBuilder.newCacheManagerBuilder().using(threadConfig).with(CacheManagerBuilder.persistence(diskCacheFP)).build(true) ); this.cacheManager = cacheManagers.get(nodeId); } private void createCache(long maxWeightInBytes) { // our EhcacheEventListener should receive events every time an entry is changed - CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder - .newEventListenerConfiguration(listener, - EventType.EVICTED, - EventType.EXPIRED, - EventType.REMOVED, - EventType.UPDATED, - EventType.CREATED) - .ordered().asynchronous(); + CacheEventListenerConfigurationBuilder listenerConfig = CacheEventListenerConfigurationBuilder.newEventListenerConfiguration( + listener, + EventType.EVICTED, + EventType.EXPIRED, + EventType.REMOVED, + EventType.UPDATED, + EventType.CREATED + ).ordered().asynchronous(); // ordered() has some performance penalty as compared to unordered(), we can also use synchronous() - cache = cacheManager.createCache(cacheAlias, + cache = cacheManager.createCache( + cacheAlias, CacheConfigurationBuilder.newCacheConfigurationBuilder( - EhcacheKey.class, BytesReference.class, ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, false)) - .withService(listenerConfig)); + EhcacheKey.class, + BytesReference.class, + ResourcePoolsBuilder.newResourcePoolsBuilder().disk(maxWeightInBytes, MemoryUnit.B, false) + ).withService(listenerConfig) + ); } @Override - public BytesReference get(IndicesRequestCache.Key key) { + public BytesReference get(IndicesRequestCache.Key key) { // I don't think we need to do the future stuff as the cache is threadsafe // if (keystore.contains(key.hashCode()) { @@ -174,7 +181,8 @@ public void put(IndicesRequestCache.Key key, BytesReference value) { } @Override - public BytesReference computeIfAbsent(IndicesRequestCache.Key key, TieredCacheLoader loader) throws Exception { + public BytesReference computeIfAbsent(IndicesRequestCache.Key key, TieredCacheLoader loader) + throws Exception { return null; // should not need to fill out, Cache.computeIfAbsent is always used } @@ -193,7 +201,8 @@ public void invalidate(IndicesRequestCache.Key key) { } @Override - public BytesReference compute(IndicesRequestCache.Key key, TieredCacheLoader loader) throws Exception { + public BytesReference compute(IndicesRequestCache.Key key, TieredCacheLoader loader) + throws Exception { return null; // should not need to fill out, Cache.compute is always used } @@ -222,6 +231,7 @@ public int count() { protected void countInc() { count.inc(); } + protected void countDec() { count.dec(); } diff --git a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java index 27872269ba77b..c72fb833a69b6 100644 --- a/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java +++ b/server/src/main/java/org/opensearch/indices/EhcacheEventListener.java @@ -8,23 +8,26 @@ package org.opensearch.indices; -import org.ehcache.event.CacheEvent; -import org.ehcache.event.CacheEventListener; -import org.ehcache.event.EventType; import org.opensearch.common.cache.RemovalListener; import org.opensearch.common.cache.RemovalNotification; import org.opensearch.common.cache.RemovalReason; import org.opensearch.core.common.bytes.BytesReference; +import org.ehcache.event.CacheEvent; +import org.ehcache.event.CacheEventListener; +import org.ehcache.event.EventType; + public class EhcacheEventListener implements CacheEventListener { // Receives key-value pairs (EhcacheKey, BytesReference), but must transform into (Key, BytesReference) // to send removal notifications private final RemovalListener removalListener; private final EhcacheDiskCachingTier tier; + EhcacheEventListener(RemovalListener removalListener, EhcacheDiskCachingTier tier) { this.removalListener = removalListener; this.tier = tier; // needed to handle count changes } + @Override public void onEvent(CacheEvent event) { EhcacheKey ehcacheKey = event.getKey(); @@ -50,7 +53,8 @@ public void onEvent(CacheEvent e case EXPIRED: case REMOVED: reason = RemovalReason.INVALIDATED; - // this is probably fine for EXPIRED. We use cache.remove() to invalidate keys, but this might overlap with RemovalReason.EXPLICIT? + // this is probably fine for EXPIRED. We use cache.remove() to invalidate keys, but this might overlap with + // RemovalReason.EXPLICIT? break; case UPDATED: reason = RemovalReason.REPLACED; diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index d3084c8c2c231..860875024b18f 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -109,8 +109,10 @@ public final class IndicesRequestCache implements TieredCacheEventListener cache; - //private final TieredCacheHandler tieredCacheHandler; - public final TieredCacheSpilloverStrategyHandler tieredCacheHandler; // Change this back after done debugging serialization issues + // private final TieredCacheHandler tieredCacheHandler; + public final TieredCacheSpilloverStrategyHandler tieredCacheHandler; // Change this back after done debugging + // serialization issues + IndicesRequestCache(Settings settings, IndicesService indicesService) { this.size = INDICES_CACHE_QUERY_SIZE.get(settings); this.expire = INDICES_CACHE_QUERY_EXPIRE.exists(settings) ? INDICES_CACHE_QUERY_EXPIRE.get(settings) : null; @@ -140,6 +142,7 @@ public final class IndicesRequestCache implements TieredCacheEventListener> getValueFromTierCache() { return null; }; } + @Override public void closeDiskTier() { diskCachingTier.close(); diff --git a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java index 0b1adbe0f87c7..6992b8a441c0a 100644 --- a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java +++ b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java @@ -10,14 +10,10 @@ import org.opensearch.common.io.stream.BytesStreamOutput; import org.opensearch.core.common.bytes.BytesReference; -import org.opensearch.core.common.io.stream.BytesStream; import org.opensearch.core.common.io.stream.BytesStreamInput; -import org.opensearch.core.common.io.stream.StreamInput; -import org.opensearch.core.common.io.stream.StreamOutput; import org.opensearch.indices.TierType; import org.opensearch.test.OpenSearchTestCase; -import java.io.IOException; import java.util.HashMap; import java.util.Map; @@ -28,9 +24,7 @@ public void testConstructorsAndAdd() throws Exception { assertTierState(emptyStats, tierType, 0, 0, 0, 0, 0); } Map testHeapMap = new HashMap<>(); - testHeapMap.put(TierType.ON_HEAP, new StatsHolder( - 1, 2, 3, 4, 5 - )); + testHeapMap.put(TierType.ON_HEAP, new StatsHolder(1, 2, 3, 4, 5)); RequestCacheStats heapOnlyStats = new RequestCacheStats(testHeapMap); for (TierType tierType : TierType.values()) { if (tierType == TierType.ON_HEAP) { @@ -41,12 +35,8 @@ public void testConstructorsAndAdd() throws Exception { } Map testBothTiersMap = new HashMap<>(); - testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder( - 11, 12, 13, 14, 15 - )); - testBothTiersMap.put(TierType.DISK, new StatsHolder( - 6, 7, 8, 9, 10 - )); + testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); + testBothTiersMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); RequestCacheStats bothTiersStats = new RequestCacheStats(testBothTiersMap); assertTierState(bothTiersStats, TierType.ON_HEAP, 11, 12, 13, 14, 15); assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); @@ -57,16 +47,12 @@ public void testConstructorsAndAdd() throws Exception { } public void testSerialization() throws Exception { - // This test also implicitly tests StreamHolder serialization + // This test also implicitly tests StatsHolder serialization BytesStreamOutput os = new BytesStreamOutput(); Map testMap = new HashMap<>(); - testMap.put(TierType.ON_HEAP, new StatsHolder( - 11, 12, 13, 14, 15 - )); - testMap.put(TierType.DISK, new StatsHolder( - 6, 7, 8, 9, 10 - )); + testMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); + testMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); RequestCacheStats stats = new RequestCacheStats(testMap); stats.writeTo(os); BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); @@ -91,5 +77,4 @@ private void assertTierState( assertEquals(missCount, stats.getMissCount(tierType)); assertEquals(entries, stats.getEntries(tierType)); } - } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 6e75a0e8102c2..5df6b2154c4d8 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -151,7 +151,7 @@ public void testAddDirectToEhcache() throws Exception { String rKey = ((OpenSearchDirectoryReader) reader).getDelegatingCacheHelper().getDelegatingCacheKey().getId().toString(); IndicesRequestCache.Key key = cache.new Key(entity, termBytes, rKey); - BytesReference value = new BytesArray(new byte[]{0}); + BytesReference value = new BytesArray(new byte[] { 0 }); cache.tieredCacheHandler.getDiskCachingTier().put(key, value); BytesReference res = cache.tieredCacheHandler.getDiskCachingTier().get(key); From 64b4bb3acedf967eb9f397777889de96f855ff84 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 23 Oct 2023 15:25:45 -0700 Subject: [PATCH 7/9] cleaned up --- .../java/org/opensearch/indices/IndicesRequestCacheIT.java | 4 ---- .../opensearch/index/cache/request/RequestCacheStats.java | 5 +---- .../opensearch/index/cache/request/ShardRequestCache.java | 3 --- 3 files changed, 1 insertion(+), 11 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 2d25079305b17..a9b18c6f8364b 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -691,10 +691,6 @@ protected static void assertCacheState( .getRequestCache(); // Check the hit count and miss count together so if they are not // correct we can see both values - ByteSizeValue memSize = requestCacheStats.getMemorySize(tierType); - if (memSize.getBytes() > 0) { - System.out.println("mem size " + memSize); - } if (enforceZeroEvictions) { assertEquals( Arrays.asList(expectedHits, expectedMisses, 0L), diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 961a000964e98..df5e5fee73cb5 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -65,8 +65,7 @@ public RequestCacheStats() { public RequestCacheStats(StreamInput in) throws IOException { this(); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { - this.map = in.readMap(StreamInput::readString, StatsHolder::new); // does it know to use the right constructor? does it rly need - // to be registered? + this.map = in.readMap(StreamInput::readString, StatsHolder::new); } else { // objects from earlier versions only contain on-heap info, and do not have entries info long memorySize = in.readVLong(); @@ -85,8 +84,6 @@ public RequestCacheStats(Map inputMap) { } } - // can prob eliminate some of these constructors - public void add(RequestCacheStats stats) { for (String tier : stats.map.keySet()) { map.get(tier).add(stats.map.get(tier)); diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index bfec09cf449da..b72a4d08e1d99 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -54,9 +54,6 @@ public ShardRequestCache() { } public RequestCacheStats stats() { - // TODO: Change RequestCacheStats to support disk tier stats. - // Changing this function to return a RequestCacheStats with stats from all tiers. - // return stats(TierType.ON_HEAP); return new RequestCacheStats(statsHolder); } From 157ca6e9557cc5864c45c930be97efa6f17ea269 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 6 Nov 2023 09:44:27 -0800 Subject: [PATCH 8/9] Addressed comments besides moving IT tests into IndicesRequestCacheIT.java Signed-off-by: Peter Alfonsi --- .../IndicesRequestCacheDiskTierIT.java | 28 ++++++++++++++----- .../indices/IndicesRequestCacheIT.java | 20 ++++++++----- .../cache/request/RequestCacheStats.java | 18 ++++++------ 3 files changed, 42 insertions(+), 24 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index 47bda50ca788a..e25fb0112ca01 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -37,6 +37,7 @@ import org.opensearch.cluster.metadata.IndexMetadata; import org.opensearch.common.settings.Settings; import org.opensearch.core.common.unit.ByteSizeValue; +import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.test.OpenSearchIntegTestCase; @@ -49,8 +50,7 @@ @OpenSearchIntegTestCase.ClusterScope(scope = OpenSearchIntegTestCase.Scope.TEST, numDataNodes = 0) public class IndicesRequestCacheDiskTierIT extends OpenSearchIntegTestCase { public void testDiskTierStats() throws Exception { - int heapSizeBytes = 1800; // enough to fit 2 queries, as each is 687 B - int requestSize = 687; // each request is 687 B + int heapSizeBytes = 4729; String node = internalCluster().startNode( Settings.builder().put(IndicesRequestCache.INDICES_CACHE_QUERY_SIZE.getKey(), new ByteSizeValue(heapSizeBytes)) ); @@ -68,21 +68,35 @@ public void testDiskTierStats() throws Exception { ensureSearchable("index"); SearchResponse resp; + resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); + int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); + System.out.println(requestSize); + assertTrue(heapSizeBytes > requestSize); + // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query + // as the cache size setting is not dynamic + int numOnDisk = 5; int numRequests = heapSizeBytes / requestSize + numOnDisk; - for (int i = 0; i < numRequests; i++) { + for (int i = 1; i < numRequests; i++) { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + i)).get(); assertSearchResponse(resp); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); - System.out.println("request number " + i); } - - System.out.println("Num requests = " + numRequests); - // the first request, for "hello0", should have been evicted to the disk tier resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); IndicesRequestCacheIT.assertCacheState(client, "index", 0, numRequests + 1, TierType.ON_HEAP, false); IndicesRequestCacheIT.assertCacheState(client, "index", 1, numRequests, TierType.DISK, false); } + + private long getCacheSizeBytes(Client client, String index, TierType tierType) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + return requestCacheStats.getMemorySizeInBytes(tierType); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index a9b18c6f8364b..080b3017c4246 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -637,13 +637,19 @@ public void testProfileDisableCache() throws Exception { public void testCacheWithInvalidation() throws Exception { Client client = client(); - - Settings.Builder builder = Settings.builder() - .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) - .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) - .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0); - - assertAcked(client.admin().indices().prepareCreate("index").setMapping("k", "type=keyword").setSettings(builder).get()); + assertAcked( + client.admin() + .indices() + .prepareCreate("index") + .setMapping("k", "type=keyword") + .setSettings( + Settings.builder() + .put(IndicesRequestCache.INDEX_CACHE_REQUEST_ENABLED_SETTING.getKey(), true) + .put(IndexMetadata.SETTING_NUMBER_OF_SHARDS, 1) + .put(IndexMetadata.SETTING_NUMBER_OF_REPLICAS, 0) + ) + .get() + ); indexRandom(true, client.prepareIndex("index").setSource("k", "hello")); ensureSearchable("index"); SearchResponse resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello")).get(); diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index df5e5fee73cb5..34d91be1bc2c6 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -52,18 +52,17 @@ */ public class RequestCacheStats implements Writeable, ToXContentFragment { - private Map map; - - public RequestCacheStats() { - this.map = new HashMap<>(); - for (TierType tierType : TierType.values()) { - map.put(tierType.getStringValue(), new StatsHolder()); + private Map map = new HashMap<>(){{ + for (TierType tierType : TierType.values()) + { + put(tierType.getStringValue(), new StatsHolder()); // Every possible tier type must have counters, even if they are disabled. Then the counters report 0 - } - } + }} + }; + + public RequestCacheStats() {} public RequestCacheStats(StreamInput in) throws IOException { - this(); if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.map = in.readMap(StreamInput::readString, StatsHolder::new); } else { @@ -78,7 +77,6 @@ public RequestCacheStats(StreamInput in) throws IOException { public RequestCacheStats(Map inputMap) { // Create a RequestCacheStats with multiple tiers' statistics - this(); for (TierType tierType : inputMap.keySet()) { map.put(tierType.getStringValue(), inputMap.get(tierType)); } From 81c038a80e85392ea11e729553c95858219b5389 Mon Sep 17 00:00:00 2001 From: Peter Alfonsi Date: Mon, 6 Nov 2023 11:07:08 -0800 Subject: [PATCH 9/9] Added/tested get time EWMA to non-heap tiers --- .../IndicesRequestCacheDiskTierIT.java | 13 +++++- .../indices/IndicesRequestCacheIT.java | 1 - .../cache/request/RequestCacheStats.java | 28 ++++++++----- .../cache/request/ShardRequestCache.java | 11 ++++- .../index/cache/request/StatsHolder.java | 40 +++++++++++++++++- .../AbstractIndexShardCacheEntity.java | 8 ++-- .../indices/IndicesRequestCache.java | 12 +++--- .../indices/TieredCacheEventListener.java | 4 +- .../TieredCacheSpilloverStrategyHandler.java | 12 +++++- .../cache/request/RequestCacheStatsTests.java | 41 ++++++++++++------- .../indices/IndicesRequestCacheTests.java | 1 - .../indices/IndicesServiceCloseTests.java | 4 +- 12 files changed, 127 insertions(+), 48 deletions(-) diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java index e25fb0112ca01..5063272ff5816 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheDiskTierIT.java @@ -70,7 +70,6 @@ public void testDiskTierStats() throws Exception { resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello" + 0)).get(); int requestSize = (int) getCacheSizeBytes(client, "index", TierType.ON_HEAP); - System.out.println(requestSize); assertTrue(heapSizeBytes > requestSize); // If this fails, increase heapSizeBytes! We can't adjust it after getting the size of one query // as the cache size setting is not dynamic @@ -82,6 +81,7 @@ public void testDiskTierStats() throws Exception { assertSearchResponse(resp); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.ON_HEAP, false); IndicesRequestCacheIT.assertCacheState(client, "index", 0, i + 1, TierType.DISK, false); + assertPositiveEWMAForDisk(client, "index"); } // the first request, for "hello0", should have been evicted to the disk tier resp = client.prepareSearch("index").setRequestCache(true).setQuery(QueryBuilders.termQuery("k", "hello0")).get(); @@ -99,4 +99,15 @@ private long getCacheSizeBytes(Client client, String index, TierType tierType) { .getRequestCache(); return requestCacheStats.getMemorySizeInBytes(tierType); } + + private void assertPositiveEWMAForDisk(Client client, String index) { + RequestCacheStats requestCacheStats = client.admin() + .indices() + .prepareStats(index) + .setRequestCache(true) + .get() + .getTotal() + .getRequestCache(); + assertTrue(requestCacheStats.getTimeEWMA(TierType.DISK) > 0); + } } diff --git a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java index 080b3017c4246..edb6a963f89c0 100644 --- a/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java +++ b/server/src/internalClusterTest/java/org/opensearch/indices/IndicesRequestCacheIT.java @@ -43,7 +43,6 @@ import org.opensearch.common.settings.Settings; import org.opensearch.common.time.DateFormatter; import org.opensearch.common.util.FeatureFlags; -import org.opensearch.core.common.unit.ByteSizeValue; import org.opensearch.index.cache.request.RequestCacheStats; import org.opensearch.index.query.QueryBuilders; import org.opensearch.search.aggregations.bucket.global.GlobalAggregationBuilder; diff --git a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java index 34d91be1bc2c6..a557438170f48 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java +++ b/server/src/main/java/org/opensearch/index/cache/request/RequestCacheStats.java @@ -52,12 +52,13 @@ */ public class RequestCacheStats implements Writeable, ToXContentFragment { - private Map map = new HashMap<>(){{ - for (TierType tierType : TierType.values()) + private Map map = new HashMap<>() { { - put(tierType.getStringValue(), new StatsHolder()); - // Every possible tier type must have counters, even if they are disabled. Then the counters report 0 - }} + for (TierType tierType : TierType.values()) { + put(tierType.getStringValue(), new StatsHolder()); + // Every possible tier type must have counters, even if they are disabled. Then the counters report 0 + } + } }; public RequestCacheStats() {} @@ -66,12 +67,12 @@ public RequestCacheStats(StreamInput in) throws IOException { if (in.getVersion().onOrAfter(Version.V_3_0_0)) { this.map = in.readMap(StreamInput::readString, StatsHolder::new); } else { - // objects from earlier versions only contain on-heap info, and do not have entries info + // objects from earlier versions only contain on-heap info, and do not have entries or getTime info long memorySize = in.readVLong(); long evictions = in.readVLong(); long hitCount = in.readVLong(); long missCount = in.readVLong(); - this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0)); + this.map.put(TierType.ON_HEAP.getStringValue(), new StatsHolder(memorySize, evictions, hitCount, missCount, 0, 0.0)); } } @@ -116,6 +117,10 @@ public long getEntries(TierType tierType) { return getTierStats(tierType).entries.count(); } + public double getTimeEWMA(TierType tierType) { + return getTierStats(tierType).getTimeEWMA; + } + // By default, return on-heap stats if no tier is specified public long getMemorySizeInBytes() { @@ -142,12 +147,14 @@ public long getEntries() { return getEntries(TierType.ON_HEAP); } + // no getTimeEWMA default as it'll always return 0 for on-heap + @Override public void writeTo(StreamOutput out) throws IOException { if (out.getVersion().onOrAfter(Version.V_3_0_0)) { out.writeMap(this.map, StreamOutput::writeString, (o, v) -> v.writeTo(o)); // ? } else { - // Write only on-heap values, and don't write entries metric + // Write only on-heap values, and don't write entries metric or getTimeEWMA StatsHolder heapStats = map.get(TierType.ON_HEAP.getStringValue()); out.writeVLong(heapStats.getMemorySize()); out.writeVLong(heapStats.getEvictions()); @@ -160,13 +167,13 @@ public void writeTo(StreamOutput out) throws IOException { public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { builder.startObject(Fields.REQUEST_CACHE_STATS); // write on-heap stats outside of tiers object - getTierStats(TierType.ON_HEAP).toXContent(builder, params); + getTierStats(TierType.ON_HEAP).toXContent(builder, params, false); // Heap tier doesn't write a getTime builder.startObject(Fields.TIERS); for (TierType tierType : TierType.values()) { // fixed order if (tierType != TierType.ON_HEAP) { String tier = tierType.getStringValue(); builder.startObject(tier); - map.get(tier).toXContent(builder, params); + map.get(tier).toXContent(builder, params, true); // Non-heap tiers write a getTime builder.endObject(); } } @@ -189,5 +196,6 @@ static final class Fields { static final String HIT_COUNT = "hit_count"; static final String MISS_COUNT = "miss_count"; static final String ENTRIES = "entries"; + static final String GET_TIME_EWMA = "get_time_ewma_millis"; } } diff --git a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java index b72a4d08e1d99..02ba13d19dd64 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java +++ b/server/src/main/java/org/opensearch/index/cache/request/ShardRequestCache.java @@ -57,12 +57,19 @@ public RequestCacheStats stats() { return new RequestCacheStats(statsHolder); } - public void onHit(TierType tierType) { + public void onHit(TierType tierType, double getTimeEWMA) { statsHolder.get(tierType).hitCount.inc(); + if (tierType == TierType.DISK) { + statsHolder.get(tierType).getTimeEWMA = getTimeEWMA; + } + } - public void onMiss(TierType tierType) { + public void onMiss(TierType tierType, double getTimeEWMA) { statsHolder.get(tierType).missCount.inc(); + if (tierType == TierType.DISK) { + statsHolder.get(tierType).getTimeEWMA = getTimeEWMA; + } } public void onCached(Accountable key, BytesReference value, TierType tierType) { diff --git a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java index 92d057ab8fd9c..4d856e052fcfb 100644 --- a/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java +++ b/server/src/main/java/org/opensearch/index/cache/request/StatsHolder.java @@ -25,6 +25,7 @@ public class StatsHolder implements Serializable, Writeable, ToXContentFragment final CounterMetric hitCount; final CounterMetric missCount; final CounterMetric entries; + double getTimeEWMA; // CounterMetric is long, we need a double public StatsHolder() { this.totalMetric = new CounterMetric(); @@ -32,9 +33,10 @@ public StatsHolder() { this.hitCount = new CounterMetric(); this.missCount = new CounterMetric(); this.entries = new CounterMetric(); + this.getTimeEWMA = 0.0; } - public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries) { + public StatsHolder(long memorySize, long evictions, long hitCount, long missCount, long entries, double getTimeEWMA) { // Switched argument order to match RequestCacheStats this.totalMetric = new CounterMetric(); this.totalMetric.inc(memorySize); @@ -46,12 +48,13 @@ public StatsHolder(long memorySize, long evictions, long hitCount, long missCoun this.missCount.inc(missCount); this.entries = new CounterMetric(); this.entries.inc(entries); + this.getTimeEWMA = getTimeEWMA; } public StatsHolder(StreamInput in) throws IOException { // Read and write the values of the counter metrics. They should always be positive // This object is new, so we shouldn't need version checks for different behavior - this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong()); + this(in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readVLong(), in.readDouble()); // java forces us to do this in one line // guaranteed to be evaluated in correct order (https://docs.oracle.com/javase/specs/jls/se7/html/jls-15.html#jls-15.7.4) } @@ -63,6 +66,7 @@ public void writeTo(StreamOutput out) throws IOException { out.writeVLong(hitCount.count()); out.writeVLong(missCount.count()); out.writeVLong(entries.count()); + out.writeDouble(getTimeEWMA); } public void add(StatsHolder otherStats) { @@ -72,6 +76,18 @@ public void add(StatsHolder otherStats) { hitCount.inc(otherStats.hitCount.count()); missCount.inc(otherStats.missCount.count()); entries.inc(otherStats.entries.count()); + if (!otherStats.isEmpty()) { + getTimeEWMA = otherStats.getTimeEWMA; + } + + /* Adding two EWMAs is a bit tricky. If both stats are non-empty we can assume the newer one dominates. + add() is only called in CommonStats.java in two places: + 1) it's used to either add otherStats to a new (empty) RequestCacheStats + 2) it's used to add new stats to an existing RequestCacheStats + In both cases, the existing object is older, so we can assume otherStats's EWMA dominates. + It doesn't make sense to use the existing EWMA in case 1, and in case 2 the actual value + will be updated from the disk tier on the next hit/miss, so it's probably ok to use otherStats.getTimeEWMA. + */ } public long getEvictions() { @@ -94,8 +110,16 @@ public long getEntries() { return entries.count(); } + public double getTimeEWMA() { + return getTimeEWMA; + } + @Override public XContentBuilder toXContent(XContentBuilder builder, Params params) throws IOException { + return toXContent(builder, params, false); // By default do not write the getTime field + } + + public XContentBuilder toXContent(XContentBuilder builder, Params params, boolean includeGetTime) throws IOException { builder.humanReadableField( RequestCacheStats.Fields.MEMORY_SIZE_IN_BYTES, RequestCacheStats.Fields.MEMORY_SIZE, @@ -105,6 +129,18 @@ public XContentBuilder toXContent(XContentBuilder builder, Params params) throws builder.field(RequestCacheStats.Fields.HIT_COUNT, getHitCount()); builder.field(RequestCacheStats.Fields.MISS_COUNT, getMissCount()); builder.field(RequestCacheStats.Fields.ENTRIES, getEntries()); + if (includeGetTime) { + builder.field(RequestCacheStats.Fields.GET_TIME_EWMA, getTimeEWMA()); + } return builder; } + + private boolean isEmpty() { + return (getEvictions() == 0) + && (getMemorySize() == 0) + && (getHitCount() == 0) + && (getMissCount() == 0) + && (getEntries() == 0) + && (getTimeEWMA() == 0.0); + } } diff --git a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java index 2eef16df2bb9a..6c066b25995a6 100644 --- a/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java +++ b/server/src/main/java/org/opensearch/indices/AbstractIndexShardCacheEntity.java @@ -56,13 +56,13 @@ public final void onCached(IndicesRequestCache.Key key, BytesReference value, Ti } @Override - public final void onHit(TierType tierType) { - stats().onHit(tierType); + public final void onHit(TierType tierType, double getTimeEWMA) { + stats().onHit(tierType, getTimeEWMA); } @Override - public final void onMiss(TierType tierType) { - stats().onMiss(tierType); + public final void onMiss(TierType tierType, double getTimeEWMA) { + stats().onMiss(tierType, getTimeEWMA); } @Override diff --git a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java index 860875024b18f..7f2d8768304e8 100644 --- a/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java +++ b/server/src/main/java/org/opensearch/indices/IndicesRequestCache.java @@ -151,8 +151,8 @@ void clear(CacheEntity entity) { } @Override - public void onMiss(Key key, TierType tierType) { - key.entity.onMiss(tierType); + public void onMiss(Key key, TierType tierType, double getTimeEWMA) { + key.entity.onMiss(tierType, getTimeEWMA); } @Override @@ -161,8 +161,8 @@ public void onRemoval(RemovalNotification notification) { } @Override - public void onHit(Key key, BytesReference value, TierType tierType) { - key.entity.onHit(tierType); + public void onHit(Key key, BytesReference value, TierType tierType, double getTimeEWMA) { + key.entity.onHit(tierType, getTimeEWMA); } @Override @@ -275,12 +275,12 @@ interface CacheEntity extends Accountable, Writeable { /** * Called each time this entity has a cache hit. */ - void onHit(TierType tierType); + void onHit(TierType tierType, double getTimeEWMA); /** * Called each time this entity has a cache miss. */ - void onMiss(TierType tierType); + void onMiss(TierType tierType, double getTimeEWMA); /** * Called when this entity instance is removed diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java index 084ac5a57e0d3..3634a6fa53543 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheEventListener.java @@ -12,11 +12,11 @@ public interface TieredCacheEventListener { - void onMiss(K key, TierType tierType); + void onMiss(K key, TierType tierType, double getTimeEWMA); void onRemoval(RemovalNotification notification); - void onHit(K key, V value, TierType tierType); + void onHit(K key, V value, TierType tierType, double getTimeEWMA); void onCached(K key, V value, TierType tierType); } diff --git a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java index f02a193c7d354..e78678994a612 100644 --- a/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java +++ b/server/src/main/java/org/opensearch/indices/TieredCacheSpilloverStrategyHandler.java @@ -131,16 +131,24 @@ private Function> getValueFromTierCache() { return key -> { for (CachingTier cachingTier : cachingTierList) { V value = cachingTier.get(key); + double getTimeEWMA = getTimeEWMAIfDisk(cachingTier); if (value != null) { - tieredCacheEventListener.onHit(key, value, cachingTier.getTierType()); + tieredCacheEventListener.onHit(key, value, cachingTier.getTierType(), getTimeEWMA); return new CacheValue<>(value, cachingTier.getTierType()); } - tieredCacheEventListener.onMiss(key, cachingTier.getTierType()); + tieredCacheEventListener.onMiss(key, cachingTier.getTierType(), getTimeEWMA); } return null; }; } + private double getTimeEWMAIfDisk(CachingTier cachingTier) { + if (cachingTier.getTierType() == TierType.DISK) { + return ((DiskCachingTier) cachingTier).getTimeMillisEWMA(); + } + return 0.0; + } + @Override public void closeDiskTier() { diskCachingTier.close(); diff --git a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java index 6992b8a441c0a..f1ae3dbc2273f 100644 --- a/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java +++ b/server/src/test/java/org/opensearch/index/cache/request/RequestCacheStatsTests.java @@ -21,29 +21,38 @@ public class RequestCacheStatsTests extends OpenSearchTestCase { public void testConstructorsAndAdd() throws Exception { RequestCacheStats emptyStats = new RequestCacheStats(); for (TierType tierType : TierType.values()) { - assertTierState(emptyStats, tierType, 0, 0, 0, 0, 0); + assertTierState(emptyStats, tierType, 0, 0, 0, 0, 0, 0.0); } Map testHeapMap = new HashMap<>(); - testHeapMap.put(TierType.ON_HEAP, new StatsHolder(1, 2, 3, 4, 5)); + testHeapMap.put(TierType.ON_HEAP, new StatsHolder(1, 2, 3, 4, 5, 0.0)); RequestCacheStats heapOnlyStats = new RequestCacheStats(testHeapMap); for (TierType tierType : TierType.values()) { if (tierType == TierType.ON_HEAP) { - assertTierState(heapOnlyStats, tierType, 1, 2, 3, 4, 5); + assertTierState(heapOnlyStats, tierType, 1, 2, 3, 4, 5, 0.0); } else { - assertTierState(heapOnlyStats, tierType, 0, 0, 0, 0, 0); + assertTierState(heapOnlyStats, tierType, 0, 0, 0, 0, 0, 0.0); } } Map testBothTiersMap = new HashMap<>(); - testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); - testBothTiersMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); + testBothTiersMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15, 0.0)); + testBothTiersMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10, 25.0)); RequestCacheStats bothTiersStats = new RequestCacheStats(testBothTiersMap); - assertTierState(bothTiersStats, TierType.ON_HEAP, 11, 12, 13, 14, 15); - assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + assertTierState(bothTiersStats, TierType.ON_HEAP, 11, 12, 13, 14, 15, 0.0); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10, 25.0); bothTiersStats.add(heapOnlyStats); - assertTierState(bothTiersStats, TierType.ON_HEAP, 12, 14, 16, 18, 20); - assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10); + assertTierState(bothTiersStats, TierType.ON_HEAP, 12, 14, 16, 18, 20, 0.0); + assertTierState(bothTiersStats, TierType.DISK, 6, 7, 8, 9, 10, 25.0); + + Map addEWMAMap = new HashMap<>(); + addEWMAMap.put(TierType.DISK, new StatsHolder(1, 1, 1, 1, 1, 16.0)); + RequestCacheStats addEWMAStats = new RequestCacheStats(addEWMAMap); + bothTiersStats.add(addEWMAStats); + assertTierState(bothTiersStats, TierType.ON_HEAP, 12, 14, 16, 18, 20, 0.0); + assertTierState(bothTiersStats, TierType.DISK, 7, 8, 9, 10, 11, 16.0); + // The new EWMA should be selected + } public void testSerialization() throws Exception { @@ -51,15 +60,15 @@ public void testSerialization() throws Exception { BytesStreamOutput os = new BytesStreamOutput(); Map testMap = new HashMap<>(); - testMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15)); - testMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10)); + testMap.put(TierType.ON_HEAP, new StatsHolder(11, 12, 13, 14, 15, 0.0)); + testMap.put(TierType.DISK, new StatsHolder(6, 7, 8, 9, 10, 25.0)); RequestCacheStats stats = new RequestCacheStats(testMap); stats.writeTo(os); BytesStreamInput is = new BytesStreamInput(BytesReference.toBytes(os.bytes())); RequestCacheStats deserialized = new RequestCacheStats(is); - assertTierState(deserialized, TierType.ON_HEAP, 11, 12, 13, 14, 15); - assertTierState(deserialized, TierType.DISK, 6, 7, 8, 9, 10); + assertTierState(deserialized, TierType.ON_HEAP, 11, 12, 13, 14, 15, 0.0); + assertTierState(deserialized, TierType.DISK, 6, 7, 8, 9, 10, 25.0); } private void assertTierState( @@ -69,12 +78,14 @@ private void assertTierState( long evictions, long hitCount, long missCount, - long entries + long entries, + double getTimeEWMA ) { assertEquals(memSize, stats.getMemorySizeInBytes(tierType)); assertEquals(evictions, stats.getEvictions(tierType)); assertEquals(hitCount, stats.getHitCount(tierType)); assertEquals(missCount, stats.getMissCount(tierType)); assertEquals(entries, stats.getEntries(tierType)); + assertEquals(getTimeEWMA, stats.getTimeEWMA(tierType), 0.01); } } diff --git a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java index 5df6b2154c4d8..31ff072a7ecba 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesRequestCacheTests.java @@ -180,7 +180,6 @@ public void testSpillover() throws Exception { TestEntity entity = new TestEntity(requestCacheStats, indexShard); Loader loader = new Loader(reader, 0); - System.out.println("On-heap cache size at start = " + requestCacheStats.stats().getMemorySizeInBytes()); BytesReference[] termBytesArr = new BytesReference[maxNumInHeap + 1]; for (int i = 0; i < maxNumInHeap + 1; i++) { diff --git a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java index 766d80a81b097..0b20bd07d647b 100644 --- a/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java +++ b/server/src/test/java/org/opensearch/indices/IndicesServiceCloseTests.java @@ -341,10 +341,10 @@ public Object getCacheIdentity() { } @Override - public void onHit(TierType tierType) {} + public void onHit(TierType tierType, double getTimeEWMA) {} @Override - public void onMiss(TierType tierType) {} + public void onMiss(TierType tierType, double getTimeEWMA) {} @Override public void onRemoval(RemovalNotification notification) {}