Skip to content

Commit f6523ee

Browse files
Piotr Limanowskipeel
authored andcommitted
Add support for nullable fields (closes #6)
Previously all the fields with nullable values were discarded as invalid. However there is a wide range of fields that cannot be treated as non-nullable. For example for enrichment failures bad row, we get following structure (trimmed): ``` column_name | is_nullable | data_type -------------------------------------------+-------------+----------------------------- processor.artifact | NO | character varying processor.version | NO | character varying failure.messages | YES | jsonb failure.timestamp | YES | timestamp without time zone payload.enriched.app_id | YES | character varying payload.enriched.base_currency | YES | character varying ... ``` Where only processor is non-nullable. Therefore enrichment failures would not be written with evaluations like: ``` payload.enriched.br_cookies -> Some(null) -> BigInt -> Left(Invalid type BIGINT for value Some(null)) ``` This PR allows marking fields nullable by default. We might want to consider a smarter approach at some point.
1 parent 39f3eb1 commit f6523ee

File tree

2 files changed

+2
-2
lines changed
  • modules/common/src
    • main/scala/com/snowplowanalytics/snowplow/postgres/shredding
    • test/scala/com/snowplowanalytics/snowplow/postgres/streaming

2 files changed

+2
-2
lines changed

modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/shredding/transform.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ object transform {
185185

186186
def cast(json: Option[Json], dataType: Type): Either[String, Option[Value]] = {
187187
val error = s"Invalid type ${dataType.ddl} for value $json".asLeft[Option[Value]]
188-
json match {
188+
json.filterNot(_.isNull) match {
189189
case Some(j) =>
190190
dataType match {
191191
case Type.Uuid =>

modules/common/src/test/scala/com/snowplowanalytics/snowplow/postgres/streaming/sinkspec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,7 +62,7 @@ class sinkspec extends Database {
6262
}
6363

6464
"sink a single self-describing JSON" >> {
65-
val row = json"""{"schema":"iglu:com.getvero/bounced/jsonschema/1-0-0","data":{"bounce_type":"one"}}"""
65+
val row = json"""{"schema":"iglu:com.getvero/bounced/jsonschema/1-0-0","data":{"bounce_type":"one","bounce_code":null}}"""
6666
val json = SelfDescribingData.parse(row).getOrElse(throw new RuntimeException("Invalid SelfDescribingData"))
6767
val stream = Stream.emit[IO, Data](Data.SelfDescribing(json))
6868

0 commit comments

Comments
 (0)