Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
21 changes: 12 additions & 9 deletions internal/healthcheck/extension.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package healthcheck // import "github.com/open-telemetry/opentelemetry-collector

import (
"context"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
Expand Down Expand Up @@ -32,6 +33,7 @@ type HealthCheckExtension struct {
eventCh chan *eventSourcePair
readyCh chan struct{}
host component.Host
shutdownOnce sync.Once
}

var (
Expand Down Expand Up @@ -110,17 +112,18 @@ func (hc *HealthCheckExtension) Start(ctx context.Context, host component.Host)

// Shutdown implements the component.Component interface.
func (hc *HealthCheckExtension) Shutdown(ctx context.Context) error {
// Preemptively send the stopped event, so it can be exported before shutdown
componentstatus.ReportStatus(hc.host, componentstatus.NewEvent(componentstatus.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

var err error
for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}
hc.shutdownOnce.Do(func() {
// Preemptively send the stopped event, so it can be exported before shutdown
componentstatus.ReportStatus(hc.host, componentstatus.NewEvent(componentstatus.StatusStopped))

close(hc.eventCh)
hc.aggregator.Close()

for _, comp := range hc.subcomponents {
err = multierr.Append(err, comp.Shutdown(ctx))
}
})
return err
}

Expand Down
27 changes: 24 additions & 3 deletions internal/healthcheck/extension_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package healthcheck

import (
"context"
"fmt"
"io"
"net"
Expand Down Expand Up @@ -32,6 +33,13 @@ func TestComponentStatus(t *testing.T) {
cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t)
cfg.UseV2 = true
ext := NewHealthCheckExtension(t.Context(), *cfg, extensiontest.NewNopSettings(extensiontest.NopType))
defer func() {
// Use Background context for shutdown in defer to avoid cancellation issues
//nolint:usetesting // defer functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, ext.Shutdown(ctx))
}()

// Status before Start will be StatusNone
st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise)
Expand Down Expand Up @@ -80,7 +88,9 @@ func TestComponentStatus(t *testing.T) {
}, time.Second, 10*time.Millisecond)

require.NoError(t, ext.NotReady())
require.NoError(t, ext.Shutdown(t.Context()))
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
require.NoError(t, ext.Shutdown(ctx))

// Events sent after shutdown will be discarded
for _, id := range traces.InstanceIDs() {
Expand Down Expand Up @@ -111,18 +121,26 @@ func TestNotifyConfig(t *testing.T) {
cfg.HTTPConfig.Config.Path = "/config"

ext := NewHealthCheckExtension(t.Context(), *cfg, extensiontest.NewNopSettings(extensiontest.NopType))
defer func() {
// Use Background context for shutdown in defer to avoid cancellation issues
//nolint:usetesting // defer functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, ext.Shutdown(ctx))
}()

require.NoError(t, ext.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, ext.Shutdown(t.Context())) })

client := &http.Client{}
defer client.CloseIdleConnections()
url := fmt.Sprintf("http://%s/config", endpoint)

var resp *http.Response

resp, err = client.Get(url)
require.NoError(t, err)
assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode)
require.NoError(t, resp.Body.Close())

require.NoError(t, ext.NotifyConfig(t.Context(), confMap))

Expand All @@ -132,6 +150,7 @@ func TestNotifyConfig(t *testing.T) {

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
assert.JSONEq(t, string(confJSON), string(body))
}

Expand All @@ -151,6 +170,8 @@ func TestShutdown(t *testing.T) {
// Get address already in use here
require.Error(t, ext.Start(t.Context(), componenttest.NewNopHost()))

require.NoError(t, ext.Shutdown(t.Context()))
ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second)
defer cancel()
require.NoError(t, ext.Shutdown(ctx))
})
}
17 changes: 15 additions & 2 deletions internal/healthcheck/internal/grpc/grpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package grpc

import (
"context"
"sync"
"testing"
"time"
Expand Down Expand Up @@ -699,7 +700,13 @@ func TestCheck(t *testing.T) {
status.NewAggregator(internalhelpers.ErrPriority(tc.componentHealthSettings)),
)
require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, server.Shutdown(t.Context())) })
t.Cleanup(func() {
// Use Background context for cleanup to avoid cancellation issues
//nolint:usetesting // cleanup functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, server.Shutdown(ctx))
})

cc, err := grpc.NewClient(
addr,
Expand Down Expand Up @@ -1535,7 +1542,13 @@ func TestWatch(t *testing.T) {
status.NewAggregator(internalhelpers.ErrPriority(tc.componentHealthSettings)),
)
require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost()))
t.Cleanup(func() { require.NoError(t, server.Shutdown(t.Context())) })
t.Cleanup(func() {
// Use Background context for cleanup to avoid cancellation issues
//nolint:usetesting // cleanup functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, server.Shutdown(ctx))
})

cc, err := grpc.NewClient(
addr,
Expand Down
19 changes: 15 additions & 4 deletions internal/healthcheck/internal/grpc/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ package grpc // import "github.com/open-telemetry/opentelemetry-collector-contri
import (
"context"
"errors"
"sync"

"go.opentelemetry.io/collector/component"
"go.opentelemetry.io/collector/component/componentstatus"
Expand All @@ -24,6 +25,7 @@ type Server struct {
componentHealthConfig *common.ComponentHealthConfig
telemetry component.TelemetrySettings
doneCh chan struct{}
doneOnce sync.Once
}

var _ component.Component = (*Server)(nil)
Expand Down Expand Up @@ -58,11 +60,13 @@ func (s *Server) Start(ctx context.Context, host component.Host) error {
healthpb.RegisterHealthServer(s.grpcServer, s)
ln, err := s.config.NetAddr.Listen(context.Background())
if err != nil {
// Server never started, ensure doneCh is closed so shutdown doesn't block
s.doneOnce.Do(func() { close(s.doneCh) })
return err
}

go func() {
defer close(s.doneCh)
defer s.doneOnce.Do(func() { close(s.doneCh) })

if err = s.grpcServer.Serve(ln); err != nil && !errors.Is(err, grpc.ErrServerStopped) {
componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err))
Expand All @@ -73,11 +77,18 @@ func (s *Server) Start(ctx context.Context, host component.Host) error {
}

// Shutdown implements the component.Component interface.
func (s *Server) Shutdown(context.Context) error {
func (s *Server) Shutdown(ctx context.Context) error {
if s.grpcServer == nil {
return nil
}
// Stop the server - this will eventually release the port even if context times out
s.grpcServer.GracefulStop()
<-s.doneCh
return nil
select {
case <-s.doneCh:
return nil
case <-ctx.Done():
// Context timed out, but server is stopping. Force stop to ensure port is released.
s.grpcServer.Stop()
return ctx.Err()
}
}
16 changes: 13 additions & 3 deletions internal/healthcheck/internal/http/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ type Server struct {
aggregator *status.Aggregator
startTimestamp time.Time
doneWg sync.WaitGroup
doneCh chan struct{}
doneOnce sync.Once
}

var (
Expand All @@ -53,6 +55,7 @@ func NewServer(
telemetry: telemetry,
mux: http.NewServeMux(),
aggregator: aggregator,
doneCh: make(chan struct{}),
}

if legacyConfig.UseV2 {
Expand Down Expand Up @@ -93,12 +96,15 @@ func (s *Server) Start(ctx context.Context, host component.Host) error {

ln, err := s.httpConfig.ToListener(ctx)
if err != nil {
// Server never started, ensure doneCh is closed so shutdown doesn't block
s.doneOnce.Do(func() { close(s.doneCh) })
return fmt.Errorf("failed to bind to address %s: %w", s.httpConfig.Endpoint, err)
}

s.doneWg.Add(1)
go func() {
defer s.doneWg.Done()
defer s.doneOnce.Do(func() { close(s.doneCh) })

if err = s.httpServer.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil {
componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err))
Expand All @@ -109,13 +115,17 @@ func (s *Server) Start(ctx context.Context, host component.Host) error {
}

// Shutdown implements the component.Component interface.
func (s *Server) Shutdown(context.Context) error {
func (s *Server) Shutdown(ctx context.Context) error {
if s.httpServer == nil {
return nil
}
s.httpServer.Close()
s.doneWg.Wait()
return nil
select {
case <-s.doneCh:
return nil
case <-ctx.Done():
return ctx.Err()
}
}

// NotifyConfig implements the extension.ConfigWatcher interface.
Expand Down
40 changes: 32 additions & 8 deletions internal/healthcheck/internal/http/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package http

import (
"context"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -2945,7 +2946,13 @@ func TestStatus(t *testing.T) {
)

require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost()))
defer func() { require.NoError(t, server.Shutdown(t.Context())) }()
defer func() {
// Use Background context for shutdown in defer to avoid cancellation issues
//nolint:usetesting // defer functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, server.Shutdown(ctx))
}()

var url string
if tc.legacyConfig.UseV2 {
Expand All @@ -2955,6 +2962,7 @@ func TestStatus(t *testing.T) {
}

client := &http.Client{}
defer client.CloseIdleConnections()

for _, ts := range tc.teststeps {
if ts.step != nil {
Expand All @@ -2968,22 +2976,30 @@ func TestStatus(t *testing.T) {

var err error
var resp *http.Response
var body []byte

if ts.eventually {
assert.EventuallyWithT(t, func(tt *assert.CollectT) {
resp, err = client.Get(stepURL)
require.NoError(tt, err)
assert.Equal(tt, ts.expectedStatusCode, resp.StatusCode)
localResp, localErr := client.Get(stepURL)
require.NoError(tt, localErr)
defer localResp.Body.Close()
assert.Equal(tt, ts.expectedStatusCode, localResp.StatusCode)
}, time.Second, 10*time.Millisecond)
// Make a final request to get the body for assertions
resp, err = client.Get(stepURL)
require.NoError(t, err)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
} else {
resp, err = client.Get(stepURL)
require.NoError(t, err)
body, err = io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
assert.Equal(t, ts.expectedStatusCode, resp.StatusCode)
}

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)

assert.Contains(t, string(body), ts.expectedBody)

if ts.expectedComponentStatus != nil {
Expand Down Expand Up @@ -3123,9 +3139,16 @@ func TestConfig(t *testing.T) {
)

require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost()))
defer func() { require.NoError(t, server.Shutdown(t.Context())) }()
defer func() {
// Use Background context for shutdown in defer to avoid cancellation issues
//nolint:usetesting // defer functions may run after test context is cancelled
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
require.NoError(t, server.Shutdown(ctx))
}()

client := &http.Client{}
defer client.CloseIdleConnections()
url := fmt.Sprintf("http://%s%s", tc.config.Endpoint, tc.config.Config.Path)

if tc.setup != nil {
Expand All @@ -3138,6 +3161,7 @@ func TestConfig(t *testing.T) {

body, err := io.ReadAll(resp.Body)
require.NoError(t, err)
require.NoError(t, resp.Body.Close())
assert.Equal(t, tc.expectedBody, body)
})
}
Expand Down
Loading