Skip to content

Commit 9cdc0b3

Browse files
committed
Make database thread pool size configurable (close #60)
1 parent 70e7990 commit 9cdc0b3

File tree

7 files changed

+27
-15
lines changed

7 files changed

+27
-15
lines changed

config/config.kinesis.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,9 @@
5151
# Maximum number of connections database pool is allowed to reach
5252
# Default is 10
5353
"maxConnections": 10
54+
# Size of the thread pool for blocking database operations
55+
# Default is value of "maxConnections"
56+
"threadPoolSize": 10
5457
}
5558

5659
# Events that fail validation are written to specified stream.

config/config.local.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,9 @@
3030
# Maximum number of connections database pool is allowed to reach
3131
# Default is 10
3232
"maxConnections": 10
33+
# Size of the thread pool for blocking database operations
34+
# Default is value of "maxConnections"
35+
"threadPoolSize": 10
3336
}
3437

3538
# Events that fail validation are written to specified stream.

config/config.pubsub.reference.hocon

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,9 @@
3737
# Maximum number of connections database pool is allowed to reach
3838
# Default is 10
3939
"maxConnections": 10
40+
# Size of the thread pool for blocking database operations
41+
# Default is value of "maxConnections"
42+
"threadPoolSize": 10
4043
}
4144

4245
# Events that fail validation are written to specified stream.

modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/config/DBConfig.scala

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,8 @@ case class DBConfig(host: String,
2424
password: String, // TODO: can be EC2 store
2525
sslMode: String,
2626
schema: String,
27-
maxConnections: Option[Int]
27+
maxConnections: Int,
28+
threadPoolSize: Option[Int]
2829
) {
2930
def getJdbc: JdbcUri =
3031
JdbcUri(host, port, database, sslMode.toLowerCase().replace('_', '-'))
@@ -46,7 +47,7 @@ object DBConfig {
4647
config.setJdbcUrl(dbConfig.getJdbc.toString)
4748
config.setUsername(dbConfig.username)
4849
config.setPassword(dbConfig.password)
49-
dbConfig.maxConnections.foreach(config.setMaximumPoolSize)
50+
config.setMaximumPoolSize(dbConfig.maxConnections)
5051
config
5152
}
5253

modules/common/src/main/scala/com/snowplowanalytics/snowplow/postgres/resources.scala

Lines changed: 3 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ object resources {
3838
def initialize[F[_]: Concurrent: Clock: ContextShift](postgres: DBConfig, iglu: Client[F, Json]) =
3939
for {
4040
blocker <- Blocker[F]
41-
xa <- resources.getTransactor[F](DBConfig.hikariConfig(postgres), blocker)
41+
xa <- resources.getTransactor[F](DBConfig.hikariConfig(postgres), blocker, postgres.threadPoolSize)
4242
state <- Resource.eval(initializeState(postgres.schema, iglu, xa))
4343
} yield (blocker, xa, state)
4444

@@ -58,14 +58,8 @@ object resources {
5858
} yield state
5959

6060
/** Get a HikariCP transactor */
61-
def getTransactor[F[_]: Async: ContextShift](config: HikariConfig, be: Blocker): Resource[F, HikariTransactor[F]] = {
62-
val threadPoolSize = {
63-
// This could be made configurable, but these are sensible defaults and unlikely to be critical for tuning throughput.
64-
// Exceeding availableProcessors could lead to unnecessary context switching.
65-
// Exceeding the connection pool size is unnecessary, because that is limit of the app's parallelism.
66-
val maxPoolSize = if (config.getMaximumPoolSize > 0) config.getMaximumPoolSize else 10
67-
Math.min(maxPoolSize, Runtime.getRuntime.availableProcessors)
68-
}
61+
def getTransactor[F[_]: Async: ContextShift](config: HikariConfig, be: Blocker, threadPoolSizeOpt: Option[Int] = None): Resource[F, HikariTransactor[F]] = {
62+
val threadPoolSize = threadPoolSizeOpt.getOrElse(config.getMaximumPoolSize)
6963
logger.debug(s"Using thread pool of size $threadPoolSize for Hikari transactor")
7064

7165
for {

modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/CliSpec.scala

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,8 @@ class CliSpec extends Specification {
5454
"mysecretpassword",
5555
"REQUIRE",
5656
"atomic",
57-
Some(10)
57+
10,
58+
None
5859
),
5960
LoaderConfig.StreamSink.Noop
6061
),
@@ -91,6 +92,7 @@ class CliSpec extends Specification {
9192
"mysecretpassword",
9293
"REQUIRE",
9394
"atomic",
95+
10,
9496
Some(10)
9597
),
9698
LoaderConfig.StreamSink.Kinesis(
@@ -131,7 +133,8 @@ class CliSpec extends Specification {
131133
"mysecretpassword",
132134
"REQUIRE",
133135
"atomic",
134-
Some(10)
136+
10,
137+
None
135138
),
136139
LoaderConfig.StreamSink.Noop
137140
),
@@ -165,6 +168,7 @@ class CliSpec extends Specification {
165168
"mysecretpassword",
166169
"REQUIRE",
167170
"atomic",
171+
10,
168172
Some(10)
169173
),
170174
LoaderConfig.StreamSink.PubSub(
@@ -202,7 +206,8 @@ class CliSpec extends Specification {
202206
"mysecretpassword",
203207
"REQUIRE",
204208
"atomic",
205-
Some(10)
209+
10,
210+
None
206211
),
207212
LoaderConfig.StreamSink.Noop
208213
),
@@ -232,6 +237,7 @@ class CliSpec extends Specification {
232237
"mysecretpassword",
233238
"REQUIRE",
234239
"atomic",
240+
10,
235241
Some(10)
236242
),
237243
LoaderConfig.StreamSink.Local(PathInfo(Path.fromString("./tmp/bad").get, PathType.Relative))
@@ -262,7 +268,8 @@ class CliSpec extends Specification {
262268
"mysecretpassword",
263269
"REQUIRE",
264270
"atomic",
265-
Some(10)
271+
10,
272+
None
266273
),
267274
LoaderConfig.StreamSink.Noop
268275
),

modules/loader/src/test/scala/com.snowplowanalytics.snowplow.postgres/config/LocalSourceSpec.scala

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ class LocalSourceSpec extends Database {
5454
"mysecretpassword",
5555
"allow",
5656
"public",
57+
10,
5758
None
5859
),
5960
LoaderConfig.StreamSink.Noop

0 commit comments

Comments
 (0)