Skip to content

Commit a5254c5

Browse files
committed
self-review
1 parent 9a1732a commit a5254c5

File tree

2 files changed

+49
-57
lines changed

2 files changed

+49
-57
lines changed

src/main/java/io/nats/client/Options.java

Lines changed: 41 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -705,12 +705,15 @@ public class Options {
705705
private final boolean trackAdvancedStats;
706706
private final boolean traceConnection;
707707

708-
private final ExecutorService configuredExecutor;
709-
private final ScheduledExecutorService configuredScheduledExecutor;
710-
private final ThreadFactory connectThreadFactory;
711-
private final ThreadFactory callbackThreadFactory;
708+
private final ReentrantLock executorsLock;
709+
710+
private final ExecutorService userExecutor;
711+
private final ScheduledExecutorService userScheduledExecutor;
712+
private final ThreadFactory userConnectThreadFactory;
713+
private final ThreadFactory userCallbackThreadFactory;
712714

713715
// these are not final b/c they are lazy initialized
716+
// and nulled during shutdownInternalExecutors
714717
private ExecutorService resolvedExecutor;
715718
private ScheduledExecutorService resolvedScheduledExecutor;
716719
private ExecutorService resolvedConnectExecutor;
@@ -857,10 +860,10 @@ public static class Builder {
857860
private ReadListener readListener = null;
858861
private StatisticsCollector statisticsCollector = null;
859862
private String dataPortType = DEFAULT_DATA_PORT_TYPE;
860-
private ExecutorService executor;
861-
private ScheduledExecutorService scheduledExecutor;
862-
private ThreadFactory connectThreadFactory;
863-
private ThreadFactory callbackThreadFactory;
863+
private ExecutorService userExecutor;
864+
private ScheduledExecutorService userScheduledExecutor;
865+
private ThreadFactory userConnectThreadFactory;
866+
private ThreadFactory userCallbackThreadFactory;
864867
private List<java.util.function.Consumer<HttpRequest>> httpRequestInterceptors;
865868
private Proxy proxy;
866869

@@ -995,10 +998,10 @@ public Builder properties(Properties props) {
995998

996999
classnameProperty(props, PROP_SERVERS_POOL_IMPLEMENTATION_CLASS, o -> this.serverPool = (ServerPool) o);
9971000
classnameProperty(props, PROP_DISPATCHER_FACTORY_CLASS, o -> this.dispatcherFactory = (DispatcherFactory) o);
998-
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.executor = (ExecutorService) o);
999-
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.scheduledExecutor = (ScheduledExecutorService) o);
1000-
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.connectThreadFactory = (ThreadFactory) o);
1001-
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.callbackThreadFactory = (ThreadFactory) o);
1001+
classnameProperty(props, PROP_EXECUTOR_SERVICE_CLASS, o -> this.userExecutor = (ExecutorService) o);
1002+
classnameProperty(props, PROP_SCHEDULED_EXECUTOR_SERVICE_CLASS, o -> this.userScheduledExecutor = (ScheduledExecutorService) o);
1003+
classnameProperty(props, PROP_CONNECT_THREAD_FACTORY_CLASS, o -> this.userConnectThreadFactory = (ThreadFactory) o);
1004+
classnameProperty(props, PROP_CALLBACK_THREAD_FACTORY_CLASS, o -> this.userCallbackThreadFactory = (ThreadFactory) o);
10021005
return this;
10031006
}
10041007

@@ -1712,7 +1715,7 @@ public Builder statisticsCollector(StatisticsCollector collector) {
17121715
* @return the Builder for chaining
17131716
*/
17141717
public Builder executor(ExecutorService executor) {
1715-
this.executor = executor;
1718+
this.userExecutor = executor;
17161719
return this;
17171720
}
17181721

@@ -1725,7 +1728,7 @@ public Builder executor(ExecutorService executor) {
17251728
* @return the Builder for chaining
17261729
*/
17271730
public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
1728-
this.scheduledExecutor = scheduledExecutor;
1731+
this.userScheduledExecutor = scheduledExecutor;
17291732
return this;
17301733
}
17311734

@@ -1736,7 +1739,7 @@ public Builder scheduledExecutor(ScheduledExecutorService scheduledExecutor) {
17361739
* @return the Builder for chaining
17371740
*/
17381741
public Builder connectThreadFactory(ThreadFactory threadFactory) {
1739-
this.connectThreadFactory = threadFactory;
1742+
this.userConnectThreadFactory = threadFactory;
17401743
return this;
17411744
}
17421745

@@ -1747,7 +1750,7 @@ public Builder connectThreadFactory(ThreadFactory threadFactory) {
17471750
* @return the Builder for chaining
17481751
*/
17491752
public Builder callbackThreadFactory(ThreadFactory threadFactory) {
1750-
this.callbackThreadFactory = threadFactory;
1753+
this.userCallbackThreadFactory = threadFactory;
17511754
return this;
17521755
}
17531756

@@ -2114,10 +2117,10 @@ public Builder(Options o) {
21142117
this.statisticsCollector = o.statisticsCollector;
21152118
this.dataPortType = o.dataPortType;
21162119
this.trackAdvancedStats = o.trackAdvancedStats;
2117-
this.executor = o.configuredExecutor;
2118-
this.scheduledExecutor = o.configuredScheduledExecutor;
2119-
this.callbackThreadFactory = o.callbackThreadFactory;
2120-
this.connectThreadFactory = o.connectThreadFactory;
2120+
this.userExecutor = o.userExecutor;
2121+
this.userScheduledExecutor = o.userScheduledExecutor;
2122+
this.userCallbackThreadFactory = o.userCallbackThreadFactory;
2123+
this.userConnectThreadFactory = o.userConnectThreadFactory;
21212124
this.httpRequestInterceptors = o.httpRequestInterceptors;
21222125
this.proxy = o.proxy;
21232126

@@ -2186,10 +2189,13 @@ private Options(Builder b) {
21862189
this.statisticsCollector = b.statisticsCollector;
21872190
this.dataPortType = b.dataPortType;
21882191
this.trackAdvancedStats = b.trackAdvancedStats;
2189-
this.configuredExecutor = b.executor;
2190-
this.configuredScheduledExecutor = b.scheduledExecutor;
2191-
this.callbackThreadFactory = b.callbackThreadFactory;
2192-
this.connectThreadFactory = b.connectThreadFactory;
2192+
2193+
executorsLock = new ReentrantLock();
2194+
this.userExecutor = b.userExecutor;
2195+
this.userScheduledExecutor = b.userScheduledExecutor;
2196+
this.userCallbackThreadFactory = b.userCallbackThreadFactory;
2197+
this.userConnectThreadFactory = b.userConnectThreadFactory;
2198+
21932199
this.httpRequestInterceptors = b.httpRequestInterceptors;
21942200
this.proxy = b.proxy;
21952201

@@ -2215,7 +2221,7 @@ public ExecutorService getExecutor() {
22152221
executorsLock.lock();
22162222
try {
22172223
if (resolvedExecutor == null || resolvedExecutor.isShutdown()) {
2218-
resolvedExecutor = configuredExecutor == null ? _getInternalExecutor() : configuredExecutor;
2224+
resolvedExecutor = userExecutor == null ? _getInternalExecutor() : userExecutor;
22192225
}
22202226
return resolvedExecutor;
22212227
}
@@ -2240,7 +2246,7 @@ public ScheduledExecutorService getScheduledExecutor() {
22402246
executorsLock.lock();
22412247
try {
22422248
if (resolvedScheduledExecutor == null || resolvedScheduledExecutor.isShutdown()) {
2243-
resolvedScheduledExecutor = configuredScheduledExecutor == null ? _getInternalScheduledExecutor() : configuredScheduledExecutor;
2249+
resolvedScheduledExecutor = userScheduledExecutor == null ? _getInternalScheduledExecutor() : userScheduledExecutor;
22442250
}
22452251
return resolvedScheduledExecutor;
22462252
}
@@ -2268,8 +2274,8 @@ public ExecutorService getCallbackExecutor() {
22682274
executorsLock.lock();
22692275
try {
22702276
if (resolvedCallbackExecutor == null || resolvedCallbackExecutor.isShutdown()) {
2271-
resolvedCallbackExecutor = callbackThreadFactory == null ?
2272-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(callbackThreadFactory);
2277+
resolvedCallbackExecutor = userCallbackThreadFactory == null ?
2278+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userCallbackThreadFactory);
22732279
}
22742280
return resolvedCallbackExecutor;
22752281
}
@@ -2286,8 +2292,8 @@ public ExecutorService getConnectExecutor() {
22862292
executorsLock.lock();
22872293
try {
22882294
if (resolvedConnectExecutor == null || resolvedConnectExecutor.isShutdown()) {
2289-
resolvedConnectExecutor = connectThreadFactory == null ?
2290-
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(connectThreadFactory);
2295+
resolvedConnectExecutor = userConnectThreadFactory == null ?
2296+
DEFAULT_SINGLE_THREAD_EXECUTOR.get() : Executors.newSingleThreadExecutor(userConnectThreadFactory);
22912297
}
22922298
return resolvedConnectExecutor;
22932299
}
@@ -2301,36 +2307,34 @@ public ExecutorService getConnectExecutor() {
23012307
* @return true if the executor is internal
23022308
*/
23032309
public boolean executorIsInternal() {
2304-
return this.configuredExecutor == null;
2310+
return this.userExecutor == null;
23052311
}
23062312

23072313
/**
23082314
* whether the scheduled executor is the internal one versus a user supplied one
23092315
* @return true if the executor is internal
23102316
*/
23112317
public boolean scheduledExecutorIsInternal() {
2312-
return this.configuredScheduledExecutor == null;
2318+
return this.userScheduledExecutor == null;
23132319
}
23142320

23152321
/**
23162322
* whether the callback executor is the internal one versus a user supplied one
23172323
* @return true if the executor is internal
23182324
*/
23192325
public boolean callbackExecutorIsInternal() {
2320-
return this.callbackThreadFactory == null;
2326+
return this.userCallbackThreadFactory == null;
23212327
}
23222328

23232329
/**
23242330
* whether the connect executor is the internal one versus a user supplied one
23252331
* @return true if the executor is internal
23262332
*/
23272333
public boolean connectExecutorIsInternal() {
2328-
return this.connectThreadFactory == null;
2334+
return this.userConnectThreadFactory == null;
23292335
}
23302336

2331-
private final ReentrantLock executorsLock = new ReentrantLock();
2332-
2333-
public void shutdownInternalExecutors() throws InterruptedException {
2337+
public void shutdownExecutors() throws InterruptedException {
23342338
executorsLock.lock();
23352339
try {
23362340
if (resolvedCallbackExecutor != null && callbackExecutorIsInternal()) {

src/main/java/io/nats/client/impl/NatsConnection.java

Lines changed: 8 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -881,7 +881,7 @@ void close(boolean checkDrainStatus, boolean forceClose) throws InterruptedExcep
881881
executor = null;
882882
connectExecutor = null;
883883
scheduledExecutor = null;
884-
options.shutdownInternalExecutors();
884+
options.shutdownExecutors();
885885

886886
statusLock.lock();
887887
try {
@@ -1846,15 +1846,15 @@ void processOK() {
18461846
this.statistics.incrementOkCount();
18471847
}
18481848

1849-
void makeCallback(Runnable r) {
1849+
void makeCallback(Runnable callback) {
18501850
if (callbackExecutor != null) {
18511851
try {
1852-
this.callbackExecutor.execute(() -> {
1852+
callbackExecutor.execute(() -> {
18531853
try {
1854-
r.run();
1854+
callback.run();
18551855
}
18561856
catch (Exception ex) {
1857-
this.statistics.incrementExceptionCount();
1857+
statistics.incrementExceptionCount();
18581858
}
18591859
});
18601860
}
@@ -1910,21 +1910,9 @@ String uriDetail(NatsUri uri, NatsUri hostOrlast) {
19101910
}
19111911

19121912
void processConnectionEvent(Events type, String uriDetails) {
1913-
if (callbackExecutor != null) {
1914-
try {
1915-
long time = System.currentTimeMillis();
1916-
for (ConnectionListener listener : connectionListeners) {
1917-
this.callbackExecutor.execute(() -> {
1918-
try {
1919-
listener.connectionEvent(this, type, time, uriDetails);
1920-
} catch (Exception ex) {
1921-
this.statistics.incrementExceptionCount();
1922-
}
1923-
});
1924-
}
1925-
} catch (RejectedExecutionException re) {
1926-
// Timing with shutdown, let it go
1927-
}
1913+
long time = System.currentTimeMillis();
1914+
for (ConnectionListener listener : connectionListeners) {
1915+
makeCallback(() -> listener.connectionEvent(this, type, time, uriDetails));
19281916
}
19291917
}
19301918

0 commit comments

Comments
 (0)