-
Notifications
You must be signed in to change notification settings - Fork 785
Closed
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression
Description
Observed behavior
If we create consumer with InactiveThreshold and have unstable connection with nats server, server will disconnect with this consumer but this consumer wouldn't have disconnect or recreate. We cannot get messages after this silence disconnect
Expected behavior
After disconnect by server, consumer will execute disconnect / reconnect process
Server and client version
nats server: v2.12.2
nats go client: v1.47.0
Host environment
No response
Steps to reproduce
My test program with steps for reproduce. My host is Linux and examples for emulating bad connection are for this OS. I think it can be reproduced with docker and docker pause / unpause commands.
package main
import (
"context"
"fmt"
"time"
"github.com/nats-io/nats.go"
"github.com/nats-io/nats.go/jetstream"
)
const (
subj = "test-subj"
streamName = "test-stream"
)
func main() {
nc, err := nats.Connect("localhost:4222")
if err != nil {
panic(err)
}
js, err := jetstream.New(nc)
if err != nil {
panic(err)
}
ctx := context.Background()
// Step 1: create general stream and simple consumer (non-durable)
stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
Name: streamName,
Subjects: []string{subj},
})
if err != nil {
panic(err)
}
consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{})
if err != nil {
panic(err)
}
_, err = consumer.Consume(func(msg jetstream.Msg) {
fmt.Printf("Received message: %v\n", string(msg.Data()))
err = msg.Ack()
if err != nil {
panic(err)
}
})
if err != nil {
panic(err)
}
// Step 2: check message receiving by consumer. Expect printing test message
_, err = js.Publish(ctx, subj, []byte("Test message 1"))
if err != nil {
panic(err)
}
// Step 3: pause program with consumer (emulating bad network connection)
// First time, nats stream info will show active consumer = 1
// After ~30 seconds we can view active consumer = 0
fmt.Println("Pause program now: ctrl + Z. Check status: nats stream info test-stream")
time.Sleep(10 * time.Second)
// Step 4: unpause program after active consumer = 0 on nats stream info
// I use "fg" command
fmt.Println("Check nats status: nats stream info test-stream")
// Step 5: send second test message.
// Expected receiving this message / disconnect
// But nothing happened!
// After relaunch program we will receive this test message
_, err = js.Publish(ctx, subj, []byte("Test message 2"))
if err != nil {
panic(err)
}
<-ctx.Done()
}Metadata
Metadata
Assignees
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression