Skip to content

Commit 78f5a4d

Browse files
authored
fix: populate populateLogicalConverterRegistry in Task
1 parent 09d2a7f commit 78f5a4d

File tree

7 files changed

+70
-14
lines changed

7 files changed

+70
-14
lines changed

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkConnector.java

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,8 +73,6 @@ public void start(Map<String, String> properties) {
7373
logger.trace("connector.start()");
7474
configProperties = properties;
7575
config = new BigQuerySinkConfig(properties);
76-
DebeziumLogicalConverters.initialize(config);
77-
KafkaLogicalConverters.initialize(config);
7876
}
7977

8078
@Override

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/BigQuerySinkTask.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@
3737
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
3838
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkTaskConfig;
3939
import com.wepay.kafka.connect.bigquery.convert.SchemaConverter;
40+
import com.wepay.kafka.connect.bigquery.convert.logicaltype.DebeziumLogicalConverters;
41+
import com.wepay.kafka.connect.bigquery.convert.logicaltype.KafkaLogicalConverters;
4042
import com.wepay.kafka.connect.bigquery.exception.BigQueryStorageWriteApiConnectException;
4143
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
4244
import com.wepay.kafka.connect.bigquery.utils.PartitionedTableId;
@@ -419,11 +421,17 @@ boolean isRunning() {
419421
return !stopped;
420422
}
421423

424+
private static void populateLogicalConverterRegistry(BigQuerySinkTaskConfig config) {
425+
DebeziumLogicalConverters.initialize(config);
426+
KafkaLogicalConverters.initialize(config);
427+
}
428+
422429
@Override
423430
public void start(Map<String, String> properties) {
424431
logger.trace("task.start()");
425432
stopped = false;
426433
config = new BigQuerySinkTaskConfig(properties);
434+
populateLogicalConverterRegistry(config);
427435
autoCreateTables = config.getBoolean(BigQuerySinkConfig.TABLE_CREATE_CONFIG);
428436

429437
useStorageApi = config.getBoolean(BigQuerySinkConfig.USE_STORAGE_WRITE_API_CONFIG);

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/logicaltype/DebeziumLogicalConverters.java

Lines changed: 26 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525

2626
import com.google.cloud.bigquery.Field;
2727
import com.google.cloud.bigquery.LegacySQLTypeName;
28+
import com.google.common.annotations.VisibleForTesting;
2829
import com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig;
2930
import com.wepay.kafka.connect.bigquery.exception.ConversionConnectException;
3031
import io.debezium.data.VariableScaleDecimal;
@@ -52,14 +53,32 @@ public class DebeziumLogicalConverters {
5253
private static final int MICROS_IN_SEC = 1000000;
5354
private static final int MICROS_IN_MILLI = 1000;
5455

56+
/**
57+
* Initialize the LogicalConverterRegistry with the DebeziumLogicalConverters.
58+
*
59+
* @param config the configuration to use.
60+
*/
5561
public static void initialize(final BigQuerySinkConfig config) {
56-
LogicalConverterRegistry.register(Date.SCHEMA_NAME, new DateConverter());
57-
LogicalConverterRegistry.register(MicroTime.SCHEMA_NAME, new MicroTimeConverter());
58-
LogicalConverterRegistry.register(MicroTimestamp.SCHEMA_NAME, new MicroTimestampConverter());
59-
LogicalConverterRegistry.register(Time.SCHEMA_NAME, new TimeConverter());
60-
LogicalConverterRegistry.register(ZonedTimestamp.SCHEMA_NAME, new ZonedTimestampConverter());
61-
LogicalConverterRegistry.register(Timestamp.SCHEMA_NAME, new TimestampConverter(config.getShouldConvertDebeziumTimestampToInteger()));
62-
LogicalConverterRegistry.register(VariableScaleDecimal.LOGICAL_NAME, new VariableScaleDecimalConverter(config.getVariableScaleDecimalHandlingMode()));
62+
LogicalConverterRegistry.registerIfAbsent(Date.SCHEMA_NAME, new DateConverter());
63+
LogicalConverterRegistry.registerIfAbsent(MicroTime.SCHEMA_NAME, new MicroTimeConverter());
64+
LogicalConverterRegistry.registerIfAbsent(MicroTimestamp.SCHEMA_NAME, new MicroTimestampConverter());
65+
LogicalConverterRegistry.registerIfAbsent(Time.SCHEMA_NAME, new TimeConverter());
66+
LogicalConverterRegistry.registerIfAbsent(ZonedTimestamp.SCHEMA_NAME, new ZonedTimestampConverter());
67+
LogicalConverterRegistry.registerIfAbsent(Timestamp.SCHEMA_NAME, new TimestampConverter(config.getShouldConvertDebeziumTimestampToInteger()));
68+
LogicalConverterRegistry.registerIfAbsent(VariableScaleDecimal.LOGICAL_NAME, new VariableScaleDecimalConverter(config.getVariableScaleDecimalHandlingMode()));
69+
}
70+
71+
/**
72+
* Remove the DebeziumLogicalConverters from the LogicalConverterRegistry.
73+
*/
74+
public static void remove() {
75+
LogicalConverterRegistry.unregister(Date.SCHEMA_NAME);
76+
LogicalConverterRegistry.unregister(MicroTime.SCHEMA_NAME);
77+
LogicalConverterRegistry.unregister(MicroTimestamp.SCHEMA_NAME);
78+
LogicalConverterRegistry.unregister(Time.SCHEMA_NAME);
79+
LogicalConverterRegistry.unregister(ZonedTimestamp.SCHEMA_NAME);
80+
LogicalConverterRegistry.unregister(Timestamp.SCHEMA_NAME);
81+
LogicalConverterRegistry.unregister(VariableScaleDecimal.LOGICAL_NAME);
6382
}
6483

6584
private DebeziumLogicalConverters() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/logicaltype/KafkaLogicalConverters.java

Lines changed: 16 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -52,11 +52,23 @@ public class KafkaLogicalConverters {
5252
private static final int MAX_NUMERIC_PRECISION = 38;
5353
private static final int MAX_NUMERIC_SCALE = 9;
5454

55+
/**
56+
* Initialize the LogicalConverterRegistry with the KafkaLogicalConverters.
57+
*
58+
* @param config the configuration to use.
59+
*/
5560
public static void initialize(final BigQuerySinkConfig config) {
56-
LogicalConverterRegistry.register(Date.LOGICAL_NAME, new DateConverter());
57-
LogicalConverterRegistry.register(Decimal.LOGICAL_NAME, new DecimalConverter(config.getDecimalHandlingMode()));
58-
LogicalConverterRegistry.register(Timestamp.LOGICAL_NAME, new TimestampConverter());
59-
LogicalConverterRegistry.register(Time.LOGICAL_NAME, new TimeConverter());
61+
LogicalConverterRegistry.registerIfAbsent(Date.LOGICAL_NAME, new DateConverter());
62+
LogicalConverterRegistry.registerIfAbsent(Decimal.LOGICAL_NAME, new DecimalConverter(config.getDecimalHandlingMode()));
63+
LogicalConverterRegistry.registerIfAbsent(Timestamp.LOGICAL_NAME, new TimestampConverter());
64+
LogicalConverterRegistry.registerIfAbsent(Time.LOGICAL_NAME, new TimeConverter());
65+
}
66+
67+
public static void remove() {
68+
LogicalConverterRegistry.unregister(Date.LOGICAL_NAME);
69+
LogicalConverterRegistry.unregister(Decimal.LOGICAL_NAME);
70+
LogicalConverterRegistry.unregister(Timestamp.LOGICAL_NAME);
71+
LogicalConverterRegistry.unregister(Time.LOGICAL_NAME);
6072
}
6173

6274
private KafkaLogicalConverters() {

kcbq-connector/src/main/java/com/wepay/kafka/connect/bigquery/convert/logicaltype/LogicalConverterRegistry.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -79,7 +79,7 @@ public static LogicalTypeConverter getConverter(String logicalTypeName) {
7979
* Determines if a converter is registered with the logical type name.
8080
*
8181
* @param typeName the logical type name.
82-
* @return }{@code true} if there is a converter registered, {@code false} otherwise.
82+
* @return {@code true} if there is a converter registered, {@code false} otherwise.
8383
*/
8484
public static boolean isRegisteredLogicalType(String typeName) {
8585
return typeName != null && converterMap.containsKey(typeName);

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQueryRecordConverterTest.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@
5555
import org.apache.kafka.connect.data.Struct;
5656
import org.apache.kafka.connect.data.Timestamp;
5757
import org.apache.kafka.connect.sink.SinkRecord;
58+
import org.junit.jupiter.api.AfterAll;
5859
import org.junit.jupiter.api.BeforeEach;
5960
import org.junit.jupiter.api.Test;
6061
import org.junit.jupiter.params.ParameterizedTest;
@@ -71,8 +72,17 @@ public class BigQueryRecordConverterTest {
7172
void resetValues() {
7273
shouldConvertDouble = true;
7374
useStorageWriteApiConfig = false;
75+
DebeziumLogicalConverters.remove();
76+
KafkaLogicalConverters.remove();
7477
}
7578

79+
@AfterAll
80+
static void cleanUpConverters() {
81+
DebeziumLogicalConverters.remove();
82+
KafkaLogicalConverters.remove();
83+
}
84+
85+
7686
private static SinkRecord spoofSinkRecord(Schema schema, Object struct, boolean isKey) {
7787
if (isKey) {
7888
return new SinkRecord(null, 0, schema, struct, null, null, 0);

kcbq-connector/src/test/java/com/wepay/kafka/connect/bigquery/convert/BigQuerySchemaConverterTest.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
import org.apache.kafka.connect.data.Schema;
4343
import org.apache.kafka.connect.data.SchemaBuilder;
4444
import org.apache.kafka.connect.data.Timestamp;
45+
import org.junit.jupiter.api.AfterAll;
4546
import org.junit.jupiter.api.BeforeEach;
4647
import org.junit.jupiter.api.Test;
4748
import org.junit.jupiter.params.ParameterizedTest;
@@ -63,6 +64,14 @@ public class BigQuerySchemaConverterTest {
6364
void resetValues() {
6465
allFieldsNullable = false;
6566
sanitizeFieldNames = false;
67+
DebeziumLogicalConverters.remove();
68+
KafkaLogicalConverters.remove();
69+
}
70+
71+
@AfterAll
72+
static void cleanUpConverters() {
73+
DebeziumLogicalConverters.remove();
74+
KafkaLogicalConverters.remove();
6675
}
6776

6877
private static BigQuerySinkConfig testingConfig(boolean convertDebeziumTimestamp, BigQuerySinkConfig.DecimalHandlingMode varibaleScaleDecimalMode, BigQuerySinkConfig.DecimalHandlingMode decimalMode) {

0 commit comments

Comments
 (0)