From b942496dfd1bf94e80f645fe0d2205905b095402 Mon Sep 17 00:00:00 2001 From: Erik Gustafsson Date: Mon, 1 Dec 2025 10:29:26 +0100 Subject: [PATCH] Ensure converters are initialized in all tasks on all workers --- .../com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java index 126c03eb..fcac55a6 100644 --- a/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java +++ b/kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java @@ -37,6 +37,8 @@ import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig; import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig; import com.wepay.kafka.connect.bigquery.convert.SchemaConverter; +import com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConverters; +import com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConverters; import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException; import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException; import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId; @@ -424,6 +426,9 @@ public void start(Map properties) { logger.trace("task.start()"); stopped = false; config = new BigQuerySinkTaskConfig(properties); + DebeziumLogicalConverters.initialize(config); + KafkaLogicalConverters.initialize(config); + autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG); useStorageApi = config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG);