Skip to content

Commit 0fd9957

Browse files
committed
Do not use the common pool, but use our own fallback
This way the default will make it harder to get users in a bad situation and provide a helper class that users might like to re-use.
1 parent 62c31fb commit 0fd9957

File tree

3 files changed

+83
-21
lines changed

3 files changed

+83
-21
lines changed
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
/*
2+
* BSD 2-Clause License
3+
*
4+
* Copyright (c) 2023, Swat.engineering
5+
*
6+
* Redistribution and use in source and binary forms, with or without
7+
* modification, are permitted provided that the following conditions are met:
8+
*
9+
* 1. Redistributions of source code must retain the above copyright notice, this
10+
* list of conditions and the following disclaimer.
11+
*
12+
* 2. Redistributions in binary form must reproduce the above copyright notice,
13+
* this list of conditions and the following disclaimer in the documentation
14+
* and/or other materials provided with the distribution.
15+
*
16+
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS"
17+
* AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
18+
* IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE ARE
19+
* DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT HOLDER OR CONTRIBUTORS BE LIABLE
20+
* FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
21+
* DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR
22+
* SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER
23+
* CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY,
24+
* OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
25+
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
26+
*/
27+
package engineering.swat.watch;
28+
29+
import java.util.concurrent.ExecutorService;
30+
import java.util.concurrent.LinkedBlockingQueue;
31+
import java.util.concurrent.ThreadFactory;
32+
import java.util.concurrent.ThreadPoolExecutor;
33+
import java.util.concurrent.TimeUnit;
34+
import java.util.concurrent.atomic.AtomicInteger;
35+
36+
/**
37+
* Build thread pools that even when not properly shutdown, will still not prevent the termination of the JVM.
38+
*/
39+
public class DaemonThreadPool {
40+
private DaemonThreadPool() {}
41+
42+
/**
43+
* Generate a thread pool that will reuse threads, clear them after a while, but constrain the total amount of threads.
44+
* @param name name of the thread pool
45+
* @param maxThreads the maximum amount of threads to start in this pool, after this things will get queued.
46+
*/
47+
public static ExecutorService buildConstrainedCached(String name, int maxThreads) {
48+
return new ThreadPoolExecutor(0, maxThreads,
49+
60, TimeUnit.SECONDS,
50+
new LinkedBlockingQueue<>(),
51+
buildFactory(name)
52+
);
53+
}
54+
55+
private static ThreadFactory buildFactory(String name) {
56+
return new ThreadFactory() {
57+
private final AtomicInteger id = new AtomicInteger(0);
58+
private final ThreadGroup group = new ThreadGroup(name);
59+
@Override
60+
public Thread newThread(Runnable r) {
61+
var t = new Thread(group, r, name + "-" + id.getAndIncrement());
62+
t.setDaemon(true);
63+
return t;
64+
}
65+
};
66+
}
67+
68+
69+
70+
}

src/main/java/engineering/swat/watch/Watch.java

Lines changed: 11 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,14 +30,14 @@
3030
import java.nio.file.Files;
3131
import java.nio.file.LinkOption;
3232
import java.nio.file.Path;
33-
import java.util.concurrent.CompletableFuture;
3433
import java.util.concurrent.Executor;
3534
import java.util.function.BiConsumer;
3635
import java.util.function.Consumer;
3736
import java.util.function.Predicate;
3837

3938
import org.apache.logging.log4j.LogManager;
4039
import org.apache.logging.log4j.Logger;
40+
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
4141

4242
import engineering.swat.watch.impl.EventHandlingWatch;
4343
import engineering.swat.watch.impl.jdk.JDKDirectoryWatch;
@@ -58,7 +58,9 @@ public class Watch {
5858
private final Path path;
5959
private final WatchScope scope;
6060
private volatile Approximation approximateOnOverflow = Approximation.ALL;
61-
private volatile Executor executor = CompletableFuture::runAsync;
61+
62+
private static final Executor FALLBACK_EXECUTOR = DaemonThreadPool.buildConstrainedCached("JavaWatch-internal-handler",Runtime.getRuntime().availableProcessors());
63+
private volatile @MonotonicNonNull Executor executor = null;
6264

6365
private static final BiConsumer<EventHandlingWatch, WatchEvent> EMPTY_HANDLER = (w, e) -> {};
6466
private volatile BiConsumer<EventHandlingWatch, WatchEvent> eventHandler = EMPTY_HANDLER;
@@ -162,11 +164,14 @@ Watch filter(Predicate<WatchEvent> predicate) {
162164

163165
/**
164166
* Optionally configure the executor in which the {@link #on(Consumer)} callbacks are scheduled.
165-
* If not defined, every task will be scheduled on the {@link java.util.concurrent.ForkJoinPool#commonPool()}.
167+
* Make sure to consider the termination of the threadpool, it should be after the close of the active watch.
166168
* @param callbackHandler worker pool to use
167169
* @return this for optional method chaining
168170
*/
169171
public Watch withExecutor(Executor callbackHandler) {
172+
if (callbackHandler == null) {
173+
throw new IllegalArgumentException("null is allowed");
174+
}
170175
this.executor = callbackHandler;
171176
return this;
172177
}
@@ -197,6 +202,9 @@ public ActiveWatch start() throws IOException {
197202
if (this.eventHandler == EMPTY_HANDLER) {
198203
throw new IllegalStateException("There is no onEvent handler defined");
199204
}
205+
if (executor == null) {
206+
executor = FALLBACK_EXECUTOR;
207+
}
200208

201209
var h = applyApproximateOnOverflow();
202210

src/main/java/engineering/swat/watch/impl/jdk/JDKPoller.java

Lines changed: 2 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -45,12 +45,7 @@
4545
import java.util.concurrent.ConcurrentHashMap;
4646
import java.util.concurrent.ExecutionException;
4747
import java.util.concurrent.ExecutorService;
48-
import java.util.concurrent.Executors;
49-
import java.util.concurrent.LinkedBlockingQueue;
50-
import java.util.concurrent.ThreadFactory;
51-
import java.util.concurrent.ThreadPoolExecutor;
5248
import java.util.concurrent.TimeUnit;
53-
import java.util.concurrent.atomic.AtomicInteger;
5449
import java.util.function.Consumer;
5550

5651
import org.apache.logging.log4j.Level;
@@ -59,6 +54,7 @@
5954

6055
import com.sun.nio.file.ExtendedWatchEventModifier;
6156

57+
import engineering.swat.watch.DaemonThreadPool;
6258
import engineering.swat.watch.impl.mac.MacWatchService;
6359
import engineering.swat.watch.impl.util.SubscriptionKey;
6460

@@ -75,19 +71,7 @@ private JDKPoller() {}
7571
* We have to be a bit careful with registering too many paths in parallel
7672
* Linux can be thrown into a deadlock if you try to start 1000 threads and then do a register at the same time.
7773
*/
78-
private static final ExecutorService registerPool = new ThreadPoolExecutor(
79-
0, Runtime.getRuntime().availableProcessors(),
80-
10, TimeUnit.SECONDS,
81-
new LinkedBlockingQueue<>(), new ThreadFactory() {
82-
private final AtomicInteger id = new AtomicInteger(0);
83-
private final ThreadGroup group = new ThreadGroup("registry pool");
84-
@Override
85-
public Thread newThread(Runnable r) {
86-
var t = new Thread(group, r, "JavaWatch-registry-pool-" + id.incrementAndGet());
87-
t.setDaemon(true);
88-
return t;
89-
}
90-
});
74+
private static final ExecutorService registerPool = DaemonThreadPool.buildConstrainedCached("JavaWatch-rate-limit-registry", Runtime.getRuntime().availableProcessors());
9175

9276
static {
9377
try {

0 commit comments

Comments
 (0)