Skip to content

Accept instance params for managedExecutorService and managedThreadFactory as an alternative to JNDI implementation #860

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
dougcavalheiro opened this issue Apr 4, 2025 · 0 comments

Comments

@dougcavalheiro
Copy link

Context

Currently, ParallelConsumerOptions class offers the ability to pass a JNDI name for custom implementations for the Thread executor service and thread factory (managedExecutorService and managedThreadFactory respectively). Java 21 offers Virtual threads that can be used in this scenario to achieve better performance and low resource usage.

var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
                .managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

Problem

To provide a different executor and thread factory using JNDI registration is not "developer friendly" today. It requires more knowledge about how JNDI works and how to register the custom objects for InitialContext lookup.

Workaround (how to make it work today)

Today, the current method to use Virtual Threads is the following.

  1. Create a custom JNDI Context Factory using a custom InitialContext class:
public class CustomJndiContextFactory implements InitialContextFactory, InitialContextFactoryBuilder {
    private static final CustomJndiContextFactory INSTANCE = new CustomJndiContextFactory();
    private static final InMemoryContext GLOBAL_CONTEXT;

    static {
        try {
            GLOBAL_CONTEXT = new InMemoryContext();
        } catch (NamingException e) {
            throw new RuntimeException(e);
        }
    }

    private CustomJndiContextFactory() {
        // Utility Class
    }

    @Override
    public InitialContextFactory createInitialContextFactory(Hashtable<?, ?> environment) {
        return INSTANCE;
    }

    @Override
    public Context getInitialContext(Hashtable<?, ?> environment) {
        return GLOBAL_CONTEXT;
    }

    public static void register() throws NamingException {
        if (!NamingManager.hasInitialContextFactoryBuilder()) {
            NamingManager.setInitialContextFactoryBuilder(INSTANCE);
            log.info("Custom JNDI Factory registered successfully.");
        } else {
            log.trace("JNDI Factory already set, skipping registration.");
        }
    }

    public static void bind(String name, Object obj) throws NamingException {
        GLOBAL_CONTEXT.bind(name, obj);
    }

    static class InMemoryContext extends InitialContext {
        private final ConcurrentHashMap<String, Object> bindings = new ConcurrentHashMap<>();

        public InMemoryContext() throws NamingException {
            super(true);
        }

        @Override
        public Object lookup(String name) throws NamingException {
            if (!bindings.containsKey(name)) {
                throw new NameNotFoundException("JNDI name not found: " + name);
            }
            return bindings.get(name);
        }

        @Override
        public void bind(String name, Object obj) throws NamingException {
            if (bindings.containsKey(name)) {
                throw new NameAlreadyBoundException("JNDI name already bound: " + name);
            }
            bindings.put(name, obj);
        }

        @Override
        public void rebind(String name, Object obj) {
            bindings.put(name, obj);
        }

        @Override
        public void unbind(String name) {
            bindings.remove(name);
        }
    }
}
  1. Bind the Custom implementations using the above class:
private void bindExecutorService() {
        try {
            CustomJndiContextFactory.register();

            ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
            ThreadFactory defaultThreadFactory = Thread.ofVirtual().factory();

            CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR, virtualThreadExecutor);
            CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_FACTORY, defaultThreadFactory);

        } catch (NamingException e) {
            throw new RuntimeException("Failed to register Thread Pool Executors in JNDI", e);
        }
    }
  1. As a final step, pass the JNDI names to the builder:
var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
                .managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

        ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);

Proposed Solution

Allow the builder (Or the Parallel Stream Processor) to accept the ExecutorService and ThreadFactory instances:

var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(Executors.newVirtualThreadPerTaskExecutor())
                .managedThreadFactory(Thread.ofVirtual().factory())
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

OR:

var options = ParallelConsumerOptions.<String, String>builder()
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options, Executors.newVirtualThreadPerTaskExecutor(), Thread.ofVirtual().factory());
@dougcavalheiro dougcavalheiro changed the title Accept instance params for managedExecutorService and managedThreadFactory to avoid JNDI implementation Accept instance params for managedExecutorService and managedThreadFactory as an alternative to JNDI implementation Apr 4, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

1 participant