Skip to content

Jetstream: key header changes when received by the consumer #1968

@Alturino

Description

@Alturino

Observed behavior

The header key changes when received by the consumer the key observed in publisher is
Traceparent but when received by the consumer the header key changes to traceparent

Expected behavior

The header key must not changes when received by the consumer when sent from publisher the key is Traceparent it must preserve the key Traceparent when it is consumed by the consumer

Server and client version

nats:2.11-alpine

Host environment

No response

Steps to reproduce

func publish(ctx context.Context, js jetstream.JetStream) {
	ctx, span := otel.Tracer("publisher").Start(ctx, "publish")
	defer span.End()

	header := make(nats.Header)
	carrier := propagation.HeaderCarrier(header)
	inOtel.GetTextMapPropagator().Inject(ctx, carrier)

	log.Println("publisher headers:", header)

	msgID := randomstring.HumanFriendlyEnglishString(16)
	fut, err := js.PublishMsgAsync(
		&nats.Msg{
			Header:  header,
			Data:    []byte("hello from sender"),
			Subject: "events.download." + msgID,
		},
		jetstream.WithMsgID(msgID),
	)
	if err != nil {
		err = fmt.Errorf("failed to PublishMsgAsync with err: %w", err)
		log.Fatalln(err.Error())
	}
	log.Printf("published message to subject: %s", fut.Msg().Subject)
}
func consume(cs jetstream.Consumer) {
	cc, err := cs.Consume(func(msg jetstream.Msg) {
		carrier := propagation.HeaderCarrier(msg.Headers())
		ctx := inOtel.GetTextMapPropagator().Extract(context.Background(), carrier)

		ctx, span := otel.Tracer("consumer").Start(ctx, "received message")
		defer span.End()

		handleMessage(ctx, msg)

		defer func() {
			log.Println("acknowledging message")
			if err := msg.Ack(); err != nil {
				log.Printf("failed to acknowledge message with err: %s", err.Error())
				return
			}
			log.Println("message acknowledged")
		}()
	})
	if err != nil {
		err = fmt.Errorf("failed to consume with err: %w", err)
		log.Fatalln(err.Error())
	}
	defer func() {
		log.Println("stopping consumer connection")
		cc.Stop()
	}()
}
func handleMessage(ctx context.Context, msg jetstream.Msg) {
	ctx, span := otel.Tracer("consumer").Start(ctx, "handleMessage")
	defer span.End()

	log.Printf("message received from subject: %s, message data: %s", msg.Subject(), msg.Data())
}

Metadata

Metadata

Assignees

Labels

defectSuspected defect such as a bug or regression

Type

No type

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions