Skip to content

Commit a2bb9af

Browse files
committed
#649 Fix PR suggestions - thanks @coderabbitai.
1 parent d1297ee commit a2bb9af

File tree

5 files changed

+16
-16
lines changed

5 files changed

+16
-16
lines changed

README.md

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -945,9 +945,9 @@ pramen.sources = [
945945
# [Optional] Set name for the Kafka key column
946946
#key.column.name = "kafka_key"
947947
948-
# The Kafka key serializer. Can be "none", "binary", "string", "avro".
949-
# When "avro", "key.naming.strategy" should be deined at the "schema.registry" section.
950-
# Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically.
948+
# The Kafka key serializer when key.naming.strategy is NOT defined. Can be "none", "binary", "string".
949+
# When key.naming.strategy IS defined in schema.registry, Avro deserialization is used automatically.
950+
# Default is "binary".
951951
#key.column.serializer = "none"
952952
953953
kafka {

pramen/core/src/main/scala/za/co/absa/pramen/core/model/QueryBuilder.scala

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ object QueryBuilder {
5353
case _ =>
5454
val parent = if (parentPath.isEmpty) "" else s" at $parentPath"
5555
if (prefix.isEmpty)
56-
throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY'$parent.")
56+
throw new IllegalArgumentException(s"No options are specified for the query. Usually, it is one of: '$SQL_KEY', '$PATH_KEY', '$TABLE_KEY', '$DB_TABLE_KEY', '$TOPIC_KEY'$parent.")
5757
else
58-
throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY'$parent.")
58+
throw new IllegalArgumentException(s"No options are specified for the '$prefix' query. Usually, it is one of: '$p$SQL_KEY', '$p$PATH_KEY', '$p$TABLE_KEY', '$p$DB_TABLE_KEY', '$p$TOPIC_KEY'$parent.")
5959
}
6060
}
6161
}

pramen/core/src/test/scala/za/co/absa/pramen/core/model/QueryBuilderSuite.scala

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ class QueryBuilderSuite extends AnyWordSpec {
5555
QueryBuilder.fromConfig(conf, "", "")
5656
}
5757

58-
assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table'.")
58+
assert(exception.getMessage == "No options are specified for the query. Usually, it is one of: 'sql', 'path', 'table', 'db.table', 'topic'.")
5959
}
6060

6161
"throw an exception when the prefix is empty" in {
@@ -65,7 +65,7 @@ class QueryBuilderSuite extends AnyWordSpec {
6565
QueryBuilder.fromConfig(conf, "input", "")
6666
}
6767

68-
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table'.")
68+
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic'.")
6969
}
7070

7171
"throw an exception when the prefix is empty and parent is specified" in {
@@ -75,7 +75,7 @@ class QueryBuilderSuite extends AnyWordSpec {
7575
QueryBuilder.fromConfig(conf, "input", "my.parent")
7676
}
7777

78-
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at my.parent.")
78+
assert(exception.getMessage == "No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at my.parent.")
7979
}
8080
}
8181
}

pramen/core/src/test/scala/za/co/absa/pramen/core/pipeline/SourceTableParserSuite.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -119,7 +119,7 @@ class SourceTableParserSuite extends AnyWordSpec {
119119
SourceTableParser.fromConfig(conf, "source.tables")
120120
}
121121

122-
assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table' at source.tables[0]."))
122+
assert(ex.getMessage.contains("No options are specified for the 'input' query. Usually, it is one of: 'input.sql', 'input.path', 'input.table', 'input.db.table', 'input.topic' at source.tables[0]."))
123123
}
124124

125125
"throw an exception in case of duplicate entries" in {

pramen/extras/src/main/scala/za/co/absa/pramen/extras/source/KafkaAvroSource.scala

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -53,9 +53,9 @@ import java.time.LocalDate
5353
* # [Optional] Set name for the Kafka key column
5454
* key.column.name = "kafka_key"
5555
*
56-
* # The Kafka key serializer. Can be "none", "binary", "string", "avro".
57-
* # When "avro", "key.naming.strategy" should be deined at the "schema.registry" section.
58-
* # Default is "binary", but if "key.naming.strategy" is defined, "avro" is selected automatically.
56+
* # The Kafka key serializer when 'key.naming.strategy' is NOT defined. Can be "none", "binary", "string".
57+
* # When 'key.naming.strategy' IS defined in 'schema.registry', Avro deserialization is used automatically.
58+
* # Default is "binary".
5959
* #key.column.serializer = "none"
6060
*
6161
* kafka {
@@ -217,7 +217,7 @@ class KafkaAvroSource(sourceConfig: Config,
217217
col("timestampType").as("timestamp_type")
218218
))
219219

220-
val hasKey = keyColumnSerializer != "none"
220+
val hasKey = kafkaAvroConfig.keyNamingStrategy.isDefined || keyColumnSerializer != "none"
221221

222222
val df2 = kafkaAvroConfig.keyNamingStrategy match {
223223
case Some(keyNamingStrategy) =>
@@ -231,16 +231,16 @@ class KafkaAvroSource(sourceConfig: Config,
231231
case "binary" => df1.withColumn(tempKafkaKeyColumnName, col("key"))
232232
case "string" => df1.withColumn(tempKafkaKeyColumnName, col("key").cast(StringType))
233233
case "avro" => throw new IllegalArgumentException("For the 'avro' serializer of Kafka topic key, 'schema.registry.key.naming.strategy' needs to be set.")
234-
case x => throw new IllegalArgumentException("Unknown Kafka key serializer. Can be one of: none, binary, long, string, avro.")
234+
case x => throw new IllegalArgumentException(s"Unknown Kafka key serializer '$x'. Can be one of: none, binary, string, avro.")
235235
}
236236
}
237237

238238
val payloadFields = df2.select("data.*").schema.fieldNames.toSet
239239
if (payloadFields.contains(kafkaColumnName)) {
240-
log.warn(s"Payload field '$kafkaColumnName' conflicts with Kafka metadata struct name and will be replaced.")
240+
log.warn(s"Payload field '$kafkaColumnName' conflicts with reserved Kafka metadata struct name and will be replaced.")
241241
}
242242
if (payloadFields.contains(keyColumnName)) {
243-
log.warn(s"Payload field '$keyColumnName' conflicts with Kafka key column name and will be replaced.")
243+
log.warn(s"Payload field '$keyColumnName' conflicts with reserved Kafka key column name and will be removed.")
244244
}
245245

246246
// Put data fields to the root level of the schema, and if data struct already has kafka_key and kafka fields,

0 commit comments

Comments
 (0)