Skip to content

Commit 7c8c7c4

Browse files
junmuzymuzammil
authored andcommitted
Preserving original data type
1 parent 0da8dcc commit 7c8c7c4

File tree

4 files changed

+102
-26
lines changed

4 files changed

+102
-26
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlDataSource.java

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -100,7 +100,9 @@ public MySqlSourceConfig getSourceConfig() {
100100

101101
@Override
102102
public SupportedMetadataColumn[] supportedMetadataColumns() {
103-
return new SupportedMetadataColumn[] {new OpTsMetadataColumn()};
103+
return new SupportedMetadataColumn[] {
104+
new OpTsMetadataColumn(), new RowKindMetadataColumn()
105+
};
104106
}
105107

106108
@Override

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java

Lines changed: 22 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -189,15 +189,27 @@ protected TableId getTableId(SourceRecord record) {
189189

190190
@Override
191191
protected Map<String, String> getMetadata(SourceRecord record) {
192+
return getMetadata(record, null);
193+
}
194+
195+
protected Map<String, String> getMetadata(SourceRecord record, String opType) {
192196
Map<String, String> metadataMap = new HashMap<>();
193197
readableMetadataList.forEach(
194198
(mySqlReadableMetadata -> {
195-
Object metadata = mySqlReadableMetadata.getConverter().read(record);
196-
if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) {
199+
if (mySqlReadableMetadata.equals(MySqlReadableMetadata.ROW_KIND)) {
200+
if (appendOnly) {
201+
// In append-only mode, map row_kind to the original operation type
202+
metadataMap.put(mySqlReadableMetadata.getKey(), opType);
203+
}
204+
// Skip ROW_KIND in non-append-only mode since it throws
205+
// UnsupportedOperationException
206+
} else if (mySqlReadableMetadata.equals(MySqlReadableMetadata.OP_TS)) {
207+
Object metadata = mySqlReadableMetadata.getConverter().read(record);
197208
metadataMap.put(
198209
mySqlReadableMetadata.getKey(),
199210
String.valueOf(((TimestampData) metadata).getMillisecond()));
200211
} else {
212+
Object metadata = mySqlReadableMetadata.getConverter().read(record);
201213
metadataMap.put(mySqlReadableMetadata.getKey(), String.valueOf(metadata));
202214
}
203215
}));
@@ -211,18 +223,20 @@ public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) th
211223
return super.deserializeDataChangeRecord(record);
212224
}
213225

214-
// Append-only mode: convert all operations to INSERT events
226+
// Append-only mode: convert all operations to INSERT events, but preserve original
227+
// operation type
215228
Envelope.Operation op = Envelope.operationFor(record);
216229
TableId tableId = getTableId(record);
217230

218231
Struct value = (Struct) record.value();
219232
Schema valueSchema = record.valueSchema();
220-
Map<String, String> meta = getMetadata(record);
221233

222234
if (op == Envelope.Operation.CREATE || op == Envelope.Operation.READ) {
235+
Map<String, String> meta = getMetadata(record, "+I");
223236
RecordData after = extractAfterData(value, valueSchema);
224237
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, after, meta));
225238
} else if (op == Envelope.Operation.DELETE) {
239+
Map<String, String> meta = getMetadata(record, "-D");
226240
// For DELETE: convert to INSERT with before data
227241
RecordData before = extractBeforeData(value, valueSchema);
228242
return Collections.singletonList(DataChangeEvent.insertEvent(tableId, before, meta));
@@ -233,13 +247,15 @@ public List<DataChangeEvent> deserializeDataChangeRecord(SourceRecord record) th
233247
List<DataChangeEvent> events = new ArrayList<>();
234248

235249
if (changelogMode == DebeziumChangelogMode.ALL) {
250+
Map<String, String> beforeMeta = getMetadata(record, "-U");
236251
// Generate INSERT event for before data (UPDATE_BEFORE -> INSERT)
237252
RecordData before = extractBeforeData(value, valueSchema);
238-
events.add(DataChangeEvent.insertEvent(tableId, before, meta));
253+
events.add(DataChangeEvent.insertEvent(tableId, before, beforeMeta));
239254
}
240255

256+
Map<String, String> afterMeta = getMetadata(record, "+U");
241257
// Generate INSERT event for after data (UPDATE_AFTER -> INSERT)
242-
events.add(DataChangeEvent.insertEvent(tableId, after, meta));
258+
events.add(DataChangeEvent.insertEvent(tableId, after, afterMeta));
243259
return events;
244260
} else {
245261
LOG.trace("Received {} operation, skip", op);
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,51 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
18+
package org.apache.flink.cdc.connectors.mysql.source;
19+
20+
import org.apache.flink.cdc.common.source.SupportedMetadataColumn;
21+
import org.apache.flink.cdc.common.types.DataType;
22+
import org.apache.flink.cdc.common.types.DataTypes;
23+
24+
import java.util.Map;
25+
26+
/** A {@link SupportedMetadataColumn} for row_kind. */
27+
public class RowKindMetadataColumn implements SupportedMetadataColumn {
28+
29+
@Override
30+
public String getName() {
31+
return "row_kind";
32+
}
33+
34+
@Override
35+
public DataType getType() {
36+
return DataTypes.STRING().notNull();
37+
}
38+
39+
@Override
40+
public Class<?> getJavaClass() {
41+
return String.class;
42+
}
43+
44+
@Override
45+
public Object read(Map<String, String> metadata) {
46+
if (metadata.containsKey("row_kind")) {
47+
return metadata.get("row_kind");
48+
}
49+
throw new IllegalArgumentException("row_kind doesn't exist in the metadata: " + metadata);
50+
}
51+
}

flink-cdc-e2e-tests/flink-cdc-pipeline-e2e-tests/src/test/java/org/apache/flink/cdc/pipeline/tests/MySqlToPaimonE2eITCase.java

Lines changed: 26 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -203,6 +203,11 @@ public void testReadChangelogAsAppendOnlyWithPaimon() throws Exception {
203203
+ " server-time-zone: UTC\n"
204204
+ " scan.read-changelog-as-append-only.enabled: true\n"
205205
+ " use.legacy.json.format: false\n"
206+
+ " metadata.list: row_kind\n"
207+
+ "\n"
208+
+ "transform:\n"
209+
+ " - source-table: '^\\.+.\\.+$'\n"
210+
+ " projection: \\*, row_kind AS __internal__op_type\n"
206211
+ "\n"
207212
+ "sink:\n"
208213
+ " type: paimon\n"
@@ -261,11 +266,11 @@ public void testReadChangelogAsAppendOnlyWithPaimon() throws Exception {
261266
database,
262267
"readChangelogAsAppendOnly",
263268
Arrays.asList(
264-
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null",
265-
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null",
266-
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null",
267-
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null",
268-
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null"));
269+
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, +I",
270+
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, +I",
271+
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, +I",
272+
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, +I",
273+
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, +I"));
269274

270275
LOG.info("Begin incremental reading stage with append-only enabled.");
271276

@@ -301,23 +306,25 @@ public void testReadChangelogAsAppendOnlyWithPaimon() throws Exception {
301306
// after), plus delete records
302307
List<String> expectedAppendOnlyRecords =
303308
Arrays.asList(
304-
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null", // Original
305-
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null", // Original
306-
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null", // Original
307-
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null", // Original
308-
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null", // Original
309-
"6, Six, Ferris, 9.813, null, null, null", // Insert
310-
"7, Seven, Grace, 2.117, null, null, null", // Insert
311-
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null", // Update id=1
312-
// (before)
313-
"1, One, Alice Updated, 3.202, red, {\"key1\": \"value1\"}, null", // Update
309+
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, +I", // Original
310+
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, +I", // Original
311+
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, +I", // Original
312+
"4, Four, Derrida, 1.857, white, {\"key4\": \"value4\"}, null, +I", // Original
313+
"5, Five, Evelyn, 5.211, red, {\"K\": \"V\", \"k\": \"v\"}, null, +I", // Original
314+
"6, Six, Ferris, 9.813, null, null, null, +I", // Insert
315+
"7, Seven, Grace, 2.117, null, null, null, +I", // Insert
316+
"1, One, Alice, 3.202, red, {\"key1\": \"value1\"}, null, -U", // Update
314317
// id=1
315-
// (after)
316-
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null", // Update id=2
317318
// (before)
318-
"2, Two, Bob, 2.0, white, {\"key2\": \"value2\"}, null", // Update id=2
319+
"1, One, Alice Updated, 3.202, red, {\"key1\": \"value1\"}, null, +U", // Update
320+
// id=1 (after)
321+
"2, Two, Bob, 1.703, white, {\"key2\": \"value2\"}, null, -U", // Update
322+
// id=2
323+
// (before)
324+
"2, Two, Bob, 2.0, white, {\"key2\": \"value2\"}, null, +U", // Update id=2
319325
// (after)
320-
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null" // Delete id=3
326+
"3, Three, Cecily, 4.105, red, {\"key3\": \"value3\"}, null, -D" // Delete
327+
// id=3
321328
);
322329

323330
validateSinkResult(

0 commit comments

Comments
 (0)