Skip to content

Commit 05855e4

Browse files
authored
Apply GCP Application Default authentication when using a Managed Service for Apache Kafka cluster. (#2925)
* Add logic to apply GCP Application Default authentication when using a Managed Service for Apache Kafka cluster * Tighten the regex for better matching on google clusters.
1 parent 9bf7840 commit 05855e4

File tree

1 file changed

+10
-4
lines changed

1 file changed

+10
-4
lines changed

v2/streaming-data-generator/src/main/java/com/google/cloud/teleport/v2/transforms/StreamingDataGeneratorWriteToKafka.java

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@
2525
import java.io.IOException;
2626
import java.nio.charset.StandardCharsets;
2727
import java.util.Arrays;
28+
import java.util.regex.Pattern;
2829
import org.apache.avro.Schema;
2930
import org.apache.avro.generic.GenericDatumReader;
3031
import org.apache.avro.generic.GenericDatumWriter;
@@ -49,6 +50,9 @@
4950
/** A {@link PTransform} that converts generatedMessages to write to Spanner table. */
5051
public final class StreamingDataGeneratorWriteToKafka {
5152

53+
private static final Pattern ManagedKafkaRegex =
54+
Pattern.compile("bootstrap\\..*\\.managedkafka\\..*\\.cloud\\.goog.*");
55+
5256
private StreamingDataGeneratorWriteToKafka() {}
5357

5458
/** Creates Kafka message with JSON (UTF-8) encoded Payload. */
@@ -179,13 +183,15 @@ public PDone expand(PCollection<byte[]> generatedMessages) {
179183
StreamingDataGenerator.OutputType.AVRO.name())));
180184
}
181185

182-
return kafkaMessages.apply(
183-
"writeSuccessMessages",
186+
KafkaIO.Write<Void, String> writeTransform =
184187
KafkaIO.<Void, String>write()
185188
.withBootstrapServers(getPipelineOptions().getBootstrapServer())
186189
.withTopic(getPipelineOptions().getKafkaTopic())
187-
.withValueSerializer(StringSerializer.class)
188-
.values());
190+
.withValueSerializer(StringSerializer.class);
191+
if (ManagedKafkaRegex.matcher(getPipelineOptions().getBootstrapServer()).matches()) {
192+
writeTransform = writeTransform.withGCPApplicationDefaultCredentials();
193+
}
194+
return kafkaMessages.apply("writeSuccessMessages", writeTransform.values());
189195
}
190196
}
191197
}

0 commit comments

Comments
 (0)