diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 126c03eb..fcac55a6 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -37,6 +37,8 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; +import com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConverters; +import com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConverters; import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException; import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; @@ -424,6 +426,9 @@ public void start(Map properties) { logger.trace("task.start()"); stopped = false; config = new BigQuerySinkTaskConfig(properties); + DebeziumLogicalConverters.initialize(config); + KafkaLogicalConverters.initialize(config); + autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG); useStorageApi = config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG);