Skip to content

Commit b3074e5

Browse files
committed
IPROTO-265 Remove additional byte[] allocations for nested readers/writers
* Add new subWriter method to implement to allow reusing encoder instances * Add some common default methods to the TagWriter/TagReader interfaces * Add common way to write a fixed varint of 5 bytes
1 parent beff6d3 commit b3074e5

File tree

10 files changed

+481
-162
lines changed

10 files changed

+481
-162
lines changed

core/src/main/java/org/infinispan/protostream/ProtobufUtil.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Objec
143143
}
144144

145145
public static byte[] toWrappedByteArray(ImmutableSerializationContext ctx, Object t, int bufferSize) throws IOException {
146-
ByteArrayOutputStream baos = new ByteArrayOutputStream(bufferSize);
146+
ByteArrayOutputStream baos = new ByteArrayOutputStreamEx(bufferSize);
147147
WrappedMessage.write(ctx, TagWriterImpl.newInstanceNoBuffer(ctx, baos), t);
148148
return baos.toByteArray();
149149
}
@@ -155,7 +155,7 @@ public static ByteBuffer toWrappedByteBuffer(ImmutableSerializationContext ctx,
155155
}
156156

157157
public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t) throws IOException {
158-
toWrappedStream(ctx, out, t, DEFAULT_STREAM_BUFFER_SIZE);
158+
WrappedMessage.write(ctx, TagWriterImpl.newInstance(ctx, out), t);
159159
}
160160

161161
public static void toWrappedStream(ImmutableSerializationContext ctx, OutputStream out, Object t, int bufferSize) throws IOException {

core/src/main/java/org/infinispan/protostream/TagReader.java

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,9 @@ public interface TagReader extends RawProtoStreamReader {
3333

3434
boolean readBool() throws IOException;
3535

36-
int readEnum() throws IOException;
36+
default int readEnum() throws IOException {
37+
return readInt32();
38+
}
3739

3840
/**
3941
* Reads a {@code string} value.
@@ -50,29 +52,48 @@ public interface TagReader extends RawProtoStreamReader {
5052
*/
5153
ByteBuffer readByteBuffer() throws IOException;
5254

53-
double readDouble() throws IOException;
55+
/**
56+
* Similar to {@link #readByteArray()} except that the reader impl may optimize creation of a sub TagReader from
57+
* itself, possibly avoiding byte[] allocations
58+
* @return a new TagReader
59+
*/
60+
TagReader subReaderFromArray() throws IOException;
61+
62+
default double readDouble() throws IOException {
63+
return Double.longBitsToDouble(readFixed64());
64+
}
5465

55-
float readFloat() throws IOException;
66+
default float readFloat() throws IOException {
67+
return Float.intBitsToFloat(readFixed32());
68+
}
5669

5770
long readInt64() throws IOException;
5871

59-
long readUInt64() throws IOException;
72+
default long readUInt64() throws IOException {
73+
return readInt64();
74+
}
6075

6176
long readSInt64() throws IOException;
6277

6378
long readFixed64() throws IOException;
6479

65-
long readSFixed64() throws IOException;
80+
default long readSFixed64() throws IOException {
81+
return readFixed64();
82+
}
6683

6784
int readInt32() throws IOException;
6885

69-
int readUInt32() throws IOException;
86+
default int readUInt32() throws IOException {
87+
return readInt32();
88+
}
7089

7190
int readSInt32() throws IOException;
7291

7392
int readFixed32() throws IOException;
7493

75-
int readSFixed32() throws IOException;
94+
default int readSFixed32() throws IOException {
95+
return readFixed32();
96+
}
7697

7798
/**
7899
* Sets a limit (based on the length of the length delimited value) when entering an embedded message.

core/src/main/java/org/infinispan/protostream/TagWriter.java

Lines changed: 53 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,19 @@ public interface TagWriter extends RawProtoStreamWriter {
1414
// start low level ops
1515
void flush() throws IOException;
1616

17-
void writeTag(int number, int wireType) throws IOException;
17+
/**
18+
* Invoke after done with writer, this implies a flush if necessary
19+
* It is necessary to invoke this on a writer returned from {@link #subWriter(int)} to actually push the data
20+
*/
21+
void close() throws IOException;
1822

19-
void writeTag(int number, WireType wireType) throws IOException;
23+
default void writeTag(int number, int wireType) throws IOException {
24+
writeVarint32(WireType.makeTag(number, wireType));
25+
}
26+
27+
default void writeTag(int number, WireType wireType) throws IOException {
28+
writeVarint32(WireType.makeTag(number, wireType));
29+
}
2030

2131
void writeVarint32(int value) throws IOException;
2232

@@ -28,38 +38,70 @@ public interface TagWriter extends RawProtoStreamWriter {
2838
// start high level ops
2939
void writeString(int number, String value) throws IOException;
3040

31-
void writeInt32(int number, int value) throws IOException;
41+
default void writeInt32(int number, int value) throws IOException {
42+
if (value >= 0) {
43+
writeUInt32(number, value);
44+
} else {
45+
writeUInt64(number, value);
46+
}
47+
}
3248

3349
void writeUInt32(int number, int value) throws IOException;
3450

35-
void writeSInt32(int number, int value) throws IOException;
51+
default void writeSInt32(int number, int value) throws IOException {
52+
// Roll the bits in order to move the sign bit from position 31 to position 0, to reduce the wire length of negative numbers.
53+
writeUInt32(number, (value << 1) ^ (value >> 31));
54+
}
3655

3756
void writeFixed32(int number, int value) throws IOException;
3857

39-
void writeSFixed32(int number, int value) throws IOException;
58+
default void writeSFixed32(int number, int value) throws IOException {
59+
writeFixed32(number, value);
60+
}
4061

4162
void writeInt64(int number, long value) throws IOException;
4263

4364
void writeUInt64(int number, long value) throws IOException;
4465

45-
void writeSInt64(int number, long value) throws IOException;
66+
default void writeSInt64(int number, long value) throws IOException {
67+
// Roll the bits in order to move the sign bit from position 63 to position 0, to reduce the wire length of negative numbers.
68+
writeUInt64(number, (value << 1) ^ (value >> 63));
69+
}
4670

4771
void writeFixed64(int number, long value) throws IOException;
4872

49-
void writeSFixed64(int number, long value) throws IOException;
73+
default void writeSFixed64(int number, long value) throws IOException {
74+
writeFixed64(number, value);
75+
}
5076

51-
void writeEnum(int number, int value) throws IOException;
77+
default void writeEnum(int number, int value) throws IOException {
78+
writeInt32(number, value);
79+
}
5280

5381
void writeBool(int number, boolean value) throws IOException;
5482

55-
void writeDouble(int number, double value) throws IOException;
83+
default void writeDouble(int number, double value) throws IOException {
84+
writeFixed64(number, Double.doubleToRawLongBits(value));
85+
}
5686

57-
void writeFloat(int number, float value) throws IOException;
87+
default void writeFloat(int number, float value) throws IOException {
88+
writeFixed32(number, Float.floatToRawIntBits(value));
89+
}
5890

5991
void writeBytes(int number, ByteBuffer value) throws IOException;
6092

61-
void writeBytes(int number, byte[] value) throws IOException;
93+
default void writeBytes(int number, byte[] value) throws IOException {
94+
writeBytes(number, value, 0, value.length);
95+
}
6296

6397
void writeBytes(int number, byte[] value, int offset, int length) throws IOException;
6498
// end high level ops
99+
100+
/**
101+
* Used to write a sub message that can be optimized by implementation. When the sub writer is complete, flush
102+
* should be invoked to ensure
103+
* @return
104+
* @throws IOException
105+
*/
106+
TagWriter subWriter(int number, boolean nested) throws IOException;
65107
}

core/src/main/java/org/infinispan/protostream/WrappedMessage.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -296,15 +296,13 @@ private static void writeMessage(ImmutableSerializationContext ctx, TagWriter ou
296296
if (t.getClass().isEnum()) {
297297
((EnumMarshallerDelegate) marshallerDelegate).encode(WRAPPED_ENUM, (Enum) t, out);
298298
} else {
299-
ByteArrayOutputStreamEx buffer = new ByteArrayOutputStreamEx();
300-
TagWriterImpl nestedCtx = TagWriterImpl.newInstanceNoBuffer(ctx, buffer);
301-
marshallerDelegate.marshall(nestedCtx, null, t);
302-
nestedCtx.flush();
303-
out.writeBytes(WRAPPED_MESSAGE, buffer.getByteBuffer());
299+
TagWriter nestedWriter = out.subWriter(WRAPPED_MESSAGE, false);
300+
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, t);
301+
nestedWriter.close();
304302
}
305303
}
306304
}
307-
out.flush();
305+
out.close();
308306
}
309307

310308
private static void writeContainer(ImmutableSerializationContext ctx, TagWriter out, BaseMarshallerDelegate marshallerDelegate, Object container) throws IOException {
@@ -355,7 +353,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
355353
String typeName = null;
356354
Integer typeId = null;
357355
int enumValue = -1;
358-
byte[] messageBytes = null;
356+
TagReader messageReader = null;
359357
Object value = null;
360358
int fieldCount = 0;
361359
int expectedFieldCount = 1;
@@ -398,7 +396,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
398396
}
399397
case WRAPPED_MESSAGE << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
400398
expectedFieldCount = 2;
401-
messageBytes = in.readByteArray();
399+
messageReader = in.subReaderFromArray();
402400
break;
403401
}
404402
case WRAPPED_STRING << WireType.TAG_TYPE_NUM_BITS | WireType.WIRETYPE_LENGTH_DELIMITED: {
@@ -514,7 +512,7 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
514512
}
515513
}
516514

517-
if (value == null && typeName == null && typeId == null && messageBytes == null) {
515+
if (value == null && typeName == null && typeId == null && messageReader == null) {
518516
return null;
519517
}
520518

@@ -533,10 +531,9 @@ private static <T> T readMessage(ImmutableSerializationContext ctx, TagReader in
533531
typeName = ctx.getDescriptorByTypeId(typeId).getFullName();
534532
}
535533
BaseMarshallerDelegate marshallerDelegate = ((SerializationContextImpl) ctx).getMarshallerDelegate(typeName);
536-
if (messageBytes != null) {
534+
if (messageReader != null) {
537535
// it's a Message type
538-
TagReaderImpl nestedInput = TagReaderImpl.newInstance(ctx, messageBytes);
539-
return (T) marshallerDelegate.unmarshall(nestedInput, null);
536+
return (T) marshallerDelegate.unmarshall((ProtobufTagMarshaller.ReadContext) messageReader, null);
540537
} else {
541538
// it's an Enum
542539
EnumMarshaller marshaller = (EnumMarshaller) marshallerDelegate.getMarshaller();

core/src/main/java/org/infinispan/protostream/annotations/impl/GeneratedMarshallerBase.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import java.io.IOException;
44

55
import org.infinispan.protostream.ProtobufTagMarshaller;
6+
import org.infinispan.protostream.TagWriter;
67
import org.infinispan.protostream.impl.BaseMarshallerDelegate;
78
import org.infinispan.protostream.impl.ByteArrayOutputStreamEx;
89
import org.infinispan.protostream.impl.Log;
@@ -46,6 +47,17 @@ protected final <T> void writeNestedMessage(BaseMarshallerDelegate<T> marshaller
4647
throw log.maxNestedMessageDepth(maxNestedMessageDepth, message.getClass());
4748
}
4849

50+
if (ctx instanceof TagWriter) {
51+
TagWriter nestedWriter = ((TagWriter) ctx).subWriter(fieldNumber, true);
52+
marshallerDelegate.marshall((ProtobufTagMarshaller.WriteContext) nestedWriter, null, message);
53+
nestedWriter.close();
54+
} else {
55+
handleNonTagWriter(marshallerDelegate, ctx, fieldNumber, message);
56+
}
57+
}
58+
59+
private <T> void handleNonTagWriter(BaseMarshallerDelegate<T> marshallerDelegate, ProtobufTagMarshaller.WriteContext ctx,
60+
int fieldNumber, T message) throws IOException {
4961
ByteArrayOutputStreamEx baos = new ByteArrayOutputStreamEx();
5062
TagWriterImpl nested = TagWriterImpl.newNestedInstance(ctx, baos);
5163
writeMessage(marshallerDelegate, nested, message);

core/src/main/java/org/infinispan/protostream/impl/ByteArrayOutputStreamEx.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -21,4 +21,14 @@ public ByteArrayOutputStreamEx(int size) {
2121
public synchronized ByteBuffer getByteBuffer() {
2222
return ByteBuffer.wrap(buf, 0, count);
2323
}
24+
25+
public int skipFixedVarint() {
26+
int prev = count;
27+
count += 5;
28+
return prev;
29+
}
30+
31+
public void writePositiveFixedVarint(int pos) {
32+
TagWriterImpl.writePositiveFixedVarint(buf, pos, count - pos - 5);
33+
}
2434
}

0 commit comments

Comments
 (0)