diff --git a/netty4-client/src/main/java/redis/netty4/RedisClientBase.java b/netty4-client/src/main/java/redis/netty4/RedisClientBase.java index a24770e..3c7f47a 100644 --- a/netty4-client/src/main/java/redis/netty4/RedisClientBase.java +++ b/netty4-client/src/main/java/redis/netty4/RedisClientBase.java @@ -34,12 +34,9 @@ public static Promise connect(String host, int port) { new SimpleChannelInboundHandler>() { @Override protected void channelRead0(ChannelHandlerContext channelHandlerContext, Reply reply) throws Exception { - Promise poll; - synchronized (client) { - poll = queue.poll(); - if (poll == null) { - throw new IllegalStateException("Promise queue is empty, received reply"); - } + Promise poll = queue.poll(); + if (poll == null) { + throw new IllegalStateException("Promise queue is empty, received reply"); } poll.set(reply); } @@ -49,12 +46,15 @@ protected void channelRead0(ChannelHandlerContext channelHandlerContext, Reply send(Command command) { - Promise reply = new Promise<>(); - synchronized (this) { - queue.add(reply); - socketChannel.write(command); - } + public Promise send(final Command command) { + final Promise reply = new Promise<>(); + socketChannel.eventLoop().execute(new Runnable() { + @Override + public void run() { + queue.add(reply); + socketChannel.writeAndFlush(command); + } + }); return reply; } } diff --git a/netty4/pom.xml b/netty4/pom.xml index c6e89fc..09364e5 100644 --- a/netty4/pom.xml +++ b/netty4/pom.xml @@ -14,12 +14,12 @@ io.netty netty-all - 4.0.10.Final + 4.0.19.Final org.javassist javassist - 3.18.0-GA + 3.18.1-GA com.github.spullara.redis diff --git a/netty4/src/main/java/redis/netty4/BulkReply.java b/netty4/src/main/java/redis/netty4/BulkReply.java index a106644..103be5b 100644 --- a/netty4/src/main/java/redis/netty4/BulkReply.java +++ b/netty4/src/main/java/redis/netty4/BulkReply.java @@ -28,7 +28,7 @@ public BulkReply(byte[] bytes) { public BulkReply(ByteBuf bytes) { this.bytes = bytes; - capacity = bytes.capacity(); + capacity = bytes == null ? -1 : bytes.readableBytes(); } @Override diff --git a/netty4/src/main/java/redis/netty4/RedisReplyDecoder.java b/netty4/src/main/java/redis/netty4/RedisReplyDecoder.java index f9e0ed4..dd2c78e 100644 --- a/netty4/src/main/java/redis/netty4/RedisReplyDecoder.java +++ b/netty4/src/main/java/redis/netty4/RedisReplyDecoder.java @@ -46,7 +46,7 @@ public ByteBuf readBytes(ByteBuf is) throws IOException { if (size == -1) { return null; } - ByteBuf buffer = is.readSlice(size); + ByteBuf buffer = is.readSlice(size).retain(); int cr = is.readByte(); int lf = is.readByte(); if (cr != CR || lf != LF) { @@ -128,16 +128,18 @@ public void checkpoint() { } public MultiBulkReply decodeMultiBulkReply(ByteBuf is) throws IOException { - try { - if (reply == null) { - reply = new MultiBulkReply(); + final MultiBulkReply multiBulkReply; + if (reply != null) { + multiBulkReply = reply; + } else { + multiBulkReply = new MultiBulkReply(); + if (checkpointEnabled) { + reply = multiBulkReply; checkpoint(); } - reply.read(this, is); - return reply; - } finally { - reply = null; } + multiBulkReply.read(this, is); + reply = null; + return multiBulkReply; } - }