|
13 | 13 | import org.elasticsearch.logging.LogManager;
|
14 | 14 | import org.elasticsearch.logging.Logger;
|
15 | 15 | import org.elasticsearch.tasks.TaskInfo;
|
| 16 | +import org.elasticsearch.transport.TransportService; |
16 | 17 | import org.elasticsearch.xpack.core.async.AsyncStopRequest;
|
| 18 | +import org.elasticsearch.xpack.esql.plugin.EsqlPlugin; |
| 19 | +import org.elasticsearch.xpack.esql.plugin.QueryPragmas; |
17 | 20 |
|
18 | 21 | import java.util.Iterator;
|
19 | 22 | import java.util.List;
|
@@ -132,13 +135,21 @@ public void testStopQueryLocal() throws Exception {
|
132 | 135 |
|
133 | 136 | Tuple<Boolean, Boolean> includeCCSMetadata = randomIncludeCCSMetadata();
|
134 | 137 | boolean responseExpectMeta = includeCCSMetadata.v2();
|
135 |
| - |
136 |
| - final String asyncExecutionId = startAsyncQuery( |
| 138 | + // By default, ES|QL uses all workers in the esql_worker threadpool to execute drivers on data nodes. |
| 139 | + // If a node is both data and coordinator, and all drivers are blocked by the allowEmitting latch, |
| 140 | + // there are no workers left to execute the final driver or fetch pages from remote clusters. |
| 141 | + // This can prevent remote clusters from being marked as successful on the coordinator, even if they |
| 142 | + // have completed. To avoid this, we reserve at least one worker for the final driver and page fetching. |
| 143 | + // A single worker is enough, as these two tasks can be paused and yielded. |
| 144 | + var threadpool = cluster(LOCAL_CLUSTER).getInstance(TransportService.class).getThreadPool(); |
| 145 | + int maxEsqlWorkers = threadpool.info(EsqlPlugin.ESQL_WORKER_THREAD_POOL_NAME).getMax(); |
| 146 | + LOGGER.info("--> Launching async query"); |
| 147 | + final String asyncExecutionId = startAsyncQueryWithPragmas( |
137 | 148 | client(),
|
138 | 149 | "FROM blocking,*:logs-* | STATS total=sum(coalesce(const,v)) | LIMIT 1",
|
139 |
| - includeCCSMetadata.v1() |
| 150 | + includeCCSMetadata.v1(), |
| 151 | + Map.of(QueryPragmas.TASK_CONCURRENCY.getKey(), between(1, maxEsqlWorkers - 1)) |
140 | 152 | );
|
141 |
| - |
142 | 153 | try {
|
143 | 154 | // wait until we know that the local query against 'blocking' has started
|
144 | 155 | assertTrue(SimplePauseFieldPlugin.startEmitting.await(30, TimeUnit.SECONDS));
|
|
0 commit comments