Skip to content

Replace Timer with scheduled tasks #1335

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 10 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 5 additions & 1 deletion build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,11 @@ test {
showStandardStreams = true
}
if (isLocal) {
failFast = true
retry {
failOnPassedAfterRetry = false
maxFailures = 2
maxRetries = 2
}
}
else {
retry {
Expand Down
14 changes: 13 additions & 1 deletion src/main/java/io/nats/client/NatsSystemClock.java
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,29 @@

package io.nats.client;

public class NatsSystemClock {
public final class NatsSystemClock {
private static NatsSystemClockProvider PROVIDER = new NatsSystemClockProvider() {};

/**
* Set the provider. Null will reset to system default
* @param provider the provider
*/
public static void setProvider(final NatsSystemClockProvider provider) {
PROVIDER = provider == null ? new NatsSystemClockProvider() {} : provider;
}

/**
* Get the current milliseconds from the provider
* @return the milliseconds
*/
public static long currentTimeMillis() {
return PROVIDER.currentTimeMillis();
}

/**
* Get the current nano time from the provider
* @return the nano time
*/
public static long nanoTime() {
return PROVIDER.nanoTime();
}
Expand Down
43 changes: 42 additions & 1 deletion src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -504,6 +504,11 @@ public class Options {
* {@link Builder#executor(ExecutorService) executor}.
*/
public static final String PROP_EXECUTOR_SERVICE_CLASS = "executor.service.class";
/**
* Property used to set class name for the Executor Service (executor) class
* {@link Builder#executor(ExecutorService) executor}.
*/
public static final String PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS = "scheduled.executor.service.class";
/**
* Property used to set class name for the Connect Thread Factory
* {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory}.
Expand Down Expand Up @@ -660,14 +665,15 @@ public class Options {
private final ErrorListener errorListener;
private final TimeTraceLogger timeTraceLogger;
private final ConnectionListener connectionListener;
private ReadListener readListener;
private final ReadListener readListener;
private final StatisticsCollector statisticsCollector;
private final String dataPortType;

private final boolean trackAdvancedStats;
private final boolean traceConnection;

private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final ThreadFactory connectThreadFactory;
private final ThreadFactory callbackThreadFactory;
private final ServerPool serverPool;
Expand Down Expand Up @@ -808,6 +814,7 @@ public static class Builder {
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private ThreadFactory connectThreadFactory;
private ThreadFactory callbackThreadFactory;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
Expand Down Expand Up @@ -939,6 +946,7 @@ public Builder properties(Properties props) {
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
return this;
Expand Down Expand Up @@ -1630,6 +1638,19 @@ public Builder executor(ExecutorService executor) {
return this;
}

/**
* Set the {@link ScheduledExecutorService ScheduledExecutorService} used to run scheduled task like
* heartbeat timers
* The default is a ScheduledThreadPoolExecutor that does not
* execute delayed tasks after shutdown and removes tasks on cancel;
* @param scheduledExecutor The ScheduledExecutorService to use for timer tasks
* @return the Builder for chaining
*/
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
return this;
}

/**
* Sets custom thread factory for the executor service
*
Expand Down Expand Up @@ -1910,6 +1931,17 @@ else if (useDefaultTls) {
new DefaultThreadFactory(threadPrefix));
}

if (this.scheduledExecutor == null) {
String threadPrefix = nullOrEmpty(this.connectionName) ? DEFAULT_THREAD_NAME_PREFIX : this.connectionName;
// the core pool size of 3 is chosen considering where we know the scheduler is used.
// 1. Ping timer, 2. cleanup timer, 3. SocketDataPortWithWriteTimeout
// Pull message managers also use a scheduler, but we don't even know if this will be consuming
ScheduledThreadPoolExecutor stpe = new ScheduledThreadPoolExecutor(3, new DefaultThreadFactory(threadPrefix));
stpe.setExecuteExistingDelayedTasksAfterShutdownPolicy(false);
stpe.setRemoveOnCancelPolicy(true);
this.scheduledExecutor = stpe;
}

if (socketReadTimeoutMillis > 0) {
long srtMin = pingInterval.toMillis() + MINIMUM_SOCKET_WRITE_TIMEOUT_GT_CONNECTION_TIMEOUT;
if (socketReadTimeoutMillis < srtMin) {
Expand Down Expand Up @@ -2014,6 +2046,7 @@ public Builder(Options o) {
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
this.scheduledExecutor = o.scheduledExecutor;
this.callbackThreadFactory = o.callbackThreadFactory;
this.connectThreadFactory = o.connectThreadFactory;
this.httpRequestInterceptors = o.httpRequestInterceptors;
Expand Down Expand Up @@ -2082,6 +2115,7 @@ private Options(Builder b) {
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
this.scheduledExecutor = b.scheduledExecutor;
this.callbackThreadFactory = b.callbackThreadFactory;
this.connectThreadFactory = b.connectThreadFactory;
this.httpRequestInterceptors = b.httpRequestInterceptors;
Expand All @@ -2107,6 +2141,13 @@ public ExecutorService getExecutor() {
return this.executor;
}

/**
* @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc
*/
public ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

/**
* @return the callback executor, see {@link Builder#callbackThreadFactory(ThreadFactory) callbackThreadFactory()} in the builder doc
*/
Expand Down
74 changes: 20 additions & 54 deletions src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,10 @@
import io.nats.client.PullRequestOptions;
import io.nats.client.SubscribeOptions;
import io.nats.client.support.NatsConstants;
import io.nats.client.support.ScheduledTask;

import java.time.Duration;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;

Expand All @@ -46,8 +45,8 @@ public enum ManageResult {MESSAGE, STATUS_HANDLED, STATUS_TERMINUS, STATUS_ERROR
protected boolean hb;
protected long idleHeartbeatSetting;
protected long alarmPeriodSettingNanos;
protected MmTimerTask heartbeatTimerTask;
protected Timer heartbeatTimer;
protected ScheduledTask heartbeatTask;
protected final AtomicLong currentAlarmPeriodNanos;

protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncMode) {
stateChangeLock = new ReentrantLock();
Expand All @@ -62,6 +61,7 @@ protected MessageManager(NatsConnection conn, SubscribeOptions so, boolean syncM
idleHeartbeatSetting = 0;
alarmPeriodSettingNanos = 0;
lastMsgReceivedNanoTime = new AtomicLong(NatsSystemClock.nanoTime());
currentAlarmPeriodNanos = new AtomicLong();
}

protected boolean isSyncMode() { return syncMode; }
Expand Down Expand Up @@ -134,62 +134,29 @@ protected void updateLastMessageReceived() {
lastMsgReceivedNanoTime.set(NatsSystemClock.nanoTime());
}

class MmTimerTask extends TimerTask {
long alarmPeriodNanos;
final AtomicBoolean alive;

public MmTimerTask(long alarmPeriodNanos) {
this.alarmPeriodNanos = alarmPeriodNanos;
alive = new AtomicBoolean(true);
}

public void reuse() {
alive.getAndSet(true);
}

public void shutdown() {
alive.getAndSet(false);
}

@Override
public void run() {
if (alive.get() && !Thread.interrupted()) {
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
if (alive.get() && sinceLast > alarmPeriodNanos) {
handleHeartbeatError();
}
}
}

@Override
public String toString() {
long sinceLastMillis = (NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get()) / NatsConstants.NANOS_PER_MILLI;
return "MmTimerTask{" +
", alarmPeriod=" + (alarmPeriodNanos / NatsConstants.NANOS_PER_MILLI) +
"ms, alive=" + alive.get() +
", sinceLast=" + sinceLastMillis + "ms}";
}
}

protected void initOrResetHeartbeatTimer() {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
if (heartbeatTask != null) {
// Same settings, just reuse the existing timer
if (heartbeatTimerTask.alarmPeriodNanos == alarmPeriodSettingNanos) {
heartbeatTimerTask.reuse();
if (currentAlarmPeriodNanos.get() == alarmPeriodSettingNanos) {
updateLastMessageReceived();
return;
}

// Replace timer since settings have changed
shutdownHeartbeatTimer();
}

// replacement or new comes here
heartbeatTimer = new Timer();
heartbeatTimerTask = new MmTimerTask(alarmPeriodSettingNanos);
long alarmPeriodSettingMillis = alarmPeriodSettingNanos / NatsConstants.NANOS_PER_MILLI;
heartbeatTimer.schedule(heartbeatTimerTask, alarmPeriodSettingMillis, alarmPeriodSettingMillis);
this.currentAlarmPeriodNanos.set(alarmPeriodSettingNanos);
heartbeatTask = new ScheduledTask(conn.getScheduledExecutor(), alarmPeriodSettingNanos, TimeUnit.NANOSECONDS,
() -> {
long sinceLast = NatsSystemClock.nanoTime() - lastMsgReceivedNanoTime.get();
if (sinceLast > currentAlarmPeriodNanos.get()) {
handleHeartbeatError();
}
});
updateLastMessageReceived();
}
finally {
Expand All @@ -200,12 +167,11 @@ protected void initOrResetHeartbeatTimer() {
protected void shutdownHeartbeatTimer() {
stateChangeLock.lock();
try {
if (heartbeatTimer != null) {
heartbeatTimerTask.shutdown();
heartbeatTimerTask = null;
heartbeatTimer.cancel();
heartbeatTimer = null;
if (heartbeatTask != null) {
heartbeatTask.shutdown();
heartbeatTask = null;
}
currentAlarmPeriodNanos.set(0);
}
finally {
stateChangeLock.unlock();
Expand Down
52 changes: 26 additions & 26 deletions src/main/java/io/nats/client/impl/NatsConnection.java
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,7 @@
import io.nats.client.*;
import io.nats.client.ConnectionListener.Events;
import io.nats.client.api.ServerInfo;
import io.nats.client.support.ByteArrayBuilder;
import io.nats.client.support.NatsRequestCompletableFuture;
import io.nats.client.support.NatsUri;
import io.nats.client.support.Validator;
import io.nats.client.support.*;

import java.io.IOException;
import java.net.InetAddress;
Expand Down Expand Up @@ -82,7 +79,8 @@ class NatsConnection implements Connection {
private final String mainInbox;
private final AtomicReference<NatsDispatcher> inboxDispatcher;
private final ReentrantLock inboxDispatcherLock;
private Timer timer;
private ScheduledTask pingTask;
private ScheduledTask cleanupTask;

private final AtomicBoolean needPing;

Expand All @@ -98,6 +96,7 @@ class NatsConnection implements Connection {
private final ExecutorService callbackRunner;
private final ExecutorService executor;
private final ExecutorService connectExecutor;
private final ScheduledExecutorService scheduledExecutor;
private final boolean advancedTracking;

private final ServerPool serverPool;
Expand Down Expand Up @@ -159,6 +158,7 @@ class NatsConnection implements Connection {
this.executor = options.getExecutor();
this.callbackRunner = options.getCallbackExecutor();
this.connectExecutor = options.getConnectExecutor();
this.scheduledExecutor = options.getScheduledExecutor();

timeTraceLogger.trace("creating reader and writer");
this.reader = new NatsConnectionReader(this);
Expand Down Expand Up @@ -595,35 +595,27 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) {
pongFuture.get(timeoutNanos, TimeUnit.NANOSECONDS);
}

if (this.timer == null) {
if (pingTask == null) {
timeCheck(end, "starting ping and cleanup timers");
this.timer = new Timer("Nats Connection Timer");

long pingMillis = this.options.getPingInterval().toMillis();

if (pingMillis > 0) {
this.timer.schedule(new TimerTask() {
public void run() {
if (isConnected()) {
try {
softPing(); // The timer always uses the standard queue
}
catch (Exception e) {
// it's running in a thread, there is no point throwing here
}
pingTask = new ScheduledTask(scheduledExecutor, pingMillis, () -> {
if (isConnected()) {
try {
softPing(); // The timer always uses the standard queue
}
catch (Exception e) {
// it's running in a thread, there is no point throwing here
}
}
}, pingMillis, pingMillis);
});
}

long cleanMillis = this.options.getRequestCleanupInterval().toMillis();

if (cleanMillis > 0) {
this.timer.schedule(new TimerTask() {
public void run() {
cleanResponses(false);
}
}, cleanMillis, cleanMillis);
cleanupTask = new ScheduledTask(scheduledExecutor, cleanMillis, () -> cleanResponses(false));
}
}

Expand Down Expand Up @@ -826,9 +818,13 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
this.dispatchers.clear();
this.subscribers.clear();

if (timer != null) {
timer.cancel();
timer = null;
if (pingTask != null) {
pingTask.shutdown();
pingTask = null;
}
if (cleanupTask != null) {
cleanupTask.shutdown();
cleanupTask = null;
}

cleanResponses(true);
Expand Down Expand Up @@ -1969,6 +1965,10 @@ ExecutorService getExecutor() {
return executor;
}

ScheduledExecutorService getScheduledExecutor() {
return scheduledExecutor;
}

void updateStatus(Status newStatus) {
Status oldStatus = this.status;

Expand Down
Loading
Loading