Skip to content

Commit 057e3ab

Browse files
committed
Fix table name substitution for creating schema history table.
1 parent 097af78 commit 057e3ab

File tree

1 file changed

+11
-4
lines changed

1 file changed

+11
-4
lines changed

sink-connector-lightweight/src/main/java/com/altinity/clickhouse/debezium/embedded/cdc/DebeziumJdbcStorageOperations.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -75,17 +75,24 @@ void createDatabaseForDebeziumStorage(Connection conn,
7575
* the properties.
7676
*/
7777
void createSchemaHistoryTable(Connection conn, Properties props) {
78-
String createSchemaHistoryTable = props.getProperty(
78+
79+
Pair<String, String> tableNameDatabaseName =
80+
getDebeziumOffsetStorageDatabaseName(props);
81+
String tableName = tableNameDatabaseName.getLeft();
82+
String dbName = tableNameDatabaseName.getRight();
83+
String createSchemaHistoryTableQuery = props.getProperty(
7984
JdbcOffsetBackingStoreConfig.OFFSET_STORAGE_PREFIX +
8085
JdbcSchemaHistoryConfig.PROP_TABLE_DDL.name());
81-
if (createSchemaHistoryTable == null ||
82-
createSchemaHistoryTable.isEmpty() == true) {
86+
String formattedCreateSchemaHistoryTableQuery = String.format(createSchemaHistoryTableQuery, dbName, dbName + "." + tableName);
87+
88+
if (createSchemaHistoryTableQuery == null ||
89+
createSchemaHistoryTableQuery.isEmpty() == true) {
8390
log.warn("Skipping creating schema history table as the query " +
8491
"was not provided in configuration");
8592
return;
8693
}
8794
try {
88-
new DBMetadata(props).executeSystemQuery(conn, createSchemaHistoryTable);
95+
new DBMetadata(props).executeSystemQuery(conn, formattedCreateSchemaHistoryTableQuery);
8996
} catch (Exception e) {
9097
log.error("Error creating schema history table", e);
9198
}

0 commit comments

Comments
 (0)