diff --git a/src/main/java/io/nats/client/Options.java b/src/main/java/io/nats/client/Options.java index c5151dafb..abe09fcde 100644 --- a/src/main/java/io/nats/client/Options.java +++ b/src/main/java/io/nats/client/Options.java @@ -36,6 +36,7 @@ import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.locks.ReentrantLock; import java.util.function.Supplier; import static io.nats.client.support.Encoding.*; @@ -704,10 +705,20 @@ public class Options { private final boolean trackAdvancedStats; private final boolean traceConnection; - private final ExecutorService executor; - private final ScheduledExecutorService scheduledExecutor; - private final ThreadFactory connectThreadFactory; - private final ThreadFactory callbackThreadFactory; + private final ReentrantLock executorsLock; + + private final ExecutorService userExecutor; + private final ScheduledExecutorService userScheduledExecutor; + private final ThreadFactory userConnectThreadFactory; + private final ThreadFactory userCallbackThreadFactory; + + // these are not final b/c they are lazy initialized + // and nulled during shutdownInternalExecutors + private ExecutorService resolvedExecutor; + private ScheduledExecutorService resolvedScheduledExecutor; + private ExecutorService resolvedConnectExecutor; + private ExecutorService resolvedCallbackExecutor; + private final ServerPool serverPool; private final DispatcherFactory dispatcherFactory; @@ -849,10 +860,10 @@ public static class Builder { private ReadListener readListener = null; private StatisticsCollector statisticsCollector = null; private String dataPortType = DEFAULT_DATA_PORT_TYPE; - private ExecutorService executor; - private ScheduledExecutorService scheduledExecutor; - private ThreadFactory connectThreadFactory; - private ThreadFactory callbackThreadFactory; + private ExecutorService userExecutor; + private ScheduledExecutorService userScheduledExecutor; + private ThreadFactory userConnectThreadFactory; + private ThreadFactory userCallbackThreadFactory; private List> httpRequestInterceptors; private Proxy proxy; @@ -987,10 +998,10 @@ public Builder properties(Properties props) { classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o); classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o); - classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o); - classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o); - classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o); - classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o); + classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.userExecutor = (ExecutorService) o); + classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.userScheduledExecutor = (ScheduledExecutorService) o); + classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.userConnectThreadFactory = (ThreadFactory) o); + classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.userCallbackThreadFactory = (ThreadFactory) o); return this; } @@ -1704,7 +1715,7 @@ public Builder statisticsCollector(StatisticsCollector collector) { * @return the Builder for chaining */ public Builder executor(ExecutorService executor) { - this.executor = executor; + this.userExecutor = executor; return this; } @@ -1717,7 +1728,7 @@ public Builder executor(ExecutorService executor) { * @return the Builder for chaining */ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { - this.scheduledExecutor = scheduledExecutor; + this.userScheduledExecutor = scheduledExecutor; return this; } @@ -1728,7 +1739,7 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) { * @return the Builder for chaining */ public Builder connectThreadFactory(ThreadFactory threadFactory) { - this.connectThreadFactory = threadFactory; + this.userConnectThreadFactory = threadFactory; return this; } @@ -1739,7 +1750,7 @@ public Builder connectThreadFactory(ThreadFactory threadFactory) { * @return the Builder for chaining */ public Builder callbackThreadFactory(ThreadFactory threadFactory) { - this.callbackThreadFactory = threadFactory; + this.userCallbackThreadFactory = threadFactory; return this; } @@ -2106,10 +2117,10 @@ public Builder(Options o) { this.statisticsCollector = o.statisticsCollector; this.dataPortType = o.dataPortType; this.trackAdvancedStats = o.trackAdvancedStats; - this.executor = o.executor; - this.scheduledExecutor = o.scheduledExecutor; - this.callbackThreadFactory = o.callbackThreadFactory; - this.connectThreadFactory = o.connectThreadFactory; + this.userExecutor = o.userExecutor; + this.userScheduledExecutor = o.userScheduledExecutor; + this.userCallbackThreadFactory = o.userCallbackThreadFactory; + this.userConnectThreadFactory = o.userConnectThreadFactory; this.httpRequestInterceptors = o.httpRequestInterceptors; this.proxy = o.proxy; @@ -2178,10 +2189,13 @@ private Options(Builder b) { this.statisticsCollector = b.statisticsCollector; this.dataPortType = b.dataPortType; this.trackAdvancedStats = b.trackAdvancedStats; - this.executor = b.executor; - this.scheduledExecutor = b.scheduledExecutor; - this.callbackThreadFactory = b.callbackThreadFactory; - this.connectThreadFactory = b.connectThreadFactory; + + executorsLock = new ReentrantLock(); + this.userExecutor = b.userExecutor; + this.userScheduledExecutor = b.userScheduledExecutor; + this.userCallbackThreadFactory = b.userCallbackThreadFactory; + this.userConnectThreadFactory = b.userConnectThreadFactory; + this.httpRequestInterceptors = b.httpRequestInterceptors; this.proxy = b.proxy; @@ -2204,7 +2218,16 @@ private Options(Builder b) { * @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc */ public ExecutorService getExecutor() { - return this.executor == null ? _getInternalExecutor() : this.executor; + executorsLock.lock(); + try { + if (resolvedExecutor == null || resolvedExecutor.isShutdown()) { + resolvedExecutor = userExecutor == null ? _getInternalExecutor() : userExecutor; + } + return resolvedExecutor; + } + finally { + executorsLock.unlock(); + } } private ExecutorService _getInternalExecutor() { @@ -2220,7 +2243,16 @@ private ExecutorService _getInternalExecutor() { * @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc */ public ScheduledExecutorService getScheduledExecutor() { - return this.scheduledExecutor == null ? _getInternalScheduledExecutor() : this.scheduledExecutor; + executorsLock.lock(); + try { + if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) { + resolvedScheduledExecutor = userScheduledExecutor == null ? _getInternalScheduledExecutor() : userScheduledExecutor; + } + return resolvedScheduledExecutor; + } + finally { + executorsLock.unlock(); + } } private ScheduledExecutorService _getInternalScheduledExecutor() { @@ -2239,8 +2271,17 @@ private ScheduledExecutorService _getInternalScheduledExecutor() { * @return the executor */ public ExecutorService getCallbackExecutor() { - return this.callbackThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory); + executorsLock.lock(); + try { + if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) { + resolvedCallbackExecutor = userCallbackThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userCallbackThreadFactory); + } + return resolvedCallbackExecutor; + } + finally { + executorsLock.unlock(); + } } /** @@ -2248,8 +2289,17 @@ public ExecutorService getCallbackExecutor() { * @return the executor */ public ExecutorService getConnectExecutor() { - return this.connectThreadFactory == null ? - DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory); + executorsLock.lock(); + try { + if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) { + resolvedConnectExecutor = userConnectThreadFactory == null ? + DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userConnectThreadFactory); + } + return resolvedConnectExecutor; + } + finally { + executorsLock.unlock(); + } } /** @@ -2257,7 +2307,7 @@ public ExecutorService getConnectExecutor() { * @return true if the executor is internal */ public boolean executorIsInternal() { - return this.executor == null; + return this.userExecutor == null; } /** @@ -2265,7 +2315,7 @@ public boolean executorIsInternal() { * @return true if the executor is internal */ public boolean scheduledExecutorIsInternal() { - return this.scheduledExecutor == null; + return this.userScheduledExecutor == null; } /** @@ -2273,7 +2323,7 @@ public boolean scheduledExecutorIsInternal() { * @return true if the executor is internal */ public boolean callbackExecutorIsInternal() { - return this.callbackThreadFactory == null; + return this.userCallbackThreadFactory == null; } /** @@ -2281,7 +2331,48 @@ public boolean callbackExecutorIsInternal() { * @return true if the executor is internal */ public boolean connectExecutorIsInternal() { - return this.connectThreadFactory == null; + return this.userConnectThreadFactory == null; + } + + public void shutdownExecutors() throws InterruptedException { + executorsLock.lock(); + try { + if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) { + // we don't just shutdownNow to give any callbacks a chance to finish + ExecutorService es = resolvedCallbackExecutor; + resolvedCallbackExecutor = null; + es.shutdown(); + try { + //noinspection ResultOfMethodCallIgnored + es.awaitTermination(getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); + } + finally { + es.shutdownNow(); + } + } + + if (resolvedConnectExecutor != null && connectExecutorIsInternal()) { + ExecutorService es = resolvedConnectExecutor; + resolvedConnectExecutor = null; + es.shutdownNow(); // There's no need to wait... + } + + if (resolvedExecutor != null && executorIsInternal()) { + ExecutorService es = resolvedExecutor; + resolvedExecutor = null; + es.shutdownNow(); // There's no need to wait... + } + + if (resolvedScheduledExecutor != null && scheduledExecutorIsInternal()) { + ScheduledExecutorService ses = resolvedScheduledExecutor; + resolvedScheduledExecutor = null; + ses.shutdownNow(); // There's no need to wait... + } + + } + finally { + executorsLock.unlock(); + } } /** diff --git a/src/main/java/io/nats/client/impl/MessageManager.java b/src/main/java/io/nats/client/impl/MessageManager.java index 79ef4baee..eac6c58ba 100644 --- a/src/main/java/io/nats/client/impl/MessageManager.java +++ b/src/main/java/io/nats/client/impl/MessageManager.java @@ -107,7 +107,7 @@ protected void trackJsMessage(Message msg) { protected void subTrackJsMessage(Message msg) {} protected void handleHeartbeatError() { - conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq)); + conn.notifyErrorListener((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq)); } protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) { diff --git a/src/main/java/io/nats/client/impl/NatsConnection.java b/src/main/java/io/nats/client/impl/NatsConnection.java index 8dbe3bbd6..7464b9293 100644 --- a/src/main/java/io/nats/client/impl/NatsConnection.java +++ b/src/main/java/io/nats/client/impl/NatsConnection.java @@ -96,10 +96,12 @@ class NatsConnection implements Connection { private final AtomicBoolean blockPublishForDrain; private final AtomicBoolean tryingToConnect; - private final ExecutorService callbackRunner; - private final ExecutorService executor; - private final ExecutorService connectExecutor; - private final ScheduledExecutorService scheduledExecutor; + // these are not final so they can be nullified on close + private ExecutorService callbackExecutor; + private ExecutorService executor; + private ExecutorService connectExecutor; + private ScheduledExecutorService scheduledExecutor; + private final boolean advancedTracking; private final ServerPool serverPool; @@ -158,7 +160,7 @@ class NatsConnection implements Connection { timeTraceLogger.trace("creating executors"); this.executor = options.getExecutor(); - this.callbackRunner = options.getCallbackExecutor(); + this.callbackExecutor = options.getCallbackExecutor(); this.connectExecutor = options.getConnectExecutor(); this.scheduledExecutor = options.getScheduledExecutor(); @@ -572,7 +574,7 @@ void tryToConnect(NatsUri cur, NatsUri resolved, long now) { }; timeoutNanos = timeCheck(end, "reading info, version and upgrading to secure if necessary"); - Future future = this.connectExecutor.submit(connectTask); + Future future = connectExecutor.submit(connectTask); try { future.get(timeoutNanos, TimeUnit.NANOSECONDS); } @@ -875,37 +877,11 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep statusLock.unlock(); } - if (options.callbackExecutorIsInternal()) { - // Stop the error handling and connect executors - callbackRunner.shutdown(); - try { - // At this point in the flow, the connection is shutting down. - // There is really no use in giving this information to the developer, - // It's fair to say that an exception here anyway will practically never happen - // and if it did, the app is probably already frozen. - //noinspection ResultOfMethodCallIgnored - callbackRunner.awaitTermination(this.options.getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS); - } - finally { - callbackRunner.shutdownNow(); - } - } - - if (options.connectExecutorIsInternal()) { - // There's no need to wait for running tasks since we're told to close - connectExecutor.shutdownNow(); - } - - // The callbackRunner and connectExecutor always come from a factory, - // so we always shut them down. - // The executor and scheduledExecutor come from a factory only if - // the user does NOT supply them, so we shut them down in that case. - if (options.executorIsInternal()) { - executor.shutdownNow(); - } - if (options.scheduledExecutorIsInternal()) { - scheduledExecutor.shutdownNow(); - } + callbackExecutor = null; + executor = null; + connectExecutor = null; + scheduledExecutor = null; + options.shutdownExecutors(); statusLock.lock(); try { @@ -917,21 +893,11 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep } } - boolean callbackRunnerIsShutdown() { - return callbackRunner == null || callbackRunner.isShutdown(); - } - - boolean executorIsShutdown() { - return executor == null || executor.isShutdown(); - } - - boolean connectExecutorIsShutdown() { - return connectExecutor == null || connectExecutor.isShutdown(); - } - - boolean scheduledExecutorIsShutdown() { - return scheduledExecutor == null || scheduledExecutor.isShutdown(); - } + // these four *ExecutorIsClosed() are only used for tests + boolean callbackExecutorIsClosed() { return callbackExecutor == null; } + boolean executorIsClosed() { return executor == null; } + boolean connectExecutorIsClosed() { return connectExecutor == null; } + boolean scheduledExecutorIsClosed() { return scheduledExecutor == null; } // Should only be called from closeSocket or close void closeSocketImpl(boolean forceClose) { @@ -1880,42 +1846,31 @@ void processOK() { this.statistics.incrementOkCount(); } - void processSlowConsumer(Consumer consumer) { - if (!this.callbackRunner.isShutdown()) { + void makeCallback(Runnable callback) { + if (callbackExecutor != null) { try { - this.callbackRunner.execute(() -> { + callbackExecutor.execute(() -> { try { - options.getErrorListener().slowConsumerDetected(this, consumer); + callback.run(); } catch (Exception ex) { - this.statistics.incrementExceptionCount(); + statistics.incrementExceptionCount(); } }); } catch (RejectedExecutionException re) { - // Timing with shutdown, let it go + // Timing with shutdown probably, let it go } } } + void processSlowConsumer(Consumer consumer) { + makeCallback(() -> options.getErrorListener().slowConsumerDetected(this, consumer)); + } + void processException(Exception exp) { this.statistics.incrementExceptionCount(); - - if (!this.callbackRunner.isShutdown()) { - try { - this.callbackRunner.execute(() -> { - try { - options.getErrorListener().exceptionOccurred(this, exp); - } - catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + makeCallback(() -> options.getErrorListener().exceptionOccurred(this, exp)); } void processError(String errorText) { @@ -1929,36 +1884,15 @@ void processError(String errorText) { this.serverAuthErrors.put(currentServer, errorText); } - if (!this.callbackRunner.isShutdown()) { - try { - this.callbackRunner.execute(() -> { - try { - options.getErrorListener().errorOccurred(this, errorText); - } - catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + makeCallback(() -> options.getErrorListener().errorOccurred(this, errorText)); } interface ErrorListenerCaller { void call(Connection conn, ErrorListener el); } - void executeCallback(ErrorListenerCaller elc) { - if (!this.callbackRunner.isShutdown()) { - try { - this.callbackRunner.execute(() -> elc.call(this, options.getErrorListener())); - } - catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } - } + void notifyErrorListener(ErrorListenerCaller elc) { + makeCallback(() -> elc.call(this, options.getErrorListener())); } String uriDetail(NatsUri uri) { @@ -1976,21 +1910,9 @@ String uriDetail(NatsUri uri, NatsUri hostOrlast) { } void processConnectionEvent(Events type, String uriDetails) { - if (!this.callbackRunner.isShutdown()) { - try { - long time = System.currentTimeMillis(); - for (ConnectionListener listener : connectionListeners) { - this.callbackRunner.execute(() -> { - try { - listener.connectionEvent(this, type, time, uriDetails); - } catch (Exception ex) { - this.statistics.incrementExceptionCount(); - } - }); - } - } catch (RejectedExecutionException re) { - // Timing with shutdown, let it go - } + long time = System.currentTimeMillis(); + for (ConnectionListener listener : connectionListeners) { + makeCallback(() -> listener.connectionEvent(this, type, time, uriDetails)); } } diff --git a/src/main/java/io/nats/client/impl/PullMessageManager.java b/src/main/java/io/nats/client/impl/PullMessageManager.java index c1538803e..97d47dbe3 100644 --- a/src/main/java/io/nats/client/impl/PullMessageManager.java +++ b/src/main/java/io/nats/client/impl/PullMessageManager.java @@ -164,7 +164,7 @@ protected ManageResult manageStatus(Message msg) { case REQUEST_TIMEOUT_CODE: case NO_RESPONDERS_CODE: if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_TERMINUS; @@ -174,7 +174,7 @@ protected ManageResult manageStatus(Message msg) { if (statMsg.startsWith(EXCEEDED_MAX_PREFIX) || statMsg.equals(SERVER_SHUTDOWN)) { if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_HANDLED; } @@ -184,7 +184,7 @@ protected ManageResult manageStatus(Message msg) { || statMsg.equals(MESSAGE_SIZE_EXCEEDS_MAX_BYTES)) { if (raiseStatusWarnings) { - conn.executeCallback((c, el) -> el.pullStatusWarning(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusWarning(c, sub, status)); } return STATUS_TERMINUS; } @@ -197,7 +197,7 @@ protected ManageResult manageStatus(Message msg) { // All unknown 409s are errors, since that basically means the client is not aware of them. // These known ones are also errors: "Consumer Deleted" and "Consumer is push based" - conn.executeCallback((c, el) -> el.pullStatusError(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.pullStatusError(c, sub, status)); return STATUS_ERROR; } diff --git a/src/main/java/io/nats/client/impl/PushMessageManager.java b/src/main/java/io/nats/client/impl/PushMessageManager.java index df281ea1b..5e4ac45b8 100644 --- a/src/main/java/io/nats/client/impl/PushMessageManager.java +++ b/src/main/java/io/nats/client/impl/PushMessageManager.java @@ -116,7 +116,7 @@ protected ManageResult manageStatus(Message msg) { } } - conn.executeCallback((c, el) -> el.unhandledStatus(c, sub, status)); + conn.notifyErrorListener((c, el) -> el.unhandledStatus(c, sub, status)); return STATUS_ERROR; } @@ -126,7 +126,7 @@ private void processFlowControl(String fcSubject, FlowControlSource source) { if (fcSubject != null && !fcSubject.equals(lastFcSubject)) { conn.publishInternal(fcSubject, null, null, null, false, false); lastFcSubject = fcSubject; // set after publish in case the pub fails - conn.executeCallback((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); + conn.notifyErrorListener((c, el) -> el.flowControlProcessed(c, sub, fcSubject, source)); } } } diff --git a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java index a62b186ff..f4eabf8c8 100644 --- a/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java +++ b/src/main/java/io/nats/client/impl/SocketDataPortWithWriteTimeout.java @@ -56,7 +56,7 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti // if now is after when it was supposed to be done by if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) { writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed - connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); + connection.notifyErrorListener((c, el) -> el.socketWriteTimeout(c)); try { connection.forceReconnect(ForceReconnectOptions.FORCE_CLOSE_INSTANCE); } diff --git a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java index e12af837f..8bd9259ea 100644 --- a/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java +++ b/src/test/java/io/nats/client/impl/NatsConnectionImplTests.java @@ -67,41 +67,41 @@ private static void verifyInternalExecutors(Options options, NatsConnection nc) assertTrue(options.executorIsInternal()); assertTrue(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerIsShutdown()); - assertFalse(nc.connectExecutorIsShutdown()); + assertFalse(nc.callbackExecutorIsClosed()); + assertFalse(nc.connectExecutorIsClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertFalse(nc.executorIsClosed()); + assertFalse(nc.scheduledExecutorIsClosed()); nc.subscribe("*"); Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerIsShutdown()); - assertTrue(nc.connectExecutorIsShutdown()); + assertTrue(nc.callbackExecutorIsClosed()); + assertTrue(nc.connectExecutorIsClosed()); - assertTrue(nc.executorIsShutdown()); - assertTrue(nc.scheduledExecutorIsShutdown()); + assertTrue(nc.executorIsClosed()); + assertTrue(nc.scheduledExecutorIsClosed()); } private static void verifyExternalExecutors(Options options, NatsConnection nc) throws InterruptedException { assertFalse(options.executorIsInternal()); assertFalse(options.scheduledExecutorIsInternal()); - assertFalse(nc.callbackRunnerIsShutdown()); - assertFalse(nc.connectExecutorIsShutdown()); + assertFalse(nc.callbackExecutorIsClosed()); + assertFalse(nc.connectExecutorIsClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertFalse(nc.executorIsClosed()); + assertFalse(nc.scheduledExecutorIsClosed()); nc.subscribe("*"); Thread.sleep(1000); nc.close(); - assertTrue(nc.callbackRunnerIsShutdown()); - assertTrue(nc.connectExecutorIsShutdown()); + assertTrue(nc.callbackExecutorIsClosed()); + assertTrue(nc.connectExecutorIsClosed()); - assertFalse(nc.executorIsShutdown()); - assertFalse(nc.scheduledExecutorIsShutdown()); + assertTrue(nc.executorIsClosed()); + assertTrue(nc.scheduledExecutorIsClosed()); } } diff --git a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java index dd921d6f2..b84c285ef 100644 --- a/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java +++ b/src/test/java/io/nats/client/impl/SocketDataPortBlockSimulator.java @@ -57,7 +57,7 @@ public void connect(@NonNull NatsConnection conn, @NonNull NatsUri nuri, long ti // if now is after when it was supposed to be done by if (NatsSystemClock.nanoTime() > writeMustBeDoneBy.get()) { writeWatchTask.shutdown(); // we don't need to repeat this, the connection is going to be closed - connection.executeCallback((c, el) -> el.socketWriteTimeout(c)); + connection.notifyErrorListener((c, el) -> el.socketWriteTimeout(c)); blocking.set(0); SIMULATE_SOCKET_BLOCK.set(0); try {