Skip to content

Record pool name in EsThread instances #116186

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.apache.lucene.search.ScoreDoc;
import org.elasticsearch.common.util.concurrent.AbstractRunnable;
import org.elasticsearch.common.util.concurrent.AtomicArray;
import org.elasticsearch.common.util.concurrent.EsExecutors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.search.SearchPhaseResult;
import org.elasticsearch.search.SearchShardTarget;
Expand All @@ -21,6 +22,7 @@
import org.elasticsearch.search.internal.ShardSearchContextId;
import org.elasticsearch.search.rank.RankDoc;
import org.elasticsearch.search.rank.RankDocShardInfo;
import org.elasticsearch.threadpool.ThreadPool;
import org.elasticsearch.transport.Transport;

import java.util.ArrayList;
Expand Down Expand Up @@ -91,7 +93,7 @@ final class FetchSearchPhase extends SearchPhase {

@Override
public void run() {
context.execute(new AbstractRunnable() {
var fetchTask = new AbstractRunnable() {

@Override
protected void doRun() throws Exception {
Expand All @@ -102,7 +104,12 @@ protected void doRun() throws Exception {
public void onFailure(Exception e) {
context.onPhaseFailure(FetchSearchPhase.this, "", e);
}
});
};
if (Thread.currentThread() instanceof EsExecutors.EsThread esThread && ThreadPool.Names.SEARCH.equals(esThread.pool())) {
fetchTask.run();
return;
}
context.execute(fetchTask);
}

private void innerRun() throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
import org.elasticsearch.common.settings.Setting.Property;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.unit.Processors;
import org.elasticsearch.core.Nullable;
import org.elasticsearch.core.SuppressForbidden;
import org.elasticsearch.node.Node;

Expand Down Expand Up @@ -326,7 +327,7 @@ public static String executorName(Thread thread) {
}

public static ThreadFactory daemonThreadFactory(Settings settings, String namePrefix) {
return createDaemonThreadFactory(threadName(settings, namePrefix), false);
return createDaemonThreadFactory(threadName(settings, namePrefix), null, false);
}

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix) {
Expand All @@ -335,16 +336,16 @@ public static ThreadFactory daemonThreadFactory(String nodeName, String namePref

public static ThreadFactory daemonThreadFactory(String nodeName, String namePrefix, boolean isSystemThread) {
assert nodeName != null && false == nodeName.isEmpty();
return createDaemonThreadFactory(threadName(nodeName, namePrefix), isSystemThread);
return createDaemonThreadFactory(threadName(nodeName, namePrefix), namePrefix, isSystemThread);
}

public static ThreadFactory daemonThreadFactory(String name) {
assert name != null && name.isEmpty() == false;
return createDaemonThreadFactory(name, false);
return createDaemonThreadFactory(name, null, false);
}

private static ThreadFactory createDaemonThreadFactory(String namePrefix, boolean isSystemThread) {
return new EsThreadFactory(namePrefix, isSystemThread);
private static ThreadFactory createDaemonThreadFactory(String namePrefix, @Nullable String pool, boolean isSystemThread) {
return new EsThreadFactory(namePrefix, pool, isSystemThread);
}

static class EsThreadFactory implements ThreadFactory {
Expand All @@ -353,18 +354,20 @@ static class EsThreadFactory implements ThreadFactory {
final AtomicInteger threadNumber = new AtomicInteger(1);
final String namePrefix;
final boolean isSystem;
final String pool;

EsThreadFactory(String namePrefix, boolean isSystem) {
EsThreadFactory(String namePrefix, @Nullable String pool, boolean isSystem) {
this.namePrefix = namePrefix;
SecurityManager s = System.getSecurityManager();
group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
this.isSystem = isSystem;
this.pool = pool;
}

@Override
public Thread newThread(Runnable r) {
return AccessController.doPrivileged((PrivilegedAction<Thread>) () -> {
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, isSystem);
Thread t = new EsThread(group, r, namePrefix + "[T#" + threadNumber.getAndIncrement() + "]", 0, pool, isSystem);
t.setDaemon(true);
return t;
});
Expand All @@ -373,15 +376,23 @@ public Thread newThread(Runnable r) {

public static class EsThread extends Thread {
private final boolean isSystem;
@Nullable
private final String pool;

EsThread(ThreadGroup group, Runnable target, String name, long stackSize, boolean isSystem) {
EsThread(ThreadGroup group, Runnable target, String name, long stackSize, @Nullable String pool, boolean isSystem) {
super(group, target, name, stackSize);
this.isSystem = isSystem;
this.pool = pool;
}

public boolean isSystem() {
return isSystem;
}

@Nullable
public String pool() {
return pool;
}
}

/**
Expand Down