From 320bf8e0b05835074c732c68820459c143623ae3 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Fri, 19 Sep 2025 23:50:28 +0000 Subject: [PATCH 1/8] Implement simple pre-fetching for index enumerators Signed-off-by: Simeon Widdis --- .../scan/OpenSearchIndexEnumerator.java | 39 ++++++++++++++++--- 1 file changed, 34 insertions(+), 5 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 384ec17b470..9c54fc78c68 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -8,6 +8,8 @@ import java.util.Collections; import java.util.Iterator; import java.util.List; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.calcite.linq4j.Enumerator; @@ -53,6 +55,9 @@ public class OpenSearchIndexEnumerator implements Enumerator { private ExprValue current; + private CompletableFuture nextBatchFuture; + private boolean isLastBatch = false; + public OpenSearchIndexEnumerator( OpenSearchClient client, List fields, @@ -69,14 +74,29 @@ public OpenSearchIndexEnumerator( if (!this.monitor.isHealthy()) { throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); } + this.nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); } private void fetchNextBatch() { - OpenSearchResponse response = client.search(request); - if (!response.isEmpty()) { - iterator = response.iterator(); - } else if (iterator == null) { - iterator = Collections.emptyIterator(); + try { + // Get results from the pre-fetched batch + OpenSearchResponse response = nextBatchFuture.get(); + + if (!response.isEmpty()) { + iterator = response.iterator(); + + // If we haven't hit the end, start pre-fetching next batch + if (!isLastBatch) { + nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); + } + } else { + if (iterator == null) { + iterator = Collections.emptyIterator(); + } + isLastBatch = true; + } + } catch (InterruptedException | ExecutionException e) { + throw new NonFallbackCalciteException("Error fetching batch: " + e.getMessage()); } } @@ -98,6 +118,7 @@ private Object resolveForCalcite(ExprValue value, String rawPath) { @Override public boolean moveNext() { if (queryCount >= maxResponseSize) { + isLastBatch = true; return false; } @@ -114,17 +135,21 @@ public boolean moveNext() { queryCount++; return true; } else { + isLastBatch = true; return false; } } @Override public void reset() { + isLastBatch = false; + nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); OpenSearchResponse response = client.search(request); if (!response.isEmpty()) { iterator = response.iterator(); } else { iterator = Collections.emptyIterator(); + isLastBatch = true; } queryCount = 0; } @@ -133,6 +158,10 @@ public void reset() { public void close() { iterator = Collections.emptyIterator(); queryCount = 0; + isLastBatch = true; + if (nextBatchFuture != null) { + nextBatchFuture.cancel(true); + } client.cleanup(request); } } From 08e40e6a037ea320086bf737b56d45a98d3bb9c4 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Wed, 24 Sep 2025 18:55:26 +0000 Subject: [PATCH 2/8] Fix the thread pool handling for background execution Signed-off-by: Simeon Widdis --- .../job/ScheduledAsyncQueryJobRunner.java | 5 +++-- .../job/ScheduledAsyncQueryJobRunnerTest.java | 5 +++-- .../sql/datasources/utils/Scheduler.java | 6 ++---- docs/dev/query-manager.md | 2 +- docs/user/admin/settings.rst | 21 +++++++++++++++++++ .../legacy/executor/AsyncRestExecutor.java | 6 ++---- .../cursor/CursorAsyncRestExecutor.java | 5 ++--- .../sql/legacy/plugin/RestSqlStatsAction.java | 3 ++- .../client/OpenSearchNodeClient.java | 2 +- .../executor/OpenSearchQueryManager.java | 3 ++- .../scan/OpenSearchIndexEnumerator.java | 19 +++++++++++++---- .../org/opensearch/sql/plugin/SQLPlugin.java | 20 +++++++++++++----- 12 files changed, 69 insertions(+), 28 deletions(-) diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java index feacb615390..ab39ecd62d0 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java @@ -12,7 +12,6 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.Plugin; -import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -21,6 +20,8 @@ import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.Client; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + /** * The job runner class for scheduling async query. * @@ -37,7 +38,7 @@ public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner { // Share SQL plugin thread pool private static final String ASYNC_QUERY_THREAD_POOL_NAME = - AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME; + SQL_WORKER_THREAD_POOL_NAME; private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); private static final ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner(); diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java index 30b242db816..a34ad909365 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -15,6 +15,7 @@ import static org.mockito.Mockito.spy; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import java.time.Instant; import org.apache.logging.log4j.LogManager; @@ -87,7 +88,7 @@ public void testRunJobWithCorrectParameter() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME)) .submit(captor.capture()); Runnable runnable = captor.getValue(); @@ -145,7 +146,7 @@ public void testDoRefreshThrowsException() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + verify(threadPool.executor(SQL_WORKER_THREAD_POOL_NAME)) .submit(captor.capture()); Runnable runnable = captor.getValue(); diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java index 1cf54ffd88d..062f1b1cd7b 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java @@ -11,13 +11,11 @@ import org.opensearch.common.unit.TimeValue; import org.opensearch.threadpool.ThreadPool; import org.opensearch.transport.client.node.NodeClient; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; -/** The scheduler which schedule the task run in sql-worker thread pool. */ +/** The scheduler which schedule the task run in sql_worker thread pool. */ @UtilityClass public class Scheduler { - - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - public static void schedule(NodeClient client, Runnable task) { ThreadPool threadPool = client.threadPool(); threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); diff --git a/docs/dev/query-manager.md b/docs/dev/query-manager.md index 68c82d0ea7d..3f84610ceeb 100644 --- a/docs/dev/query-manager.md +++ b/docs/dev/query-manager.md @@ -31,4 +31,4 @@ Parser parse raw query as Statement and create AbstractPlan. Each AbstractPlan d ### Change of existing logic 1. Remove the schedule logic in NIO thread. After the change, a. Parser will be executed in NIO thread. - b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql-worker** thread pool. \ No newline at end of file + b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql_worker** thread pool. \ No newline at end of file diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 9d5e45cd7b6..62c8f704b68 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -216,6 +216,27 @@ Result set:: "transient": {} } +Thread Pool Settings +==================== + +The SQL plugin is integrated with the `OpenSearch Thread Pool Settings `. +There are two thread pools which can be configured on cluster setup via `settings.yml`:: + + thread_pool: + sql_worker: + size: 30 + queue_size: 100 + sql_background_io: + size: 30 + queue_size: 1000 + +The ``sql_worker`` pool corresponds to compute resources related to running queries, such as compute-heavy evaluations on result sets. +This directly maps to the number of queries that can be run concurrently. +This is the primary pool you interact with externally. +``sql_background_io`` is a low-footprint pool for IO requests the plugin makes, +and can be used to partially the search load SQL places on your cluster for some types of expensive operations. +A ``sql_worker`` thread may spawn multiple background threads. + plugins.query.executionengine.spark.session.limit ================================================== diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java index 2b16b584453..15580cf6477 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/AsyncRestExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + import java.io.IOException; import java.time.Duration; import java.util.Map; @@ -30,10 +32,6 @@ /** A RestExecutor wrapper to execute request asynchronously to avoid blocking transport thread. */ public class AsyncRestExecutor implements RestExecutor { - - /** Custom thread pool name managed by OpenSearch */ - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - private static final Logger LOG = LogManager.getLogger(AsyncRestExecutor.class); /** diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java index 799aa55cf40..0a1e043b811 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/executor/cursor/CursorAsyncRestExecutor.java @@ -5,6 +5,8 @@ package org.opensearch.sql.legacy.executor.cursor; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; + import java.io.IOException; import java.time.Duration; import java.util.Map; @@ -24,9 +26,6 @@ import org.opensearch.transport.client.Client; public class CursorAsyncRestExecutor { - /** Custom thread pool name managed by OpenSearch */ - public static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; - private static final Logger LOG = LogManager.getLogger(CursorAsyncRestExecutor.class); /** Delegated rest executor to async */ diff --git a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java index 32b4d17ecda..bbf1d351850 100644 --- a/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java +++ b/legacy/src/main/java/org/opensearch/sql/legacy/plugin/RestSqlStatsAction.java @@ -6,6 +6,7 @@ package org.opensearch.sql.legacy.plugin; import static org.opensearch.core.rest.RestStatus.INTERNAL_SERVER_ERROR; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import com.google.common.collect.ImmutableList; import java.util.Arrays; @@ -90,7 +91,7 @@ protected Set responseParams() { private void schedule(NodeClient client, Runnable task) { ThreadPool threadPool = client.threadPool(); - threadPool.schedule(withCurrentContext(task), new TimeValue(0), "sql-worker"); + threadPool.schedule(withCurrentContext(task), new TimeValue(0), SQL_WORKER_THREAD_POOL_NAME); } private Runnable withCurrentContext(final Runnable task) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 152fe499d2e..2492cb945f8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -218,7 +218,7 @@ public void cleanup(OpenSearchRequest request) { @Override public void schedule(Runnable task) { - // at that time, task already running the sql-worker ThreadPool. + // at that time, task already running the sql_worker ThreadPool. task.run(); } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java index 76218d8295d..1ed5277c257 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchQueryManager.java @@ -21,7 +21,8 @@ public class OpenSearchQueryManager implements QueryManager { private final NodeClient nodeClient; - private static final String SQL_WORKER_THREAD_POOL_NAME = "sql-worker"; + public static final String SQL_WORKER_THREAD_POOL_NAME = "sql_worker"; + public static final String SQL_BACKGROUND_THREAD_POOL_NAME = "sql_background_io"; @Override public QueryId submit(AbstractPlan queryPlan) { diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index 3d441ca55f7..c7d504618f5 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -5,11 +5,14 @@ package org.opensearch.sql.opensearch.storage.scan; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; + import java.util.Collections; import java.util.Iterator; import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.calcite.linq4j.Enumerator; @@ -33,6 +36,8 @@ public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; + private final Executor backgroundExecutor; + private final List fields; /** Search request. */ @@ -71,7 +76,6 @@ public OpenSearchIndexEnumerator( int maxResultWindow, OpenSearchRequest request, ResourceMonitor monitor) { - this.client = client; this.fields = fields; this.request = request; this.maxResponseSize = maxResponseSize; @@ -82,7 +86,12 @@ public OpenSearchIndexEnumerator( if (!this.monitor.isHealthy()) { throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); } - this.nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); + + this.client = client; + this.backgroundExecutor = + client.getNodeClient().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME); + this.nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); } private void fetchNextBatch() { @@ -101,7 +110,8 @@ private void fetchNextBatch() { // If we haven't hit the end, start pre-fetching next batch if (!isLastBatch && !fetchOnce) { - nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); } } else { if (iterator == null) { @@ -157,7 +167,8 @@ public boolean moveNext() { @Override public void reset() { isLastBatch = false; - nextBatchFuture = CompletableFuture.supplyAsync(() -> client.search(request)); + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); OpenSearchResponse response = client.search(request); if (!response.isEmpty()) { iterator = response.iterator(); diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 9653fe334a8..031429884f8 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -5,8 +5,9 @@ package org.opensearch.sql.plugin; -import static java.util.Collections.singletonList; import static org.opensearch.sql.datasource.model.DataSourceMetadata.defaultOpenSearchDataSourceMetadata; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; import static org.opensearch.sql.spark.data.constants.SparkConstants.SPARK_REQUEST_BUFFER_INDEX_NAME; import com.google.common.collect.ImmutableList; @@ -74,7 +75,6 @@ import org.opensearch.sql.datasources.transport.TransportPatchDataSourceAction; import org.opensearch.sql.datasources.transport.TransportUpdateDataSourceAction; import org.opensearch.sql.legacy.esdomain.LocalClusterState; -import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.legacy.metrics.Metrics; import org.opensearch.sql.legacy.plugin.RestSqlAction; import org.opensearch.sql.legacy.plugin.RestSqlStatsAction; @@ -277,13 +277,23 @@ public ScheduledJobParser getJobParser() { @Override public List> getExecutorBuilders(Settings settings) { - return singletonList( + // The worker pool is the primary pool where most of the work is done. The background thread + // pool is a separate queue for asynchronous requests to other nodes. We keep them separate to + // prevent deadlocks during async fetches on small node counts. Tasks in the background pool + // should do no work except I/O to other services. + return List.of( new FixedExecutorBuilder( settings, - AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME, + SQL_WORKER_THREAD_POOL_NAME, OpenSearchExecutors.allocatedProcessors(settings), 1000, - null)); + "thread_pool." + SQL_WORKER_THREAD_POOL_NAME), + new FixedExecutorBuilder( + settings, + SQL_BACKGROUND_THREAD_POOL_NAME, + OpenSearchExecutors.allocatedProcessors(settings), + 100, // No external API connects to this, we should have sane resource usage + "thread_pool." + SQL_BACKGROUND_THREAD_POOL_NAME)); } @Override From e38b70969dc66542a901246669b0127c9a2782fb Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Wed, 24 Sep 2025 19:00:08 +0000 Subject: [PATCH 3/8] Fix a link Signed-off-by: Simeon Widdis --- docs/user/admin/settings.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 62c8f704b68..9257aafa29c 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -219,7 +219,7 @@ Result set:: Thread Pool Settings ==================== -The SQL plugin is integrated with the `OpenSearch Thread Pool Settings `. +The SQL plugin is integrated with the `OpenSearch Thread Pool Settings `_. There are two thread pools which can be configured on cluster setup via `settings.yml`:: thread_pool: From 25b9ce5ceec99b7da1308976e1ed2d2e22d45eb5 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Thu, 25 Sep 2025 18:58:23 +0000 Subject: [PATCH 4/8] Implement background scanner handling optional missing NodeClients Signed-off-by: Simeon Widdis --- .../opensearch/client/OpenSearchClient.java | 3 +- .../client/OpenSearchNodeClient.java | 5 +- .../client/OpenSearchRestClient.java | 5 +- .../executor/OpenSearchExecutionEngine.java | 9 +- .../opensearch/storage/OpenSearchIndex.java | 34 +++- .../storage/scan/BackgroundSearchScanner.java | 177 ++++++++++++++++++ .../scan/OpenSearchIndexEnumerator.java | 81 ++------ .../client/OpenSearchRestClientTest.java | 3 +- .../OpenSearchDefaultImplementorTest.java | 11 +- .../storage/OpenSearchIndexTest.java | 4 + .../scan/BackgroundSearchScannerTest.java | 150 +++++++++++++++ .../org/opensearch/sql/plugin/SQLPlugin.java | 2 +- 12 files changed, 398 insertions(+), 86 deletions(-) create mode 100644 opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java create mode 100644 opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java index 0261bc98120..68350c5a0fd 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchClient.java @@ -7,6 +7,7 @@ import java.util.List; import java.util.Map; +import java.util.Optional; import org.opensearch.action.search.CreatePitRequest; import org.opensearch.action.search.DeletePitRequest; import org.opensearch.sql.opensearch.mapping.IndexMapping; @@ -97,7 +98,7 @@ public interface OpenSearchClient { */ void schedule(Runnable task); - NodeClient getNodeClient(); + Optional getNodeClient(); /** * Create PIT for given indices diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java index 2492cb945f8..78cc845daf9 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchNodeClient.java @@ -11,6 +11,7 @@ import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.ExecutionException; import java.util.function.Function; import java.util.function.Predicate; @@ -223,8 +224,8 @@ public void schedule(Runnable task) { } @Override - public NodeClient getNodeClient() { - return client; + public Optional getNodeClient() { + return Optional.of(client); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java index 87b171707bb..427eb7d6b03 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/client/OpenSearchRestClient.java @@ -13,6 +13,7 @@ import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import java.util.stream.Stream; import lombok.RequiredArgsConstructor; @@ -236,8 +237,8 @@ public void schedule(Runnable task) { } @Override - public NodeClient getNodeClient() { - throw new UnsupportedOperationException("Unsupported method."); + public Optional getNodeClient() { + return Optional.empty(); } @Override diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java index b5c2d6edccc..d13d52e3d5f 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/executor/OpenSearchExecutionEngine.java @@ -15,6 +15,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicReference; import org.apache.calcite.plan.RelOptUtil; import org.apache.calcite.rel.RelNode; @@ -47,13 +48,13 @@ import org.opensearch.sql.expression.function.BuiltinFunctionName; import org.opensearch.sql.expression.function.PPLFuncImpTable; import org.opensearch.sql.opensearch.client.OpenSearchClient; -import org.opensearch.sql.opensearch.client.OpenSearchNodeClient; import org.opensearch.sql.opensearch.executor.protector.ExecutionProtector; import org.opensearch.sql.opensearch.functions.DistinctCountApproxAggFunction; import org.opensearch.sql.opensearch.functions.GeoIpFunction; import org.opensearch.sql.opensearch.util.JdbcOpenSearchDataTypeConvertor; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.TableScanOperator; +import org.opensearch.transport.client.node.NodeClient; /** OpenSearch execution engine implementation. */ public class OpenSearchExecutionEngine implements ExecutionEngine { @@ -268,9 +269,9 @@ private void buildResultSet( /** Registers opensearch-dependent functions */ private void registerOpenSearchFunctions() { - if (client instanceof OpenSearchNodeClient) { - SqlUserDefinedFunction geoIpFunction = - new GeoIpFunction(client.getNodeClient()).toUDF("GEOIP"); + Optional nodeClient = client.getNodeClient(); + if (nodeClient.isPresent()) { + SqlUserDefinedFunction geoIpFunction = new GeoIpFunction(nodeClient.get()).toUDF("GEOIP"); PPLFuncImpTable.INSTANCE.registerExternalOperator(BuiltinFunctionName.GEOIP, geoIpFunction); } else { logger.info( diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java index c8b00c6daed..3ea7280b16b 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/OpenSearchIndex.java @@ -10,6 +10,7 @@ import java.util.LinkedHashMap; import java.util.Map; import java.util.Map.Entry; +import java.util.Optional; import java.util.function.Function; import java.util.stream.Collectors; import lombok.Getter; @@ -45,6 +46,7 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.physical.PhysicalPlan; import org.opensearch.sql.storage.read.TableScanBuilder; +import org.opensearch.transport.client.node.NodeClient; /** OpenSearch table (index) implementation. */ public class OpenSearchIndex extends AbstractOpenSearchTable { @@ -231,27 +233,43 @@ public static class OpenSearchDefaultImplementor extends DefaultImplementor nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Machine Learning operators on clients outside of the local node"); + } return new MLCommonsOperator( - visitChild(node, context), - node.getAlgorithm(), - node.getArguments(), - client.getNodeClient()); + visitChild(node, context), node.getAlgorithm(), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitAD(LogicalAD node, OpenSearchIndexScan context) { - return new ADOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Anomaly Detector operators on clients outside of the local node"); + } + return new ADOperator(visitChild(node, context), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitML(LogicalML node, OpenSearchIndexScan context) { - return new MLOperator(visitChild(node, context), node.getArguments(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Machine Learning operators on clients outside of the local node"); + } + return new MLOperator(visitChild(node, context), node.getArguments(), nc.get()); } @Override public PhysicalPlan visitEval(LogicalEval node, OpenSearchIndexScan context) { - return new OpenSearchEvalOperator( - visitChild(node, context), node.getExpressions(), client.getNodeClient()); + Optional nc = client.getNodeClient(); + if (nc.isEmpty()) { + throw new UnsupportedOperationException( + "Unable to run Eval operators on clients outside of the local node"); + } + return new OpenSearchEvalOperator(visitChild(node, context), node.getExpressions(), nc.get()); } } diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java new file mode 100644 index 00000000000..4019346e055 --- /dev/null +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScanner.java @@ -0,0 +1,177 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; + +import java.util.Collections; +import java.util.Iterator; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Executor; +import javax.annotation.Nullable; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.exception.NonFallbackCalciteException; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; + +/** + * Utility class for asynchronously scanning an index. This lets us send background requests to the + * index while we work on processing the previous batch. + * + *

Lifecycle

+ * + * The typical usage pattern is: + * + *
+ *   1. Create scanner: new BackgroundSearchScanner(client)
+ *   2. Start initial scan: startScanning(request)
+ *   3. Fetch batches in a loop: fetchNextBatch(request, maxWindow)
+ *   4. Close scanner when done: close()
+ * 
+ * + *

Async vs Sync Behavior

+ * + * The scanner attempts to operate asynchronously when possible to improve performance: + * + *
    + *
  • When async is available (client has thread pool access): - Next batch is pre-fetched while + * current batch is being processed - Reduces latency between batches + *
  • When async is not available (client lacks thread pool access): - Falls back to synchronous + * fetching - Each batch is fetched only when needed + *
+ * + *

Termination Conditions

+ * + * Scanning will stop when any of these conditions are met: + * + *
    + *
  • An empty response is received (lastBatch = true) + *
  • Response is an aggregation or count response (fetchOnce = true) + *
  • Response size is less than maxResultWindow (fetchOnce = true) + *
+ * + * Note: This class should be explicitly closed when no longer needed to ensure proper resource + * cleanup. + */ +public class BackgroundSearchScanner { + private final OpenSearchClient client; + @Nullable private final Executor backgroundExecutor; + private CompletableFuture nextBatchFuture = null; + private boolean stopIteration = false; + + public BackgroundSearchScanner(OpenSearchClient client) { + this.client = client; + // We can only actually do the background operation if we have the ability to access the thread + // pool. Otherwise, fallback to synchronous fetch. + if (client.getNodeClient().isPresent()) { + this.backgroundExecutor = + client.getNodeClient().get().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME); + } else { + this.backgroundExecutor = null; + } + } + + private boolean isAsync() { + return backgroundExecutor != null; + } + + /** + * @return Whether the search scanner has fetched all batches + */ + public boolean isScanDone() { + return stopIteration; + } + + /** + * Initiates the scanning process. If async operations are available, this will trigger the first + * background fetch. + * + * @param request The OpenSearch request to execute + */ + public void startScanning(OpenSearchRequest request) { + if (isAsync()) { + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + } + } + + private OpenSearchResponse getCurrentResponse(OpenSearchRequest request) { + if (isAsync()) { + try { + return nextBatchFuture.get(); + } catch (InterruptedException | ExecutionException e) { + throw new NonFallbackCalciteException( + "Failed to fetch data from the index: the background task failed or interrupted.\n" + + " Inner error: " + + e.getMessage()); + } + } else { + return client.search(request); + } + } + + /** + * Fetches the next batch of results. If async is enabled and more batches are expected, this will + * also trigger the next background fetch. + * + * @param request The OpenSearch request to execute + * @param maxResultWindow Maximum number of results to fetch per batch + * @return SearchBatchResult containing the current batch's iterator and completion status + * @throws NonFallbackCalciteException if the background fetch fails or is interrupted + */ + public SearchBatchResult fetchNextBatch(OpenSearchRequest request, int maxResultWindow) { + OpenSearchResponse response = getCurrentResponse(request); + + // Determine if we need future batches + if (response.isAggregationResponse() + || response.isCountResponse() + || response.getHitsSize() < maxResultWindow) { + stopIteration = true; + } + + Iterator iterator; + if (!response.isEmpty()) { + iterator = response.iterator(); + + // Pre-fetch next batch if needed + if (!stopIteration && isAsync()) { + nextBatchFuture = + CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + } + } else { + iterator = Collections.emptyIterator(); + stopIteration = true; + } + + return new SearchBatchResult(iterator, stopIteration); + } + + /** + * Resets the scanner to its initial state, allowing a new scan to begin. This clears all + * completion flags and initiates a new background fetch if async is enabled. + * + * @param request The OpenSearch request to execute + */ + public void reset(OpenSearchRequest request) { + stopIteration = false; + startScanning(request); + } + + /** + * Releases resources associated with this scanner. Cancels any pending background fetches and + * marks the scan as complete. The scanner cannot be reused after closing without calling reset(). + */ + public void close() { + stopIteration = true; + if (nextBatchFuture != null) { + nextBatchFuture.cancel(true); + } + } + + public record SearchBatchResult(Iterator iterator, boolean stopIteration) {} +} diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index c7d504618f5..d0ec98724a8 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -5,14 +5,9 @@ package org.opensearch.sql.opensearch.storage.scan; -import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_BACKGROUND_THREAD_POOL_NAME; - import java.util.Collections; import java.util.Iterator; import java.util.List; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.ExecutionException; -import java.util.concurrent.Executor; import lombok.EqualsAndHashCode; import lombok.ToString; import org.apache.calcite.linq4j.Enumerator; @@ -36,7 +31,7 @@ public class OpenSearchIndexEnumerator implements Enumerator { /** OpenSearch client. */ private final OpenSearchClient client; - private final Executor backgroundExecutor; + private final BackgroundSearchScanner bgScanner; private final List fields; @@ -56,18 +51,12 @@ public class OpenSearchIndexEnumerator implements Enumerator { private final ResourceMonitor monitor; /** Number of rows returned. */ - private Integer queryCount; + private Integer queryCount = 0; /** Search response for current batch. */ private Iterator iterator; - private ExprValue current; - - private CompletableFuture nextBatchFuture; - private boolean isLastBatch = false; - - /** flag to indicate whether fetch more than one batch */ - private boolean fetchOnce = false; + private ExprValue current = null; public OpenSearchIndexEnumerator( OpenSearchClient client, @@ -76,52 +65,24 @@ public OpenSearchIndexEnumerator( int maxResultWindow, OpenSearchRequest request, ResourceMonitor monitor) { + if (!monitor.isHealthy()) { + throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); + } + this.fields = fields; this.request = request; this.maxResponseSize = maxResponseSize; this.maxResultWindow = maxResultWindow; this.monitor = monitor; - this.queryCount = 0; - this.current = null; - if (!this.monitor.isHealthy()) { - throw new NonFallbackCalciteException("insufficient resources to run the query, quit."); - } - this.client = client; - this.backgroundExecutor = - client.getNodeClient().threadPool().executor(SQL_BACKGROUND_THREAD_POOL_NAME); - this.nextBatchFuture = - CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + this.bgScanner = new BackgroundSearchScanner(client); + this.bgScanner.startScanning(request); } - private void fetchNextBatch() { - try { - OpenSearchResponse response = nextBatchFuture.get(); - // Start by determining whether we actually need future batches - if (response.isAggregationResponse() - || response.isCountResponse() - || response.getHitsSize() < maxResultWindow) { - // No need to fetch next batch if it's for an aggregation - // or the length of response hits is less than max result window size. - fetchOnce = true; - } - if (!response.isEmpty()) { - iterator = response.iterator(); - - // If we haven't hit the end, start pre-fetching next batch - if (!isLastBatch && !fetchOnce) { - nextBatchFuture = - CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); - } - } else { - if (iterator == null) { - iterator = Collections.emptyIterator(); - } - isLastBatch = true; - } - } catch (InterruptedException | ExecutionException e) { - throw new NonFallbackCalciteException("Error fetching batch: " + e.getMessage()); - } + private Iterator fetchNextBatch() { + BackgroundSearchScanner.SearchBatchResult result = + bgScanner.fetchNextBatch(request, maxResultWindow); + return result.iterator(); } @Override @@ -142,7 +103,6 @@ private Object resolveForCalcite(ExprValue value, String rawPath) { @Override public boolean moveNext() { if (queryCount >= maxResponseSize) { - isLastBatch = true; return false; } @@ -151,30 +111,26 @@ public boolean moveNext() { throw new NonFallbackCalciteException("insufficient resources to load next row, quit."); } - if (iterator == null || (!iterator.hasNext() && !fetchOnce)) { - fetchNextBatch(); + if (iterator == null || (!iterator.hasNext() && !this.bgScanner.isScanDone())) { + iterator = fetchNextBatch(); } if (iterator.hasNext()) { current = iterator.next(); queryCount++; return true; } else { - isLastBatch = true; return false; } } @Override public void reset() { - isLastBatch = false; - nextBatchFuture = - CompletableFuture.supplyAsync(() -> client.search(request), backgroundExecutor); + bgScanner.reset(request); OpenSearchResponse response = client.search(request); if (!response.isEmpty()) { iterator = response.iterator(); } else { iterator = Collections.emptyIterator(); - isLastBatch = true; } queryCount = 0; } @@ -183,10 +139,7 @@ public void reset() { public void close() { iterator = Collections.emptyIterator(); queryCount = 0; - isLastBatch = true; - if (nextBatchFuture != null) { - nextBatchFuture.cancel(true); - } + bgScanner.close(); if (request != null) { client.forceCleanup(request); request = null; diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java index 6be02c9d6f1..8cee2cf3f3a 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/client/OpenSearchRestClientTest.java @@ -29,6 +29,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Optional; import java.util.concurrent.atomic.AtomicBoolean; import lombok.SneakyThrows; import org.apache.commons.lang3.reflect.FieldUtils; @@ -527,7 +528,7 @@ void meta_with_IOException() throws IOException { @Test void ml_with_exception() { - assertThrows(UnsupportedOperationException.class, () -> client.getNodeClient()); + assertEquals(Optional.empty(), client.getNodeClient()); } private Map mockFieldMappings(String indexName, String mappings) diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java index 85d0a4e94fa..1977a03a1be 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchDefaultImplementorTest.java @@ -7,6 +7,7 @@ import static org.junit.jupiter.api.Assertions.assertNotNull; +import java.util.Optional; import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Answers; @@ -18,6 +19,7 @@ import org.opensearch.sql.planner.logical.LogicalML; import org.opensearch.sql.planner.logical.LogicalMLCommons; import org.opensearch.sql.planner.logical.LogicalPlan; +import org.opensearch.transport.client.node.NodeClient; @ExtendWith(MockitoExtension.class) public class OpenSearchDefaultImplementorTest { @@ -27,7 +29,8 @@ public class OpenSearchDefaultImplementorTest { @Test public void visitMachineLearning() { LogicalMLCommons node = Mockito.mock(LogicalMLCommons.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitMLCommons(node, null)); @@ -36,7 +39,8 @@ public void visitMachineLearning() { @Test public void visitAD() { LogicalAD node = Mockito.mock(LogicalAD.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitAD(node, null)); @@ -45,7 +49,8 @@ public void visitAD() { @Test public void visitML() { LogicalML node = Mockito.mock(LogicalML.class, Answers.RETURNS_DEEP_STUBS); - Mockito.when(node.getChild().get(0)).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(node.getChild().getFirst()).thenReturn(Mockito.mock(LogicalPlan.class)); + Mockito.when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); OpenSearchIndex.OpenSearchDefaultImplementor implementor = new OpenSearchIndex.OpenSearchDefaultImplementor(client); assertNotNull(implementor.visitML(node, null)); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java index 6a4713dc917..7bba55955b2 100644 --- a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/OpenSearchIndexTest.java @@ -31,6 +31,7 @@ import com.google.common.collect.ImmutableMap; import java.util.HashMap; import java.util.Map; +import java.util.Optional; import java.util.stream.Collectors; import org.apache.commons.lang3.tuple.ImmutablePair; import org.apache.commons.lang3.tuple.Pair; @@ -38,6 +39,7 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.api.extension.ExtendWith; import org.mockito.Mock; +import org.mockito.Mockito; import org.mockito.junit.jupiter.MockitoExtension; import org.opensearch.common.unit.TimeValue; import org.opensearch.sql.ast.tree.Sort; @@ -58,6 +60,7 @@ import org.opensearch.sql.planner.logical.LogicalPlan; import org.opensearch.sql.planner.logical.LogicalPlanDSL; import org.opensearch.sql.planner.physical.PhysicalPlanDSL; +import org.opensearch.transport.client.node.NodeClient; @ExtendWith(MockitoExtension.class) class OpenSearchIndexTest { @@ -225,6 +228,7 @@ void implementRelationOperatorWithOptimization() { @Test void implementOtherLogicalOperators() { when(client.getIndexMaxResultWindows("test")).thenReturn(Map.of("test", 10000)); + when(client.getNodeClient()).thenReturn(Optional.of(Mockito.mock(NodeClient.class))); NamedExpression include = named("age", ref("age", INTEGER)); ReferenceExpression exclude = ref("name", STRING); ReferenceExpression dedupeField = ref("name", STRING); diff --git a/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java new file mode 100644 index 00000000000..f4a7f297df9 --- /dev/null +++ b/opensearch/src/test/java/org/opensearch/sql/opensearch/storage/scan/BackgroundSearchScannerTest.java @@ -0,0 +1,150 @@ +/* + * Copyright OpenSearch Contributors + * SPDX-License-Identifier: Apache-2.0 + */ + +package org.opensearch.sql.opensearch.storage.scan; + +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.times; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +import java.util.Arrays; +import java.util.Collections; +import java.util.Optional; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.opensearch.sql.data.model.ExprValue; +import org.opensearch.sql.opensearch.client.OpenSearchClient; +import org.opensearch.sql.opensearch.request.OpenSearchRequest; +import org.opensearch.sql.opensearch.response.OpenSearchResponse; +import org.opensearch.threadpool.ThreadPool; +import org.opensearch.transport.client.node.NodeClient; + +class BackgroundSearchScannerTest { + private OpenSearchClient client; + private NodeClient nodeClient; + private ThreadPool threadPool; + private OpenSearchRequest request; + private BackgroundSearchScanner scanner; + private ExecutorService executor; + + @BeforeEach + void setUp() { + client = mock(OpenSearchClient.class); + nodeClient = mock(NodeClient.class); + threadPool = mock(ThreadPool.class); + request = mock(OpenSearchRequest.class); + executor = Executors.newSingleThreadExecutor(); + + when(client.getNodeClient()).thenReturn(Optional.of(nodeClient)); + when(nodeClient.threadPool()).thenReturn(threadPool); + when(threadPool.executor(any())).thenReturn(executor); + + scanner = new BackgroundSearchScanner(client); + } + + @Test + void testSyncFallbackWhenNoNodeClient() { + // Setup client without node client + OpenSearchClient syncClient = mock(OpenSearchClient.class); + when(syncClient.getNodeClient()).thenReturn(Optional.empty()); + scanner = new BackgroundSearchScanner(syncClient); + + OpenSearchResponse response = mockResponse(false, false, 10); + when(syncClient.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertFalse( + result.stopIteration(), "Expected iteration to continue after fetching one full page"); + verify(syncClient, times(1)).search(request); + } + + @Test + void testCompleteScanWithMultipleBatches() { + // First batch: normal response + OpenSearchResponse response1 = mockResponse(false, false, 10); + // Second batch: empty response + OpenSearchResponse response2 = mockResponse(true, false, 5); + + when(client.search(request)).thenReturn(response1).thenReturn(response2); + + scanner.startScanning(request); + + // First batch + BackgroundSearchScanner.SearchBatchResult result1 = scanner.fetchNextBatch(request, 10); + assertFalse( + result1.stopIteration(), "Expected iteration to continue after fetching 10/15 results"); + assertTrue(result1.iterator().hasNext()); + + // Second batch + BackgroundSearchScanner.SearchBatchResult result2 = scanner.fetchNextBatch(request, 10); + assertTrue(result2.stopIteration()); + assertFalse(result2.iterator().hasNext()); + } + + @Test + void testFetchOnceForAggregationResponse() { + OpenSearchResponse response = mockResponse(false, true, 1); + when(client.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + } + + @Test + void testFetchOnceWhenResultsBelowWindow() { + OpenSearchResponse response = mockResponse(false, false, 5); + when(client.search(request)).thenReturn(response); + + scanner.startScanning(request); + BackgroundSearchScanner.SearchBatchResult result = scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + } + + @Test + void testReset() { + OpenSearchResponse response1 = mockResponse(false, false, 5); + OpenSearchResponse response2 = mockResponse(true, false, 0); + + when(client.search(request)).thenReturn(response1).thenReturn(response2); + + scanner.startScanning(request); + scanner.fetchNextBatch(request, 10); + scanner.fetchNextBatch(request, 10); + + assertTrue(scanner.isScanDone()); + + scanner.reset(request); + + assertFalse(scanner.isScanDone()); + } + + private OpenSearchResponse mockResponse(boolean isEmpty, boolean isAggregation, int numResults) { + OpenSearchResponse response = mock(OpenSearchResponse.class); + when(response.isEmpty()).thenReturn(isEmpty); + when(response.isAggregationResponse()).thenReturn(isAggregation); + + if (numResults > 0) { + ExprValue[] values = new ExprValue[numResults]; + Arrays.fill(values, mock(ExprValue.class)); + when(response.iterator()).thenReturn(Arrays.asList(values).iterator()); + } else { + when(response.iterator()).thenReturn(Collections.emptyIterator()); + } + + when(response.getHitsSize()).thenReturn(numResults); + return response; + } +} diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index 031429884f8..fad18965bdc 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -292,7 +292,7 @@ public List> getExecutorBuilders(Settings settings) { settings, SQL_BACKGROUND_THREAD_POOL_NAME, OpenSearchExecutors.allocatedProcessors(settings), - 100, // No external API connects to this, we should have sane resource usage + 1000, "thread_pool." + SQL_BACKGROUND_THREAD_POOL_NAME)); } From a5a0fbff9bccb2f9cef1a4b9504c8fa73bc4184a Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Thu, 25 Sep 2025 20:50:21 +0000 Subject: [PATCH 5/8] Tweak a sentence's wording Signed-off-by: Simeon Widdis --- docs/user/admin/settings.rst | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index 9257aafa29c..b4764cac289 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -234,7 +234,7 @@ The ``sql_worker`` pool corresponds to compute resources related to running quer This directly maps to the number of queries that can be run concurrently. This is the primary pool you interact with externally. ``sql_background_io`` is a low-footprint pool for IO requests the plugin makes, -and can be used to partially the search load SQL places on your cluster for some types of expensive operations. +and can be used to limit indirect load that SQL places on your cluster for Calcite-enabled operations. A ``sql_worker`` thread may spawn multiple background threads. plugins.query.executionengine.spark.session.limit From 7eb3a17f808d894b62a5d38d8c686884d3eda9cb Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Fri, 26 Sep 2025 19:05:52 +0000 Subject: [PATCH 6/8] Set default background threads to same as search threads Signed-off-by: Simeon Widdis --- plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java index c9ffe4f6864..343a36880fc 100644 --- a/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java +++ b/plugin/src/main/java/org/opensearch/sql/plugin/SQLPlugin.java @@ -328,7 +328,8 @@ public List> getExecutorBuilders(Settings settings) { new FixedExecutorBuilder( settings, SQL_BACKGROUND_THREAD_POOL_NAME, - OpenSearchExecutors.allocatedProcessors(settings), + settings.getAsInt( + "thread_pool.search.size", OpenSearchExecutors.allocatedProcessors(settings)), 1000, "thread_pool." + SQL_BACKGROUND_THREAD_POOL_NAME)); } From c5eb5a092c3c82c045d35ea1c5cc5565faad00b6 Mon Sep 17 00:00:00 2001 From: Simeon Widdis Date: Tue, 30 Sep 2025 23:22:28 +0000 Subject: [PATCH 7/8] Revert to sql-worker Signed-off-by: Simeon Widdis --- .../org/opensearch/sql/datasources/utils/Scheduler.java | 2 +- docs/dev/query-manager.md | 2 +- docs/user/admin/settings.rst | 6 +++--- .../sql/opensearch/client/OpenSearchNodeClient.java | 2 +- .../sql/opensearch/executor/OpenSearchQueryManager.java | 2 +- 5 files changed, 7 insertions(+), 7 deletions(-) diff --git a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java index 062f1b1cd7b..8aa96338948 100644 --- a/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java +++ b/datasources/src/main/java/org/opensearch/sql/datasources/utils/Scheduler.java @@ -13,7 +13,7 @@ import org.opensearch.transport.client.node.NodeClient; import static org.opensearch.sql.opensearch.executor.OpenSearchQueryManager.SQL_WORKER_THREAD_POOL_NAME; -/** The scheduler which schedule the task run in sql_worker thread pool. */ +/** The scheduler which schedule the task run in sql-worker thread pool. */ @UtilityClass public class Scheduler { public static void schedule(NodeClient client, Runnable task) { diff --git a/docs/dev/query-manager.md b/docs/dev/query-manager.md index 3f84610ceeb..68c82d0ea7d 100644 --- a/docs/dev/query-manager.md +++ b/docs/dev/query-manager.md @@ -31,4 +31,4 @@ Parser parse raw query as Statement and create AbstractPlan. Each AbstractPlan d ### Change of existing logic 1. Remove the schedule logic in NIO thread. After the change, a. Parser will be executed in NIO thread. - b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql_worker** thread pool. \ No newline at end of file + b. QueryManager decide query execution strategy. e.g. OpenSearchQueryManager schedule the QueryExecution running in **sql-worker** thread pool. \ No newline at end of file diff --git a/docs/user/admin/settings.rst b/docs/user/admin/settings.rst index b4764cac289..85cb9ed93ab 100644 --- a/docs/user/admin/settings.rst +++ b/docs/user/admin/settings.rst @@ -223,19 +223,19 @@ The SQL plugin is integrated with the `OpenSearch Thread Pool Settings Date: Tue, 30 Sep 2025 23:33:16 +0000 Subject: [PATCH 8/8] Simplify reset Signed-off-by: Simeon Widdis --- .../storage/scan/OpenSearchIndexEnumerator.java | 8 +------- 1 file changed, 1 insertion(+), 7 deletions(-) diff --git a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java index d0ec98724a8..e684d128914 100644 --- a/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java +++ b/opensearch/src/main/java/org/opensearch/sql/opensearch/storage/scan/OpenSearchIndexEnumerator.java @@ -17,7 +17,6 @@ import org.opensearch.sql.monitor.ResourceMonitor; import org.opensearch.sql.opensearch.client.OpenSearchClient; import org.opensearch.sql.opensearch.request.OpenSearchRequest; -import org.opensearch.sql.opensearch.response.OpenSearchResponse; /** * Supports a simple iteration over a collection for OpenSearch index @@ -126,12 +125,7 @@ public boolean moveNext() { @Override public void reset() { bgScanner.reset(request); - OpenSearchResponse response = client.search(request); - if (!response.isEmpty()) { - iterator = response.iterator(); - } else { - iterator = Collections.emptyIterator(); - } + iterator = bgScanner.fetchNextBatch(request, maxResultWindow).iterator(); queryCount = 0; }