Skip to content

Commit c31602e

Browse files
committed
Add a dynamic setting to change skip_cache_factor and min_frequency for querycache
Signed-off-by: kkewwei <kewei.11@bytedance.com> Signed-off-by: kkewwei <kkewwei@163.com> test
1 parent d723db8 commit c31602e

File tree

8 files changed

+193
-4
lines changed

8 files changed

+193
-4
lines changed

CHANGELOG.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
2828
- Use QueryCoordinatorContext for the rewrite in validate API. ([#18272](https://github.com/opensearch-project/OpenSearch/pull/18272))
2929
- Upgrade crypto kms plugin dependencies for AWS SDK v2.x. ([#18268](https://github.com/opensearch-project/OpenSearch/pull/18268))
3030
- Add support for `matched_fields` with the unified highlighter ([#18164](https://github.com/opensearch-project/OpenSearch/issues/18164))
31+
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18274]( ([#18274](URL_ADDRESS.com/opensearch-project/OpenSearch/pull/18274))
3132

3233
### Changed
3334
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))

server/src/main/java/org/opensearch/common/settings/ClusterSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,6 +297,7 @@ public void apply(Settings value, Settings current, Settings previous) {
297297
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
298298
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
299299
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
300+
IndicesQueryCache.INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR,
300301
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
301302
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
302303
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,

server/src/main/java/org/opensearch/common/settings/IndexScopedSettings.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -185,6 +185,7 @@ public final class IndexScopedSettings extends AbstractScopedSettings {
185185
IndexSettings.INDEX_SEARCH_IDLE_AFTER,
186186
IndexSettings.INDEX_SEARCH_THROTTLED,
187187
IndexSettings.INDEX_UNREFERENCED_FILE_CLEANUP,
188+
IndexSettings.INDEX_QUERY_CACHE_MIN_FREQUENCY,
188189
IndexFieldDataService.INDEX_FIELDDATA_CACHE_KEY,
189190
FieldMapper.IGNORE_MALFORMED_SETTING,
190191
FieldMapper.COERCE_SETTING,

server/src/main/java/org/opensearch/index/IndexSettings.java

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -783,6 +783,14 @@ public static IndexMergePolicy fromString(String text) {
783783
Property.IndexScope
784784
);
785785

786+
public static final Setting<Integer> INDEX_QUERY_CACHE_MIN_FREQUENCY = Setting.intSetting(
787+
"index.query_cache.min_frequency",
788+
5,
789+
1,
790+
Property.Dynamic,
791+
Property.IndexScope
792+
);
793+
786794
private final Index index;
787795
private final Version version;
788796
private final Logger logger;
@@ -832,6 +840,7 @@ public static IndexMergePolicy fromString(String text) {
832840
private final RemoteStorePathStrategy remoteStorePathStrategy;
833841
private final boolean isTranslogMetadataEnabled;
834842
private volatile boolean allowDerivedField;
843+
private volatile int queryCacheMinFrequency;
835844

836845
/**
837846
* The maximum age of a retention lease before it is considered expired.
@@ -1079,6 +1088,7 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
10791088
setEnableFuzzySetForDocId(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_ENABLED_SETTING));
10801089
setDocIdFuzzySetFalsePositiveProbability(scopedSettings.get(INDEX_DOC_ID_FUZZY_SET_FALSE_POSITIVE_PROBABILITY_SETTING));
10811090
isCompositeIndex = scopedSettings.get(StarTreeIndexSettings.IS_COMPOSITE_INDEX_SETTING);
1091+
queryCacheMinFrequency = scopedSettings.get(INDEX_QUERY_CACHE_MIN_FREQUENCY);
10821092
scopedSettings.addSettingsUpdateConsumer(
10831093
TieredMergePolicyProvider.INDEX_COMPOUND_FORMAT_SETTING,
10841094
tieredMergePolicyProvider::setNoCFSRatio
@@ -1201,6 +1211,15 @@ public IndexSettings(final IndexMetadata indexMetadata, final Settings nodeSetti
12011211
IndexMetadata.INDEX_REMOTE_TRANSLOG_REPOSITORY_SETTING,
12021212
this::setRemoteStoreTranslogRepository
12031213
);
1214+
scopedSettings.addSettingsUpdateConsumer(INDEX_QUERY_CACHE_MIN_FREQUENCY, this::setQueryCacheMinFrequency);
1215+
}
1216+
1217+
private void setQueryCacheMinFrequency(int queryCacheMinFrequency) {
1218+
this.queryCacheMinFrequency = queryCacheMinFrequency;
1219+
}
1220+
1221+
public int getQueryCacheMinFrequency() {
1222+
return queryCacheMinFrequency;
12041223
}
12051224

12061225
private void setSearchIdleAfter(TimeValue searchIdleAfter) {

server/src/main/java/org/opensearch/index/shard/IndexShard.java

Lines changed: 53 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,10 +43,14 @@
4343
import org.apache.lucene.index.LeafReader;
4444
import org.apache.lucene.index.SegmentInfos;
4545
import org.apache.lucene.index.Term;
46+
import org.apache.lucene.search.BooleanQuery;
47+
import org.apache.lucene.search.DisjunctionMaxQuery;
48+
import org.apache.lucene.search.MultiTermQuery;
4649
import org.apache.lucene.search.Query;
4750
import org.apache.lucene.search.QueryCachingPolicy;
4851
import org.apache.lucene.search.ReferenceManager;
4952
import org.apache.lucene.search.Sort;
53+
import org.apache.lucene.search.TermInSetQuery;
5054
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
5155
import org.apache.lucene.store.AlreadyClosedException;
5256
import org.apache.lucene.store.BufferedChecksumIndexInput;
@@ -495,7 +499,7 @@ public boolean shouldCache(Query query) {
495499
}
496500
};
497501
} else {
498-
cachingPolicy = new UsageTrackingQueryCachingPolicy();
502+
cachingPolicy = new OpenseachUsageTrackingQueryCachingPolicy(() -> indexSettings.getQueryCacheMinFrequency());
499503
}
500504
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
501505
readerWrapper = indexReaderWrapper;
@@ -5578,4 +5582,52 @@ public void stopRefreshTask() {
55785582
public AsyncShardRefreshTask getRefreshTask() {
55795583
return refreshTask;
55805584
}
5585+
5586+
/**
5587+
* Custom caching policy for Opensearch.
5588+
*/
5589+
public static class OpenseachUsageTrackingQueryCachingPolicy extends UsageTrackingQueryCachingPolicy {
5590+
private Supplier<Integer> queryCacheMinFrequency;
5591+
5592+
public OpenseachUsageTrackingQueryCachingPolicy(Supplier<Integer> queryCacheMinFrequency) {
5593+
this.queryCacheMinFrequency = queryCacheMinFrequency;
5594+
}
5595+
5596+
@Override
5597+
protected int minFrequencyToCache(Query query) {
5598+
if (isCostly(query)) {
5599+
return 2;
5600+
}
5601+
int minFrequency = queryCacheMinFrequency.get();
5602+
if (query instanceof BooleanQuery || query instanceof DisjunctionMaxQuery) {
5603+
--minFrequency;
5604+
}
5605+
5606+
return Math.max(1, minFrequency);
5607+
}
5608+
5609+
/**
5610+
* Same to Lucene's UsageTrackingQueryCachingPolicy.isCostly, it's not public in Lucene.
5611+
* Given that lucene doesn't give the desired extensibility at this point.
5612+
* Also, we can extension point if needed.
5613+
*/
5614+
private boolean isCostly(Query query) {
5615+
return query instanceof MultiTermQuery
5616+
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreBlendedWrapper")
5617+
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreWrapper")
5618+
|| query instanceof TermInSetQuery
5619+
|| isPointQuery(query);
5620+
}
5621+
5622+
// Same to Lucene's UsageTrackingQueryCachingPolicy.isPointQuery
5623+
private boolean isPointQuery(Query query) {
5624+
for (Class<?> clazz = query.getClass(); clazz != Query.class; clazz = clazz.getSuperclass()) {
5625+
final String simpleName = clazz.getSimpleName();
5626+
if (simpleName.startsWith("Point") && simpleName.endsWith("Query")) {
5627+
return true;
5628+
}
5629+
}
5630+
return false;
5631+
}
5632+
}
55815633
}

server/src/main/java/org/opensearch/indices/IndicesQueryCache.java

Lines changed: 36 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@
4444
import org.apache.lucene.search.Weight;
4545
import org.opensearch.common.annotation.PublicApi;
4646
import org.opensearch.common.lucene.ShardCoreKeyMap;
47+
import org.opensearch.common.settings.ClusterSettings;
4748
import org.opensearch.common.settings.Setting;
4849
import org.opensearch.common.settings.Setting.Property;
4950
import org.opensearch.common.settings.Settings;
@@ -91,6 +92,15 @@ public class IndicesQueryCache implements QueryCache, Closeable {
9192
Property.NodeScope
9293
);
9394

95+
// dynamic change the skipCacheFactor for query_cache
96+
public static final Setting<Float> INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR = Setting.floatSetting(
97+
"indices.queries.cache.skip_cache_factor",
98+
10,
99+
1,
100+
Property.NodeScope,
101+
Property.Dynamic
102+
);
103+
94104
private final LRUQueryCache cache;
95105
private final ShardCoreKeyMap shardKeyMap = new ShardCoreKeyMap();
96106
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
@@ -101,18 +111,43 @@ public class IndicesQueryCache implements QueryCache, Closeable {
101111
// See onDocIdSetEviction for more info
102112
private final Map<Object, StatsAndCount> stats2 = Collections.synchronizedMap(new IdentityHashMap<>());
103113

114+
// Compatible for public api
104115
public IndicesQueryCache(Settings settings) {
116+
this(settings, null);
117+
}
118+
119+
public IndicesQueryCache(Settings settings, ClusterSettings clusterSettings) {
105120
final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings);
106121
final int count = INDICES_CACHE_QUERY_COUNT_SETTING.get(settings);
107-
logger.debug("using [node] query cache with size [{}] max filter count [{}]", size, count);
122+
float skipCacheFactor = INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.get(settings);
123+
logger.debug("using [node] query cache with size [{}] max filter count [{}] skipCacheFactor [{}]", size, count, skipCacheFactor);
108124
if (INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.get(settings)) {
109125
cache = new OpenSearchLRUQueryCache(count, size.getBytes(), context -> true, 1f);
110126
} else {
111127
cache = new OpenSearchLRUQueryCache(count, size.getBytes());
128+
cache.setSkipCacheFactor(skipCacheFactor);
129+
if (clusterSettings != null) {
130+
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR, this::setSkipCacheFactor);
131+
} else {
132+
logger.warn("clusterSettings is null, so {} is not dynamic", INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey());
133+
}
112134
}
113135
sharedRamBytesUsed = 0;
114136
}
115137

138+
public void setSkipCacheFactor(float skipCacheFactor) {
139+
if (skipCacheFactor < 1) {
140+
throw new IllegalArgumentException("skipCacheFactor must be no less than 1, get " + skipCacheFactor);
141+
}
142+
logger.debug(
143+
"set cluster settings {} {} -> {}",
144+
INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey(),
145+
this.cache.getSkipCacheFactor(),
146+
skipCacheFactor
147+
);
148+
cache.setSkipCacheFactor(skipCacheFactor);
149+
}
150+
116151
/** Get usage statistics for the given shard. */
117152
public QueryCacheStats getStats(ShardId shard) {
118153
final Map<ShardId, QueryCacheStats> stats = new HashMap<>();

server/src/main/java/org/opensearch/indices/IndicesService.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -471,7 +471,7 @@ public IndicesService(
471471
}
472472
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
473473
}), cacheService, threadPool, clusterService, nodeEnv);
474-
this.indicesQueryCache = new IndicesQueryCache(settings);
474+
this.indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
475475
this.mapperRegistry = mapperRegistry;
476476
this.namedWriteableRegistry = namedWriteableRegistry;
477477
indexingMemoryController = new IndexingMemoryController(
@@ -1195,7 +1195,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
11951195
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
11961196
});
11971197
closeables.add(indicesFieldDataCache);
1198-
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
1198+
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
11991199
closeables.add(indicesQueryCache);
12001200
// this will also fail if some plugin fails etc. which is nice since we can verify that early
12011201
final IndexService service = createIndexService(

server/src/test/java/org/opensearch/indices/IndicesQueryCacheTests.java

Lines changed: 80 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,9 +33,14 @@
3333
package org.opensearch.indices;
3434

3535
import org.apache.lucene.document.Document;
36+
import org.apache.lucene.document.IntPoint;
3637
import org.apache.lucene.index.DirectoryReader;
3738
import org.apache.lucene.index.IndexWriter;
39+
import org.apache.lucene.index.IndexWriterConfig;
3840
import org.apache.lucene.index.LeafReaderContext;
41+
import org.apache.lucene.index.Term;
42+
import org.apache.lucene.search.BooleanClause;
43+
import org.apache.lucene.search.BooleanQuery;
3944
import org.apache.lucene.search.ConstantScoreScorer;
4045
import org.apache.lucene.search.ConstantScoreWeight;
4146
import org.apache.lucene.search.DocIdSetIterator;
@@ -48,16 +53,23 @@
4853
import org.apache.lucene.search.ScoreMode;
4954
import org.apache.lucene.search.Scorer;
5055
import org.apache.lucene.search.ScorerSupplier;
56+
import org.apache.lucene.search.TermQuery;
57+
import org.apache.lucene.search.TotalHitCountCollector;
5158
import org.apache.lucene.search.Weight;
5259
import org.apache.lucene.store.Directory;
60+
import org.apache.lucene.util.BytesRef;
5361
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
62+
import org.opensearch.common.settings.ClusterSettings;
5463
import org.opensearch.common.settings.Settings;
5564
import org.opensearch.common.util.io.IOUtils;
5665
import org.opensearch.core.index.shard.ShardId;
5766
import org.opensearch.index.cache.query.QueryCacheStats;
67+
import org.opensearch.index.mapper.KeywordFieldMapper;
68+
import org.opensearch.index.shard.IndexShard;
5869
import org.opensearch.test.OpenSearchTestCase;
5970

6071
import java.io.IOException;
72+
import java.util.concurrent.atomic.AtomicInteger;
6173

6274
public class IndicesQueryCacheTests extends OpenSearchTestCase {
6375

@@ -492,4 +504,72 @@ public void testDelegatesCount() throws Exception {
492504
cache.onClose(shard);
493505
cache.close();
494506
}
507+
508+
public void testDynamicChangeSettings() throws IOException {
509+
Directory dir = newDirectory();
510+
IndexWriterConfig conf = newIndexWriterConfig();
511+
conf.setMaxBufferedDocs(100000);
512+
IndexWriter w = new IndexWriter(dir, conf);
513+
// lucene will not cache segment whose docs is smaller than 10,000
514+
for (int i = 0; i < 10001; i++) {
515+
Document document = new Document();
516+
document.add(new IntPoint("age", i));
517+
final BytesRef binaryValue = new BytesRef(String.valueOf(i));
518+
document.add(new KeywordFieldMapper.KeywordField("name", binaryValue, KeywordFieldMapper.Defaults.FIELD_TYPE));
519+
w.addDocument(document);
520+
}
521+
DirectoryReader r = DirectoryReader.open(w);
522+
w.close();
523+
ShardId shard = new ShardId("index", "_na_", 0);
524+
r = OpenSearchDirectoryReader.wrap(r, shard);
525+
526+
AtomicInteger queryCacheMinFrequency = new AtomicInteger(5);
527+
IndexSearcher searcher = new IndexSearcher(r);
528+
searcher.setQueryCachingPolicy(new IndexShard.OpenseachUsageTrackingQueryCachingPolicy(() -> queryCacheMinFrequency.get()));
529+
Settings settings = Settings.builder().build();
530+
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
531+
IndicesQueryCache cache = new IndicesQueryCache(settings, clusterSettings);
532+
searcher.setQueryCache(cache);
533+
final TotalHitCountCollector collector = new TotalHitCountCollector();
534+
535+
// test changing queryCacheMinFrequency
536+
{
537+
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
538+
booleanQuery.add(IntPoint.newRangeQuery("age", 1, 9999), BooleanClause.Occur.FILTER);
539+
booleanQuery.add(new TermQuery(new Term("name", "1")), BooleanClause.Occur.FILTER);
540+
searcher.search(booleanQuery.build(), collector);
541+
QueryCacheStats stats = cache.getStats(shard);
542+
assertEquals(0L, stats.getCacheSize());
543+
assertEquals(0L, stats.getCacheCount());
544+
assertEquals(0L, stats.getHitCount());
545+
assertEquals(6L, stats.getMissCount());
546+
547+
queryCacheMinFrequency.set(2);
548+
searcher.search(booleanQuery.build(), collector);
549+
stats = cache.getStats(shard);
550+
// ensure result is cached
551+
assertEquals(1L, stats.getCacheSize());
552+
assertEquals(1L, stats.getCacheCount());
553+
}
554+
555+
// test changing skipCacheFactor
556+
{
557+
// make sure the TermInSetQuery can be cached, because queryCacheMinFrequency is 2
558+
queryCacheMinFrequency.set(4);
559+
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
560+
booleanQuery.add(IntPoint.newRangeQuery("age", 2, 9999), BooleanClause.Occur.FILTER);
561+
booleanQuery.add(new TermQuery(new Term("name", "3")), BooleanClause.Occur.MUST);
562+
cache.setSkipCacheFactor(20000);
563+
searcher.search(booleanQuery.build(), collector);
564+
searcher.search(booleanQuery.build(), collector);
565+
QueryCacheStats stats = cache.getStats(shard);
566+
// only range can be cached, assert the range query has been cached because of skipCacheFactor
567+
assertEquals(2L, stats.getCacheSize());
568+
assertEquals(2L, stats.getCacheCount());
569+
assertEquals(0L, stats.getHitCount());
570+
}
571+
IOUtils.close(r, dir);
572+
cache.onClose(shard);
573+
cache.close(); // this triggers some assertions
574+
}
495575
}

0 commit comments

Comments
 (0)