From a541204fb25b07af759210011c464d3ee95a2f5b Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 3 Dec 2025 10:46:34 -0500 Subject: [PATCH 1/7] Lazy init executors so as not to keep making instances if the options getter is called repeatedly. --- src/main/java/io/nats/client/Options.java | 47 ++++++++++++++++------- 1 file changed, 33 insertions(+), 14 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index c5151dafb..1cb90bb1b 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -704,10 +704,17 @@ public class Options { private final boolean trackAdvancedStats; private final boolean traceConnection; - private final ExecutorService executor; - private final ScheduledExecutorService scheduledExecutor; + private final ExecutorService configuredExecutor; + private final ScheduledExecutorService configuredScheduledExecutor; private final ThreadFactory connectThreadFactory; private final ThreadFactory callbackThreadFactory; + + // these are not final b/c they are lazy initialized + private ExecutorService resolvedExecutor; + private ScheduledExecutorService resolvedScheduledExecutor; + private ExecutorService resolvedConnectExecutor; + private ExecutorService resolvedCallbackExecutor; + private final ServerPool serverPool; private final DispatcherFactory dispatcherFactory; @@ -2106,8 +2113,8 @@ public Builder(Options o) { this.statisticsCollector = o.statisticsCollector; this.dataPortType = o.dataPortType; this.trackAdvancedStats = o.trackAdvancedStats; - this.executor = o.executor; - this.scheduledExecutor = o.scheduledExecutor; + this.executor = o.configuredExecutor; + this.scheduledExecutor = o.configuredScheduledExecutor; this.callbackThreadFactory = o.callbackThreadFactory; this.connectThreadFactory = o.connectThreadFactory; this.httpRequestInterceptors = o.httpRequestInterceptors; @@ -2178,8 +2185,8 @@ private Options(Builder b) { this.statisticsCollector = b.statisticsCollector; this.dataPortType = b.dataPortType; this.trackAdvancedStats = b.trackAdvancedStats; - this.executor = b.executor; - this.scheduledExecutor = b.scheduledExecutor; + this.configuredExecutor = b.executor; + this.configuredScheduledExecutor = b.scheduledExecutor; this.callbackThreadFactory = b.callbackThreadFactory; this.connectThreadFactory = b.connectThreadFactory; this.httpRequestInterceptors = b.httpRequestInterceptors; @@ -2204,7 +2211,10 @@ private Options(Builder b) { * @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc */ public ExecutorService getExecutor() { - return this.executor == null ? _getInternalExecutor() : this.executor; + if (resolvedExecutor == null) { + resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor; + } + return resolvedExecutor; } private ExecutorService _getInternalExecutor() { @@ -2220,7 +2230,10 @@ private ExecutorService _getInternalExecutor() { * @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc */ public ScheduledExecutorService getScheduledExecutor() { - return this.scheduledExecutor == null ? _getInternalScheduledExecutor() : this.scheduledExecutor; + if (resolvedScheduledExecutor == null) { + resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor; + } + return resolvedScheduledExecutor; } private ScheduledExecutorService _getInternalScheduledExecutor() { @@ -2239,8 +2252,11 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { * @return the executor */ public ExecutorService getCallbackExecutor() { - return this.callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + if (resolvedCallbackExecutor == null) { + resolvedCallbackExecutor = callbackThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory); + } + return resolvedCallbackExecutor; } /** @@ -2248,8 +2264,11 @@ public ExecutorService getCallbackExecutor() { * @return the executor */ public ExecutorService getConnectExecutor() { - return this.connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + if (resolvedConnectExecutor == null) { + resolvedConnectExecutor = connectThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory); + } + return resolvedConnectExecutor; } /** @@ -2257,7 +2276,7 @@ public ExecutorService getConnectExecutor() { * @return true if the executor is internal */ public boolean executorIsInternal() { - return this.executor == null; + return this.configuredExecutor == null; } /** @@ -2265,7 +2284,7 @@ public boolean executorIsInternal() { * @return true if the executor is internal */ public boolean scheduledExecutorIsInternal() { - return this.scheduledExecutor == null; + return this.configuredScheduledExecutor == null; } /** From 7b95db8a1123d18d4069f31a6ff1967b5f0bd05f Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 3 Dec 2025 11:31:03 -0500 Subject: [PATCH 2/7] Lazy init executors so as not to keep making instances if the options getter is called repeatedly. --- src/main/java/io/nats/client/Options.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 1cb90bb1b..fbf237fa6 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -2211,7 +2211,7 @@ private Options(Builder b) { * @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc */ public ExecutorService getExecutor() { - if (resolvedExecutor == null) { + if (resolvedExecutor == null || resolvedExecutor.isShutdown()) { resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor; } return resolvedExecutor; @@ -2230,7 +2230,7 @@ private ExecutorService _getInternalExecutor() { * @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc */ public ScheduledExecutorService getScheduledExecutor() { - if (resolvedScheduledExecutor == null) { + if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) { resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor; } return resolvedScheduledExecutor; @@ -2252,7 +2252,7 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { * @return the executor */ public ExecutorService getCallbackExecutor() { - if (resolvedCallbackExecutor == null) { + if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) { resolvedCallbackExecutor = callbackThreadFactory == null ? DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory); } @@ -2264,7 +2264,7 @@ public ExecutorService getCallbackExecutor() { * @return the executor */ public ExecutorService getConnectExecutor() { - if (resolvedConnectExecutor == null) { + if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) { resolvedConnectExecutor = connectThreadFactory == null ? DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory); } From f377dbee1466562325284fb711d45b9a11e82b57 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 3 Dec 2025 12:06:32 -0500 Subject: [PATCH 3/7] shut down internal in options instance for better multi thread, cleanup and reuse --- src/main/java/io/nats/client/Options.java | 97 ++++++++++++++++--- .../io/nats/client/impl/NatsConnection.java | 70 +++++-------- 2 files changed, 106 insertions(+), 61 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index fbf237fa6..d7ef0e065 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import static io.nats.client.support.Encoding.*; @@ -2211,10 +2212,16 @@ private Options(Builder b) { * @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc */ public ExecutorService getExecutor() { - if (resolvedExecutor == null || resolvedExecutor.isShutdown()) { - resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor; + executorsLock.lock(); + try { + if (resolvedExecutor == null || resolvedExecutor.isShutdown()) { + resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor; + } + return resolvedExecutor; + } + finally { + executorsLock.unlock(); } - return resolvedExecutor; } private ExecutorService _getInternalExecutor() { @@ -2230,10 +2237,16 @@ private ExecutorService _getInternalExecutor() { * @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc */ public ScheduledExecutorService getScheduledExecutor() { - if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) { - resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor; + executorsLock.lock(); + try { + if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) { + resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor; + } + return resolvedScheduledExecutor; + } + finally { + executorsLock.unlock(); } - return resolvedScheduledExecutor; } private ScheduledExecutorService _getInternalScheduledExecutor() { @@ -2252,11 +2265,17 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { * @return the executor */ public ExecutorService getCallbackExecutor() { - if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) { - resolvedCallbackExecutor = callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory); + executorsLock.lock(); + try { + if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) { + resolvedCallbackExecutor = callbackThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory); + } + return resolvedCallbackExecutor; + } + finally { + executorsLock.unlock(); } - return resolvedCallbackExecutor; } /** @@ -2264,11 +2283,17 @@ public ExecutorService getCallbackExecutor() { * @return the executor */ public ExecutorService getConnectExecutor() { - if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) { - resolvedConnectExecutor = connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory); + executorsLock.lock(); + try { + if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) { + resolvedConnectExecutor = connectThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory); + } + return resolvedConnectExecutor; + } + finally { + executorsLock.unlock(); } - return resolvedConnectExecutor; } /** @@ -2303,6 +2328,50 @@ public boolean connectExecutorIsInternal() { return this.connectThreadFactory == null; } + private final ReentrantLock executorsLock = new ReentrantLock(); + + public void shutdownInternalExecutors() throws InterruptedException { + executorsLock.lock(); + try { + if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) { + // we don't just shut it down now to give any callbacks a chance to finish + ExecutorService es = resolvedCallbackExecutor; + resolvedCallbackExecutor = null; + es.shutdown(); + try { + //noinspection ResultOfMethodCallIgnored + es.awaitTermination(getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + finally { + es.shutdownNow(); + } + } + + if (resolvedConnectExecutor != null && connectExecutorIsInternal()) { + // There's no need to wait for running tasks since we're told to close + ExecutorService es = resolvedConnectExecutor; + resolvedConnectExecutor = null; + es.shutdownNow(); + } + + if (resolvedExecutor != null && executorIsInternal()) { + ExecutorService es = resolvedExecutor; + resolvedExecutor = null; + es.shutdownNow(); + } + + if (resolvedScheduledExecutor != null && scheduledExecutorIsInternal()) { + ScheduledExecutorService ses = resolvedScheduledExecutor; + resolvedScheduledExecutor = null; + ses.shutdownNow(); + } + + } + finally { + executorsLock.unlock(); + } + } + /** * the list of HttpRequest interceptors. * @return the list diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 8dbe3bbd6..a8a2e75df 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -96,10 +96,12 @@ class NatsConnection implements Connection { private final AtomicBoolean blockPublishForDrain; private final AtomicBoolean tryingToConnect; - private final ExecutorService callbackRunner; - private final ExecutorService executor; - private final ExecutorService connectExecutor; - private final ScheduledExecutorService scheduledExecutor; + // these are not final so they can be nullified on close + private ExecutorService callbackExecutor; + private ExecutorService executor; + private ExecutorService connectExecutor; + private ScheduledExecutorService scheduledExecutor; + private final boolean advancedTracking; private final ServerPool serverPool; @@ -158,7 +160,7 @@ class NatsConnection implements Connection { timeTraceLogger.trace("creating executors"); this.executor = options.getExecutor(); - this.callbackRunner = options.getCallbackExecutor(); + this.callbackExecutor = options.getCallbackExecutor(); this.connectExecutor = options.getConnectExecutor(); this.scheduledExecutor = options.getScheduledExecutor(); @@ -875,37 +877,11 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep statusLock.unlock(); } - if (options.callbackExecutorIsInternal()) { - // Stop the error handling and connect executors - callbackRunner.shutdown(); - try { - // At this point in the flow, the connection is shutting down. - // There is really no use in giving this information to the developer, - // It's fair to say that an exception here anyway will practically never happen - // and if it did, the app is probably already frozen. - //noinspection ResultOfMethodCallIgnored - callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - finally { - callbackRunner.shutdownNow(); - } - } - - if (options.connectExecutorIsInternal()) { - // There's no need to wait for running tasks since we're told to close - connectExecutor.shutdownNow(); - } - - // The callbackRunner and connectExecutor always come from a factory, - // so we always shut them down. - // The executor and scheduledExecutor come from a factory only if - // the user does NOT supply them, so we shut them down in that case. - if (options.executorIsInternal()) { - executor.shutdownNow(); - } - if (options.scheduledExecutorIsInternal()) { - scheduledExecutor.shutdownNow(); - } + callbackExecutor = null; + executor = null; + connectExecutor = null; + scheduledExecutor = null; + options.shutdownInternalExecutors(); statusLock.lock(); try { @@ -918,7 +894,7 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep } boolean callbackRunnerIsShutdown() { - return callbackRunner == null || callbackRunner.isShutdown(); + return callbackExecutor == null || callbackExecutor.isShutdown(); } boolean executorIsShutdown() { @@ -1881,9 +1857,9 @@ void processOK() { } void processSlowConsumer(Consumer consumer) { - if (!this.callbackRunner.isShutdown()) { + if (!this.callbackExecutor.isShutdown()) { try { - this.callbackRunner.execute(() -> { + this.callbackExecutor.execute(() -> { try { options.getErrorListener().slowConsumerDetected(this, consumer); } @@ -1901,9 +1877,9 @@ void processSlowConsumer(Consumer consumer) { void processException(Exception exp) { this.statistics.incrementExceptionCount(); - if (!this.callbackRunner.isShutdown()) { + if (!this.callbackExecutor.isShutdown()) { try { - this.callbackRunner.execute(() -> { + this.callbackExecutor.execute(() -> { try { options.getErrorListener().exceptionOccurred(this, exp); } @@ -1929,9 +1905,9 @@ void processError(String errorText) { this.serverAuthErrors.put(currentServer, errorText); } - if (!this.callbackRunner.isShutdown()) { + if (!this.callbackExecutor.isShutdown()) { try { - this.callbackRunner.execute(() -> { + this.callbackExecutor.execute(() -> { try { options.getErrorListener().errorOccurred(this, errorText); } @@ -1951,9 +1927,9 @@ interface ErrorListenerCaller { } void executeCallback(ErrorListenerCaller elc) { - if (!this.callbackRunner.isShutdown()) { + if (!this.callbackExecutor.isShutdown()) { try { - this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener())); + this.callbackExecutor.execute(() -> elc.call(this, options.getErrorListener())); } catch (RejectedExecutionException re) { // Timing with shutdown, let it go @@ -1976,11 +1952,11 @@ String uriDetail(NatsUri uri, NatsUri hostOrlast) { } void processConnectionEvent(Events type, String uriDetails) { - if (!this.callbackRunner.isShutdown()) { + if (!this.callbackExecutor.isShutdown()) { try { long time = System.currentTimeMillis(); for (ConnectionListener listener : connectionListeners) { - this.callbackRunner.execute(() -> { + this.callbackExecutor.execute(() -> { try { listener.connectionEvent(this, type, time, uriDetails); } catch (Exception ex) { From de12c4777229974c91a30da7e766454096c39e59 Mon Sep 17 00:00:00 2001 From: scottf Date: Wed, 3 Dec 2025 12:40:30 -0500 Subject: [PATCH 4/7] fixed the way shutdown state was reported --- src/main/java/io/nats/client/Options.java | 9 +++--- .../io/nats/client/impl/NatsConnection.java | 16 +++++----- .../client/impl/NatsConnectionImplTests.java | 32 +++++++++---------- 3 files changed, 28 insertions(+), 29 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index d7ef0e065..0066b816d 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -2334,7 +2334,7 @@ public void shutdownInternalExecutors() throws InterruptedException { executorsLock.lock(); try { if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) { - // we don't just shut it down now to give any callbacks a chance to finish + // we don't just shutdownNow to give any callbacks a chance to finish ExecutorService es = resolvedCallbackExecutor; resolvedCallbackExecutor = null; es.shutdown(); @@ -2348,22 +2348,21 @@ public void shutdownInternalExecutors() throws InterruptedException { } if (resolvedConnectExecutor != null && connectExecutorIsInternal()) { - // There's no need to wait for running tasks since we're told to close ExecutorService es = resolvedConnectExecutor; resolvedConnectExecutor = null; - es.shutdownNow(); + es.shutdownNow(); // There's no need to wait... } if (resolvedExecutor != null && executorIsInternal()) { ExecutorService es = resolvedExecutor; resolvedExecutor = null; - es.shutdownNow(); + es.shutdownNow(); // There's no need to wait... } if (resolvedScheduledExecutor != null && scheduledExecutorIsInternal()) { ScheduledExecutorService ses = resolvedScheduledExecutor; resolvedScheduledExecutor = null; - ses.shutdownNow(); + ses.shutdownNow(); // There's no need to wait... } } diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index a8a2e75df..da2aa9fb0 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -893,20 +893,20 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep } } - boolean callbackRunnerIsShutdown() { - return callbackExecutor == null || callbackExecutor.isShutdown(); + boolean callbackRunnerClosed() { + return callbackExecutor == null; } - boolean executorIsShutdown() { - return executor == null || executor.isShutdown(); + boolean executorIsClosed() { + return executor == null; } - boolean connectExecutorIsShutdown() { - return connectExecutor == null || connectExecutor.isShutdown(); + boolean connectExecutorClosed() { + return connectExecutor == null; } - boolean scheduledExecutorIsShutdown() { - return scheduledExecutor == null || scheduledExecutor.isShutdown(); + boolean scheduledExecutorIsClosed() { + return scheduledExecutor == null; } // Should only be called from closeSocket or close diff --git a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java index e12af837f..70d23c23a 100644 --- a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java +++ b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java @@ -67,41 +67,41 @@ private static void verifyInternalExecutors(Options options, NatsConnection nc) assertTrue(options.executorIsInternal()); assertTrue(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerIsShutdown()); - assertFalse(nc.connectExecutorIsShutdown()); + assertFalse(nc.callbackRunnerClosed()); + assertFalse(nc.connectExecutorClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertFalse(nc.executorIsClosed()); + assertFalse(nc.scheduledExecutorIsClosed()); nc.subscribe("*"); Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerIsShutdown()); - assertTrue(nc.connectExecutorIsShutdown()); + assertTrue(nc.callbackRunnerClosed()); + assertTrue(nc.connectExecutorClosed()); - assertTrue(nc.executorIsShutdown()); - assertTrue(nc.scheduledExecutorIsShutdown()); + assertTrue(nc.executorIsClosed()); + assertTrue(nc.scheduledExecutorIsClosed()); } private static void verifyExternalExecutors(Options options, NatsConnection nc) throws InterruptedException { assertFalse(options.executorIsInternal()); assertFalse(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerIsShutdown()); - assertFalse(nc.connectExecutorIsShutdown()); + assertFalse(nc.callbackRunnerClosed()); + assertFalse(nc.connectExecutorClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertFalse(nc.executorIsClosed()); + assertFalse(nc.scheduledExecutorIsClosed()); nc.subscribe("*"); Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerIsShutdown()); - assertTrue(nc.connectExecutorIsShutdown()); + assertTrue(nc.callbackRunnerClosed()); + assertTrue(nc.connectExecutorClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertTrue(nc.executorIsClosed()); + assertTrue(nc.scheduledExecutorIsClosed()); } } From 818764fb23930a8de40093574281625b9689a8f1 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 4 Dec 2025 06:41:24 -0500 Subject: [PATCH 5/7] refactor callback to remove 5 repeated blocks of the same code and properly null guard the callback executor --- .../io/nats/client/impl/MessageManager.java | 2 +- .../io/nats/client/impl/NatsConnection.java | 60 +++++-------------- .../nats/client/impl/PullMessageManager.java | 8 +-- .../nats/client/impl/PushMessageManager.java | 4 +- .../impl/SocketDataPortWithWriteTimeout.java | 2 +- .../impl/SocketDataPortBlockSimulator.java | 2 +- 6 files changed, 23 insertions(+), 55 deletions(-) diff --git a/src/main/java/io/nats/client/impl/MessageManager.java b/src/main/java/io/nats/client/impl/MessageManager.java index 79ef4baee..eac6c58ba 100644 --- a/src/main/java/io/nats/client/impl/MessageManager.java +++ b/src/main/java/io/nats/client/impl/MessageManager.java @@ -107,7 +107,7 @@ protected void trackJsMessage(Message msg) { protected void subTrackJsMessage(Message msg) {} protected void handleHeartbeatError() { - conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq)); + conn.notifyErrorListener((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq)); } protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) { diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index da2aa9fb0..478c0528e 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -574,7 +574,7 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) { }; timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary"); - Future future = this.connectExecutor.submit(connectTask); + Future future = connectExecutor.submit(connectTask); try { future.get(timeoutNanos, TimeUnit.NANOSECONDS); } @@ -1856,12 +1856,12 @@ void processOK() { this.statistics.incrementOkCount(); } - void processSlowConsumer(Consumer consumer) { - if (!this.callbackExecutor.isShutdown()) { + void makeCallback(Runnable r) { + if (callbackExecutor != null) { try { this.callbackExecutor.execute(() -> { try { - options.getErrorListener().slowConsumerDetected(this, consumer); + r.run(); } catch (Exception ex) { this.statistics.incrementExceptionCount(); @@ -1869,29 +1869,18 @@ void processSlowConsumer(Consumer consumer) { }); } catch (RejectedExecutionException re) { - // Timing with shutdown, let it go + // Timing with shutdown probably, let it go } } } + void processSlowConsumer(Consumer consumer) { + makeCallback(() -> options.getErrorListener().slowConsumerDetected(this, consumer)); + } + void processException(Exception exp) { this.statistics.incrementExceptionCount(); - - if (!this.callbackExecutor.isShutdown()) { - try { - this.callbackExecutor.execute(() -> { - try { - options.getErrorListener().exceptionOccurred(this, exp); - } - catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + makeCallback(() -> options.getErrorListener().exceptionOccurred(this, exp)); } void processError(String errorText) { @@ -1905,36 +1894,15 @@ void processError(String errorText) { this.serverAuthErrors.put(currentServer, errorText); } - if (!this.callbackExecutor.isShutdown()) { - try { - this.callbackExecutor.execute(() -> { - try { - options.getErrorListener().errorOccurred(this, errorText); - } - catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + makeCallback(() -> options.getErrorListener().errorOccurred(this, errorText)); } interface ErrorListenerCaller { void call(Connection conn, ErrorListener el); } - void executeCallback(ErrorListenerCaller elc) { - if (!this.callbackExecutor.isShutdown()) { - try { - this.callbackExecutor.execute(() -> elc.call(this, options.getErrorListener())); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + void notifyErrorListener(ErrorListenerCaller elc) { + makeCallback(() -> elc.call(this, options.getErrorListener())); } String uriDetail(NatsUri uri) { @@ -1952,7 +1920,7 @@ String uriDetail(NatsUri uri, NatsUri hostOrlast) { } void processConnectionEvent(Events type, String uriDetails) { - if (!this.callbackExecutor.isShutdown()) { + if (callbackExecutor != null) { try { long time = System.currentTimeMillis(); for (ConnectionListener listener : connectionListeners) { diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index c1538803e..97d47dbe3 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -164,7 +164,7 @@ protected ManageResult manageStatus(Message msg) { case REQUEST_TIMEOUT_CODE: case NO_RESPONDERS_CODE: if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_TERMINUS; @@ -174,7 +174,7 @@ protected ManageResult manageStatus(Message msg) { if (statMsg.startsWith(EXCEEDED_MAX_PREFIX) || statMsg.equals(SERVER_SHUTDOWN)) { if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_HANDLED; } @@ -184,7 +184,7 @@ protected ManageResult manageStatus(Message msg) { || statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES)) { if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_TERMINUS; } @@ -197,7 +197,7 @@ protected ManageResult manageStatus(Message msg) { // All unknown 409s are errors, since that basically means the client is not aware of them. // These known ones are also errors: "Consumer Deleted" and "Consumer is push based" - conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusError(c, sub, status)); return STATUS_ERROR; } diff --git a/src/main/java/io/nats/client/impl/PushMessageManager.java b/src/main/java/io/nats/client/impl/PushMessageManager.java index df281ea1b..5e4ac45b8 100644 --- a/src/main/java/io/nats/client/impl/PushMessageManager.java +++ b/src/main/java/io/nats/client/impl/PushMessageManager.java @@ -116,7 +116,7 @@ protected ManageResult manageStatus(Message msg) { } } - conn.executeCallback((c, el) -> el.unhandledStatus(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.unhandledStatus(c, sub, status)); return STATUS_ERROR; } @@ -126,7 +126,7 @@ private void processFlowControl(String fcSubject, FlowControlSource source) { if (fcSubject != null && !fcSubject.equals(lastFcSubject)) { conn.publishInternal(fcSubject, null, null, null, false, false); lastFcSubject = fcSubject; // set after publish in case the pub fails - conn.executeCallback((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); + conn.notifyErrorListener((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); } } } diff --git a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java index a62b186ff..f4eabf8c8 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java +++ b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java @@ -56,7 +56,7 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti // if now is after when it was supposed to be done by if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) { writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed - connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); + connection.notifyErrorListener((c, el) -> el.socketWriteTimeout(c)); try { connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE); } diff --git a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java index dd921d6f2..b84c285ef 100644 --- a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java +++ b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java @@ -57,7 +57,7 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti // if now is after when it was supposed to be done by if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) { writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed - connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); + connection.notifyErrorListener((c, el) -> el.socketWriteTimeout(c)); blocking.set(0); SIMULATE_SOCKET_BLOCK.set(0); try { From 9a1732a2a05fe22499229bedd2b5355bb6d13f97 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 4 Dec 2025 06:52:52 -0500 Subject: [PATCH 6/7] refactor callback to remove 5 repeated blocks of the same code and properly null guard the callback executor --- .../io/nats/client/impl/NatsConnection.java | 20 +++++-------------- .../client/impl/NatsConnectionImplTests.java | 16 +++++++-------- 2 files changed, 13 insertions(+), 23 deletions(-) diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 478c0528e..04b1b360e 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -893,21 +893,11 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep } } - boolean callbackRunnerClosed() { - return callbackExecutor == null; - } - - boolean executorIsClosed() { - return executor == null; - } - - boolean connectExecutorClosed() { - return connectExecutor == null; - } - - boolean scheduledExecutorIsClosed() { - return scheduledExecutor == null; - } + // these four *ExecutorIsClosed() are only used for tests + boolean callbackExecutorIsClosed() { return callbackExecutor == null; } + boolean executorIsClosed() { return executor == null; } + boolean connectExecutorIsClosed() { return connectExecutor == null; } + boolean scheduledExecutorIsClosed() { return scheduledExecutor == null; } // Should only be called from closeSocket or close void closeSocketImpl(boolean forceClose) { diff --git a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java index 70d23c23a..8bd9259ea 100644 --- a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java +++ b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java @@ -67,8 +67,8 @@ private static void verifyInternalExecutors(Options options, NatsConnection nc) assertTrue(options.executorIsInternal()); assertTrue(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerClosed()); - assertFalse(nc.connectExecutorClosed()); + assertFalse(nc.callbackExecutorIsClosed()); + assertFalse(nc.connectExecutorIsClosed()); assertFalse(nc.executorIsClosed()); assertFalse(nc.scheduledExecutorIsClosed()); @@ -77,8 +77,8 @@ private static void verifyInternalExecutors(Options options, NatsConnection nc) Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerClosed()); - assertTrue(nc.connectExecutorClosed()); + assertTrue(nc.callbackExecutorIsClosed()); + assertTrue(nc.connectExecutorIsClosed()); assertTrue(nc.executorIsClosed()); assertTrue(nc.scheduledExecutorIsClosed()); @@ -88,8 +88,8 @@ private static void verifyExternalExecutors(Options options, NatsConnection nc) assertFalse(options.executorIsInternal()); assertFalse(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerClosed()); - assertFalse(nc.connectExecutorClosed()); + assertFalse(nc.callbackExecutorIsClosed()); + assertFalse(nc.connectExecutorIsClosed()); assertFalse(nc.executorIsClosed()); assertFalse(nc.scheduledExecutorIsClosed()); @@ -98,8 +98,8 @@ private static void verifyExternalExecutors(Options options, NatsConnection nc) Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerClosed()); - assertTrue(nc.connectExecutorClosed()); + assertTrue(nc.callbackExecutorIsClosed()); + assertTrue(nc.connectExecutorIsClosed()); assertTrue(nc.executorIsClosed()); assertTrue(nc.scheduledExecutorIsClosed()); From a5254c5bd3fd1149d1f654f3b6c0b5fa0a6abff2 Mon Sep 17 00:00:00 2001 From: scottf Date: Thu, 4 Dec 2025 07:22:34 -0500 Subject: [PATCH 7/7] self-review --- src/main/java/io/nats/client/Options.java | 78 ++++++++++--------- .../io/nats/client/impl/NatsConnection.java | 28 ++----- 2 files changed, 49 insertions(+), 57 deletions(-) diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index 0066b816d..abe09fcde 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -705,12 +705,15 @@ public class Options { private final boolean trackAdvancedStats; private final boolean traceConnection; - private final ExecutorService configuredExecutor; - private final ScheduledExecutorService configuredScheduledExecutor; - private final ThreadFactory connectThreadFactory; - private final ThreadFactory callbackThreadFactory; + private final ReentrantLock executorsLock; + + private final ExecutorService userExecutor; + private final ScheduledExecutorService userScheduledExecutor; + private final ThreadFactory userConnectThreadFactory; + private final ThreadFactory userCallbackThreadFactory; // these are not final b/c they are lazy initialized + // and nulled during shutdownInternalExecutors private ExecutorService resolvedExecutor; private ScheduledExecutorService resolvedScheduledExecutor; private ExecutorService resolvedConnectExecutor; @@ -857,10 +860,10 @@ public static class Builder { private ReadListener readListener = null; private StatisticsCollector statisticsCollector = null; private String dataPortType = DEFAULT_DATA_PORT_TYPE; - private ExecutorService executor; - private ScheduledExecutorService scheduledExecutor; - private ThreadFactory connectThreadFactory; - private ThreadFactory callbackThreadFactory; + private ExecutorService userExecutor; + private ScheduledExecutorService userScheduledExecutor; + private ThreadFactory userConnectThreadFactory; + private ThreadFactory userCallbackThreadFactory; private List> httpRequestInterceptors; private Proxy proxy; @@ -995,10 +998,10 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o); classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o); - classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o); - classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o); - classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o); - classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o); + classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.userExecutor = (ExecutorService) o); + classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.userScheduledExecutor = (ScheduledExecutorService) o); + classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.userConnectThreadFactory = (ThreadFactory) o); + classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.userCallbackThreadFactory = (ThreadFactory) o); return this; } @@ -1712,7 +1715,7 @@ public Builder statisticsCollector(StatisticsCollector collector) { * @return the Builder for chaining */ public Builder executor(ExecutorService executor) { - this.executor = executor; + this.userExecutor = executor; return this; } @@ -1725,7 +1728,7 @@ public Builder executor(ExecutorService executor) { * @return the Builder for chaining */ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { - this.scheduledExecutor = scheduledExecutor; + this.userScheduledExecutor = scheduledExecutor; return this; } @@ -1736,7 +1739,7 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { * @return the Builder for chaining */ public Builder connectThreadFactory(ThreadFactory threadFactory) { - this.connectThreadFactory = threadFactory; + this.userConnectThreadFactory = threadFactory; return this; } @@ -1747,7 +1750,7 @@ public Builder connectThreadFactory(ThreadFactory threadFactory) { * @return the Builder for chaining */ public Builder callbackThreadFactory(ThreadFactory threadFactory) { - this.callbackThreadFactory = threadFactory; + this.userCallbackThreadFactory = threadFactory; return this; } @@ -2114,10 +2117,10 @@ public Builder(Options o) { this.statisticsCollector = o.statisticsCollector; this.dataPortType = o.dataPortType; this.trackAdvancedStats = o.trackAdvancedStats; - this.executor = o.configuredExecutor; - this.scheduledExecutor = o.configuredScheduledExecutor; - this.callbackThreadFactory = o.callbackThreadFactory; - this.connectThreadFactory = o.connectThreadFactory; + this.userExecutor = o.userExecutor; + this.userScheduledExecutor = o.userScheduledExecutor; + this.userCallbackThreadFactory = o.userCallbackThreadFactory; + this.userConnectThreadFactory = o.userConnectThreadFactory; this.httpRequestInterceptors = o.httpRequestInterceptors; this.proxy = o.proxy; @@ -2186,10 +2189,13 @@ private Options(Builder b) { this.statisticsCollector = b.statisticsCollector; this.dataPortType = b.dataPortType; this.trackAdvancedStats = b.trackAdvancedStats; - this.configuredExecutor = b.executor; - this.configuredScheduledExecutor = b.scheduledExecutor; - this.callbackThreadFactory = b.callbackThreadFactory; - this.connectThreadFactory = b.connectThreadFactory; + + executorsLock = new ReentrantLock(); + this.userExecutor = b.userExecutor; + this.userScheduledExecutor = b.userScheduledExecutor; + this.userCallbackThreadFactory = b.userCallbackThreadFactory; + this.userConnectThreadFactory = b.userConnectThreadFactory; + this.httpRequestInterceptors = b.httpRequestInterceptors; this.proxy = b.proxy; @@ -2215,7 +2221,7 @@ public ExecutorService getExecutor() { executorsLock.lock(); try { if (resolvedExecutor == null || resolvedExecutor.isShutdown()) { - resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor; + resolvedExecutor = userExecutor == null ? _getInternalExecutor() : userExecutor; } return resolvedExecutor; } @@ -2240,7 +2246,7 @@ public ScheduledExecutorService getScheduledExecutor() { executorsLock.lock(); try { if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) { - resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor; + resolvedScheduledExecutor = userScheduledExecutor == null ? _getInternalScheduledExecutor() : userScheduledExecutor; } return resolvedScheduledExecutor; } @@ -2268,8 +2274,8 @@ public ExecutorService getCallbackExecutor() { executorsLock.lock(); try { if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) { - resolvedCallbackExecutor = callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory); + resolvedCallbackExecutor = userCallbackThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userCallbackThreadFactory); } return resolvedCallbackExecutor; } @@ -2286,8 +2292,8 @@ public ExecutorService getConnectExecutor() { executorsLock.lock(); try { if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) { - resolvedConnectExecutor = connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory); + resolvedConnectExecutor = userConnectThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userConnectThreadFactory); } return resolvedConnectExecutor; } @@ -2301,7 +2307,7 @@ public ExecutorService getConnectExecutor() { * @return true if the executor is internal */ public boolean executorIsInternal() { - return this.configuredExecutor == null; + return this.userExecutor == null; } /** @@ -2309,7 +2315,7 @@ public boolean executorIsInternal() { * @return true if the executor is internal */ public boolean scheduledExecutorIsInternal() { - return this.configuredScheduledExecutor == null; + return this.userScheduledExecutor == null; } /** @@ -2317,7 +2323,7 @@ public boolean scheduledExecutorIsInternal() { * @return true if the executor is internal */ public boolean callbackExecutorIsInternal() { - return this.callbackThreadFactory == null; + return this.userCallbackThreadFactory == null; } /** @@ -2325,12 +2331,10 @@ public boolean callbackExecutorIsInternal() { * @return true if the executor is internal */ public boolean connectExecutorIsInternal() { - return this.connectThreadFactory == null; + return this.userConnectThreadFactory == null; } - private final ReentrantLock executorsLock = new ReentrantLock(); - - public void shutdownInternalExecutors() throws InterruptedException { + public void shutdownExecutors() throws InterruptedException { executorsLock.lock(); try { if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) { diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 04b1b360e..7464b9293 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -881,7 +881,7 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep executor = null; connectExecutor = null; scheduledExecutor = null; - options.shutdownInternalExecutors(); + options.shutdownExecutors(); statusLock.lock(); try { @@ -1846,15 +1846,15 @@ void processOK() { this.statistics.incrementOkCount(); } - void makeCallback(Runnable r) { + void makeCallback(Runnable callback) { if (callbackExecutor != null) { try { - this.callbackExecutor.execute(() -> { + callbackExecutor.execute(() -> { try { - r.run(); + callback.run(); } catch (Exception ex) { - this.statistics.incrementExceptionCount(); + statistics.incrementExceptionCount(); } }); } @@ -1910,21 +1910,9 @@ String uriDetail(NatsUri uri, NatsUri hostOrlast) { } void processConnectionEvent(Events type, String uriDetails) { - if (callbackExecutor != null) { - try { - long time = System.currentTimeMillis(); - for (ConnectionListener listener : connectionListeners) { - this.callbackExecutor.execute(() -> { - try { - listener.connectionEvent(this, type, time, uriDetails); - } catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - } catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } + long time = System.currentTimeMillis(); + for (ConnectionListener listener : connectionListeners) { + makeCallback(() -> listener.connectionEvent(this, type, time, uriDetails)); } }