Skip to content

Commit 8cfaf17

Browse files
committed
Remove requirement to always use Kinesis Enhanced Fan Out (close #14)
1 parent 9e44678 commit 8cfaf17

File tree

5 files changed

+120
-24
lines changed

5 files changed

+120
-24
lines changed

config/config.hocon.sample

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,13 @@
1515
"region": "eu-central-1"
1616
# Either TRIM_HORIZON or LATEST
1717
"initialPosition": "TRIM_HORIZON"
18+
19+
# Use either enhanced fan out or polling mode for retrieving records
20+
"retrievalMode": "FanOut"
21+
# "retrievalMode": {
22+
# "type": "Polling"
23+
# "maxRecords": 1000
24+
# }
1825
}
1926

2027
"storage" : {
Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,3 @@
1-
snowplow {}
1+
snowplow {
2+
input.retrievalMode = FAN_OUT
3+
}

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/LoaderConfig.scala

Lines changed: 29 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -27,7 +27,7 @@ import io.circe.generic.extras.Configuration
2727
import LoaderConfig.{Purpose, Source}
2828

2929
import software.amazon.awssdk.regions.Region
30-
import software.amazon.kinesis.common.InitialPositionInStream
30+
import software.amazon.kinesis.common.{InitialPositionInStream, InitialPositionInStreamExtended}
3131

3232
case class LoaderConfig(name: String,
3333
id: UUID,
@@ -50,12 +50,12 @@ object LoaderConfig {
5050

5151
sealed trait InitPosition {
5252

53-
/** Turn it into fs2-aws-compatible structure */
54-
def unwrap: Either[InitialPositionInStream, Date] =
53+
/** Turn it into aws-compatible structure */
54+
def unwrap: InitialPositionInStreamExtended =
5555
this match {
56-
case InitPosition.Latest => InitialPositionInStream.LATEST.asLeft
57-
case InitPosition.TrimHorizon => InitialPositionInStream.TRIM_HORIZON.asLeft
58-
case InitPosition.AtTimestamp(date) => Date.from(date).asRight
56+
case InitPosition.Latest => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.LATEST)
57+
case InitPosition.TrimHorizon => InitialPositionInStreamExtended.newInitialPosition(InitialPositionInStream.TRIM_HORIZON)
58+
case InitPosition.AtTimestamp(timestamp) => InitialPositionInStreamExtended.newInitialPositionAtTimestamp(Date.from(timestamp))
5959
}
6060
}
6161
object InitPosition {
@@ -103,9 +103,31 @@ object LoaderConfig {
103103
sealed trait Source extends Product with Serializable
104104
object Source {
105105

106-
case class Kinesis(appName: String, streamName: String, region: Region, initialPosition: InitPosition) extends Source
106+
case class Kinesis(appName: String,
107+
streamName: String,
108+
region: Region,
109+
initialPosition: InitPosition,
110+
retrievalMode: Kinesis.Retrieval) extends Source
107111
case class PubSub(projectId: String, subscriptionId: String) extends Source
108112

113+
object Kinesis {
114+
sealed trait Retrieval
115+
116+
object Retrieval {
117+
case class Polling(maxRecords: Int) extends Retrieval
118+
case object FanOut extends Retrieval
119+
120+
implicit val retrievalDecoder: Decoder[Retrieval] = {
121+
Decoder.decodeString.emap {
122+
case "FanOut" => FanOut.asRight
123+
case "Polling" => "retrieval mode Polling must provide the maxRecords option".asLeft
124+
case other =>
125+
s"retrieval mode $other is unknown. Choose from FanOut and Polling. Polling must provide a MaxRecords option".asLeft
126+
}.or(deriveConfiguredDecoder[Retrieval])
127+
}
128+
}
129+
}
130+
109131
implicit def ioCirceConfigSourceDecoder: Decoder[Source] =
110132
deriveConfiguredDecoder[Source]
111133
}

modules/loader/src/main/scala/com/snowplowanalytics/snowplow/postgres/streaming/source.scala

Lines changed: 80 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -12,15 +12,16 @@
1212
*/
1313
package com.snowplowanalytics.snowplow.postgres.streaming
1414

15-
import java.util.Base64
15+
import java.util.{Base64, UUID}
16+
import java.net.InetAddress
1617
import java.nio.charset.StandardCharsets
1718

1819
import cats.implicits._
1920

2021
import cats.effect.{Blocker, ConcurrentEffect, ContextShift, Resource, Sync, Timer}
2122

2223
import fs2.Stream
23-
import fs2.aws.kinesis.{CommittableRecord, Kinesis, KinesisConsumerSettings}
24+
import fs2.aws.kinesis.{CommittableRecord, Kinesis}
2425

2526
import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig}
2627
import io.circe.Json
@@ -41,6 +42,12 @@ import com.google.pubsub.v1.PubsubMessage
4142
import com.permutive.pubsub.consumer.Model.{ProjectId, Subscription}
4243
import com.permutive.pubsub.consumer.decoder.MessageDecoder
4344

45+
import software.amazon.awssdk.regions.Region
46+
import software.amazon.kinesis.common.ConfigsBuilder
47+
import software.amazon.kinesis.coordinator.Scheduler
48+
import software.amazon.kinesis.processor.ShardRecordProcessorFactory
49+
import software.amazon.kinesis.retrieval.polling.PollingConfig
50+
import software.amazon.kinesis.retrieval.fanout.FanOutConfig
4451
import software.amazon.awssdk.services.cloudwatch.CloudWatchAsyncClient
4552
import software.amazon.awssdk.services.dynamodb.DynamoDbAsyncClient
4653
import software.amazon.awssdk.services.kinesis.KinesisAsyncClient
@@ -59,14 +66,13 @@ object source {
5966
*/
6067
def getSource[F[_]: ConcurrentEffect: ContextShift: Timer](blocker: Blocker, purpose: Purpose, config: Source): Stream[F, Either[BadData, Data]] =
6168
config match {
62-
case LoaderConfig.Source.Kinesis(appName, streamName, region, position) =>
69+
case kConfig: LoaderConfig.Source.Kinesis =>
6370
for {
64-
kinesisClient <- Stream.resource(makeKinesisClient[F])
65-
dynamoClient <- Stream.resource(makeDynamoDbClient[F])
66-
cloudWatchClient <- Stream.resource(makeCloudWatchClient[F])
67-
kinesis = Kinesis.create(kinesisClient, dynamoClient, cloudWatchClient, blocker)
68-
settings = KinesisConsumerSettings.apply(streamName, appName, region, initialPositionInStream = position.unwrap)
69-
record <- kinesis.readFromKinesisStream(settings)
71+
kinesisClient <- Stream.resource(makeKinesisClient[F](kConfig.region))
72+
dynamoClient <- Stream.resource(makeDynamoDbClient[F](kConfig.region))
73+
cloudWatchClient <- Stream.resource(makeCloudWatchClient[F](kConfig.region))
74+
kinesis = Kinesis.create(blocker, scheduler(kinesisClient, dynamoClient, cloudWatchClient, kConfig, _))
75+
record <- kinesis.readFromKinesisStream("THIS DOES NOTHING", "THIS DOES NOTHING")
7076
_ <- Stream.eval(record.checkpoint)
7177
} yield parseRecord(purpose, record)
7278
case LoaderConfig.Source.PubSub(projectId, subscriptionId) =>
@@ -135,12 +141,71 @@ object source {
135141
def pubsubOnFailedTerminate[F[_]: Sync](error: Throwable): F[Unit] =
136142
Sync[F].delay(logger.warn(s"Cannot terminate pubsub consumer properly\n${error.getMessage}"))
137143

138-
def makeKinesisClient[F[_]: Sync]: Resource[F, KinesisAsyncClient] =
139-
Resource.fromAutoCloseable(Sync[F].delay(KinesisAsyncClient.create()))
144+
def scheduler[F[_]: Sync](kinesisClient: KinesisAsyncClient,
145+
dynamoDbClient: DynamoDbAsyncClient,
146+
cloudWatchClient: CloudWatchAsyncClient,
147+
config: LoaderConfig.Source.Kinesis,
148+
recordProcessorFactory: ShardRecordProcessorFactory): F[Scheduler] =
149+
Sync[F].delay(UUID.randomUUID()).map { uuid =>
150+
val hostname = InetAddress.getLocalHost().getCanonicalHostName()
151+
152+
val configsBuilder =
153+
new ConfigsBuilder(config.streamName,
154+
config.appName,
155+
kinesisClient,
156+
dynamoDbClient,
157+
cloudWatchClient,
158+
s"$hostname:$uuid",
159+
recordProcessorFactory)
160+
161+
val retrievalConfig =
162+
configsBuilder
163+
.retrievalConfig
164+
.initialPositionInStreamExtended(config.initialPosition.unwrap)
165+
.retrievalSpecificConfig {
166+
config.retrievalMode match {
167+
case LoaderConfig.Source.Kinesis.Retrieval.FanOut =>
168+
new FanOutConfig(kinesisClient)
169+
case LoaderConfig.Source.Kinesis.Retrieval.Polling(maxRecords) =>
170+
new PollingConfig(config.streamName, kinesisClient).maxRecords(maxRecords)
171+
}
172+
}
173+
174+
new Scheduler(
175+
configsBuilder.checkpointConfig,
176+
configsBuilder.coordinatorConfig,
177+
configsBuilder.leaseManagementConfig,
178+
configsBuilder.lifecycleConfig,
179+
configsBuilder.metricsConfig,
180+
configsBuilder.processorConfig,
181+
retrievalConfig
182+
)
183+
}
184+
185+
def makeKinesisClient[F[_]: Sync](region: Region): Resource[F, KinesisAsyncClient] =
186+
Resource.fromAutoCloseable {
187+
Sync[F].delay {
188+
KinesisAsyncClient.builder()
189+
.region(region)
190+
.build
191+
}
192+
}
140193

141-
def makeDynamoDbClient[F[_]: Sync]: Resource[F, DynamoDbAsyncClient] =
142-
Resource.fromAutoCloseable(Sync[F].delay(DynamoDbAsyncClient.create()))
194+
def makeDynamoDbClient[F[_]: Sync](region: Region): Resource[F, DynamoDbAsyncClient] =
195+
Resource.fromAutoCloseable {
196+
Sync[F].delay {
197+
DynamoDbAsyncClient.builder()
198+
.region(region)
199+
.build
200+
}
201+
}
143202

144-
def makeCloudWatchClient[F[_]: Sync]: Resource[F, CloudWatchAsyncClient] =
145-
Resource.fromAutoCloseable(Sync[F].delay(CloudWatchAsyncClient.create()))
203+
def makeCloudWatchClient[F[_]: Sync](region: Region): Resource[F, CloudWatchAsyncClient] =
204+
Resource.fromAutoCloseable {
205+
Sync[F].delay {
206+
CloudWatchAsyncClient.builder()
207+
.region(region)
208+
.build
209+
}
210+
}
146211
}

modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ class CliSpec extends Specification {
3434
val expected = LoaderConfig(
3535
"Acme Ltd. Snowplow Postgres",
3636
UUID.fromString("5c5e4353-4eeb-43da-98f8-2de6dc7fa947"),
37-
Source.Kinesis("acme-postgres-loader", "enriched-events", Region.EU_CENTRAL_1, InitPosition.TrimHorizon),
37+
Source.Kinesis("acme-postgres-loader", "enriched-events", Region.EU_CENTRAL_1, InitPosition.TrimHorizon, Source.Kinesis.Retrieval.FanOut),
3838
DBConfig(
3939
"localhost",
4040
5432,

0 commit comments

Comments
 (0)