Skip to content

Commit f7d5f1e

Browse files
authored
Merge pull request #1492 from nats-io/lazy-executors
Lazy initialize Options executors
2 parents bae1321 + a5254c5 commit f7d5f1e

File tree

8 files changed

+185
-172
lines changed

8 files changed

+185
-172
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 125 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,7 @@
3636
import java.util.*;
3737
import java.util.concurrent.*;
3838
import java.util.concurrent.atomic.AtomicInteger;
39+
import java.util.concurrent.locks.ReentrantLock;
3940
import java.util.function.Supplier;
4041

4142
import static io.nats.client.support.Encoding.*;
@@ -704,10 +705,20 @@ public class Options {
704705
private final boolean trackAdvancedStats;
705706
private final boolean traceConnection;
706707

707-
private final ExecutorService executor;
708-
private final ScheduledExecutorService scheduledExecutor;
709-
private final ThreadFactory connectThreadFactory;
710-
private final ThreadFactory callbackThreadFactory;
708+
private final ReentrantLock executorsLock;
709+
710+
private final ExecutorService userExecutor;
711+
private final ScheduledExecutorService userScheduledExecutor;
712+
private final ThreadFactory userConnectThreadFactory;
713+
private final ThreadFactory userCallbackThreadFactory;
714+
715+
// these are not final b/c they are lazy initialized
716+
// and nulled during shutdownInternalExecutors
717+
private ExecutorService resolvedExecutor;
718+
private ScheduledExecutorService resolvedScheduledExecutor;
719+
private ExecutorService resolvedConnectExecutor;
720+
private ExecutorService resolvedCallbackExecutor;
721+
711722
private final ServerPool serverPool;
712723
private final DispatcherFactory dispatcherFactory;
713724

@@ -849,10 +860,10 @@ public static class Builder {
849860
private ReadListener readListener = null;
850861
private StatisticsCollector statisticsCollector = null;
851862
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
852-
private ExecutorService executor;
853-
private ScheduledExecutorService scheduledExecutor;
854-
private ThreadFactory connectThreadFactory;
855-
private ThreadFactory callbackThreadFactory;
863+
private ExecutorService userExecutor;
864+
private ScheduledExecutorService userScheduledExecutor;
865+
private ThreadFactory userConnectThreadFactory;
866+
private ThreadFactory userCallbackThreadFactory;
856867
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
857868
private Proxy proxy;
858869

@@ -987,10 +998,10 @@ public Builder properties(Properties props) {
987998

988999
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
9891000
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
990-
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
991-
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
992-
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
993-
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
1001+
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.userExecutor = (ExecutorService) o);
1002+
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.userScheduledExecutor = (ScheduledExecutorService) o);
1003+
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.userConnectThreadFactory = (ThreadFactory) o);
1004+
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.userCallbackThreadFactory = (ThreadFactory) o);
9941005
return this;
9951006
}
9961007

@@ -1704,7 +1715,7 @@ public Builder statisticsCollector(StatisticsCollector collector) {
17041715
* @return the Builder for chaining
17051716
*/
17061717
public Builder executor(ExecutorService executor) {
1707-
this.executor = executor;
1718+
this.userExecutor = executor;
17081719
return this;
17091720
}
17101721

@@ -1717,7 +1728,7 @@ public Builder executor(ExecutorService executor) {
17171728
* @return the Builder for chaining
17181729
*/
17191730
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
1720-
this.scheduledExecutor = scheduledExecutor;
1731+
this.userScheduledExecutor = scheduledExecutor;
17211732
return this;
17221733
}
17231734

@@ -1728,7 +1739,7 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
17281739
* @return the Builder for chaining
17291740
*/
17301741
public Builder connectThreadFactory(ThreadFactory threadFactory) {
1731-
this.connectThreadFactory = threadFactory;
1742+
this.userConnectThreadFactory = threadFactory;
17321743
return this;
17331744
}
17341745

@@ -1739,7 +1750,7 @@ public Builder connectThreadFactory(ThreadFactory threadFactory) {
17391750
* @return the Builder for chaining
17401751
*/
17411752
public Builder callbackThreadFactory(ThreadFactory threadFactory) {
1742-
this.callbackThreadFactory = threadFactory;
1753+
this.userCallbackThreadFactory = threadFactory;
17431754
return this;
17441755
}
17451756

@@ -2106,10 +2117,10 @@ public Builder(Options o) {
21062117
this.statisticsCollector = o.statisticsCollector;
21072118
this.dataPortType = o.dataPortType;
21082119
this.trackAdvancedStats = o.trackAdvancedStats;
2109-
this.executor = o.executor;
2110-
this.scheduledExecutor = o.scheduledExecutor;
2111-
this.callbackThreadFactory = o.callbackThreadFactory;
2112-
this.connectThreadFactory = o.connectThreadFactory;
2120+
this.userExecutor = o.userExecutor;
2121+
this.userScheduledExecutor = o.userScheduledExecutor;
2122+
this.userCallbackThreadFactory = o.userCallbackThreadFactory;
2123+
this.userConnectThreadFactory = o.userConnectThreadFactory;
21132124
this.httpRequestInterceptors = o.httpRequestInterceptors;
21142125
this.proxy = o.proxy;
21152126

@@ -2178,10 +2189,13 @@ private Options(Builder b) {
21782189
this.statisticsCollector = b.statisticsCollector;
21792190
this.dataPortType = b.dataPortType;
21802191
this.trackAdvancedStats = b.trackAdvancedStats;
2181-
this.executor = b.executor;
2182-
this.scheduledExecutor = b.scheduledExecutor;
2183-
this.callbackThreadFactory = b.callbackThreadFactory;
2184-
this.connectThreadFactory = b.connectThreadFactory;
2192+
2193+
executorsLock = new ReentrantLock();
2194+
this.userExecutor = b.userExecutor;
2195+
this.userScheduledExecutor = b.userScheduledExecutor;
2196+
this.userCallbackThreadFactory = b.userCallbackThreadFactory;
2197+
this.userConnectThreadFactory = b.userConnectThreadFactory;
2198+
21852199
this.httpRequestInterceptors = b.httpRequestInterceptors;
21862200
this.proxy = b.proxy;
21872201

@@ -2204,7 +2218,16 @@ private Options(Builder b) {
22042218
* @return the executor, see {@link Builder#executor(ExecutorService) executor()} in the builder doc
22052219
*/
22062220
public ExecutorService getExecutor() {
2207-
return this.executor == null ? _getInternalExecutor() : this.executor;
2221+
executorsLock.lock();
2222+
try {
2223+
if (resolvedExecutor == null || resolvedExecutor.isShutdown()) {
2224+
resolvedExecutor = userExecutor == null ? _getInternalExecutor() : userExecutor;
2225+
}
2226+
return resolvedExecutor;
2227+
}
2228+
finally {
2229+
executorsLock.unlock();
2230+
}
22082231
}
22092232

22102233
private ExecutorService _getInternalExecutor() {
@@ -2220,7 +2243,16 @@ private ExecutorService _getInternalExecutor() {
22202243
* @return the ScheduledExecutorService, see {@link Builder#scheduledExecutor(ScheduledExecutorService) scheduledExecutor()} in the builder doc
22212244
*/
22222245
public ScheduledExecutorService getScheduledExecutor() {
2223-
return this.scheduledExecutor == null ? _getInternalScheduledExecutor() : this.scheduledExecutor;
2246+
executorsLock.lock();
2247+
try {
2248+
if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) {
2249+
resolvedScheduledExecutor = userScheduledExecutor == null ? _getInternalScheduledExecutor() : userScheduledExecutor;
2250+
}
2251+
return resolvedScheduledExecutor;
2252+
}
2253+
finally {
2254+
executorsLock.unlock();
2255+
}
22242256
}
22252257

22262258
private ScheduledExecutorService _getInternalScheduledExecutor() {
@@ -2239,49 +2271,108 @@ private ScheduledExecutorService _getInternalScheduledExecutor() {
22392271
* @return the executor
22402272
*/
22412273
public ExecutorService getCallbackExecutor() {
2242-
return this.callbackThreadFactory == null ?
2243-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.callbackThreadFactory);
2274+
executorsLock.lock();
2275+
try {
2276+
if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) {
2277+
resolvedCallbackExecutor = userCallbackThreadFactory == null ?
2278+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userCallbackThreadFactory);
2279+
}
2280+
return resolvedCallbackExecutor;
2281+
}
2282+
finally {
2283+
executorsLock.unlock();
2284+
}
22442285
}
22452286

22462287
/**
22472288
* the connect executor, see {@link Builder#connectThreadFactory(ThreadFactory) connectThreadFactory()} in the builder doc
22482289
* @return the executor
22492290
*/
22502291
public ExecutorService getConnectExecutor() {
2251-
return this.connectThreadFactory == null ?
2252-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(this.connectThreadFactory);
2292+
executorsLock.lock();
2293+
try {
2294+
if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) {
2295+
resolvedConnectExecutor = userConnectThreadFactory == null ?
2296+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userConnectThreadFactory);
2297+
}
2298+
return resolvedConnectExecutor;
2299+
}
2300+
finally {
2301+
executorsLock.unlock();
2302+
}
22532303
}
22542304

22552305
/**
22562306
* whether the general executor is the internal one versus a user supplied one
22572307
* @return true if the executor is internal
22582308
*/
22592309
public boolean executorIsInternal() {
2260-
return this.executor == null;
2310+
return this.userExecutor == null;
22612311
}
22622312

22632313
/**
22642314
* whether the scheduled executor is the internal one versus a user supplied one
22652315
* @return true if the executor is internal
22662316
*/
22672317
public boolean scheduledExecutorIsInternal() {
2268-
return this.scheduledExecutor == null;
2318+
return this.userScheduledExecutor == null;
22692319
}
22702320

22712321
/**
22722322
* whether the callback executor is the internal one versus a user supplied one
22732323
* @return true if the executor is internal
22742324
*/
22752325
public boolean callbackExecutorIsInternal() {
2276-
return this.callbackThreadFactory == null;
2326+
return this.userCallbackThreadFactory == null;
22772327
}
22782328

22792329
/**
22802330
* whether the connect executor is the internal one versus a user supplied one
22812331
* @return true if the executor is internal
22822332
*/
22832333
public boolean connectExecutorIsInternal() {
2284-
return this.connectThreadFactory == null;
2334+
return this.userConnectThreadFactory == null;
2335+
}
2336+
2337+
public void shutdownExecutors() throws InterruptedException {
2338+
executorsLock.lock();
2339+
try {
2340+
if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) {
2341+
// we don't just shutdownNow to give any callbacks a chance to finish
2342+
ExecutorService es = resolvedCallbackExecutor;
2343+
resolvedCallbackExecutor = null;
2344+
es.shutdown();
2345+
try {
2346+
//noinspection ResultOfMethodCallIgnored
2347+
es.awaitTermination(getConnectionTimeout().toNanos(), TimeUnit.NANOSECONDS);
2348+
}
2349+
finally {
2350+
es.shutdownNow();
2351+
}
2352+
}
2353+
2354+
if (resolvedConnectExecutor != null && connectExecutorIsInternal()) {
2355+
ExecutorService es = resolvedConnectExecutor;
2356+
resolvedConnectExecutor = null;
2357+
es.shutdownNow(); // There's no need to wait...
2358+
}
2359+
2360+
if (resolvedExecutor != null && executorIsInternal()) {
2361+
ExecutorService es = resolvedExecutor;
2362+
resolvedExecutor = null;
2363+
es.shutdownNow(); // There's no need to wait...
2364+
}
2365+
2366+
if (resolvedScheduledExecutor != null && scheduledExecutorIsInternal()) {
2367+
ScheduledExecutorService ses = resolvedScheduledExecutor;
2368+
resolvedScheduledExecutor = null;
2369+
ses.shutdownNow(); // There's no need to wait...
2370+
}
2371+
2372+
}
2373+
finally {
2374+
executorsLock.unlock();
2375+
}
22852376
}
22862377

22872378
/**

src/main/java/io/nats/client/impl/MessageManager.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ protected void trackJsMessage(Message msg) {
107107
protected void subTrackJsMessage(Message msg) {}
108108

109109
protected void handleHeartbeatError() {
110-
conn.executeCallback((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
110+
conn.notifyErrorListener((c, el) -> el.heartbeatAlarm(c, sub, lastStreamSeq, lastConsumerSeq));
111111
}
112112

113113
protected void configureIdleHeartbeat(Duration configIdleHeartbeat, long configMessageAlarmTime) {

0 commit comments

Comments
 (0)