Open
Description
Context
Currently, ParallelConsumerOptions
class offers the ability to pass a JNDI name for custom implementations for the Thread executor service and thread factory (managedExecutorService
and managedThreadFactory
respectively). Java 21 offers Virtual threads that can be used in this scenario to achieve better performance and low resource usage.
var options = ParallelConsumerOptions.<String, String>builder()
.managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
.managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
.ordering(PARTITION)
.maxConcurrency(20)
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
Problem
To provide a different executor and thread factory using JNDI registration is not "developer friendly" today. It requires more knowledge about how JNDI works and how to register the custom objects for InitialContext
lookup.
Workaround (how to make it work today)
Today, the current method to use Virtual Threads is the following.
- Create a custom JNDI Context Factory using a custom InitialContext class:
public class CustomJndiContextFactory implements InitialContextFactory, InitialContextFactoryBuilder {
private static final CustomJndiContextFactory INSTANCE = new CustomJndiContextFactory();
private static final InMemoryContext GLOBAL_CONTEXT;
static {
try {
GLOBAL_CONTEXT = new InMemoryContext();
} catch (NamingException e) {
throw new RuntimeException(e);
}
}
private CustomJndiContextFactory() {
// Utility Class
}
@Override
public InitialContextFactory createInitialContextFactory(Hashtable<?, ?> environment) {
return INSTANCE;
}
@Override
public Context getInitialContext(Hashtable<?, ?> environment) {
return GLOBAL_CONTEXT;
}
public static void register() throws NamingException {
if (!NamingManager.hasInitialContextFactoryBuilder()) {
NamingManager.setInitialContextFactoryBuilder(INSTANCE);
log.info("Custom JNDI Factory registered successfully.");
} else {
log.trace("JNDI Factory already set, skipping registration.");
}
}
public static void bind(String name, Object obj) throws NamingException {
GLOBAL_CONTEXT.bind(name, obj);
}
static class InMemoryContext extends InitialContext {
private final ConcurrentHashMap<String, Object> bindings = new ConcurrentHashMap<>();
public InMemoryContext() throws NamingException {
super(true);
}
@Override
public Object lookup(String name) throws NamingException {
if (!bindings.containsKey(name)) {
throw new NameNotFoundException("JNDI name not found: " + name);
}
return bindings.get(name);
}
@Override
public void bind(String name, Object obj) throws NamingException {
if (bindings.containsKey(name)) {
throw new NameAlreadyBoundException("JNDI name already bound: " + name);
}
bindings.put(name, obj);
}
@Override
public void rebind(String name, Object obj) {
bindings.put(name, obj);
}
@Override
public void unbind(String name) {
bindings.remove(name);
}
}
}
- Bind the Custom implementations using the above class:
private void bindExecutorService() {
try {
CustomJndiContextFactory.register();
ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
ThreadFactory defaultThreadFactory = Thread.ofVirtual().factory();
CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR, virtualThreadExecutor);
CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_FACTORY, defaultThreadFactory);
} catch (NamingException e) {
throw new RuntimeException("Failed to register Thread Pool Executors in JNDI", e);
}
}
- As a final step, pass the JNDI names to the builder:
var options = ParallelConsumerOptions.<String, String>builder()
.managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
.managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
.ordering(PARTITION)
.maxConcurrency(20)
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);
Proposed Solution
Allow the builder (Or the Parallel Stream Processor) to accept the ExecutorService
and ThreadFactory
instances:
var options = ParallelConsumerOptions.<String, String>builder()
.managedExecutorService(Executors.newVirtualThreadPerTaskExecutor())
.managedThreadFactory(Thread.ofVirtual().factory())
.ordering(PARTITION)
.maxConcurrency(20)
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
OR:
var options = ParallelConsumerOptions.<String, String>builder()
.ordering(PARTITION)
.maxConcurrency(20)
.consumer(kafkaConsumer)
.producer(kafkaProducer)
.build();
ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options, Executors.newVirtualThreadPerTaskExecutor(), Thread.ofVirtual().factory());
Metadata
Metadata
Assignees
Labels
No labels