diff --git a/internal/healthcheck/extension.go b/internal/healthcheck/extension.go index 34cbd793e9437..5442021792f8a 100644 --- a/internal/healthcheck/extension.go +++ b/internal/healthcheck/extension.go @@ -5,6 +5,7 @@ package healthcheck // import "github.com/open-telemetry/opentelemetry-collector import ( "context" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -32,6 +33,7 @@ type HealthCheckExtension struct { eventCh chan *eventSourcePair readyCh chan struct{} host component.Host + shutdownOnce sync.Once } var ( @@ -110,17 +112,18 @@ func (hc *HealthCheckExtension) Start(ctx context.Context, host component.Host) // Shutdown implements the component.Component interface. func (hc *HealthCheckExtension) Shutdown(ctx context.Context) error { - // Preemptively send the stopped event, so it can be exported before shutdown - componentstatus.ReportStatus(hc.host, componentstatus.NewEvent(componentstatus.StatusStopped)) - - close(hc.eventCh) - hc.aggregator.Close() - var err error - for _, comp := range hc.subcomponents { - err = multierr.Append(err, comp.Shutdown(ctx)) - } + hc.shutdownOnce.Do(func() { + // Preemptively send the stopped event, so it can be exported before shutdown + componentstatus.ReportStatus(hc.host, componentstatus.NewEvent(componentstatus.StatusStopped)) + + close(hc.eventCh) + hc.aggregator.Close() + for _, comp := range hc.subcomponents { + err = multierr.Append(err, comp.Shutdown(ctx)) + } + }) return err } diff --git a/internal/healthcheck/extension_test.go b/internal/healthcheck/extension_test.go index 1551778827077..03efe20749c00 100644 --- a/internal/healthcheck/extension_test.go +++ b/internal/healthcheck/extension_test.go @@ -4,6 +4,7 @@ package healthcheck import ( + "context" "fmt" "io" "net" @@ -32,6 +33,13 @@ func TestComponentStatus(t *testing.T) { cfg.GRPCConfig.NetAddr.Endpoint = testutil.GetAvailableLocalAddress(t) cfg.UseV2 = true ext := NewHealthCheckExtension(t.Context(), *cfg, extensiontest.NewNopSettings(extensiontest.NopType)) + defer func() { + // Use Background context for shutdown in defer to avoid cancellation issues + //nolint:usetesting // defer functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, ext.Shutdown(ctx)) + }() // Status before Start will be StatusNone st, ok := ext.aggregator.AggregateStatus(status.ScopeAll, status.Concise) @@ -80,7 +88,9 @@ func TestComponentStatus(t *testing.T) { }, time.Second, 10*time.Millisecond) require.NoError(t, ext.NotReady()) - require.NoError(t, ext.Shutdown(t.Context())) + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + require.NoError(t, ext.Shutdown(ctx)) // Events sent after shutdown will be discarded for _, id := range traces.InstanceIDs() { @@ -111,11 +121,18 @@ func TestNotifyConfig(t *testing.T) { cfg.HTTPConfig.Config.Path = "/config" ext := NewHealthCheckExtension(t.Context(), *cfg, extensiontest.NewNopSettings(extensiontest.NopType)) + defer func() { + // Use Background context for shutdown in defer to avoid cancellation issues + //nolint:usetesting // defer functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, ext.Shutdown(ctx)) + }() require.NoError(t, ext.Start(t.Context(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, ext.Shutdown(t.Context())) }) client := &http.Client{} + defer client.CloseIdleConnections() url := fmt.Sprintf("http://%s/config", endpoint) var resp *http.Response @@ -123,6 +140,7 @@ func TestNotifyConfig(t *testing.T) { resp, err = client.Get(url) require.NoError(t, err) assert.Equal(t, http.StatusServiceUnavailable, resp.StatusCode) + require.NoError(t, resp.Body.Close()) require.NoError(t, ext.NotifyConfig(t.Context(), confMap)) @@ -132,6 +150,7 @@ func TestNotifyConfig(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) + require.NoError(t, resp.Body.Close()) assert.JSONEq(t, string(confJSON), string(body)) } @@ -151,6 +170,8 @@ func TestShutdown(t *testing.T) { // Get address already in use here require.Error(t, ext.Start(t.Context(), componenttest.NewNopHost())) - require.NoError(t, ext.Shutdown(t.Context())) + ctx, cancel := context.WithTimeout(t.Context(), 10*time.Second) + defer cancel() + require.NoError(t, ext.Shutdown(ctx)) }) } diff --git a/internal/healthcheck/internal/grpc/grpc_test.go b/internal/healthcheck/internal/grpc/grpc_test.go index 33a46060c9b97..5f427dacc2c4e 100644 --- a/internal/healthcheck/internal/grpc/grpc_test.go +++ b/internal/healthcheck/internal/grpc/grpc_test.go @@ -4,6 +4,7 @@ package grpc import ( + "context" "sync" "testing" "time" @@ -699,7 +700,13 @@ func TestCheck(t *testing.T) { status.NewAggregator(internalhelpers.ErrPriority(tc.componentHealthSettings)), ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, server.Shutdown(t.Context())) }) + t.Cleanup(func() { + // Use Background context for cleanup to avoid cancellation issues + //nolint:usetesting // cleanup functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, server.Shutdown(ctx)) + }) cc, err := grpc.NewClient( addr, @@ -1535,7 +1542,13 @@ func TestWatch(t *testing.T) { status.NewAggregator(internalhelpers.ErrPriority(tc.componentHealthSettings)), ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - t.Cleanup(func() { require.NoError(t, server.Shutdown(t.Context())) }) + t.Cleanup(func() { + // Use Background context for cleanup to avoid cancellation issues + //nolint:usetesting // cleanup functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, server.Shutdown(ctx)) + }) cc, err := grpc.NewClient( addr, diff --git a/internal/healthcheck/internal/grpc/server.go b/internal/healthcheck/internal/grpc/server.go index 670ca061bec9f..a43d78a002a63 100644 --- a/internal/healthcheck/internal/grpc/server.go +++ b/internal/healthcheck/internal/grpc/server.go @@ -6,6 +6,7 @@ package grpc // import "github.com/open-telemetry/opentelemetry-collector-contri import ( "context" "errors" + "sync" "go.opentelemetry.io/collector/component" "go.opentelemetry.io/collector/component/componentstatus" @@ -24,6 +25,7 @@ type Server struct { componentHealthConfig *common.ComponentHealthConfig telemetry component.TelemetrySettings doneCh chan struct{} + doneOnce sync.Once } var _ component.Component = (*Server)(nil) @@ -58,11 +60,13 @@ func (s *Server) Start(ctx context.Context, host component.Host) error { healthpb.RegisterHealthServer(s.grpcServer, s) ln, err := s.config.NetAddr.Listen(context.Background()) if err != nil { + // Server never started, ensure doneCh is closed so shutdown doesn't block + s.doneOnce.Do(func() { close(s.doneCh) }) return err } go func() { - defer close(s.doneCh) + defer s.doneOnce.Do(func() { close(s.doneCh) }) if err = s.grpcServer.Serve(ln); err != nil && !errors.Is(err, grpc.ErrServerStopped) { componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err)) @@ -73,11 +77,18 @@ func (s *Server) Start(ctx context.Context, host component.Host) error { } // Shutdown implements the component.Component interface. -func (s *Server) Shutdown(context.Context) error { +func (s *Server) Shutdown(ctx context.Context) error { if s.grpcServer == nil { return nil } + // Stop the server - this will eventually release the port even if context times out s.grpcServer.GracefulStop() - <-s.doneCh - return nil + select { + case <-s.doneCh: + return nil + case <-ctx.Done(): + // Context timed out, but server is stopping. Force stop to ensure port is released. + s.grpcServer.Stop() + return ctx.Err() + } } diff --git a/internal/healthcheck/internal/http/server.go b/internal/healthcheck/internal/http/server.go index 138d8b48d2f55..f3c030368d060 100644 --- a/internal/healthcheck/internal/http/server.go +++ b/internal/healthcheck/internal/http/server.go @@ -34,6 +34,8 @@ type Server struct { aggregator *status.Aggregator startTimestamp time.Time doneWg sync.WaitGroup + doneCh chan struct{} + doneOnce sync.Once } var ( @@ -53,6 +55,7 @@ func NewServer( telemetry: telemetry, mux: http.NewServeMux(), aggregator: aggregator, + doneCh: make(chan struct{}), } if legacyConfig.UseV2 { @@ -93,12 +96,15 @@ func (s *Server) Start(ctx context.Context, host component.Host) error { ln, err := s.httpConfig.ToListener(ctx) if err != nil { + // Server never started, ensure doneCh is closed so shutdown doesn't block + s.doneOnce.Do(func() { close(s.doneCh) }) return fmt.Errorf("failed to bind to address %s: %w", s.httpConfig.Endpoint, err) } s.doneWg.Add(1) go func() { defer s.doneWg.Done() + defer s.doneOnce.Do(func() { close(s.doneCh) }) if err = s.httpServer.Serve(ln); !errors.Is(err, http.ErrServerClosed) && err != nil { componentstatus.ReportStatus(host, componentstatus.NewPermanentErrorEvent(err)) @@ -109,13 +115,17 @@ func (s *Server) Start(ctx context.Context, host component.Host) error { } // Shutdown implements the component.Component interface. -func (s *Server) Shutdown(context.Context) error { +func (s *Server) Shutdown(ctx context.Context) error { if s.httpServer == nil { return nil } s.httpServer.Close() - s.doneWg.Wait() - return nil + select { + case <-s.doneCh: + return nil + case <-ctx.Done(): + return ctx.Err() + } } // NotifyConfig implements the extension.ConfigWatcher interface. diff --git a/internal/healthcheck/internal/http/server_test.go b/internal/healthcheck/internal/http/server_test.go index b30399c6c8bda..5721e37d2fc75 100644 --- a/internal/healthcheck/internal/http/server_test.go +++ b/internal/healthcheck/internal/http/server_test.go @@ -4,6 +4,7 @@ package http import ( + "context" "encoding/json" "fmt" "io" @@ -2945,7 +2946,13 @@ func TestStatus(t *testing.T) { ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - defer func() { require.NoError(t, server.Shutdown(t.Context())) }() + defer func() { + // Use Background context for shutdown in defer to avoid cancellation issues + //nolint:usetesting // defer functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, server.Shutdown(ctx)) + }() var url string if tc.legacyConfig.UseV2 { @@ -2955,6 +2962,7 @@ func TestStatus(t *testing.T) { } client := &http.Client{} + defer client.CloseIdleConnections() for _, ts := range tc.teststeps { if ts.step != nil { @@ -2968,22 +2976,30 @@ func TestStatus(t *testing.T) { var err error var resp *http.Response + var body []byte if ts.eventually { assert.EventuallyWithT(t, func(tt *assert.CollectT) { - resp, err = client.Get(stepURL) - require.NoError(tt, err) - assert.Equal(tt, ts.expectedStatusCode, resp.StatusCode) + localResp, localErr := client.Get(stepURL) + require.NoError(tt, localErr) + defer localResp.Body.Close() + assert.Equal(tt, ts.expectedStatusCode, localResp.StatusCode) }, time.Second, 10*time.Millisecond) + // Make a final request to get the body for assertions + resp, err = client.Get(stepURL) + require.NoError(t, err) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) } else { resp, err = client.Get(stepURL) require.NoError(t, err) + body, err = io.ReadAll(resp.Body) + require.NoError(t, err) + require.NoError(t, resp.Body.Close()) assert.Equal(t, ts.expectedStatusCode, resp.StatusCode) } - body, err := io.ReadAll(resp.Body) - require.NoError(t, err) - assert.Contains(t, string(body), ts.expectedBody) if ts.expectedComponentStatus != nil { @@ -3123,9 +3139,16 @@ func TestConfig(t *testing.T) { ) require.NoError(t, server.Start(t.Context(), componenttest.NewNopHost())) - defer func() { require.NoError(t, server.Shutdown(t.Context())) }() + defer func() { + // Use Background context for shutdown in defer to avoid cancellation issues + //nolint:usetesting // defer functions may run after test context is cancelled + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + require.NoError(t, server.Shutdown(ctx)) + }() client := &http.Client{} + defer client.CloseIdleConnections() url := fmt.Sprintf("http://%s%s", tc.config.Endpoint, tc.config.Config.Path) if tc.setup != nil { @@ -3138,6 +3161,7 @@ func TestConfig(t *testing.T) { body, err := io.ReadAll(resp.Body) require.NoError(t, err) + require.NoError(t, resp.Body.Close()) assert.Equal(t, tc.expectedBody, body) }) }