Skip to content
Open
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ This project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.htm
- The `go.opentelemetry.io/contrib/detectors/autodetect` package is added to automatically compose user defined `resource.Detector`s at runtime. (#7522)
- Add the `WithLoggerProviderOptions`, `WithMeterProviderOptions` and `WithTracerProviderOptions` options to `NewSDK` to allow passing custom options to providers in `go.opentelemetry.io/contrib/otelconf`. (#7552)
- Added V2 version of AWS EC2 detector `go.opentelemetry.io/contrib/detectors/aws/ec2/v2` due to deprecation of `github.com/aws/aws-sdk-go`. (#6961)
- Handle the `WithMessageEvents` option for `Transport` in `go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp`. (#7513)

<!-- Released section -->
<!-- Don't change this section unless doing release -->
Expand Down
9 changes: 5 additions & 4 deletions instrumentation/net/http/otelhttp/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,15 +152,16 @@ const (
WriteEvents
)

// WithMessageEvents configures the Handler to record the specified events
// WithMessageEvents configures the Handler or Transport to record the specified events
// (span.AddEvent) on spans. By default only summary attributes are added at the
// end of the request.
//
// Valid events are:
// - ReadEvents: Record the number of bytes read after every http.Request.Body.Read
// using the ReadBytesKey
// - WriteEvents: Record the number of bytes written after every http.ResponeWriter.Write
// using the WriteBytesKey
// using the ReadBytesKey for Handler. For Transport, it will record the number of
// bytes read after every http.Response.Body.Read.
// - WriteEvents: Record the number of bytes written after every http.ResponseWriter.Write
// using the WriteBytesKey for Handler. Ignored for Transport.
func WithMessageEvents(events ...event) Option {
return optionFunc(func(c *config) {
for _, e := range events {
Expand Down
16 changes: 12 additions & 4 deletions instrumentation/net/http/otelhttp/transport.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ type Transport struct {
spanNameFormatter func(string, *http.Request) string
clientTrace func(context.Context) *httptrace.ClientTrace
metricAttributesFn func(*http.Request) []attribute.KeyValue
readEvent bool

semconv semconv.HTTPClient
}
Expand Down Expand Up @@ -74,6 +75,7 @@ func (t *Transport) applyConfig(c *config) {
t.clientTrace = c.ClientTrace
t.semconv = semconv.NewHTTPClient(c.Meter)
t.metricAttributesFn = c.MetricAttributesFn
t.readEvent = c.ReadEvent
}

func defaultTransportFormatter(_ string, r *http.Request) string {
Expand Down Expand Up @@ -153,7 +155,7 @@ func (t *Transport) RoundTrip(r *http.Request) (*http.Response, error) {
t.semconv.RecordResponseSize(ctx, n, metricOpts)
}

res.Body = newWrappedBody(span, readRecordFunc, res.Body)
res.Body = newWrappedBody(span, readRecordFunc, res.Body, t.readEvent)
}

// Use floating point division here for higher precision (instead of Millisecond method).
Expand Down Expand Up @@ -198,17 +200,17 @@ func (t *Transport) metricAttributesFromRequest(r *http.Request) []attribute.Key
// newWrappedBody returns a new and appropriately scoped *wrappedBody as an
// io.ReadCloser. If the passed body implements io.Writer, the returned value
// will implement io.ReadWriteCloser.
func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser) io.ReadCloser {
func newWrappedBody(span trace.Span, record func(n int64), body io.ReadCloser, readEvent bool) io.ReadCloser {
// The successful protocol switch responses will have a body that
// implement an io.ReadWriteCloser. Ensure this interface type continues
// to be satisfied if that is the case.
if _, ok := body.(io.ReadWriteCloser); ok {
return &wrappedBody{span: span, record: record, body: body}
return &wrappedBody{span: span, record: record, body: body, readEvent: readEvent}
}

// Remove the implementation of the io.ReadWriteCloser and only implement
// the io.ReadCloser.
return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body}}
return struct{ io.ReadCloser }{&wrappedBody{span: span, record: record, body: body, readEvent: readEvent}}
}

// wrappedBody is the response body type returned by the transport
Expand All @@ -225,6 +227,8 @@ type wrappedBody struct {
record func(n int64)
body io.ReadCloser
read atomic.Int64

readEvent bool
}

var _ io.ReadWriteCloser = &wrappedBody{}
Expand All @@ -244,6 +248,10 @@ func (wb *wrappedBody) Read(b []byte) (int, error) {
// Record the number of bytes read
wb.read.Add(int64(n))

if wb.readEvent {
wb.span.AddEvent("read", trace.WithAttributes(ReadBytesKey.Int64(int64(n))))
}

switch err {
case nil:
// nothing to do here but fall through to the return
Expand Down
80 changes: 70 additions & 10 deletions instrumentation/net/http/otelhttp/transport_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ func TestWrappedBodyRead(t *testing.T) {
s := new(span)
called := false
record := func(numBytes int64) { called = true }
wb := newWrappedBody(s, record, readCloser{})
wb := newWrappedBody(s, record, readCloser{}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.NoError(t, err)
Expand All @@ -276,7 +276,7 @@ func TestWrappedBodyReadEOFError(t *testing.T) {
called = true
numRecorded = numBytes
}
wb := newWrappedBody(s, record, readCloser{readErr: io.EOF})
wb := newWrappedBody(s, record, readCloser{readErr: io.EOF}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.Equal(t, io.EOF, err)
Expand All @@ -290,7 +290,7 @@ func TestWrappedBodyReadError(t *testing.T) {
called := false
record := func(int64) { called = true }
expectedErr := errors.New("test")
wb := newWrappedBody(s, record, readCloser{readErr: expectedErr})
wb := newWrappedBody(s, record, readCloser{readErr: expectedErr}, false)
n, err := wb.Read([]byte{})
assert.Equal(t, readSize, n, "wrappedBody returned wrong bytes")
assert.Equal(t, expectedErr, err)
Expand All @@ -302,7 +302,7 @@ func TestWrappedBodyClose(t *testing.T) {
s := new(span)
called := false
record := func(int64) { called = true }
wb := newWrappedBody(s, record, readCloser{})
wb := newWrappedBody(s, record, readCloser{}, false)
assert.NoError(t, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.True(t, called, "record should have been called")
Expand All @@ -311,7 +311,7 @@ func TestWrappedBodyClose(t *testing.T) {
func TestWrappedBodyClosePanic(t *testing.T) {
s := new(span)
var body io.ReadCloser
wb := newWrappedBody(s, func(n int64) {}, body)
wb := newWrappedBody(s, func(n int64) {}, body, false)
assert.NotPanics(t, func() { wb.Close() }, "nil body should not panic on close")
}

Expand All @@ -320,7 +320,7 @@ func TestWrappedBodyCloseError(t *testing.T) {
called := false
record := func(int64) { called = true }
expectedErr := errors.New("test")
wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr})
wb := newWrappedBody(s, record, readCloser{closeErr: expectedErr}, false)
assert.Equal(t, expectedErr, wb.Close())
s.assert(t, true, nil, codes.Unset, "")
assert.True(t, called, "record should have been called")
Expand All @@ -339,12 +339,12 @@ func (rwc readWriteCloser) Write([]byte) (int, error) {
}

func TestNewWrappedBodyReadWriteCloserImplementation(t *testing.T) {
wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{})
wb := newWrappedBody(nil, func(n int64) {}, readWriteCloser{}, false)
assert.Implements(t, (*io.ReadWriteCloser)(nil), wb)
}

func TestNewWrappedBodyReadCloserImplementation(t *testing.T) {
wb := newWrappedBody(nil, func(n int64) {}, readCloser{})
wb := newWrappedBody(nil, func(n int64) {}, readCloser{}, false)
assert.Implements(t, (*io.ReadCloser)(nil), wb)

_, ok := wb.(io.ReadWriteCloser)
Expand All @@ -355,7 +355,7 @@ func TestWrappedBodyWrite(t *testing.T) {
s := new(span)
var rwc io.ReadWriteCloser
assert.NotPanics(t, func() {
rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}).(io.ReadWriteCloser)
rwc = newWrappedBody(s, func(n int64) {}, readWriteCloser{}, false).(io.ReadWriteCloser)
})

n, err := rwc.Write([]byte{})
Expand All @@ -373,7 +373,7 @@ func TestWrappedBodyWriteError(t *testing.T) {
func(n int64) {},
readWriteCloser{
writeErr: expectedErr,
}).(io.ReadWriteCloser)
}, false).(io.ReadWriteCloser)
})
n, err := rwc.Write([]byte{})
assert.Equal(t, writeSize, n, "wrappedBody returned wrong bytes")
Expand Down Expand Up @@ -617,6 +617,66 @@ func TestTransportErrorStatus(t *testing.T) {
}
}

func TestTransportWithMessageEventsReadEvents(t *testing.T) {
// Prepare tracing stuff.
spanRecorder := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(sdktrace.WithSpanProcessor(spanRecorder))

content := []byte("Hello, world!")
ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
_, err := w.Write(content)
assert.NoError(t, err)
}))
defer ts.Close()

// Create our Transport and make request.
tr := NewTransport(
http.DefaultTransport,
WithTracerProvider(provider),
WithMessageEvents(ReadEvents),
)
c := http.Client{Transport: tr}
r, err := http.NewRequest(http.MethodGet, ts.URL, nil)
if err != nil {
t.Fatal(err)
}
Comment on lines +640 to +642
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not require.NoError?

res, err := c.Do(r) // nolint:bodyclose // False-positive.
require.NoError(t, err)
defer func() { assert.NoError(t, res.Body.Close()) }()

_, err = io.ReadAll(res.Body)
require.NoError(t, err)

// Check span.
spans := spanRecorder.Ended()
if len(spans) != 1 {
t.Fatalf("expected 1 span; got: %d", len(spans))
}
Comment on lines +652 to +654
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use `require.Len`` instead?

span := spans[0]

events := span.Events()
require.NotEmpty(t, events, "expected span to have events")

var readEvent sdktrace.Event
for _, ev := range events {
if ev.Name == "read" {
readEvent = ev
break
}
}
require.NotEmpty(t, readEvent, "expected span to have a 'read' event")

var readBytesAttr attribute.KeyValue
for _, attr := range readEvent.Attributes {
if attr.Key == ReadBytesKey {
readBytesAttr = attr
break
}
}
require.NotEmpty(t, readBytesAttr, "expected span to have a read bytes attribute")
require.Positive(t, readBytesAttr.Value.AsInt64(), "expected read bytes attribute to have a non-zero int64 value")
}

func TestTransportRequestWithTraceContext(t *testing.T) {
spanRecorder := tracetest.NewSpanRecorder()
provider := sdktrace.NewTracerProvider(
Expand Down