From e86088bf97f4d5dbc57f33c2f4b9eca8a034a0c2 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Sun, 6 Jul 2025 20:22:27 +0200 Subject: [PATCH 01/11] CPS SPI POC --- server/src/main/java/module-info.java | 3 ++- .../search/CrossClusterSearchExtension.java | 24 +++++++++++++++++++ .../elasticsearch/node/NodeConstruction.java | 9 +++++++ 3 files changed, 35 insertions(+), 1 deletion(-) create mode 100644 server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index ef3d5da1c9531..2df8cc7878a32 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -398,7 +398,8 @@ org.elasticsearch.settings.secure, org.elasticsearch.serverless.constants, org.elasticsearch.serverless.apifiltering, - org.elasticsearch.internal.security; + org.elasticsearch.internal.security, + org.elasticsearch.serverless.crossproject; exports org.elasticsearch.telemetry.tracing; exports org.elasticsearch.telemetry; diff --git a/server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java b/server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java new file mode 100644 index 0000000000000..62f0248e80103 --- /dev/null +++ b/server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java @@ -0,0 +1,24 @@ +/* + * Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one + * or more contributor license agreements. Licensed under the "Elastic License + * 2.0", the "GNU Affero General Public License v3.0 only", and the "Server Side + * Public License v 1"; you may not use this file except in compliance with, at + * your election, the "Elastic License 2.0", the "GNU Affero General Public + * License v3.0 only", or the "Server Side Public License, v 1". + */ + +package org.elasticsearch.action.search; + +public interface CrossClusterSearchExtension { + + boolean forceReconnect(); + + class Noop implements CrossClusterSearchExtension { + public Noop() {} + + @Override + public boolean forceReconnect() { + return false; + } + } +} diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index aaaf5383ba80f..844fc0c3580fe 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -23,6 +23,7 @@ import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.ReservedPipelineAction; +import org.elasticsearch.action.search.CrossClusterSearchExtension; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -1015,6 +1016,14 @@ public Map queryFields() { var reservedStateHandlerProviders = pluginsService.loadServiceProviders(ReservedStateHandlerProvider.class); + var crossClusterSearchExtension = pluginsService.loadSingletonServiceProvider( + CrossClusterSearchExtension.class, + CrossClusterSearchExtension.Noop::new + ); + + logger.info("Using cross-cluster search extension: [{}]", crossClusterSearchExtension.getClass().getName()); + logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceReconnect()); + ActionModule actionModule = new ActionModule( settings, clusterModule.getIndexNameExpressionResolver(), From 423f224a6f223a968dc51f8152bd49d05e41f29c Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Mon, 7 Jul 2025 12:38:18 +0200 Subject: [PATCH 02/11] Clean up --- server/src/main/java/module-info.java | 3 +-- .../org/elasticsearch/node/NodeConstruction.java | 7 +++---- .../internal}/CrossClusterSearchExtension.java | 14 ++++++++------ 3 files changed, 12 insertions(+), 12 deletions(-) rename server/src/main/java/org/elasticsearch/{action/search => search/internal}/CrossClusterSearchExtension.java (63%) diff --git a/server/src/main/java/module-info.java b/server/src/main/java/module-info.java index 2df8cc7878a32..ef3d5da1c9531 100644 --- a/server/src/main/java/module-info.java +++ b/server/src/main/java/module-info.java @@ -398,8 +398,7 @@ org.elasticsearch.settings.secure, org.elasticsearch.serverless.constants, org.elasticsearch.serverless.apifiltering, - org.elasticsearch.internal.security, - org.elasticsearch.serverless.crossproject; + org.elasticsearch.internal.security; exports org.elasticsearch.telemetry.tracing; exports org.elasticsearch.telemetry; diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 844fc0c3580fe..8ea482099306f 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -23,7 +23,7 @@ import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.ReservedPipelineAction; -import org.elasticsearch.action.search.CrossClusterSearchExtension; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -1018,11 +1018,10 @@ public Map queryFields() { var crossClusterSearchExtension = pluginsService.loadSingletonServiceProvider( CrossClusterSearchExtension.class, - CrossClusterSearchExtension.Noop::new + CrossClusterSearchExtension.Default::new ); - logger.info("Using cross-cluster search extension: [{}]", crossClusterSearchExtension.getClass().getName()); - logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceReconnect()); + logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceReconnectBehaviorSupplier().get()); ActionModule actionModule = new ActionModule( settings, diff --git a/server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java similarity index 63% rename from server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java rename to server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java index 62f0248e80103..ad4932761b327 100644 --- a/server/src/main/java/org/elasticsearch/action/search/CrossClusterSearchExtension.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java @@ -7,18 +7,20 @@ * License v3.0 only", or the "Server Side Public License, v 1". */ -package org.elasticsearch.action.search; +package org.elasticsearch.search.internal; + +import java.util.function.Supplier; public interface CrossClusterSearchExtension { - boolean forceReconnect(); + Supplier forceReconnectBehaviorSupplier(); - class Noop implements CrossClusterSearchExtension { - public Noop() {} + class Default implements CrossClusterSearchExtension { + public Default() {} @Override - public boolean forceReconnect() { - return false; + public Supplier forceReconnectBehaviorSupplier() { + return () -> false; } } } From ff45bef015eef791d1d51b7c742db25240aa3b25 Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Mon, 7 Jul 2025 10:49:06 +0000 Subject: [PATCH 03/11] [CI] Auto commit changes from spotless --- .../src/main/java/org/elasticsearch/node/NodeConstruction.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 8ea482099306f..72388e71a5cd9 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.ReservedPipelineAction; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -204,6 +203,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchUtils; import org.elasticsearch.search.aggregations.support.AggregationUsageService; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService; From 28a622e19aec34ef21845873568055a22353a156 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Mon, 7 Jul 2025 14:25:43 +0200 Subject: [PATCH 04/11] Inject extension into service --- .../action/search/TransportSearchAction.java | 2 ++ .../org/elasticsearch/node/NodeConstruction.java | 7 ++++--- .../org/elasticsearch/node/NodeServiceProvider.java | 7 +++++-- .../java/org/elasticsearch/search/SearchService.java | 12 +++++++++++- .../search/internal/CrossClusterSearchExtension.java | 4 ++-- .../snapshots/SnapshotResiliencyTests.java | 4 +++- .../main/java/org/elasticsearch/node/MockNode.java | 7 +++++-- 7 files changed, 32 insertions(+), 11 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index 69260bcac105c..be1a7c6eb09f9 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -215,6 +215,8 @@ public TransportSearchAction( this.searchResponseMetrics = searchResponseMetrics; this.client = client; this.usageService = usageService; + // just showing injection + logger.info("Refresh remote connections: {}", searchService.forceRefreshRemoteConnections()); } private Map buildPerIndexOriginalIndices( diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 8ea482099306f..965fa610ab705 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -23,7 +23,6 @@ import org.elasticsearch.action.bulk.IncrementalBulkService; import org.elasticsearch.action.datastreams.autosharding.DataStreamAutoShardingService; import org.elasticsearch.action.ingest.ReservedPipelineAction; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.action.search.OnlinePrewarmingService; import org.elasticsearch.action.search.OnlinePrewarmingServiceProvider; import org.elasticsearch.action.search.SearchExecutionStatsCollector; @@ -204,6 +203,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.SearchUtils; import org.elasticsearch.search.aggregations.support.AggregationUsageService; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.shutdown.PluginShutdownService; import org.elasticsearch.snapshots.InternalSnapshotsInfoService; import org.elasticsearch.snapshots.RepositoryIntegrityHealthIndicatorService; @@ -1021,7 +1021,7 @@ public Map queryFields() { CrossClusterSearchExtension.Default::new ); - logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceReconnectBehaviorSupplier().get()); + logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceRefreshRemoteConnections().get()); ActionModule actionModule = new ActionModule( settings, @@ -1213,7 +1213,8 @@ public Map queryFields() { circuitBreakerService, systemIndices.getExecutorSelector(), telemetryProvider.getTracer(), - onlinePrewarmingService + onlinePrewarmingService, + crossClusterSearchExtension ); final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 8f013a8ee7dde..4b3a3017540e3 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -37,6 +37,7 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; @@ -135,7 +136,8 @@ SearchService newSearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService + OnlinePrewarmingService onlinePrewarmingService, + CrossClusterSearchExtension crossClusterSearchExtension ) { return new SearchService( clusterService, @@ -147,7 +149,8 @@ SearchService newSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService + onlinePrewarmingService, + crossClusterSearchExtension ); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index d485b53e7e409..33ad5492b4f84 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -104,6 +104,7 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.internal.AliasFilter; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.internal.ReaderContext; @@ -368,6 +369,8 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final Tracer tracer; + private final CrossClusterSearchExtension crossClusterSearchExtension; + public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -378,7 +381,8 @@ public SearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService + OnlinePrewarmingService onlinePrewarmingService, + CrossClusterSearchExtension crossClusterSearchExtension ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -444,6 +448,8 @@ public SearchService( clusterService.getClusterSettings() .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings); + + this.crossClusterSearchExtension = crossClusterSearchExtension; } public CircuitBreaker getCircuitBreaker() { @@ -458,6 +464,10 @@ private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { } } + public boolean forceRefreshRemoteConnections() { + return crossClusterSearchExtension.forceRefreshRemoteConnections().get(); + } + private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParallelCollection) { this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection; } diff --git a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java index ad4932761b327..92b59e04bf7c1 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java @@ -13,13 +13,13 @@ public interface CrossClusterSearchExtension { - Supplier forceReconnectBehaviorSupplier(); + Supplier forceRefreshRemoteConnections(); class Default implements CrossClusterSearchExtension { public Default() {} @Override - public Supplier forceReconnectBehaviorSupplier() { + public Supplier forceRefreshRemoteConnections() { return () -> false; } } diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index ee99195766111..d4a18a9d6c889 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -186,6 +186,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ClusterServiceUtils; @@ -2557,7 +2558,8 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { new NoneCircuitBreakerService(), EmptySystemIndices.INSTANCE.getExecutorSelector(), Tracer.NOOP, - OnlinePrewarmingService.NOOP + OnlinePrewarmingService.NOOP, + new CrossClusterSearchExtension.Default() ); final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService); diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 4c2e0a3c6c047..11c67f035c4f8 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -42,6 +42,7 @@ import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; @@ -103,7 +104,8 @@ SearchService newSearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService + OnlinePrewarmingService onlinePrewarmingService, + CrossClusterSearchExtension crossClusterSearchExtension ) { if (pluginsService.filterPlugins(MockSearchService.TestPlugin.class).findAny().isEmpty()) { return super.newSearchService( @@ -117,7 +119,8 @@ SearchService newSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService + onlinePrewarmingService, + crossClusterSearchExtension ); } From 54612a726df1e9b38f46e3bf070deed97a5dad08 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Mon, 7 Jul 2025 14:38:05 +0200 Subject: [PATCH 05/11] Fix compile --- .../main/java/org/elasticsearch/search/MockSearchService.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 07ffb3ab9a4eb..6bfb365e52493 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -20,6 +20,7 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -98,7 +99,8 @@ public MockSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService + onlinePrewarmingService, + new CrossClusterSearchExtension.Default() ); } From 0855e6cc3bd744c36b51449a6e5ab70e86be9764 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Tue, 8 Jul 2025 12:22:44 +0200 Subject: [PATCH 06/11] Simplify --- .../action/search/TransportSearchAction.java | 2 -- .../org/elasticsearch/node/NodeConstruction.java | 5 ++--- .../org/elasticsearch/node/NodeServiceProvider.java | 7 ++----- .../org/elasticsearch/search/SearchService.java | 12 +----------- .../internal/CrossClusterSearchExtension.java | 13 ++++++++----- 5 files changed, 13 insertions(+), 26 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java index be1a7c6eb09f9..69260bcac105c 100644 --- a/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java +++ b/server/src/main/java/org/elasticsearch/action/search/TransportSearchAction.java @@ -215,8 +215,6 @@ public TransportSearchAction( this.searchResponseMetrics = searchResponseMetrics; this.client = client; this.usageService = usageService; - // just showing injection - logger.info("Refresh remote connections: {}", searchService.forceRefreshRemoteConnections()); } private Map buildPerIndexOriginalIndices( diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 965fa610ab705..5425531edd9aa 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1021,7 +1021,7 @@ public Map queryFields() { CrossClusterSearchExtension.Default::new ); - logger.info("Cross-cluster search force reconnect: [{}]", crossClusterSearchExtension.forceRefreshRemoteConnections().get()); + logger.info("Cross-cluster example: [{}]", crossClusterSearchExtension.example().getClass()); ActionModule actionModule = new ActionModule( settings, @@ -1213,8 +1213,7 @@ public Map queryFields() { circuitBreakerService, systemIndices.getExecutorSelector(), telemetryProvider.getTracer(), - onlinePrewarmingService, - crossClusterSearchExtension + onlinePrewarmingService ); final ShutdownPrepareService shutdownPrepareService = new ShutdownPrepareService(settings, httpServerTransport, terminationHandler); diff --git a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java index 4b3a3017540e3..8f013a8ee7dde 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java +++ b/server/src/main/java/org/elasticsearch/node/NodeServiceProvider.java @@ -37,7 +37,6 @@ import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.threadpool.ThreadPool; @@ -136,8 +135,7 @@ SearchService newSearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService, - CrossClusterSearchExtension crossClusterSearchExtension + OnlinePrewarmingService onlinePrewarmingService ) { return new SearchService( clusterService, @@ -149,8 +147,7 @@ SearchService newSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService, - crossClusterSearchExtension + onlinePrewarmingService ); } diff --git a/server/src/main/java/org/elasticsearch/search/SearchService.java b/server/src/main/java/org/elasticsearch/search/SearchService.java index d6f604a369159..9c228468f1964 100644 --- a/server/src/main/java/org/elasticsearch/search/SearchService.java +++ b/server/src/main/java/org/elasticsearch/search/SearchService.java @@ -104,7 +104,6 @@ import org.elasticsearch.search.fetch.subphase.ScriptFieldsContext.ScriptField; import org.elasticsearch.search.fetch.subphase.highlight.HighlightBuilder; import org.elasticsearch.search.internal.AliasFilter; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.search.internal.InternalScrollSearchRequest; import org.elasticsearch.search.internal.LegacyReaderContext; import org.elasticsearch.search.internal.ReaderContext; @@ -369,8 +368,6 @@ public class SearchService extends AbstractLifecycleComponent implements IndexEv private final Tracer tracer; - private final CrossClusterSearchExtension crossClusterSearchExtension; - public SearchService( ClusterService clusterService, IndicesService indicesService, @@ -381,8 +378,7 @@ public SearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService, - CrossClusterSearchExtension crossClusterSearchExtension + OnlinePrewarmingService onlinePrewarmingService ) { Settings settings = clusterService.getSettings(); this.threadPool = threadPool; @@ -448,8 +444,6 @@ public SearchService( clusterService.getClusterSettings() .addSettingsUpdateConsumer(MEMORY_ACCOUNTING_BUFFER_SIZE, newValue -> this.memoryAccountingBufferSize = newValue.getBytes()); prewarmingMaxPoolFactorThreshold = PREWARMING_THRESHOLD_THREADPOOL_SIZE_FACTOR_POOL_SIZE.get(settings); - - this.crossClusterSearchExtension = crossClusterSearchExtension; } public CircuitBreaker getCircuitBreaker() { @@ -464,10 +458,6 @@ private void setEnableSearchWorkerThreads(boolean enableSearchWorkerThreads) { } } - public boolean forceRefreshRemoteConnections() { - return crossClusterSearchExtension.forceRefreshRemoteConnections().get(); - } - private void setEnableQueryPhaseParallelCollection(boolean enableQueryPhaseParallelCollection) { this.enableQueryPhaseParallelCollection = enableQueryPhaseParallelCollection; } diff --git a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java index 92b59e04bf7c1..a333cfce04d01 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java @@ -9,18 +9,21 @@ package org.elasticsearch.search.internal; -import java.util.function.Supplier; - public interface CrossClusterSearchExtension { - Supplier forceRefreshRemoteConnections(); + Example example(); + + interface Example { + + } class Default implements CrossClusterSearchExtension { public Default() {} @Override - public Supplier forceRefreshRemoteConnections() { - return () -> false; + public Example example() { + return new Example() { + }; } } } From ba1443b105321164269e8c036d369eaf2805f13a Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Tue, 8 Jul 2025 13:55:58 +0200 Subject: [PATCH 07/11] Missing --- .../src/main/java/org/elasticsearch/node/MockNode.java | 7 ++----- .../java/org/elasticsearch/search/MockSearchService.java | 4 +--- 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java index 11c67f035c4f8..4c2e0a3c6c047 100644 --- a/test/framework/src/main/java/org/elasticsearch/node/MockNode.java +++ b/test/framework/src/main/java/org/elasticsearch/node/MockNode.java @@ -42,7 +42,6 @@ import org.elasticsearch.search.MockSearchService; import org.elasticsearch.search.SearchService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.tasks.TaskManager; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ESTestCase; @@ -104,8 +103,7 @@ SearchService newSearchService( CircuitBreakerService circuitBreakerService, ExecutorSelector executorSelector, Tracer tracer, - OnlinePrewarmingService onlinePrewarmingService, - CrossClusterSearchExtension crossClusterSearchExtension + OnlinePrewarmingService onlinePrewarmingService ) { if (pluginsService.filterPlugins(MockSearchService.TestPlugin.class).findAny().isEmpty()) { return super.newSearchService( @@ -119,8 +117,7 @@ SearchService newSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService, - crossClusterSearchExtension + onlinePrewarmingService ); } diff --git a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java index 6bfb365e52493..07ffb3ab9a4eb 100644 --- a/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java +++ b/test/framework/src/main/java/org/elasticsearch/search/MockSearchService.java @@ -20,7 +20,6 @@ import org.elasticsearch.plugins.Plugin; import org.elasticsearch.script.ScriptService; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; import org.elasticsearch.search.internal.ReaderContext; import org.elasticsearch.search.internal.SearchContext; import org.elasticsearch.search.internal.ShardSearchRequest; @@ -99,8 +98,7 @@ public MockSearchService( circuitBreakerService, executorSelector, tracer, - onlinePrewarmingService, - new CrossClusterSearchExtension.Default() + onlinePrewarmingService ); } From 569b0cc4c984e2c0029bb9bccaa1b7a6e514a37a Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Wed, 9 Jul 2025 09:41:15 +0200 Subject: [PATCH 08/11] Fix test --- .../elasticsearch/snapshots/SnapshotResiliencyTests.java | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index d4a18a9d6c889..f247488d2a612 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -186,8 +186,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; -import org.elasticsearch.search.internal.CrossClusterSearchExtension; -import org.elasticsearch.telemetry.TelemetryProvider; + import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; @@ -2558,8 +2557,7 @@ public RecyclerBytesStreamOutput newNetworkBytesStream() { new NoneCircuitBreakerService(), EmptySystemIndices.INSTANCE.getExecutorSelector(), Tracer.NOOP, - OnlinePrewarmingService.NOOP, - new CrossClusterSearchExtension.Default() + OnlinePrewarmingService.NOOP ); final SnapshotFilesProvider snapshotFilesProvider = new SnapshotFilesProvider(repositoriesService); From 6c7918f97ae32879391f6d70f76308157de403db Mon Sep 17 00:00:00 2001 From: elasticsearchmachine Date: Wed, 9 Jul 2025 07:49:39 +0000 Subject: [PATCH 09/11] [CI] Auto commit changes from spotless --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 1 - 1 file changed, 1 deletion(-) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index f247488d2a612..24ef378acf78d 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -186,7 +186,6 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; - import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; From c60a6915926b0801022afc9dc186640ecdc673d6 Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Wed, 9 Jul 2025 12:30:00 +0200 Subject: [PATCH 10/11] Fix --- .../org/elasticsearch/snapshots/SnapshotResiliencyTests.java | 1 + 1 file changed, 1 insertion(+) diff --git a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java index c451bbbabcacd..81f4f743d821a 100644 --- a/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java +++ b/server/src/test/java/org/elasticsearch/snapshots/SnapshotResiliencyTests.java @@ -186,6 +186,7 @@ import org.elasticsearch.search.SearchService; import org.elasticsearch.search.builder.SearchSourceBuilder; import org.elasticsearch.search.fetch.FetchPhase; +import org.elasticsearch.telemetry.TelemetryProvider; import org.elasticsearch.telemetry.tracing.Tracer; import org.elasticsearch.test.ClusterServiceUtils; import org.elasticsearch.test.ESTestCase; From 48c4af69c10039f799deb809622836aaf3b02a9a Mon Sep 17 00:00:00 2001 From: Nikolaj Volgushev Date: Thu, 10 Jul 2025 15:23:22 +0200 Subject: [PATCH 11/11] Tweak --- .../org/elasticsearch/node/NodeConstruction.java | 2 +- .../internal/CrossClusterSearchExtension.java | 14 +++++++++----- 2 files changed, 10 insertions(+), 6 deletions(-) diff --git a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java index 071ff3fae759a..50f8b00827ec8 100644 --- a/server/src/main/java/org/elasticsearch/node/NodeConstruction.java +++ b/server/src/main/java/org/elasticsearch/node/NodeConstruction.java @@ -1021,7 +1021,7 @@ public Map queryFields() { CrossClusterSearchExtension.Default::new ); - logger.info("Cross-cluster example: [{}]", crossClusterSearchExtension.example().getClass()); + logger.info("Cross-cluster example: [{}]", crossClusterSearchExtension.indicesExpressionRewriter().getClass()); ActionModule actionModule = new ActionModule( settings, diff --git a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java index a333cfce04d01..313c037652057 100644 --- a/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java +++ b/server/src/main/java/org/elasticsearch/search/internal/CrossClusterSearchExtension.java @@ -9,20 +9,24 @@ package org.elasticsearch.search.internal; -public interface CrossClusterSearchExtension { +import org.elasticsearch.action.IndicesRequest; - Example example(); +public interface CrossClusterSearchExtension { - interface Example { + IndicesExpressionRewriter indicesExpressionRewriter(); + interface IndicesExpressionRewriter { + IndicesRequest.Replaceable rewrite(IndicesRequest.Replaceable request); } class Default implements CrossClusterSearchExtension { public Default() {} @Override - public Example example() { - return new Example() { + public IndicesExpressionRewriter indicesExpressionRewriter() { + return request -> { + // Default implementation does nothing, can be overridden by extensions + return request; }; } }