Skip to content

Commit 7ee5a2a

Browse files
authored
KAFKA-19446: Add Transaction Version to request builder [2/3] (#20868)
In this patch for KIP-1228, we add a new WriteTxnMarkersRequest builder constructor that includes the transaction versions for each marker. The transaction version is already stored in the metadata on the transaction coordinator. We now extract this version information and pass it to the builder via the TxnMarkerEntry, which propagates it to partition leaders. Since the TransactionVersion field is marked as ignorable, we always include it when building the request. The field will be automatically omitted without any errors, during serialization if the request version is < 2, ensuring backward compatibility with brokers that only support version 1. Reviewers: Justine Olshan <[email protected]>
1 parent 39c2d6f commit 7ee5a2a

File tree

7 files changed

+224
-36
lines changed

7 files changed

+224
-36
lines changed

clients/src/main/java/org/apache/kafka/common/requests/WriteTxnMarkersRequest.java

Lines changed: 34 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,17 +38,20 @@ public static class TxnMarkerEntry {
3838
private final int coordinatorEpoch;
3939
private final TransactionResult result;
4040
private final List<TopicPartition> partitions;
41+
private final short transactionVersion;
4142

4243
public TxnMarkerEntry(long producerId,
4344
short producerEpoch,
4445
int coordinatorEpoch,
4546
TransactionResult result,
46-
List<TopicPartition> partitions) {
47+
List<TopicPartition> partitions,
48+
short transactionVersion) {
4749
this.producerId = producerId;
4850
this.producerEpoch = producerEpoch;
4951
this.coordinatorEpoch = coordinatorEpoch;
5052
this.result = result;
5153
this.partitions = partitions;
54+
this.transactionVersion = transactionVersion;
5255
}
5356

5457
public long producerId() {
@@ -71,6 +74,10 @@ public List<TopicPartition> partitions() {
7174
return partitions;
7275
}
7376

77+
public short transactionVersion() {
78+
return transactionVersion;
79+
}
80+
7481
@Override
7582
public String toString() {
7683
return "TxnMarkerEntry{" +
@@ -79,6 +86,7 @@ public String toString() {
7986
", coordinatorEpoch=" + coordinatorEpoch +
8087
", result=" + result +
8188
", partitions=" + partitions +
89+
", transactionVersion=" + transactionVersion +
8290
'}';
8391
}
8492

@@ -91,12 +99,13 @@ public boolean equals(final Object o) {
9199
producerEpoch == that.producerEpoch &&
92100
coordinatorEpoch == that.coordinatorEpoch &&
93101
result == that.result &&
102+
transactionVersion == that.transactionVersion &&
94103
Objects.equals(partitions, that.partitions);
95104
}
96105

97106
@Override
98107
public int hashCode() {
99-
return Objects.hash(producerId, producerEpoch, coordinatorEpoch, result, partitions);
108+
return Objects.hash(producerId, producerEpoch, coordinatorEpoch, result, partitions, transactionVersion);
100109
}
101110
}
102111

@@ -109,6 +118,11 @@ public Builder(WriteTxnMarkersRequestData data) {
109118
this.data = data;
110119
}
111120

121+
/**
122+
* Creates a builder with the given markers. Transaction versions are read from each marker entry.
123+
*
124+
* @param markers the list of transaction marker entries
125+
*/
112126
public Builder(final List<TxnMarkerEntry> markers) {
113127
// version will be determined at build time based on broker capabilities
114128
super(ApiKeys.WRITE_TXN_MARKERS);
@@ -123,12 +137,18 @@ public Builder(final List<TxnMarkerEntry> markers) {
123137
topicMap.put(topicPartition.topic(), topic);
124138
}
125139

126-
dataMarkers.add(new WritableTxnMarker()
127-
.setProducerId(marker.producerId)
128-
.setProducerEpoch(marker.producerEpoch)
129-
.setCoordinatorEpoch(marker.coordinatorEpoch)
130-
.setTransactionResult(marker.transactionResult().id)
131-
.setTopics(new ArrayList<>(topicMap.values())));
140+
WritableTxnMarker writableMarker = new WritableTxnMarker()
141+
.setProducerId(marker.producerId)
142+
.setProducerEpoch(marker.producerEpoch)
143+
.setCoordinatorEpoch(marker.coordinatorEpoch)
144+
.setTransactionResult(marker.transactionResult().id)
145+
.setTopics(new ArrayList<>(topicMap.values()));
146+
147+
// Set transaction version from the marker entry (KIP-1228).
148+
// Serialization will automatically omit TransactionVersion field in version 1 since it's ignorable.
149+
writableMarker.setTransactionVersion((byte) marker.transactionVersion);
150+
151+
dataMarkers.add(writableMarker);
132152
}
133153
this.data = new WriteTxnMarkersRequestData().setMarkers(dataMarkers);
134154
}
@@ -178,12 +198,17 @@ public List<TxnMarkerEntry> markers() {
178198
topicPartitions.add(new TopicPartition(topic.name(), partitionIdx));
179199
}
180200
}
201+
// Read transactionVersion from raw marker data.
202+
// For request version 1, this field is set to 0 during deserialization since it's ignorable.
203+
short transactionVersion = markerEntry.transactionVersion();
204+
181205
markers.add(new TxnMarkerEntry(
182206
markerEntry.producerId(),
183207
markerEntry.producerEpoch(),
184208
markerEntry.coordinatorEpoch(),
185209
TransactionResult.forId(markerEntry.transactionResult()),
186-
topicPartitions)
210+
topicPartitions,
211+
transactionVersion)
187212
);
188213
}
189214
return markers;

clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2826,7 +2826,7 @@ private EndTxnResponse createEndTxnResponse() {
28262826

28272827
private WriteTxnMarkersRequest createWriteTxnMarkersRequest(short version) {
28282828
List<TopicPartition> partitions = singletonList(new TopicPartition("topic", 73));
2829-
WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73, TransactionResult.ABORT, partitions);
2829+
WriteTxnMarkersRequest.TxnMarkerEntry txnMarkerEntry = new WriteTxnMarkersRequest.TxnMarkerEntry(21L, (short) 42, 73, TransactionResult.ABORT, partitions, (short) 0);
28302830
return new WriteTxnMarkersRequest.Builder(singletonList(txnMarkerEntry)).build(version);
28312831
}
28322832

clients/src/test/java/org/apache/kafka/common/requests/WriteTxnMarkersRequestTest.java

Lines changed: 149 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@
1717
package org.apache.kafka.common.requests;
1818

1919
import org.apache.kafka.common.TopicPartition;
20+
import org.apache.kafka.common.message.WriteTxnMarkersRequestData;
2021
import org.apache.kafka.common.protocol.ApiKeys;
22+
import org.apache.kafka.common.protocol.ByteBufferAccessor;
2123
import org.apache.kafka.common.protocol.Errors;
2224

2325
import org.junit.jupiter.api.BeforeEach;
@@ -27,6 +29,7 @@
2729
import java.util.List;
2830

2931
import static org.junit.jupiter.api.Assertions.assertEquals;
32+
import static org.junit.jupiter.api.Assertions.assertNotNull;
3033

3134
public class WriteTxnMarkersRequestTest {
3235

@@ -45,23 +48,51 @@ public void setUp() {
4548
markers = Collections.singletonList(
4649
new WriteTxnMarkersRequest.TxnMarkerEntry(
4750
PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
48-
RESULT, Collections.singletonList(TOPIC_PARTITION))
51+
RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 0)
4952
);
5053
}
5154

5255
@Test
5356
public void testConstructor() {
54-
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markers);
55-
for (short version : ApiKeys.WRITE_TXN_MARKERS.allVersions()) {
56-
WriteTxnMarkersRequest request = builder.build(version);
57-
assertEquals(1, request.markers().size());
58-
WriteTxnMarkersRequest.TxnMarkerEntry marker = request.markers().get(0);
59-
assertEquals(PRODUCER_ID, marker.producerId());
60-
assertEquals(PRODUCER_EPOCH, marker.producerEpoch());
61-
assertEquals(COORDINATOR_EPOCH, marker.coordinatorEpoch());
62-
assertEquals(RESULT, marker.transactionResult());
63-
assertEquals(Collections.singletonList(TOPIC_PARTITION), marker.partitions());
64-
}
57+
// We always set the transaction version in the request data using the arguments provided to the builder.
58+
// If the version doesn't support it, it will be omitted during serialization.
59+
60+
// Test constructor with transactionVersion = 2
61+
List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion = Collections.singletonList(
62+
new WriteTxnMarkersRequest.TxnMarkerEntry(
63+
PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
64+
RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
65+
);
66+
67+
// Build with request version 1.
68+
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markersWithVersion);
69+
WriteTxnMarkersRequest request = builder.build((short) 1);
70+
assertEquals(1, request.data().markers().size());
71+
WriteTxnMarkersRequestData.WritableTxnMarker dataMarker = request.data().markers().get(0);
72+
assertEquals(PRODUCER_ID, dataMarker.producerId());
73+
assertEquals(PRODUCER_EPOCH, dataMarker.producerEpoch());
74+
assertEquals(COORDINATOR_EPOCH, dataMarker.coordinatorEpoch());
75+
assertEquals(RESULT.id, dataMarker.transactionResult());
76+
assertEquals(1, dataMarker.topics().size());
77+
assertEquals(TOPIC_PARTITION.topic(), dataMarker.topics().get(0).name());
78+
assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()), dataMarker.topics().get(0).partitionIndexes());
79+
// Verify TransactionVersion is set to 2 in the data irrespective of the request version
80+
assertEquals((byte) 2, dataMarker.transactionVersion());
81+
82+
// Build with request version 2
83+
WriteTxnMarkersRequest.Builder builderWithVersions = new WriteTxnMarkersRequest.Builder(markersWithVersion);
84+
WriteTxnMarkersRequest requestWithVersion = builderWithVersions.build((short) 2);
85+
assertEquals(1, requestWithVersion.data().markers().size());
86+
WriteTxnMarkersRequestData.WritableTxnMarker dataMarkerWithVersion = requestWithVersion.data().markers().get(0);
87+
assertEquals(PRODUCER_ID, dataMarkerWithVersion.producerId());
88+
assertEquals(PRODUCER_EPOCH, dataMarkerWithVersion.producerEpoch());
89+
assertEquals(COORDINATOR_EPOCH, dataMarkerWithVersion.coordinatorEpoch());
90+
assertEquals(RESULT.id, dataMarkerWithVersion.transactionResult());
91+
assertEquals(1, dataMarkerWithVersion.topics().size());
92+
assertEquals(TOPIC_PARTITION.topic(), dataMarkerWithVersion.topics().get(0).name());
93+
assertEquals(Collections.singletonList(TOPIC_PARTITION.partition()), dataMarkerWithVersion.topics().get(0).partitionIndexes());
94+
// Verify TransactionVersion is set to 2 in the data
95+
assertEquals((byte) 2, dataMarkerWithVersion.transactionVersion());
6596
}
6697

6798
@Test
@@ -79,4 +110,110 @@ public void testGetErrorResponse() {
79110
assertEquals(0, errorResponse.throttleTimeMs());
80111
}
81112
}
113+
114+
@Test
115+
public void testTransactionVersion() {
116+
// Test that TransactionVersion is set correctly and serialization handles it properly.
117+
List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithVersion = Collections.singletonList(
118+
new WriteTxnMarkersRequest.TxnMarkerEntry(
119+
PRODUCER_ID, PRODUCER_EPOCH, COORDINATOR_EPOCH,
120+
RESULT, Collections.singletonList(TOPIC_PARTITION), (short) 2)
121+
);
122+
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markersWithVersion);
123+
124+
// Test request version 2 - TransactionVersion should be included.
125+
WriteTxnMarkersRequest requestV2 = builder.build((short) 2);
126+
assertNotNull(requestV2);
127+
assertEquals(1, requestV2.markers().size());
128+
129+
// Verify TransactionVersion is set to 2 in the request data.
130+
assertEquals((byte) 2, requestV2.data().markers().get(0).transactionVersion());
131+
// Verify the request can be serialized for version 2 (TransactionVersion field included).
132+
// This should not throw an exception.
133+
ByteBufferAccessor serializedV2 = requestV2.serialize();
134+
assertNotNull(serializedV2, "Serialization should succeed without error for version 2");
135+
// Test deserialization for version 2 - verify TransactionVersion field was included during serialization.
136+
// Use the already serialized request and parse it back to verify the field is present.
137+
serializedV2.buffer().rewind();
138+
RequestAndSize requestAndSizeV2 = AbstractRequest.parseRequest(
139+
ApiKeys.WRITE_TXN_MARKERS, (short) 2, serializedV2);
140+
WriteTxnMarkersRequest parsedRequestV2 = (WriteTxnMarkersRequest) requestAndSizeV2.request;
141+
assertNotNull(parsedRequestV2);
142+
assertEquals(1, parsedRequestV2.markers().size());
143+
// After deserialization, TransactionVersion should be 2 because it was included during serialization.
144+
assertEquals((short) 2, parsedRequestV2.markers().get(0).transactionVersion());
145+
// Verify the data also shows 2 (since it was read from serialized bytes with the field).
146+
assertEquals((byte) 2, parsedRequestV2.data().markers().get(0).transactionVersion());
147+
148+
// Test request version 1 - TransactionVersion should be omitted (ignorable field).
149+
WriteTxnMarkersRequest requestV1 = builder.build((short) 1);
150+
assertNotNull(requestV1);
151+
assertEquals(1, requestV1.markers().size());
152+
153+
// Verify TransactionVersion is still set to 2 in the request data (even for version 1).
154+
// This is what the coordinator has when building the request - data() is used before serialization.
155+
// The field value is preserved in the data, but will be omitted during serialization.
156+
assertEquals((byte) 2, requestV1.data().markers().get(0).transactionVersion());
157+
// Verify the request can be serialized for version 1 (TransactionVersion field omitted).
158+
// This should not throw an exception even though TransactionVersion is set to 2
159+
// because the field is marked as ignorable.
160+
ByteBufferAccessor serializedV1 = requestV1.serialize();
161+
assertNotNull(serializedV1, "Serialization should succeed without error for version 1 even with TransactionVersion set");
162+
// Test deserialization for version 1 - verify TransactionVersion field was omitted during serialization.
163+
// Use the already serialized request and parse it back to verify the field is not present.
164+
serializedV1.buffer().rewind();
165+
RequestAndSize requestAndSizeV1 = AbstractRequest.parseRequest(
166+
ApiKeys.WRITE_TXN_MARKERS, (short) 1, serializedV1);
167+
WriteTxnMarkersRequest parsedRequestV1 = (WriteTxnMarkersRequest) requestAndSizeV1.request;
168+
assertNotNull(parsedRequestV1);
169+
assertEquals(1, parsedRequestV1.markers().size());
170+
// After deserialization, TransactionVersion should be 0 because it was omitted during serialization.
171+
// The field is not present in the serialized bytes for version 1, so it defaults to 0.
172+
assertEquals((short) 0, parsedRequestV1.markers().get(0).transactionVersion());
173+
// Verify the data also shows 0 (since it was read from serialized bytes without the field).
174+
assertEquals((byte) 0, parsedRequestV1.data().markers().get(0).transactionVersion());
175+
}
176+
177+
@Test
178+
public void testRequestWithMultipleMarkersDifferentTransactionVersions() {
179+
// Test building a request with two markers - one with tv1 and one with tv2
180+
// and verify that the right transaction versions are updated in the request data
181+
TopicPartition topicPartition1 = new TopicPartition("topic1", 0);
182+
TopicPartition topicPartition2 = new TopicPartition("topic2", 1);
183+
long producerId1 = 100L;
184+
long producerId2 = 200L;
185+
186+
List<WriteTxnMarkersRequest.TxnMarkerEntry> markersWithDifferentVersions = List.of(
187+
new WriteTxnMarkersRequest.TxnMarkerEntry(
188+
producerId1, PRODUCER_EPOCH, COORDINATOR_EPOCH,
189+
RESULT, Collections.singletonList(topicPartition1), (short) 1), // tv1
190+
new WriteTxnMarkersRequest.TxnMarkerEntry(
191+
producerId2, PRODUCER_EPOCH, COORDINATOR_EPOCH,
192+
RESULT, Collections.singletonList(topicPartition2), (short) 2) // tv2
193+
);
194+
195+
WriteTxnMarkersRequest.Builder builder = new WriteTxnMarkersRequest.Builder(markersWithDifferentVersions);
196+
WriteTxnMarkersRequest request = builder.build((short) 2);
197+
198+
assertNotNull(request);
199+
assertEquals(2, request.data().markers().size());
200+
201+
// Verify first marker has tv1 (transactionVersion = 1) in the request data
202+
WriteTxnMarkersRequestData.WritableTxnMarker dataMarker1 = request.data().markers().get(0);
203+
assertEquals(producerId1, dataMarker1.producerId());
204+
assertEquals((byte) 1, dataMarker1.transactionVersion());
205+
206+
// Verify second marker has tv2 (transactionVersion = 2) in the request data
207+
WriteTxnMarkersRequestData.WritableTxnMarker dataMarker2 = request.data().markers().get(1);
208+
assertEquals(producerId2, dataMarker2.producerId());
209+
assertEquals((byte) 2, dataMarker2.transactionVersion());
210+
211+
// Verify markers() method also returns correct transaction versions
212+
List<WriteTxnMarkersRequest.TxnMarkerEntry> markers = request.markers();
213+
assertEquals(2, markers.size());
214+
assertEquals((short) 1, markers.get(0).transactionVersion());
215+
assertEquals(producerId1, markers.get(0).producerId());
216+
assertEquals((short) 2, markers.get(1).transactionVersion());
217+
assertEquals(producerId2, markers.get(1).producerId());
218+
}
82219
}

core/src/main/scala/kafka/coordinator/transaction/TransactionMarkerChannelManager.scala

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -391,10 +391,21 @@ class TransactionMarkerChannelManager(
391391
}
392392

393393
val coordinatorEpoch = pendingCompleteTxn.coordinatorEpoch
394+
// Extract transaction version from metadata. In practice, clientTransactionVersion should never be null
395+
// (it's always set when loading from log or creating new metadata), but we check defensively.
396+
val transactionVersion = {
397+
val clientTransactionVersion = pendingCompleteTxn.txnMetadata.clientTransactionVersion()
398+
if (clientTransactionVersion != null) {
399+
clientTransactionVersion.featureLevel()
400+
} else {
401+
0.toShort
402+
}
403+
}
404+
394405
for ((broker: Option[Node], topicPartitions: immutable.Set[TopicPartition]) <- partitionsByDestination) {
395406
broker match {
396407
case Some(brokerNode) =>
397-
val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava)
408+
val marker = new TxnMarkerEntry(producerId, producerEpoch, coordinatorEpoch, result, topicPartitions.toList.asJava, transactionVersion)
398409
val pendingCompleteTxnAndMarker = PendingCompleteTxnAndMarkerEntry(pendingCompleteTxn, marker)
399410

400411
if (brokerNode == Node.noNode) {

0 commit comments

Comments
 (0)