Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 12 additions & 12 deletions netty4-client/src/main/java/redis/netty4/RedisClientBase.java
Original file line number Diff line number Diff line change
Expand Up @@ -34,12 +34,9 @@ public static Promise<RedisClientBase> connect(String host, int port) {
new SimpleChannelInboundHandler<Reply<?>>() {
@Override
protected void channelRead0(ChannelHandlerContext channelHandlerContext, Reply<?> reply) throws Exception {
Promise<Reply> poll;
synchronized (client) {
poll = queue.poll();
if (poll == null) {
throw new IllegalStateException("Promise queue is empty, received reply");
}
Promise<Reply> poll = queue.poll();
if (poll == null) {
throw new IllegalStateException("Promise queue is empty, received reply");
}
poll.set(reply);
}
Expand All @@ -49,12 +46,15 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, Reply<?
return promise;
}

public Promise<Reply> send(Command command) {
Promise<Reply> reply = new Promise<>();
synchronized (this) {
queue.add(reply);
socketChannel.write(command);
}
public Promise<Reply> send(final Command command) {
final Promise<Reply> reply = new Promise<>();
socketChannel.eventLoop().execute(new Runnable() {
@Override
public void run() {
queue.add(reply);
socketChannel.writeAndFlush(command);
}
});
return reply;
}
}
4 changes: 2 additions & 2 deletions netty4/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,12 @@
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.0.10.Final</version>
<version>4.0.19.Final</version>
</dependency>
<dependency>
<groupId>org.javassist</groupId>
<artifactId>javassist</artifactId>
<version>3.18.0-GA</version>
<version>3.18.1-GA</version>
</dependency>
<dependency>
<groupId>com.github.spullara.redis</groupId>
Expand Down
2 changes: 1 addition & 1 deletion netty4/src/main/java/redis/netty4/BulkReply.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ public BulkReply(byte[] bytes) {

public BulkReply(ByteBuf bytes) {
this.bytes = bytes;
capacity = bytes.capacity();
capacity = bytes == null ? -1 : bytes.readableBytes();
}

@Override
Expand Down
20 changes: 11 additions & 9 deletions netty4/src/main/java/redis/netty4/RedisReplyDecoder.java
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ public ByteBuf readBytes(ByteBuf is) throws IOException {
if (size == -1) {
return null;
}
ByteBuf buffer = is.readSlice(size);
ByteBuf buffer = is.readSlice(size).retain();
int cr = is.readByte();
int lf = is.readByte();
if (cr != CR || lf != LF) {
Expand Down Expand Up @@ -128,16 +128,18 @@ public void checkpoint() {
}

public MultiBulkReply decodeMultiBulkReply(ByteBuf is) throws IOException {
try {
if (reply == null) {
reply = new MultiBulkReply();
final MultiBulkReply multiBulkReply;
if (reply != null) {
multiBulkReply = reply;
} else {
multiBulkReply = new MultiBulkReply();
if (checkpointEnabled) {
reply = multiBulkReply;
checkpoint();
}
reply.read(this, is);
return reply;
} finally {
reply = null;
}
multiBulkReply.read(this, is);
reply = null;
return multiBulkReply;
}

}