Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
159 changes: 125 additions & 34 deletions src/main/java/io/nats/client/Options.java
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Supplier;

import static io.nats.client.support.Encoding.*;
Expand Down Expand Up @@ -704,10 +705,20 @@ public class Options {
private final boolean trackAdvancedStats;
private final boolean traceConnection;

private final ExecutorService executor;
private final ScheduledExecutorService scheduledExecutor;
private final ThreadFactory connectThreadFactory;
private final ThreadFactory callbackThreadFactory;
private final ReentrantLock executorsLock;

private final ExecutorService userExecutor;
private final ScheduledExecutorService userScheduledExecutor;
private final ThreadFactory userConnectThreadFactory;
private final ThreadFactory userCallbackThreadFactory;

// these are not final b/c they are lazy initialized
// and nulled during shutdownInternalExecutors
private ExecutorService resolvedExecutor;
private ScheduledExecutorService resolvedScheduledExecutor;
private ExecutorService resolvedConnectExecutor;
private ExecutorService resolvedCallbackExecutor;

private final ServerPool serverPool;
private final DispatcherFactory dispatcherFactory;

Expand Down Expand Up @@ -849,10 +860,10 @@ public static class Builder {
private ReadListener readListener = null;
private StatisticsCollector statisticsCollector = null;
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
private ExecutorService executor;
private ScheduledExecutorService scheduledExecutor;
private ThreadFactory connectThreadFactory;
private ThreadFactory callbackThreadFactory;
private ExecutorService userExecutor;
private ScheduledExecutorService userScheduledExecutor;
private ThreadFactory userConnectThreadFactory;
private ThreadFactory userCallbackThreadFactory;
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
private Proxy proxy;

Expand Down Expand Up @@ -987,10 +998,10 @@ public Builder properties(Properties props) {

classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.userExecutor = (ExecutorService) o);
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.userScheduledExecutor = (ScheduledExecutorService) o);
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.userConnectThreadFactory = (ThreadFactory) o);
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.userCallbackThreadFactory = (ThreadFactory) o);
return this;
}

Expand Down Expand Up @@ -1704,7 +1715,7 @@ public Builder statisticsCollector(StatisticsCollector collector) {
* @return the Builder for chaining
*/
public Builder executor(ExecutorService executor) {
this.executor = executor;
this.userExecutor = executor;
return this;
}

Expand All @@ -1717,7 +1728,7 @@ public Builder executor(ExecutorService executor) {
* @return the Builder for chaining
*/
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
this.scheduledExecutor = scheduledExecutor;
this.userScheduledExecutor = scheduledExecutor;
return this;
}

Expand All @@ -1728,7 +1739,7 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
* @return the Builder for chaining
*/
public Builder connectThreadFactory(ThreadFactory threadFactory) {
this.connectThreadFactory = threadFactory;
this.userConnectThreadFactory = threadFactory;
return this;
}

Expand All @@ -1739,7 +1750,7 @@ public Builder connectThreadFactory(ThreadFactory threadFactory) {
* @return the Builder for chaining
*/
public Builder callbackThreadFactory(ThreadFactory threadFactory) {
this.callbackThreadFactory = threadFactory;
this.userCallbackThreadFactory = threadFactory;
return this;
}

Expand Down Expand Up @@ -2106,10 +2117,10 @@ public Builder(Options o) {
this.statisticsCollector = o.statisticsCollector;
this.dataPortType = o.dataPortType;
this.trackAdvancedStats = o.trackAdvancedStats;
this.executor = o.executor;
this.scheduledExecutor = o.scheduledExecutor;
this.callbackThreadFactory = o.callbackThreadFactory;
this.connectThreadFactory = o.connectThreadFactory;
this.userExecutor = o.userExecutor;
this.userScheduledExecutor = o.userScheduledExecutor;
this.userCallbackThreadFactory = o.userCallbackThreadFactory;
this.userConnectThreadFactory = o.userConnectThreadFactory;
this.httpRequestInterceptors = o.httpRequestInterceptors;
this.proxy = o.proxy;

Expand Down Expand Up @@ -2178,10 +2189,13 @@ private Options(Builder b) {
this.statisticsCollector = b.statisticsCollector;
this.dataPortType = b.dataPortType;
this.trackAdvancedStats = b.trackAdvancedStats;
this.executor = b.executor;
this.scheduledExecutor = b.scheduledExecutor;
this.callbackThreadFactory = b.callbackThreadFactory;
this.connectThreadFactory = b.connectThreadFactory;

executorsLock = new ReentrantLock();
this.userExecutor = b.userExecutor;
this.userScheduledExecutor = b.userScheduledExecutor;
this.userCallbackThreadFactory = b.userCallbackThreadFactory;
this.userConnectThreadFactory = b.userConnectThreadFactory;

this.httpRequestInterceptors = b.httpRequestInterceptors;
this.proxy = b.proxy;

Expand All @@ -2204,7 +2218,16 @@ private Options(Builder b) {
* @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc
*/
public ExecutorService getExecutor() {
return this.executor == null ? _getInternalExecutor() : this.executor;
executorsLock.lock();
try {
if (resolvedExecutor == null || resolvedExecutor.isShutdown()) {
resolvedExecutor = userExecutor == null ? _getInternalExecutor() : userExecutor;
}
return resolvedExecutor;
}
finally {
executorsLock.unlock();
}
}

private ExecutorService _getInternalExecutor() {
Expand All @@ -2220,7 +2243,16 @@ private ExecutorService _getInternalExecutor() {
* @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc
*/
public ScheduledExecutorService getScheduledExecutor() {
return this.scheduledExecutor == null ? _getInternalScheduledExecutor() : this.scheduledExecutor;
executorsLock.lock();
try {
if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) {
resolvedScheduledExecutor = userScheduledExecutor == null ? _getInternalScheduledExecutor() : userScheduledExecutor;
}
return resolvedScheduledExecutor;
}
finally {
executorsLock.unlock();
}
}

private ScheduledExecutorService _getInternalScheduledExecutor() {
Expand All @@ -2239,49 +2271,108 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
* @return the executor
*/
public ExecutorService getCallbackExecutor() {
return this.callbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
executorsLock.lock();
try {
if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) {
resolvedCallbackExecutor = userCallbackThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userCallbackThreadFactory);
}
return resolvedCallbackExecutor;
}
finally {
executorsLock.unlock();
}
}

/**
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
* @return the executor
*/
public ExecutorService getConnectExecutor() {
return this.connectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
executorsLock.lock();
try {
if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) {
resolvedConnectExecutor = userConnectThreadFactory == null ?
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userConnectThreadFactory);
}
return resolvedConnectExecutor;
}
finally {
executorsLock.unlock();
}
}

/**
* whether the general executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean executorIsInternal() {
return this.executor == null;
return this.userExecutor == null;
}

/**
* whether the scheduled executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean scheduledExecutorIsInternal() {
return this.scheduledExecutor == null;
return this.userScheduledExecutor == null;
}

/**
* whether the callback executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean callbackExecutorIsInternal() {
return this.callbackThreadFactory == null;
return this.userCallbackThreadFactory == null;
}

/**
* whether the connect executor is the internal one versus a user supplied one
* @return true if the executor is internal
*/
public boolean connectExecutorIsInternal() {
return this.connectThreadFactory == null;
return this.userConnectThreadFactory == null;
}

public void shutdownExecutors() throws InterruptedException {
executorsLock.lock();
try {
if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) {
// we don't just shutdownNow to give any callbacks a chance to finish
ExecutorService es = resolvedCallbackExecutor;
resolvedCallbackExecutor = null;
es.shutdown();
try {
//noinspection ResultOfMethodCallIgnored
es.awaitTermination(getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
}
finally {
es.shutdownNow();
}
}

if (resolvedConnectExecutor != null && connectExecutorIsInternal()) {
ExecutorService es = resolvedConnectExecutor;
resolvedConnectExecutor = null;
es.shutdownNow(); // There's no need to wait...
}

if (resolvedExecutor != null && executorIsInternal()) {
ExecutorService es = resolvedExecutor;
resolvedExecutor = null;
es.shutdownNow(); // There's no need to wait...
}

if (resolvedScheduledExecutor != null && scheduledExecutorIsInternal()) {
ScheduledExecutorService ses = resolvedScheduledExecutor;
resolvedScheduledExecutor = null;
ses.shutdownNow(); // There's no need to wait...
}

}
finally {
executorsLock.unlock();
}
}

/**
Expand Down
2 changes: 1 addition & 1 deletion src/main/java/io/nats/client/impl/MessageManager.java
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ protected void trackJsMessage(Message msg) {
protected void subTrackJsMessage(Message msg) {}

protected void handleHeartbeatError() {
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
conn.notifyErrorListener((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
}

protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {
Expand Down
Loading
Loading