Skip to content

Jetstream: no consumer disconnect / reconnect on unstable connection with NATS server #1972

@Arlet2

Description

@Arlet2

Observed behavior

If we create consumer with InactiveThreshold and have unstable connection with nats server, server will disconnect with this consumer but this consumer wouldn't have disconnect or recreate. We cannot get messages after this silence disconnect

Expected behavior

After disconnect by server, consumer will execute disconnect / reconnect process

Server and client version

nats server: v2.12.2
nats go client: v1.47.0

Host environment

No response

Steps to reproduce

My test program with steps for reproduce. My host is Linux and examples for emulating bad connection are for this OS. I think it can be reproduced with docker and docker pause / unpause commands.

package main

import (
	"context"
	"fmt"
	"time"

	"github.com/nats-io/nats.go"
	"github.com/nats-io/nats.go/jetstream"
)

const (
	subj       = "test-subj"
	streamName = "test-stream"
)

func main() {
	nc, err := nats.Connect("localhost:4222")
	if err != nil {
		panic(err)
	}

	js, err := jetstream.New(nc)
	if err != nil {
		panic(err)
	}

	ctx := context.Background()

	// Step 1: create general stream and simple consumer (non-durable)
	stream, err := js.CreateOrUpdateStream(ctx, jetstream.StreamConfig{
		Name:     streamName,
		Subjects: []string{subj},
	})
	if err != nil {
		panic(err)
	}

	consumer, err := stream.CreateOrUpdateConsumer(ctx, jetstream.ConsumerConfig{})
	if err != nil {
		panic(err)
	}

	_, err = consumer.Consume(func(msg jetstream.Msg) {
		fmt.Printf("Received message: %v\n", string(msg.Data()))

		err = msg.Ack()
		if err != nil {
			panic(err)
		}
	})
	if err != nil {
		panic(err)
	}

	// Step 2: check message receiving by consumer. Expect printing test message
	_, err = js.Publish(ctx, subj, []byte("Test message 1"))
	if err != nil {
		panic(err)
	}

	// Step 3: pause program with consumer (emulating bad network connection)
	// First time, nats stream info will show active consumer = 1
	// After ~30 seconds we can view active consumer = 0
	fmt.Println("Pause program now: ctrl + Z. Check status: nats stream info test-stream")
	time.Sleep(10 * time.Second)

	// Step 4: unpause program after active consumer = 0 on nats stream info
	// I use "fg" command
	fmt.Println("Check nats status: nats stream info test-stream")

	// Step 5: send second test message.
	// Expected receiving this message / disconnect
	// But nothing happened!
	// After relaunch program we will receive this test message
	_, err = js.Publish(ctx, subj, []byte("Test message 2"))
	if err != nil {
		panic(err)
	}

	<-ctx.Done()
}

Metadata

Metadata

Assignees

Labels

defectSuspected defect such as a bug or regression

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions