-
Notifications
You must be signed in to change notification settings - Fork 785
Description
Observed behavior
Given the following scenario:
- One consumer which runs in multiple replicas (i.e. Kubernetes deployment with replicas) (for example 2)
- Multiple existing messages in the waiting to be delivered (for example 10)
- Consumer runs with
Consume()orMessages()(same outcome), and uses a dummy handler which waits 28 seconds, then acks. - The Jetstream server uses the default AckWait of 30 seconds.
I observe the following:
- Replica 1 runs
Consume()(or.Next()) - Replica 1 starts processing message 1.
- Replica 2 runs
Consume()(or.Next()), does nothing because Replica 1 pre-buffered all 10 messages. - Replica 1 acks message 1 and starts processing message 2.
- Due to AckWait being reached, the unacknowledged messages are redelivered, so Replica 2 starts processing message 2.
- Replica 1 and 2 process messages 3 to 8 because they both pre-buffered all of them.
This effectively leads to 2 undesirable outcomes:
- Both replicas consume almost all messages (which should be fine given consumers are idempotent), and this leads to more replicas not increasing the consumption throughput.
- Consume calls the provided handler function with messages which are already expired due to AckWait.
This is an edge case which only occurs when there are multiple replicas of the same consumer running, messages are slow to process and there are multiple messages waiting to be delivered.
Expected behavior
I would say there is 2 options:
- This is the expected behaviour and it would be useful to add it to the readme.
- Users of
Consume()orMessages()should receive messages which have the full AckWait timeout to process instead of the effective AckWait being related to how long it took to process messages within the current batch.
If this is expected behaviour I would like to know if there is a simple way to overcome this. For now, the most reasonable to me seems to use Fetch() instead and create a context with AckWait as timeout right after.
Server and client version
server: v2.11.8
client: v1.45.0
Host environment
No response
Steps to reproduce
To replicate the observed behaviour you can use the following 2 snippets:
server: to create streams and consumers and send some messages:
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
nc, err := nats.Connect("nats://localhost:4222")
if err != nil {
log.Fatal(err)
}
js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}
defer js.CleanupPublisher()
stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: "events",
Retention: jetstream.InterestPolicy,
Subjects: []string{"events.>"},
})
if err != nil {
log.Fatal(err)
}
stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{
Durable: "processor-1",
})
for i := range 10 {
_, err := js.Publish(ctx, "events.example", []byte(fmt.Sprint(i)))
if err != nil {
log.Fatal(err)
}
}
slog.Info("Published events")
}
client: to use the consumer and ack messages after 28 seconds:
func main() {
ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt)
defer cancel()
nc, _ := nats.Connect("nats://localhost:4222")
js, err := jetstream.New(nc)
if err != nil {
log.Fatal(err)
}
cons, err := js.CreateOrUpdateConsumer(ctx, "events", jetstream.ConsumerConfig{
Durable: "processor-1",
})
if err != nil {
log.Fatal(err)
}
cc, _ := cons.Consume(func(msg jetstream.Msg) {
fmt.Println("got msg", string(msg.Data()))
time.Sleep(28 * time.Second)
msg.Ack()
fmt.Println("acked msg", string(msg.Data()))
})
defer cc.Stop()
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, os.Interrupt)
<-sigChan
}
to replicate run a container with jetstream: docker run -d --rm --name nats -p 4222:4222 nats:2.11 -js, run the first snippet once, then run the second snippet 2 times in parallel.