Skip to content

Commit c7c54d9

Browse files
committed
Re-introduce the beats handlers worker group to separata the Beats protocol processing from the boss group that accepts and listed to new sockets
1 parent e24f339 commit c7c54d9

File tree

1 file changed

+9
-5
lines changed

1 file changed

+9
-5
lines changed

src/main/java/org/logstash/beats/Server.java

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,9 +1,11 @@
11
package org.logstash.beats;
22

33
import io.netty.bootstrap.ServerBootstrap;
4-
import io.netty.buffer.ByteBufAllocator;
5-
import io.netty.buffer.PooledByteBufAllocator;
6-
import io.netty.channel.*;
4+
import io.netty.channel.Channel;
5+
import io.netty.channel.ChannelHandlerContext;
6+
import io.netty.channel.ChannelInitializer;
7+
import io.netty.channel.ChannelOption;
8+
import io.netty.channel.ChannelPipeline;
79
import io.netty.channel.nio.NioEventLoopGroup;
810
import io.netty.channel.socket.SocketChannel;
911
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -114,6 +116,7 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
114116
private final int IDLESTATE_WRITER_IDLE_TIME_SECONDS = 5;
115117

116118
private final EventExecutorGroup idleExecutorGroup;
119+
private final EventExecutorGroup beatsHandlerExecutorGroup;
117120
private final IMessageListener localMessageListener;
118121
private final int localClientInactivityTimeoutSeconds;
119122

@@ -122,6 +125,7 @@ private class BeatsInitializer extends ChannelInitializer<SocketChannel> {
122125
this.localMessageListener = messageListener;
123126
this.localClientInactivityTimeoutSeconds = clientInactivityTimeoutSeconds;
124127
idleExecutorGroup = new DefaultEventExecutorGroup(DEFAULT_IDLESTATEHANDLER_THREAD);
128+
beatsHandlerExecutorGroup = new DefaultEventExecutorGroup(beatsHandlerThread);
125129
}
126130

127131
public void initChannel(SocketChannel socket){
@@ -137,8 +141,8 @@ public void initChannel(SocketChannel socket){
137141
pipeline.addLast(CONNECTION_HANDLER, new ConnectionHandler());
138142
pipeline.addLast(new FlowLimiterHandler());
139143
pipeline.addLast(new ThunderingGuardHandler());
140-
pipeline.addLast(new BeatsParser());
141-
pipeline.addLast(new BeatsHandler(localMessageListener));
144+
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsParser());
145+
pipeline.addLast(beatsHandlerExecutorGroup, new BeatsHandler(localMessageListener));
142146
}
143147

144148

0 commit comments

Comments
 (0)