diff --git a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java index d7b847d835b83..9383476afa443 100644 --- a/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java +++ b/server/src/main/java/org/elasticsearch/action/search/FetchSearchPhase.java @@ -12,6 +12,7 @@ import org.apache.lucene.search.ScoreDoc; import org.elasticsearch.common.util.concurrent.AbstractRunnable; import org.elasticsearch.common.util.concurrent.AtomicArray; +import org.elasticsearch.common.util.concurrent.EsExecutors; import org.elasticsearch.core.Nullable; import org.elasticsearch.search.SearchPhaseResult; import org.elasticsearch.search.SearchShardTarget; @@ -21,6 +22,7 @@ import org.elasticsearch.search.internal.ShardSearchContextId; import org.elasticsearch.search.rank.RankDoc; import org.elasticsearch.search.rank.RankDocShardInfo; +import org.elasticsearch.threadpool.ThreadPool; import org.elasticsearch.transport.Transport; import java.util.ArrayList; @@ -91,7 +93,7 @@ final class FetchSearchPhase extends SearchPhase { @Override public void run() { - context.execute(new AbstractRunnable() { + var fetchTask = new AbstractRunnable() { @Override protected void doRun() throws Exception { @@ -102,7 +104,12 @@ protected void doRun() throws Exception { public void onFailure(Exception e) { context.onPhaseFailure(FetchSearchPhase.this, "", e); } - }); + }; + if (Thread.currentThread() instanceof EsExecutors.EsThread esThread && ThreadPool.Names.SEARCH.equals(esThread.pool())) { + fetchTask.run(); + return; + } + context.execute(fetchTask); } private void innerRun() throws Exception { diff --git a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java index 9120576815bac..ecc4a2f94cbb5 100644 --- a/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java +++ b/server/src/main/java/org/elasticsearch/common/util/concurrent/EsExecutors.java @@ -14,6 +14,7 @@ import org.elasticsearch.common.settings.Setting.Property; import org.elasticsearch.common.settings.Settings; import org.elasticsearch.common.unit.Processors; +import org.elasticsearch.core.Nullable; import org.elasticsearch.core.SuppressForbidden; import org.elasticsearch.node.Node; @@ -326,7 +327,7 @@ public static String executorName(Thread thread) { } public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) { - return createDaemonThreadFactory(threadName(settings, namePrefix), false); + return createDaemonThreadFactory(threadName(settings, namePrefix), null, false); } public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) { @@ -335,16 +336,16 @@ public static ThreadFactory daemonThreadFactory(String nodeName, String namePref public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) { assert nodeName != null && false == nodeName.isEmpty(); - return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread); + return createDaemonThreadFactory(threadName(nodeName, namePrefix), namePrefix, isSystemThread); } public static ThreadFactory daemonThreadFactory(String name) { assert name != null && name.isEmpty() == false; - return createDaemonThreadFactory(name, false); + return createDaemonThreadFactory(name, null, false); } - private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) { - return new EsThreadFactory(namePrefix, isSystemThread); + private static ThreadFactory createDaemonThreadFactory(String namePrefix, @Nullable String pool, boolean isSystemThread) { + return new EsThreadFactory(namePrefix, pool, isSystemThread); } static class EsThreadFactory implements ThreadFactory { @@ -353,18 +354,20 @@ static class EsThreadFactory implements ThreadFactory { final AtomicInteger threadNumber = new AtomicInteger(1); final String namePrefix; final boolean isSystem; + final String pool; - EsThreadFactory(String namePrefix, boolean isSystem) { + EsThreadFactory(String namePrefix, @Nullable String pool, boolean isSystem) { this.namePrefix = namePrefix; SecurityManager s = System.getSecurityManager(); group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup(); this.isSystem = isSystem; + this.pool = pool; } @Override public Thread newThread(Runnable r) { return AccessController.doPrivileged((PrivilegedAction) () -> { - Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem); + Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, pool, isSystem); t.setDaemon(true); return t; }); @@ -373,15 +376,23 @@ public Thread newThread(Runnable r) { public static class EsThread extends Thread { private final boolean isSystem; + @Nullable + private final String pool; - EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) { + EsThread(ThreadGroup group, Runnable target, String name, long stackSize, @Nullable String pool, boolean isSystem) { super(group, target, name, stackSize); this.isSystem = isSystem; + this.pool = pool; } public boolean isSystem() { return isSystem; } + + @Nullable + public String pool() { + return pool; + } } /**