|
43 | 43 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED; |
44 | 44 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_CHUNK_KEY_COLUMN; |
45 | 45 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED; |
| 46 | +import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED; |
46 | 47 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES; |
47 | 48 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TABLES_EXCLUDE; |
48 | 49 | import static org.apache.flink.cdc.connectors.mysql.source.MySqlDataSourceOptions.TREAT_TINYINT1_AS_BOOLEAN_ENABLED; |
@@ -269,14 +270,68 @@ public void testOptionalOption() { |
269 | 270 | .contains( |
270 | 271 | TREAT_TINYINT1_AS_BOOLEAN_ENABLED, |
271 | 272 | PARSE_ONLINE_SCHEMA_CHANGES, |
272 | | - SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED); |
| 273 | + SCAN_BINLOG_NEWLY_ADDED_TABLE_ENABLED, |
| 274 | + SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED); |
273 | 275 |
|
274 | 276 | MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); |
275 | 277 | assertThat(dataSource.getSourceConfig().isTreatTinyInt1AsBoolean()).isFalse(); |
276 | 278 | assertThat(dataSource.getSourceConfig().isParseOnLineSchemaChanges()).isTrue(); |
277 | 279 | assertThat(dataSource.getSourceConfig().isAssignUnboundedChunkFirst()).isTrue(); |
278 | 280 | } |
279 | 281 |
|
| 282 | + @Test |
| 283 | + void testScanReadChangelogAsAppendOnlyEnabledDefault() { |
| 284 | + inventoryDatabase.createAndInitialize(); |
| 285 | + Map<String, String> options = new HashMap<>(); |
| 286 | + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); |
| 287 | + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); |
| 288 | + options.put(USERNAME.key(), TEST_USER); |
| 289 | + options.put(PASSWORD.key(), TEST_PASSWORD); |
| 290 | + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); |
| 291 | + Factory.Context context = new MockContext(Configuration.fromMap(options)); |
| 292 | + |
| 293 | + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); |
| 294 | + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); |
| 295 | + |
| 296 | + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse(); |
| 297 | + } |
| 298 | + |
| 299 | + @Test |
| 300 | + void testScanReadChangelogAsAppendOnlyEnabledTrue() { |
| 301 | + inventoryDatabase.createAndInitialize(); |
| 302 | + Map<String, String> options = new HashMap<>(); |
| 303 | + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); |
| 304 | + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); |
| 305 | + options.put(USERNAME.key(), TEST_USER); |
| 306 | + options.put(PASSWORD.key(), TEST_PASSWORD); |
| 307 | + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); |
| 308 | + options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "true"); |
| 309 | + Factory.Context context = new MockContext(Configuration.fromMap(options)); |
| 310 | + |
| 311 | + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); |
| 312 | + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); |
| 313 | + |
| 314 | + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isTrue(); |
| 315 | + } |
| 316 | + |
| 317 | + @Test |
| 318 | + void testScanReadChangelogAsAppendOnlyEnabledFalse() { |
| 319 | + inventoryDatabase.createAndInitialize(); |
| 320 | + Map<String, String> options = new HashMap<>(); |
| 321 | + options.put(HOSTNAME.key(), MYSQL_CONTAINER.getHost()); |
| 322 | + options.put(PORT.key(), String.valueOf(MYSQL_CONTAINER.getDatabasePort())); |
| 323 | + options.put(USERNAME.key(), TEST_USER); |
| 324 | + options.put(PASSWORD.key(), TEST_PASSWORD); |
| 325 | + options.put(TABLES.key(), inventoryDatabase.getDatabaseName() + ".prod\\.*"); |
| 326 | + options.put(SCAN_READ_CHANGELOG_AS_APPEND_ONLY_ENABLED.key(), "false"); |
| 327 | + Factory.Context context = new MockContext(Configuration.fromMap(options)); |
| 328 | + |
| 329 | + MySqlDataSourceFactory factory = new MySqlDataSourceFactory(); |
| 330 | + MySqlDataSource dataSource = (MySqlDataSource) factory.createDataSource(context); |
| 331 | + |
| 332 | + assertThat(dataSource.getSourceConfig().isScanReadChangelogAsAppendOnly()).isFalse(); |
| 333 | + } |
| 334 | + |
280 | 335 | @Test |
281 | 336 | void testPrefixRequireOption() { |
282 | 337 | inventoryDatabase.createAndInitialize(); |
|
0 commit comments