-
Notifications
You must be signed in to change notification settings - Fork 787
Open
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression
Description
Observed behavior
The header key changes when received by the consumer the key observed in publisher is
Traceparent but when received by the consumer the header key changes to traceparent
Expected behavior
The header key must not changes when received by the consumer when sent from publisher the key is Traceparent it must preserve the key Traceparent when it is consumed by the consumer
Server and client version
nats:2.11-alpine
Host environment
No response
Steps to reproduce
func publish(ctx context.Context, js jetstream.JetStream) {
ctx, span := otel.Tracer("publisher").Start(ctx, "publish")
defer span.End()
header := make(nats.Header)
carrier := propagation.HeaderCarrier(header)
inOtel.GetTextMapPropagator().Inject(ctx, carrier)
log.Println("publisher headers:", header)
msgID := randomstring.HumanFriendlyEnglishString(16)
fut, err := js.PublishMsgAsync(
&nats.Msg{
Header: header,
Data: []byte("hello from sender"),
Subject: "events.download." + msgID,
},
jetstream.WithMsgID(msgID),
)
if err != nil {
err = fmt.Errorf("failed to PublishMsgAsync with err: %w", err)
log.Fatalln(err.Error())
}
log.Printf("published message to subject: %s", fut.Msg().Subject)
}
func consume(cs jetstream.Consumer) {
cc, err := cs.Consume(func(msg jetstream.Msg) {
carrier := propagation.HeaderCarrier(msg.Headers())
ctx := inOtel.GetTextMapPropagator().Extract(context.Background(), carrier)
ctx, span := otel.Tracer("consumer").Start(ctx, "received message")
defer span.End()
handleMessage(ctx, msg)
defer func() {
log.Println("acknowledging message")
if err := msg.Ack(); err != nil {
log.Printf("failed to acknowledge message with err: %s", err.Error())
return
}
log.Println("message acknowledged")
}()
})
if err != nil {
err = fmt.Errorf("failed to consume with err: %w", err)
log.Fatalln(err.Error())
}
defer func() {
log.Println("stopping consumer connection")
cc.Stop()
}()
}
func handleMessage(ctx context.Context, msg jetstream.Msg) {
ctx, span := otel.Tracer("consumer").Start(ctx, "handleMessage")
defer span.End()
log.Printf("message received from subject: %s, message data: %s", msg.Subject(), msg.Data())
}paivagustavo
Metadata
Metadata
Assignees
Labels
defectSuspected defect such as a bug or regressionSuspected defect such as a bug or regression