Skip to content

Commit 4356daa

Browse files
committed
Warn about bad rows even in DummyStream sink (close #64)
1 parent 8a930ab commit 4356daa

File tree

9 files changed

+44
-10
lines changed

9 files changed

+44
-10
lines changed

config/config.kinesis.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -74,6 +74,9 @@
7474
# Max size of the batch in bytes before emitting
7575
# Default is 5MB
7676
"maxBatchBytes": 5000000
77+
78+
# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
79+
"reportPeriod": 10 seconds
7780
}
7881
}
7982

config/config.local.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,9 @@
4242
"type": "Local"
4343
# Path for bad row sink.
4444
"path": "./tmp/bad"
45+
46+
# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
47+
"reportPeriod": 10 seconds
4548
}
4649
}
4750

config/config.pubsub.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,9 @@
6363
# The number of threads used internally by library to process the callback after message delivery
6464
# Default is 1
6565
"numCallbackExecutors": 1
66+
67+
# Only used when "type" is "Noop" or missing. How often to log number of bad rows discarded.
68+
"reportPeriod": 10 seconds
6669
}
6770
}
6871

modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/DummyStreamSink.scala

Lines changed: 27 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,33 @@
1212
*/
1313
package com.snowplowanalytics.snowplow.postgres.streaming
1414

15-
import cats.effect.{Resource, Concurrent}
15+
import org.log4s.getLogger
16+
17+
import cats.effect.{Resource, Concurrent, Sync, Timer}
18+
import cats.effect.concurrent.Ref
19+
20+
import fs2.Stream
21+
22+
import scala.concurrent.duration.FiniteDuration
1623

1724
object DummyStreamSink {
18-
def create[F[_]: Concurrent]:Resource[F, StreamSink[F]] =
19-
Resource.pure[F, StreamSink[F]](_ => Concurrent[F].pure(()))
25+
def create[F[_]: Concurrent: Timer](period: FiniteDuration): Resource[F, StreamSink[F]] =
26+
for {
27+
counter <- Resource.eval(Ref.of(0))
28+
_ <- Concurrent[F].background(reporter(counter, period))
29+
} yield { _ =>
30+
counter.update(_ + 1)
31+
}
32+
33+
lazy val logger = getLogger
34+
35+
private def reporter[F[_]: Sync: Timer](counter: Ref[F, Int], period: FiniteDuration): F[Unit] =
36+
Stream.awakeDelay[F](period)
37+
.evalMap(_ => counter.getAndSet(0))
38+
.evalMap { count =>
39+
if (count > 0) Sync[F].delay(logger.info(s"Discarded $count bad rows during the last $period"))
40+
else Sync[F].unit
41+
}
42+
.compile
43+
.drain
2044
}

modules/loader/src/main/resources/application.conf

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
}
2323
"bad": {
2424
"type": "Noop"
25+
"reportPeriod": 30 seconds
2526
"delayThreshold": 200 milliseconds
2627
"maxBatchSize": 500
2728
"maxBatchBytes": 5000000

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -234,7 +234,7 @@ object LoaderConfig {
234234
sealed trait StreamSink extends Product with Serializable
235235
object StreamSink {
236236

237-
case object Noop extends StreamSink
237+
case class Noop(reportPeriod: FiniteDuration) extends StreamSink
238238

239239
case class Local(path: PathInfo) extends StreamSink
240240

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -42,7 +42,7 @@ object Environment {
4242
blocker: Blocker) =
4343
for {
4444
badSink <- config.output.bad match {
45-
case LoaderConfig.StreamSink.Noop => DummyStreamSink.create
45+
case LoaderConfig.StreamSink.Noop(period) => DummyStreamSink.create(period)
4646
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy, blocker)
4747
case c: LoaderConfig.StreamSink.PubSub => PubSubSink.create(c, config.backoffPolicy)
4848
case c: LoaderConfig.StreamSink.Local => LocalSink.create(c, blocker)

modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class CliSpec extends Specification {
5757
10,
5858
None
5959
),
60-
LoaderConfig.StreamSink.Noop
60+
LoaderConfig.StreamSink.Noop(30.seconds)
6161
),
6262
Purpose.Enriched,
6363
Monitoring(Monitoring.Metrics(true)),
@@ -136,7 +136,7 @@ class CliSpec extends Specification {
136136
10,
137137
None
138138
),
139-
LoaderConfig.StreamSink.Noop
139+
LoaderConfig.StreamSink.Noop(30.seconds)
140140
),
141141
Purpose.Enriched,
142142
Monitoring(Monitoring.Metrics(true)),
@@ -209,7 +209,7 @@ class CliSpec extends Specification {
209209
10,
210210
None
211211
),
212-
LoaderConfig.StreamSink.Noop
212+
LoaderConfig.StreamSink.Noop(30.seconds)
213213
),
214214
Purpose.Enriched,
215215
Monitoring(Monitoring.Metrics(true)),
@@ -271,7 +271,7 @@ class CliSpec extends Specification {
271271
10,
272272
None
273273
),
274-
LoaderConfig.StreamSink.Noop
274+
LoaderConfig.StreamSink.Noop(30.seconds)
275275
),
276276
Purpose.Enriched,
277277
Monitoring(Monitoring.Metrics(true)),

modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ class LocalSourceSpec extends Database {
5757
10,
5858
None
5959
),
60-
LoaderConfig.StreamSink.Noop
60+
LoaderConfig.StreamSink.Noop(30.seconds)
6161
),
6262
Purpose.Enriched,
6363
Monitoring(Monitoring.Metrics(false)),

0 commit comments

Comments
 (0)