Skip to content

Commit d246c48

Browse files
authored
Merge pull request #51 from nomisRev/commit-batch-new-apis
Add commitBatchWithin
2 parents 781db44 + 81b4754 commit d246c48

File tree

21 files changed

+519
-142
lines changed

21 files changed

+519
-142
lines changed

.github/workflows/build.yaml

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -19,10 +19,20 @@ jobs:
1919

2020
steps:
2121
- name: Checkout the repo
22-
uses: actions/checkout@v3
22+
uses: actions/[email protected]
23+
with:
24+
fetch-depth: 0
2325

24-
- name: Run Check
25-
run: ./gradlew check
26+
- name: Set up Java
27+
uses: actions/[email protected]
28+
with:
29+
distribution: 'zulu'
30+
java-version: 11
31+
32+
- name: Build
33+
uses: gradle/[email protected]
34+
with:
35+
arguments: build --scan --full-stacktrace
2636

2737
- name: Bundle the build report
2838
if: failure()

.github/workflows/pr.yaml

Lines changed: 12 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -17,20 +17,20 @@ jobs:
1717

1818
steps:
1919
- name: Checkout the repo
20-
uses: actions/checkout@v3
20+
uses: actions/[email protected]
21+
with:
22+
fetch-depth: 0
23+
24+
- name: Set up Java
25+
uses: actions/[email protected]
26+
with:
27+
distribution: 'zulu'
28+
java-version: 11
2129

22-
- name: Restore Gradle cache
23-
id: cache
24-
uses: actions/[email protected]
30+
- name: Build
31+
uses: gradle/[email protected]
2532
with:
26-
path: |
27-
~/.gradle/caches
28-
~/.gradle/wrapper
29-
key: ${{ runner.os }}-check-${{ hashFiles('**/*.gradle*', '**/gradle-wrapper.properties') }}
30-
restore-keys: ${{ runner.os }}-check-
31-
32-
- name: Run Check
33-
run: ./gradlew check
33+
arguments: build --scan --full-stacktrace
3434

3535
- name: Bundle the build report
3636
if: failure()

build.gradle.kts

Lines changed: 2 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -34,11 +34,8 @@ dependencies {
3434
api(libs.kotlinx.coroutines.core)
3535
api(libs.kotlinx.coroutines.jdk8)
3636
api(libs.kafka.clients)
37-
38-
testImplementation(libs.kotest.runner.junit5)
39-
testImplementation(libs.kotest.property)
40-
testImplementation(libs.kotest.framework)
41-
testImplementation(libs.kotest.assertions)
37+
38+
testImplementation(libs.bundles.kotest)
4239
}
4340

4441
configure<KnitPluginExtension> {

guide/example/example-admin-01.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package example.exampleAdmin01
22

3-
import com.github.nomisRev.kafka.*
3+
import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

guide/example/example-consumer-01.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package example.exampleConsumer01
22

3-
import com.github.nomisRev.kafka.*
3+
import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

guide/example/example-producer-01.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package example.exampleProducer01
22

3-
import com.github.nomisRev.kafka.*
3+
import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

guide/example/example-producer-02.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package example.exampleProducer02
22

3-
import com.github.nomisRev.kafka.*
3+
import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

guide/example/example-readme-01.kt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
package example.exampleReadme01
22

3-
import com.github.nomisRev.kafka.*
3+
import io.github.nomisRev.kafka.*
44
import java.util.Properties
55
import kotlinx.coroutines.runBlocking
66

Lines changed: 21 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,6 @@
1-
import com.github.dockerjava.api.command.InspectContainerResponse
2-
import com.github.nomisRev.kafka.Admin
3-
import com.github.nomisRev.kafka.AdminSettings
4-
import com.github.nomisRev.kafka.await
5-
import kotlinx.coroutines.runBlocking
61
import org.testcontainers.containers.KafkaContainer
72
import org.testcontainers.utility.DockerImageName
3+
import java.lang.System.getProperty
84

95
/**
106
* A singleton `Kafka` Test Container.
@@ -30,27 +26,28 @@ import org.testcontainers.utility.DockerImageName
3026
* @see https://pawelpluta.com/optimise-testcontainers-for-better-tests-performance/
3127
*/
3228
class Kafka private constructor(imageName: DockerImageName) : KafkaContainer(imageName) {
33-
29+
3430
companion object {
31+
private val image: DockerImageName =
32+
if (getProperty("os.arch") == "aarch64") DockerImageName.parse("niciqy/cp-kafka-arm64:7.0.1")
33+
.asCompatibleSubstituteFor("confluentinc/cp-kafka")
34+
else DockerImageName.parse("confluentinc/cp-kafka:6.2.1")
35+
3536
val container: KafkaContainer by lazy {
36-
Kafka(DockerImageName.parse("confluentinc/cp-kafka:6.2.1"))
37-
.withReuse(true)
38-
.withNetwork(null)
39-
.withLabel("io.github.nomisrev.kafka", "fqn-testcontainers-reuse")
40-
.also { it.start() }
41-
}
42-
}
43-
44-
override fun containerIsStarted(containerInfo: InspectContainerResponse?, reused: Boolean) {
45-
super.containerIsStarted(containerInfo, reused)
46-
// If we're reusing the container, we want to reset the state of the container. We do this by
47-
// deleting all topics.
48-
// if (reused)
49-
runBlocking<Unit> {
50-
Admin(AdminSettings(bootstrapServers)).use { admin ->
51-
val names = admin.listTopics().listings().await()
52-
admin.deleteTopics(names.map { it.name() }).all().await()
53-
}
37+
Kafka(image).also { it.start() }
5438
}
5539
}
40+
41+
// override fun containerIsStarted(containerInfo: InspectContainerResponse?, reused: Boolean) {
42+
// super.containerIsStarted(containerInfo, reused)
43+
// // If we're reusing the container, we want to reset the state of the container. We do this by
44+
// // deleting all topics.
45+
// // if (reused)
46+
// runBlocking<Unit> {
47+
// Admin(AdminSettings(bootstrapServers)).use { admin ->
48+
// val names = admin.listTopics().listings().await()
49+
// admin.deleteTopics(names.map { it.name() }).all().await()
50+
// }
51+
// }
52+
// }
5653
}

guide/src/main/kotlin/main.kt

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
package io.github.nomisRev.kafka
2+
3+
import kotlinx.coroutines.Dispatchers
4+
import kotlinx.coroutines.coroutineScope
5+
import kotlinx.coroutines.flow.Flow
6+
import kotlinx.coroutines.flow.asFlow
7+
import kotlinx.coroutines.flow.collect
8+
import kotlinx.coroutines.flow.map
9+
import kotlinx.coroutines.flow.take
10+
import kotlinx.coroutines.launch
11+
import kotlinx.coroutines.runBlocking
12+
import org.apache.kafka.clients.admin.NewTopic
13+
import org.apache.kafka.clients.producer.ProducerRecord
14+
import org.apache.kafka.common.serialization.IntegerDeserializer
15+
import org.apache.kafka.common.serialization.IntegerSerializer
16+
import org.apache.kafka.common.serialization.StringDeserializer
17+
import org.apache.kafka.common.serialization.StringSerializer
18+
import java.util.UUID
19+
import kotlin.time.Duration.Companion.milliseconds
20+
21+
@JvmInline
22+
value class Key(val index: Int)
23+
24+
@JvmInline
25+
value class Message(val content: String)
26+
27+
fun main(): Unit = runBlocking(Dispatchers.Default) {
28+
val topicName = "test-topic"
29+
val msgCount = 10
30+
val kafka = Kafka.container
31+
32+
Admin(AdminSettings(kafka.bootstrapServers)).use { client ->
33+
client.createTopic(NewTopic(topicName, 1, 1))
34+
}
35+
36+
coroutineScope { // Run produces and consumer in a single scope
37+
launch(Dispatchers.IO) { // Send 20 messages, and then close the producer
38+
val settings: ProducerSettings<Key, Message> = ProducerSettings(
39+
kafka.bootstrapServers,
40+
IntegerSerializer().imap { key: Key -> key.index },
41+
StringSerializer().imap { msg: Message -> msg.content },
42+
Acks.All
43+
)
44+
(1..msgCount)
45+
.map { index -> ProducerRecord(topicName, Key(index), Message("msg: $index")) }
46+
.asFlow()
47+
.produce(settings)
48+
.collect(::println)
49+
}
50+
51+
launch(Dispatchers.IO) { // Consume 20 messages as a stream, and then close the consumer
52+
val settings: ConsumerSettings<Key, Message> = ConsumerSettings(
53+
kafka.bootstrapServers,
54+
IntegerDeserializer().map(::Key),
55+
StringDeserializer().map(::Message),
56+
groupId = UUID.randomUUID().toString(),
57+
autoOffsetReset = AutoOffsetReset.Earliest,
58+
enableAutoCommit = false
59+
)
60+
61+
KafkaConsumer(settings).asFlow()
62+
.subscribeTo(topicName)
63+
.tap { (key, value) -> println("$key -> $value") }
64+
.commitBatchWithin(settings, 3, 10.milliseconds)
65+
.take(4)
66+
.collect()
67+
}
68+
}
69+
}
70+
71+
fun <A> Flow<A>.tap(also: suspend (A) -> Unit): Flow<A> =
72+
map { it.also { also(it) } }

0 commit comments

Comments
 (0)