Skip to content

Accept instance params for managedExecutorService and managedThreadFactory as an alternative to JNDI implementation #860

Open
@dougcavalheiro

Description

@dougcavalheiro

Context

Currently, ParallelConsumerOptions class offers the ability to pass a JNDI name for custom implementations for the Thread executor service and thread factory (managedExecutorService and managedThreadFactory respectively). Java 21 offers Virtual threads that can be used in this scenario to achieve better performance and low resource usage.

var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
                .managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

Problem

To provide a different executor and thread factory using JNDI registration is not "developer friendly" today. It requires more knowledge about how JNDI works and how to register the custom objects for InitialContext lookup.

Workaround (how to make it work today)

Today, the current method to use Virtual Threads is the following.

  1. Create a custom JNDI Context Factory using a custom InitialContext class:
public class CustomJndiContextFactory implements InitialContextFactory, InitialContextFactoryBuilder {
    private static final CustomJndiContextFactory INSTANCE = new CustomJndiContextFactory();
    private static final InMemoryContext GLOBAL_CONTEXT;

    static {
        try {
            GLOBAL_CONTEXT = new InMemoryContext();
        } catch (NamingException e) {
            throw new RuntimeException(e);
        }
    }

    private CustomJndiContextFactory() {
        // Utility Class
    }

    @Override
    public InitialContextFactory createInitialContextFactory(Hashtable<?, ?> environment) {
        return INSTANCE;
    }

    @Override
    public Context getInitialContext(Hashtable<?, ?> environment) {
        return GLOBAL_CONTEXT;
    }

    public static void register() throws NamingException {
        if (!NamingManager.hasInitialContextFactoryBuilder()) {
            NamingManager.setInitialContextFactoryBuilder(INSTANCE);
            log.info("Custom JNDI Factory registered successfully.");
        } else {
            log.trace("JNDI Factory already set, skipping registration.");
        }
    }

    public static void bind(String name, Object obj) throws NamingException {
        GLOBAL_CONTEXT.bind(name, obj);
    }

    static class InMemoryContext extends InitialContext {
        private final ConcurrentHashMap<String, Object> bindings = new ConcurrentHashMap<>();

        public InMemoryContext() throws NamingException {
            super(true);
        }

        @Override
        public Object lookup(String name) throws NamingException {
            if (!bindings.containsKey(name)) {
                throw new NameNotFoundException("JNDI name not found: " + name);
            }
            return bindings.get(name);
        }

        @Override
        public void bind(String name, Object obj) throws NamingException {
            if (bindings.containsKey(name)) {
                throw new NameAlreadyBoundException("JNDI name already bound: " + name);
            }
            bindings.put(name, obj);
        }

        @Override
        public void rebind(String name, Object obj) {
            bindings.put(name, obj);
        }

        @Override
        public void unbind(String name) {
            bindings.remove(name);
        }
    }
}
  1. Bind the Custom implementations using the above class:
private void bindExecutorService() {
        try {
            CustomJndiContextFactory.register();

            ExecutorService virtualThreadExecutor = Executors.newVirtualThreadPerTaskExecutor();
            ThreadFactory defaultThreadFactory = Thread.ofVirtual().factory();

            CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR, virtualThreadExecutor);
            CustomJndiContextFactory.bind(JNDI_NAME_VIRTUAL_THREAD_FACTORY, defaultThreadFactory);

        } catch (NamingException e) {
            throw new RuntimeException("Failed to register Thread Pool Executors in JNDI", e);
        }
    }
  1. As a final step, pass the JNDI names to the builder:
var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(JNDI_NAME_VIRTUAL_THREAD_EXECUTOR)
                .managedThreadFactory(JNDI_NAME_VIRTUAL_THREAD_FACTORY)
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

        ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options);

Proposed Solution

Allow the builder (Or the Parallel Stream Processor) to accept the ExecutorService and ThreadFactory instances:

var options = ParallelConsumerOptions.<String, String>builder()
                .managedExecutorService(Executors.newVirtualThreadPerTaskExecutor())
                .managedThreadFactory(Thread.ofVirtual().factory())
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

OR:

var options = ParallelConsumerOptions.<String, String>builder()
                .ordering(PARTITION)                
                .maxConcurrency(20)
                .consumer(kafkaConsumer)
                .producer(kafkaProducer)
                .build();

ParallelStreamProcessor<String, String> streamProcessor = ParallelStreamProcessor.createEosStreamProcessor(options, Executors.newVirtualThreadPerTaskExecutor(), Thread.ofVirtual().factory());

Metadata

Metadata

Assignees

No one assigned

    Labels

    No labels
    No labels

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions