Skip to content

Commit d6c2958

Browse files
committed
Addressing comments
1 parent 7c8c7c4 commit d6c2958

File tree

6 files changed

+15
-202
lines changed

6 files changed

+15
-202
lines changed

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/MySqlEventDeserializer.java

Lines changed: 1 addition & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -138,10 +138,7 @@ protected List<SchemaChangeEvent> deserializeSchemaChangeRecord(SourceRecord rec
138138
if (customParser == null) {
139139
customParser =
140140
new CustomMySqlAntlrDdlParser(
141-
includeComments,
142-
tinyInt1isBit,
143-
isTableIdCaseInsensitive,
144-
appendOnly);
141+
includeComments, tinyInt1isBit, isTableIdCaseInsensitive);
145142
tables = new Tables();
146143
}
147144

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomAlterTableParserListener.java

Lines changed: 11 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,6 @@ public class CustomAlterTableParserListener extends MySqlParserBaseListener {
6868
private CustomColumnDefinitionParserListener columnDefinitionListener;
6969
private TableEditor tableEditor;
7070
private boolean isTableIdCaseInsensitive;
71-
private final boolean appendOnly;
7271
private int parsingColumnIndex = STARTING_INDEX;
7372

7473
public CustomAlterTableParserListener(
@@ -77,22 +76,11 @@ public CustomAlterTableParserListener(
7776
LinkedList<SchemaChangeEvent> changes,
7877
boolean tinyInt1isBit,
7978
boolean isTableIdCaseInsensitive) {
80-
this(parser, listeners, changes, tinyInt1isBit, isTableIdCaseInsensitive, false);
81-
}
82-
83-
public CustomAlterTableParserListener(
84-
MySqlAntlrDdlParser parser,
85-
List<ParseTreeListener> listeners,
86-
LinkedList<SchemaChangeEvent> changes,
87-
boolean tinyInt1isBit,
88-
boolean isTableIdCaseInsensitive,
89-
boolean appendOnly) {
9079
this.parser = parser;
9180
this.listeners = listeners;
9281
this.changes = changes;
9382
this.tinyInt1isBit = tinyInt1isBit;
9483
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
95-
this.appendOnly = appendOnly;
9684
}
9785

9886
@Override
@@ -105,12 +93,12 @@ public void exitCopyCreateTable(MySqlParser.CopyCreateTableContext ctx) {
10593
.overwriteTable(
10694
tableId,
10795
original.columns(),
108-
appendOnly ? Collections.emptyList() : original.primaryKeyColumnNames(),
96+
original.primaryKeyColumnNames(),
10997
original.defaultCharsetName());
11098
parser.signalCreateTable(tableId, ctx);
11199
Schema.Builder builder = Schema.newBuilder();
112100
original.columns().forEach(column -> builder.column(toCdcColumn(column)));
113-
if (!appendOnly && !original.primaryKeyColumnNames().isEmpty()) {
101+
if (!original.primaryKeyColumnNames().isEmpty()) {
114102
builder.primaryKey(original.primaryKeyColumnNames());
115103
}
116104
changes.add(new CreateTableEvent(toCdcTableId(tableId), builder.build()));
@@ -164,7 +152,7 @@ public void exitColumnCreateTable(MySqlParser.ColumnCreateTableContext ctx) {
164152

165153
Schema.Builder builder = Schema.newBuilder();
166154
tableEditor.columns().forEach(column -> builder.column(toCdcColumn(column)));
167-
if (!appendOnly && tableEditor.hasPrimaryKey()) {
155+
if (tableEditor.hasPrimaryKey()) {
168156
builder.primaryKey(tableEditor.primaryKeyColumnNames());
169157
}
170158
builder.comment(tableEditor.create().comment());
@@ -185,7 +173,7 @@ public void enterColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
185173
if (columnDefinitionListener == null) {
186174
columnDefinitionListener =
187175
new CustomColumnDefinitionParserListener(
188-
tableEditor, columnEditor, parser, listeners, appendOnly);
176+
tableEditor, columnEditor, parser, listeners);
189177
listeners.add(columnDefinitionListener);
190178
} else {
191179
columnDefinitionListener.setColumnEditor(columnEditor);
@@ -210,9 +198,7 @@ public void exitColumnDeclaration(MySqlParser.ColumnDeclarationContext ctx) {
210198
public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraintContext ctx) {
211199
parser.runIfNotNull(
212200
() -> {
213-
if (!appendOnly) {
214-
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
215-
}
201+
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
216202
},
217203
tableEditor);
218204
super.enterPrimaryKeyTableConstraint(ctx);
@@ -222,7 +208,7 @@ public void enterPrimaryKeyTableConstraint(MySqlParser.PrimaryKeyTableConstraint
222208
public void enterUniqueKeyTableConstraint(MySqlParser.UniqueKeyTableConstraintContext ctx) {
223209
parser.runIfNotNull(
224210
() -> {
225-
if (!appendOnly && !tableEditor.hasPrimaryKey()) {
211+
if (!tableEditor.hasPrimaryKey()) {
226212
parser.parsePrimaryIndexColumnNames(ctx.indexColumnNames(), tableEditor);
227213
}
228214
},
@@ -249,7 +235,7 @@ public void enterAlterByAddColumn(MySqlParser.AlterByAddColumnContext ctx) {
249235
ColumnEditor columnEditor = Column.editor().name(columnName);
250236
columnDefinitionListener =
251237
new CustomColumnDefinitionParserListener(
252-
tableEditor, columnEditor, parser, listeners, appendOnly);
238+
tableEditor, columnEditor, parser, listeners);
253239
listeners.add(columnDefinitionListener);
254240
super.exitAlterByAddColumn(ctx);
255241
}
@@ -305,7 +291,7 @@ public void enterAlterByAddColumns(MySqlParser.AlterByAddColumnsContext ctx) {
305291
}
306292
columnDefinitionListener =
307293
new CustomColumnDefinitionParserListener(
308-
tableEditor, columnEditors.get(0), parser, listeners, appendOnly);
294+
tableEditor, columnEditors.get(0), parser, listeners);
309295
listeners.add(columnDefinitionListener);
310296
super.enterAlterByAddColumns(ctx);
311297
}
@@ -356,7 +342,7 @@ public void enterAlterByChangeColumn(MySqlParser.AlterByChangeColumnContext ctx)
356342

357343
columnDefinitionListener =
358344
new CustomColumnDefinitionParserListener(
359-
tableEditor, columnEditor, parser, listeners, appendOnly);
345+
tableEditor, columnEditor, parser, listeners);
360346
listeners.add(columnDefinitionListener);
361347
super.enterAlterByChangeColumn(ctx);
362348
}
@@ -407,7 +393,7 @@ public void enterAlterByRenameColumn(MySqlParser.AlterByRenameColumnContext ctx)
407393
ColumnEditor columnEditor = Column.editor().name(oldColumnName);
408394
columnDefinitionListener =
409395
new CustomColumnDefinitionParserListener(
410-
tableEditor, columnEditor, parser, listeners, appendOnly);
396+
tableEditor, columnEditor, parser, listeners);
411397
listeners.add(columnDefinitionListener);
412398
super.enterAlterByRenameColumn(ctx);
413399
}
@@ -420,7 +406,7 @@ public void enterAlterByModifyColumn(MySqlParser.AlterByModifyColumnContext ctx)
420406

421407
columnDefinitionListener =
422408
new CustomColumnDefinitionParserListener(
423-
tableEditor, columnEditor, parser, listeners, appendOnly);
409+
tableEditor, columnEditor, parser, listeners);
424410
listeners.add(columnDefinitionListener);
425411
super.enterAlterByModifyColumn(ctx);
426412
}

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomColumnDefinitionParserListener.java

Lines changed: 1 addition & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -54,28 +54,17 @@ public class CustomColumnDefinitionParserListener extends MySqlParserBaseListene
5454
private final TableEditor tableEditor;
5555

5656
private final List<ParseTreeListener> listeners;
57-
private final boolean appendOnly;
5857

5958
public CustomColumnDefinitionParserListener(
6059
TableEditor tableEditor,
6160
ColumnEditor columnEditor,
6261
MySqlAntlrDdlParser parser,
6362
List<ParseTreeListener> listeners) {
64-
this(tableEditor, columnEditor, parser, listeners, false);
65-
}
66-
67-
public CustomColumnDefinitionParserListener(
68-
TableEditor tableEditor,
69-
ColumnEditor columnEditor,
70-
MySqlAntlrDdlParser parser,
71-
List<ParseTreeListener> listeners,
72-
boolean appendOnly) {
7363
this.tableEditor = tableEditor;
7464
this.columnEditor = columnEditor;
7565
this.parser = parser;
7666
this.dataTypeResolver = parser.dataTypeResolver();
7767
this.listeners = listeners;
78-
this.appendOnly = appendOnly;
7968
}
8069

8170
public void setColumnEditor(ColumnEditor columnEditor) {
@@ -122,9 +111,7 @@ public void enterPrimaryKeyColumnConstraint(MySqlParser.PrimaryKeyColumnConstrai
122111
// otherwise the statement can't be executed due to multiple primary key error
123112
optionalColumn.set(Boolean.FALSE);
124113
tableEditor.addColumn(columnEditor.create());
125-
if (!appendOnly) {
126-
tableEditor.setPrimaryKeyNames(columnEditor.name());
127-
}
114+
tableEditor.setPrimaryKeyNames(columnEditor.name());
128115
super.enterPrimaryKeyColumnConstraint(ctx);
129116
}
130117

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParser.java

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -37,23 +37,13 @@ public class CustomMySqlAntlrDdlParser extends MySqlAntlrDdlParser {
3737
private final LinkedList<SchemaChangeEvent> parsedEvents;
3838
private final boolean tinyInt1isBit;
3939
private boolean isTableIdCaseInsensitive;
40-
private final boolean appendOnly;
4140

4241
public CustomMySqlAntlrDdlParser(
4342
boolean includeComments, boolean tinyInt1isBit, boolean isTableIdCaseInsensitive) {
44-
this(includeComments, tinyInt1isBit, isTableIdCaseInsensitive, false);
45-
}
46-
47-
public CustomMySqlAntlrDdlParser(
48-
boolean includeComments,
49-
boolean tinyInt1isBit,
50-
boolean isTableIdCaseInsensitive,
51-
boolean appendOnly) {
5243
super(true, false, includeComments, null, Tables.TableFilter.includeAll());
5344
this.parsedEvents = new LinkedList<>();
5445
this.tinyInt1isBit = tinyInt1isBit;
5546
this.isTableIdCaseInsensitive = isTableIdCaseInsensitive;
56-
this.appendOnly = appendOnly;
5747
}
5848

5949
// Overriding this method because the BIT type requires default length dimension of 1.
@@ -294,7 +284,7 @@ protected DataTypeResolver initializeDataTypeResolver() {
294284
@Override
295285
protected AntlrDdlParserListener createParseTreeWalkerListener() {
296286
return new CustomMySqlAntlrDdlParserListener(
297-
this, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive, appendOnly);
287+
this, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive);
298288
}
299289

300290
public List<SchemaChangeEvent> getAndClearParsedEvents() {

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/main/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlAntlrDdlParserListener.java

Lines changed: 1 addition & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -78,27 +78,13 @@ public CustomMySqlAntlrDdlParserListener(
7878
LinkedList<SchemaChangeEvent> parsedEvents,
7979
boolean tinyInt1isBit,
8080
boolean isTableIdCaseInsensitive) {
81-
this(parser, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive, false);
82-
}
83-
84-
public CustomMySqlAntlrDdlParserListener(
85-
MySqlAntlrDdlParser parser,
86-
LinkedList<SchemaChangeEvent> parsedEvents,
87-
boolean tinyInt1isBit,
88-
boolean isTableIdCaseInsensitive,
89-
boolean appendOnly) {
9081
// initialize listeners
9182
listeners.add(new CreateAndAlterDatabaseParserListener(parser));
9283
listeners.add(new DropDatabaseParserListener(parser));
9384
listeners.add(new CreateTableParserListener(parser, listeners));
9485
listeners.add(
9586
new CustomAlterTableParserListener(
96-
parser,
97-
listeners,
98-
parsedEvents,
99-
tinyInt1isBit,
100-
isTableIdCaseInsensitive,
101-
appendOnly));
87+
parser, listeners, parsedEvents, tinyInt1isBit, isTableIdCaseInsensitive));
10288
listeners.add(new DropTableParserListener(parser));
10389
listeners.add(new RenameTableParserListener(parser));
10490
listeners.add(new TruncateTableParserListener(parser));

flink-cdc-connect/flink-cdc-pipeline-connectors/flink-cdc-pipeline-connector-mysql/src/test/java/org/apache/flink/cdc/connectors/mysql/source/parser/CustomMySqlDdlParserAppendOnlyTest.java

Lines changed: 0 additions & 133 deletions
This file was deleted.

0 commit comments

Comments
 (0)