Skip to content

Commit 82a0ca5

Browse files
committed
Use direct executor for KPL callback (close #73)
1 parent 26d384f commit 82a0ca5

File tree

2 files changed

+9
-10
lines changed

2 files changed

+9
-10
lines changed

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/Environment.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ object Environment {
4343
for {
4444
badSink <- config.output.bad match {
4545
case LoaderConfig.StreamSink.Noop(period) => DummyStreamSink.create(period)
46-
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy, blocker)
46+
case c: LoaderConfig.StreamSink.Kinesis => KinesisSink.create(c, config.monitoring, config.backoffPolicy)
4747
case c: LoaderConfig.StreamSink.PubSub => PubSubSink.create(c, config.backoffPolicy)
4848
case c: LoaderConfig.StreamSink.Local => LocalSink.create(c, blocker)
4949
}

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/env/kinesis/KinesisSink.scala

Lines changed: 8 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ import scala.concurrent.ExecutionContext
1919
import scala.jdk.FutureConverters._
2020
import scala.util.{Success, Failure}
2121

22-
import cats.effect.{Async, Blocker, ContextShift, Resource, Sync, Timer}
22+
import cats.effect.{Async, ContextShift, Resource, Sync, Timer}
2323
import cats.implicits._
2424

2525
import fs2.aws.internal.{KinesisProducerClientImpl, KinesisProducerClient}
@@ -32,7 +32,7 @@ import software.amazon.awssdk.services.kinesis.model.DescribeStreamRequest
3232
import software.amazon.awssdk.services.kinesis.model.StreamStatus
3333
import software.amazon.awssdk.regions.Region
3434

35-
import com.google.common.util.concurrent.{Futures, ListenableFuture, FutureCallback}
35+
import com.google.common.util.concurrent.{Futures, ListenableFuture, FutureCallback, MoreExecutors}
3636

3737
import org.log4s.getLogger
3838

@@ -44,8 +44,8 @@ object KinesisSink {
4444

4545
lazy val logger = getLogger
4646

47-
def create[F[_]: Async: Timer: ContextShift](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring, backoffPolicy: BackoffPolicy, blocker: Blocker): Resource[F, StreamSink[F]] =
48-
mkProducer(config, monitoring).map(writeToKinesis(config.streamName, backoffPolicy, blocker))
47+
def create[F[_]: Async: Timer: ContextShift](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring, backoffPolicy: BackoffPolicy): Resource[F, StreamSink[F]] =
48+
mkProducer(config, monitoring).map(writeToKinesis(config.streamName, backoffPolicy))
4949

5050
private def mkProducer[F[_]: Sync](config: LoaderConfig.StreamSink.Kinesis, monitoring: Monitoring): Resource[F, KinesisProducerClient[F]] =
5151
Resource.eval(
@@ -66,15 +66,14 @@ object KinesisSink {
6666
}
6767

6868
private def writeToKinesis[F[_]: Async: Timer: ContextShift](streamName: String,
69-
backoffPolicy: BackoffPolicy,
70-
blocker: Blocker)
69+
backoffPolicy: BackoffPolicy)
7170
(producer: KinesisProducerClient[F])
7271
(data: Array[Byte]): F[Unit] = {
7372
val res = for {
7473
byteBuffer <- Async[F].delay(ByteBuffer.wrap(data))
7574
partitionKey <- Async[F].delay(UUID.randomUUID().toString)
7675
cb <- producer.putData(streamName, partitionKey, byteBuffer)
77-
cbRes <- registerCallback(cb, blocker)
76+
cbRes <- registerCallback(cb)
7877
_ <- ContextShift[F].shift
7978
} yield cbRes
8079
res.retryingOnFailuresAndAllErrors(
@@ -85,7 +84,7 @@ object KinesisSink {
8584
).void
8685
}
8786

88-
private def registerCallback[F[_]: Async](f: ListenableFuture[UserRecordResult], blocker: Blocker): F[UserRecordResult] =
87+
private def registerCallback[F[_]: Async](f: ListenableFuture[UserRecordResult]): F[UserRecordResult] =
8988
Async[F].async[UserRecordResult] { cb =>
9089
Futures.addCallback(
9190
f,
@@ -94,7 +93,7 @@ object KinesisSink {
9493

9594
override def onSuccess(result: UserRecordResult): Unit = cb(Right(result))
9695
},
97-
(command: Runnable) => blocker.blockingContext.execute(command)
96+
MoreExecutors.directExecutor
9897
)
9998
}
10099

0 commit comments

Comments
 (0)