Skip to content

Add a dynamic setting to change skip_cache_factor and min_frequency for querycache #18351

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
- Added File Cache Pinning ([#17617](https://github.com/opensearch-project/OpenSearch/issues/13648))
- Support consumer reset in Resume API for pull-based ingestion. This PR includes a breaking change for the experimental pull-based ingestion feature. ([#18332](https://github.com/opensearch-project/OpenSearch/pull/18332))
- Add FIPS build tooling ([#4254](https://github.com/opensearch-project/security/issues/4254))
- Add a dynamic setting to change skip_cache_factor and min_frequency for querycache ([#18351](https://github.com/opensearch-project/OpenSearch/issues/18351))

### Changed
- Create generic DocRequest to better categorize ActionRequests ([#18269](https://github.com/opensearch-project/OpenSearch/pull/18269)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,6 +597,10 @@ private void applyChanges(UpdateTask task, ClusterState previousClusterState, Cl
logger.debug("completed calling listeners of cluster state for version {}", newClusterState.version());
}

public ClusterSettings clusterSettings() {
return clusterSettings;
}

protected void connectToNodesAndWait(ClusterState newClusterState) {
// can't wait for an ActionFuture on the cluster applier thread, but we do want to block the thread here, so use a CountDownLatch.
final CountDownLatch countDownLatch = new CountDownLatch(1);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -297,6 +297,9 @@ public void apply(Settings value, Settings current, Settings previous) {
IndicesQueryCache.INDICES_CACHE_QUERY_SIZE_SETTING,
IndicesQueryCache.INDICES_CACHE_QUERY_COUNT_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING,
IndicesQueryCache.INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR,
IndicesQueryCache.INDICES_QUERY_CACHE_MIN_FREQUENCY,
IndicesQueryCache.INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY,
IndicesService.CLUSTER_DEFAULT_INDEX_MAX_MERGE_AT_ONCE_SETTING,
IndicesService.CLUSTER_DEFAULT_INDEX_REFRESH_INTERVAL_SETTING,
IndicesService.CLUSTER_MINIMUM_INDEX_REFRESH_INTERVAL_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ReferenceManager;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.store.AlreadyClosedException;
import org.apache.lucene.store.BufferedChecksumIndexInput;
import org.apache.lucene.store.ChecksumIndexInput;
Expand Down Expand Up @@ -187,6 +186,7 @@
import org.opensearch.index.warmer.ShardIndexWarmerService;
import org.opensearch.index.warmer.WarmerStats;
import org.opensearch.indices.IndexingMemoryController;
import org.opensearch.indices.IndicesQueryCache;
import org.opensearch.indices.IndicesService;
import org.opensearch.indices.RemoteStoreSettings;
import org.opensearch.indices.cluster.IndicesClusterStateService;
Expand Down Expand Up @@ -495,7 +495,7 @@ public boolean shouldCache(Query query) {
}
};
} else {
cachingPolicy = new UsageTrackingQueryCachingPolicy();
cachingPolicy = new IndicesQueryCache.OpenseachUsageTrackingQueryCachingPolicy(clusterApplierService.clusterSettings());
}
indexShardOperationPermits = new IndexShardOperationPermits(shardId, threadPool);
readerWrapper = indexReaderWrapper;
Expand Down Expand Up @@ -5577,4 +5577,5 @@ public void stopRefreshTask() {
public AsyncShardRefreshTask getRefreshTask() {
return refreshTask;
}

}
115 changes: 114 additions & 1 deletion server/src/main/java/org/opensearch/indices/IndicesQueryCache.java
Original file line number Diff line number Diff line change
Expand Up @@ -35,15 +35,20 @@
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.DisjunctionMaxQuery;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.LRUQueryCache;
import org.apache.lucene.search.MultiTermQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCache;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.UsageTrackingQueryCachingPolicy;
import org.apache.lucene.search.Weight;
import org.opensearch.common.annotation.PublicApi;
import org.opensearch.common.lucene.ShardCoreKeyMap;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Setting;
import org.opensearch.common.settings.Setting.Property;
import org.opensearch.common.settings.Settings;
Expand Down Expand Up @@ -91,6 +96,33 @@ public class IndicesQueryCache implements QueryCache, Closeable {
Property.NodeScope
);

// dynamic change the skipCacheFactor for query_cache
public static final Setting<Float> INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR = Setting.floatSetting(
"indices.queries.cache.skip_cache_factor",
10,
1,
Property.NodeScope,
Property.Dynamic
);

// dynamic change the min frequency cache threshold for query
public static final Setting<Integer> INDICES_QUERY_CACHE_MIN_FREQUENCY = Setting.intSetting(
"indices.queries.cache.min_frequency",
5,
1,
Property.NodeScope,
Property.Dynamic
);

// dynamic change the min frequency cache threshold for costly query
public static final Setting<Integer> INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY = Setting.intSetting(
"indices.queries.cache.costly_min_frequency",
2,
1,
Property.NodeScope,
Property.Dynamic
);

private final LRUQueryCache cache;
private final ShardCoreKeyMap shardKeyMap = new ShardCoreKeyMap();
private final Map<ShardId, Stats> shardStats = new ConcurrentHashMap<>();
Expand All @@ -101,18 +133,40 @@ public class IndicesQueryCache implements QueryCache, Closeable {
// See onDocIdSetEviction for more info
private final Map<Object, StatsAndCount> stats2 = Collections.synchronizedMap(new IdentityHashMap<>());

// Compatible for public api
public IndicesQueryCache(Settings settings) {
this(settings, null);
}

public IndicesQueryCache(Settings settings, ClusterSettings clusterSettings) {
final ByteSizeValue size = INDICES_CACHE_QUERY_SIZE_SETTING.get(settings);
final int count = INDICES_CACHE_QUERY_COUNT_SETTING.get(settings);
logger.debug("using [node] query cache with size [{}] max filter count [{}]", size, count);
float skipCacheFactor = INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.get(settings);
logger.debug("using [node] query cache with size [{}] max filter count [{}] skipCacheFactor [{}]", size, count, skipCacheFactor);
if (INDICES_QUERIES_CACHE_ALL_SEGMENTS_SETTING.get(settings)) {
cache = new OpenSearchLRUQueryCache(count, size.getBytes(), context -> true, 1f);
} else {
cache = new OpenSearchLRUQueryCache(count, size.getBytes());
cache.setSkipCacheFactor(skipCacheFactor);
if (clusterSettings != null) {
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR, this::setSkipCacheFactor);
} else {
logger.warn("clusterSettings is null, so {} is not dynamic", INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey());
}
}
sharedRamBytesUsed = 0;
}

public void setSkipCacheFactor(float skipCacheFactor) {
logger.debug(
"set cluster settings {} {} -> {}",
INDICES_QUERIES_CACHE_SKIP_CACHE_FACTOR.getKey(),
this.cache.getSkipCacheFactor(),
skipCacheFactor
);
cache.setSkipCacheFactor(skipCacheFactor);
}

/** Get usage statistics for the given shard. */
public QueryCacheStats getStats(ShardId shard) {
final Map<ShardId, QueryCacheStats> stats = new HashMap<>();
Expand Down Expand Up @@ -393,4 +447,63 @@ protected void onMiss(Object readerCoreKey, Query filter) {
shardStats.missCount += 1;
}
}

/**
* Custom caching policy for Opensearch.
*/
public static class OpenseachUsageTrackingQueryCachingPolicy extends UsageTrackingQueryCachingPolicy {
private volatile int minFrequency;
private volatile int minFrequencyForCostly;

public OpenseachUsageTrackingQueryCachingPolicy(ClusterSettings clusterSettings) {
minFrequency = clusterSettings.get(INDICES_QUERY_CACHE_MIN_FREQUENCY);
minFrequencyForCostly = clusterSettings.get(INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY);
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERY_CACHE_MIN_FREQUENCY, this::setMinFrequency);
clusterSettings.addSettingsUpdateConsumer(INDICES_QUERY_CACHE_COSTLY_MIN_FREQUENCY, this::setMinFrequencyForCostly);
}

@Override
protected int minFrequencyToCache(Query query) {
if (isCostly(query)) {
return minFrequencyForCostly;
}
int minFrequency = this.minFrequency;
if (query instanceof BooleanQuery || query instanceof DisjunctionMaxQuery) {
--minFrequency;
}

return Math.max(1, minFrequency);
}

/**
* Same to Lucene's UsageTrackingQueryCachingPolicy.isCostly, it's not public in Lucene.
* Given that lucene doesn't give the desired extensibility at this point.
* Also, we can extend it if needed.
*/
private boolean isCostly(Query query) {
return query instanceof MultiTermQuery
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreBlendedWrapper")
|| query.getClass().getSimpleName().equals("MultiTermQueryConstantScoreWrapper")
|| isPointQuery(query);
}

// Same to Lucene's UsageTrackingQueryCachingPolicy.isPointQuery
private boolean isPointQuery(Query query) {
for (Class<?> clazz = query.getClass(); clazz != Query.class; clazz = clazz.getSuperclass()) {
final String simpleName = clazz.getSimpleName();
if (simpleName.startsWith("Point") && simpleName.endsWith("Query")) {
return true;
}
}
return false;
}

public void setMinFrequency(int minFrequency) {
this.minFrequency = minFrequency;
}

public void setMinFrequencyForCostly(int minFrequencyForCostly) {
this.minFrequencyForCostly = minFrequencyForCostly;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ public IndicesService(
}
return Optional.of(new IndexShardCacheEntity(indexService.getShardOrNull(shardId.id())));
}), cacheService, threadPool, clusterService, nodeEnv);
this.indicesQueryCache = new IndicesQueryCache(settings);
this.indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
this.mapperRegistry = mapperRegistry;
this.namedWriteableRegistry = namedWriteableRegistry;
indexingMemoryController = new IndexingMemoryController(
Expand Down Expand Up @@ -1195,7 +1195,7 @@ public synchronized void verifyIndexMetadata(IndexMetadata metadata, IndexMetada
IndicesFieldDataCache indicesFieldDataCache = new IndicesFieldDataCache(settings, new IndexFieldDataCache.Listener() {
});
closeables.add(indicesFieldDataCache);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings);
IndicesQueryCache indicesQueryCache = new IndicesQueryCache(settings, clusterService.getClusterSettings());
closeables.add(indicesQueryCache);
// this will also fail if some plugin fails etc. which is nice since we can verify that early
final IndexService service = createIndexService(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,31 +33,49 @@
package org.opensearch.indices;

import org.apache.lucene.document.Document;
import org.apache.lucene.document.IntPoint;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.LeafReaderContext;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.BooleanClause;
import org.apache.lucene.search.BooleanQuery;
import org.apache.lucene.search.ConstantScoreScorer;
import org.apache.lucene.search.ConstantScoreWeight;
import org.apache.lucene.search.DocIdSetIterator;
import org.apache.lucene.search.Explanation;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.MatchAllDocsQuery;
import org.apache.lucene.search.MultiTermQuery;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.QueryCachingPolicy;
import org.apache.lucene.search.QueryVisitor;
import org.apache.lucene.search.ScoreMode;
import org.apache.lucene.search.Scorer;
import org.apache.lucene.search.ScorerSupplier;
import org.apache.lucene.search.TermInSetQuery;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TotalHitCountCollector;
import org.apache.lucene.search.Weight;
import org.apache.lucene.store.Directory;
import org.apache.lucene.util.BytesRef;
import org.opensearch.common.lucene.index.OpenSearchDirectoryReader;
import org.opensearch.common.settings.ClusterSettings;
import org.opensearch.common.settings.Settings;
import org.opensearch.common.util.io.IOUtils;
import org.opensearch.core.index.shard.ShardId;
import org.opensearch.index.cache.query.QueryCacheStats;
import org.opensearch.index.mapper.KeywordFieldMapper;
import org.opensearch.test.OpenSearchTestCase;

import java.io.IOException;
import java.util.ArrayList;
import java.util.List;

import static org.opensearch.indices.IndicesQueryCache.OpenseachUsageTrackingQueryCachingPolicy;
import static org.apache.lucene.search.MultiTermQuery.CONSTANT_SCORE_BLENDED_REWRITE;
import static org.apache.lucene.search.MultiTermQuery.CONSTANT_SCORE_REWRITE;

public class IndicesQueryCacheTests extends OpenSearchTestCase {

Expand Down Expand Up @@ -492,4 +510,105 @@ public void testDelegatesCount() throws Exception {
cache.onClose(shard);
cache.close();
}

public void testDynamicChangeSettings() throws IOException {
Directory dir = newDirectory();
IndexWriterConfig conf = newIndexWriterConfig();
conf.setMaxBufferedDocs(100000);
IndexWriter w = new IndexWriter(dir, conf);
// lucene will not cache segment whose docs is smaller than 10,000
for (int i = 0; i < 10001; i++) {
Document document = new Document();
document.add(new IntPoint("age", i));
final BytesRef binaryValue = new BytesRef(String.valueOf(i));
document.add(new KeywordFieldMapper.KeywordField("name", binaryValue, KeywordFieldMapper.Defaults.FIELD_TYPE));
w.addDocument(document);
}
DirectoryReader r = DirectoryReader.open(w);
w.close();
ShardId shard = new ShardId("index", "_na_", 0);
r = OpenSearchDirectoryReader.wrap(r, shard);

IndexSearcher searcher = new IndexSearcher(r);
Settings settings = Settings.builder().build();
OpenseachUsageTrackingQueryCachingPolicy queryCachingPolicy = new OpenseachUsageTrackingQueryCachingPolicy(
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);

searcher.setQueryCachingPolicy(queryCachingPolicy);
ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
IndicesQueryCache cache = new IndicesQueryCache(settings, clusterSettings);
searcher.setQueryCache(cache);
final TotalHitCountCollector collector = new TotalHitCountCollector();

// test changing queryCacheMinFrequency
{
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
booleanQuery.add(IntPoint.newRangeQuery("age", 1, 9999), BooleanClause.Occur.FILTER);
booleanQuery.add(new TermQuery(new Term("name", "1")), BooleanClause.Occur.FILTER);
searcher.search(booleanQuery.build(), collector);
QueryCacheStats stats = cache.getStats(shard);
assertEquals(0L, stats.getCacheSize());
assertEquals(0L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
assertEquals(6L, stats.getMissCount());

queryCachingPolicy.setMinFrequency(2);
searcher.search(booleanQuery.build(), collector);
stats = cache.getStats(shard);
// ensure result is cached
assertEquals(1L, stats.getCacheSize());
assertEquals(1L, stats.getCacheCount());
}

// test changing skipCacheFactor
{
// make sure the range query can be cached, because queryCacheMinFrequency is 2
queryCachingPolicy.setMinFrequency(4);
BooleanQuery.Builder booleanQuery = new BooleanQuery.Builder();
booleanQuery.add(IntPoint.newRangeQuery("age", 2, 9999), BooleanClause.Occur.FILTER);
booleanQuery.add(new TermQuery(new Term("name", "3")), BooleanClause.Occur.MUST);
cache.setSkipCacheFactor(20000);
searcher.search(booleanQuery.build(), collector);
searcher.search(booleanQuery.build(), collector);
QueryCacheStats stats = cache.getStats(shard);
// only range can be cached, assert the range query has been cached because of skipCacheFactor
assertEquals(2L, stats.getCacheSize());
assertEquals(2L, stats.getCacheCount());
assertEquals(0L, stats.getHitCount());
}
IOUtils.close(r, dir);
cache.onClose(shard);
cache.close(); // this triggers some assertions
}

public void testCostlyMinFrequencyToCache() throws IOException {
Settings settings = Settings.builder().build();
OpenseachUsageTrackingQueryCachingPolicy queryCachingPolicy = new OpenseachUsageTrackingQueryCachingPolicy(
new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)
);
int minFrequencyForCostly = 3;
queryCachingPolicy.setMinFrequencyForCostly(minFrequencyForCostly);

// test MultiTermQuery
List<BytesRef> terms = new ArrayList<>();
terms.add(new BytesRef("foo"));
TermInSetQuery termInSetQuery = new TermInSetQuery(MultiTermQuery.DOC_VALUES_REWRITE, "field", terms);
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(termInSetQuery));

// test MultiTermQueryConstantScoreBlendedWrapper
Query query = CONSTANT_SCORE_BLENDED_REWRITE.rewrite(null, termInSetQuery);
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));

// test MultiTermQueryConstantScoreWrapper
query = CONSTANT_SCORE_REWRITE.rewrite(null, termInSetQuery);
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));

// test TermInSetQuery
query = CONSTANT_SCORE_REWRITE.rewrite(null, termInSetQuery);
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));

query = IntPoint.newRangeQuery("age", 2, 9999);
assertEquals(minFrequencyForCostly, queryCachingPolicy.minFrequencyToCache(query));
}
}
Loading