diff --git a/.chloggen/revert_43054.yaml b/.chloggen/revert_43054.yaml new file mode 100644 index 0000000000000..fcfe0a0a23d6b --- /dev/null +++ b/.chloggen/revert_43054.yaml @@ -0,0 +1,30 @@ +# Use this changelog template to create an entry for release notes. + +# One of 'breaking', 'deprecation', 'new_component', 'enhancement', 'bug_fix' +change_type: enhancement + +# The name of the component, or a single word describing the area of concern, (e.g. receiver/filelog) +component: receiver/k8sobjects + +# A brief description of the change. Surround your text with quotes ("") if it needs to start with a backtick (`). +note: Revert standby mode behavior on leader election loss. Receiver now fully shuts down when losing leader lease instead of entering standby mode. + +# Mandatory: One or more tracking issues related to the change. You can use the PR number here if no issue exists. +issues: [43054, 44512] + +# (Optional) One or more lines of additional information to render under the primary note. +# These lines will be padded with 2 spaces and then inserted directly into the document. +# Use pipe (|) for multiline entries. +subtext: | + This reverts the standby mode feature introduced in PR #43054. The receiver will now call Shutdown() + when leader lease is lost, restoring the original behavior. This change also preserves the bug fix + for correct log record counting in watch mode. + +# If your change doesn't affect end users or the exported elements of any package, +# you should instead start your pull request title with [chore] or use the "Skip Changelog" label. +# Optional: The change log or logs in which this entry should be included. +# e.g. '[user]' or '[user, api]' +# Include 'user' if the change is relevant to end users. +# Include 'api' if there is a change to a library API. +# Default: '[user]' +change_logs: [user] diff --git a/receiver/k8sobjectsreceiver/receiver.go b/receiver/k8sobjectsreceiver/receiver.go index 5aa337776d1b1..7e025a86c0bf3 100644 --- a/receiver/k8sobjectsreceiver/receiver.go +++ b/receiver/k8sobjectsreceiver/receiver.go @@ -38,7 +38,6 @@ type k8sobjectsreceiver struct { obsrecv *receiverhelper.ObsReport mu sync.Mutex cancel context.CancelFunc - wg sync.WaitGroup } func newReceiver(params receiver.Settings, config *Config, consumer consumer.Logs) (receiver.Logs, error) { @@ -133,6 +132,8 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er return fmt.Errorf("the extension %T is not implement k8sleaderelector.LeaderElection", k8sLeaderElector) } + // Register callbacks with the leader elector extension. These callbacks remain active + // for the lifetime of the receiver, allowing it to restart when leadership is regained. elector.SetCallBackFuncs( func(ctx context.Context) { cctx, cancel := context.WithCancel(ctx) @@ -142,12 +143,15 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er } kr.setting.Logger.Info("Object Receiver started as leader") }, - // onStoppedLeading: stop watches, but DO NOT shut the whole receiver down func() { - kr.setting.Logger.Info("no longer leader, stopping watches") - kr.stopWatches() - }, - ) + // Shutdown on leader loss. The receiver will restart if leadership is regained + // since the callbacks remain registered with the leader elector extension. + kr.setting.Logger.Info("no longer leader, stopping") + err = kr.Shutdown(context.Background()) + if err != nil { + kr.setting.Logger.Error("shutdown receiver error:", zap.Error(err)) + } + }) } else { cctx, cancel := context.WithCancel(ctx) kr.cancel = cancel @@ -160,37 +164,17 @@ func (kr *k8sobjectsreceiver) Start(ctx context.Context, host component.Host) er } func (kr *k8sobjectsreceiver) Shutdown(context.Context) error { - // Stop informers and wait for them to exit. kr.setting.Logger.Info("Object Receiver stopped") - kr.stopWatches() - if kr.cancel != nil { kr.cancel() - kr.cancel = nil } - return nil -} -// stopWatches closes all informer stop channels (idempotently) and waits for their goroutines to exit. -func (kr *k8sobjectsreceiver) stopWatches() { kr.mu.Lock() - // Copy and clear the list under lock to avoid races on restart - chans := kr.stopperChanList - kr.stopperChanList = nil - kr.mu.Unlock() - - if len(chans) == 0 { - return - } - for _, ch := range chans { - select { - case <-ch: // already closed - default: - close(ch) - } + for _, stopperChan := range kr.stopperChanList { + close(stopperChan) } - // Now wait for all WG-tracked loops (both pull & watch) to exit - kr.wg.Wait() + kr.mu.Unlock() + return nil } func (kr *k8sobjectsreceiver) start(ctx context.Context, object *K8sObjectsConfig) { @@ -225,9 +209,7 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC stopperChan := make(chan struct{}) kr.mu.Lock() kr.stopperChanList = append(kr.stopperChanList, stopperChan) - kr.wg.Add(1) kr.mu.Unlock() - defer kr.wg.Done() ticker := newTicker(ctx, config.Interval) listOption := metav1.ListOptions{ FieldSelector: config.FieldSelector, @@ -248,21 +230,15 @@ func (kr *k8sobjectsreceiver) startPull(ctx context.Context, config *K8sObjectsC kr.setting.Logger.Error("error in pulling object", zap.String("resource", config.gvr.String()), zap.Error(err)) - continue - } - if len(objects.Items) == 0 { - continue + } else if len(objects.Items) > 0 { + logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version) + obsCtx := kr.obsrecv.StartLogsOp(ctx) + logRecordCount := logs.LogRecordCount() + err = kr.consumer.ConsumeLogs(obsCtx, logs) + kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) } - logs := pullObjectsToLogData(objects, time.Now(), config, kr.setting.BuildInfo.Version) - obsCtx := kr.obsrecv.StartLogsOp(ctx) - logRecordCount := logs.LogRecordCount() - err = kr.consumer.ConsumeLogs(obsCtx, logs) - kr.obsrecv.EndLogsOp(obsCtx, metadata.Type.String(), logRecordCount, err) - case <-stopperChan: return - case <-ctx.Done(): - return } } } @@ -271,9 +247,7 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects stopperChan := make(chan struct{}) kr.mu.Lock() kr.stopperChanList = append(kr.stopperChanList, stopperChan) - kr.wg.Add(1) kr.mu.Unlock() - defer kr.wg.Done() if kr.config.IncludeInitialState { kr.sendInitialState(ctx, config, resource) @@ -286,7 +260,6 @@ func (kr *k8sobjectsreceiver) startWatch(ctx context.Context, config *K8sObjects }) cancelCtx, cancel := context.WithCancel(ctx) - defer cancel() cfgCopy := *config wait.UntilWithContext(cancelCtx, func(newCtx context.Context) { resourceVersion, err := getResourceVersion(newCtx, &cfgCopy, resource) @@ -374,10 +347,6 @@ func (kr *k8sobjectsreceiver) doWatch(ctx context.Context, config *K8sObjectsCon res := watcher.ResultChan() for { select { - case <-ctx.Done(): - kr.setting.Logger.Info("context canceled, stopping watch", - zap.String("resource", config.gvr.String())) - return true case data, ok := <-res: if data.Type == apiWatch.Error { errObject := apierrors.FromObject(data.Object) diff --git a/receiver/k8sobjectsreceiver/receiver_test.go b/receiver/k8sobjectsreceiver/receiver_test.go index 04b4aaa011262..6aaa861ee02ab 100644 --- a/receiver/k8sobjectsreceiver/receiver_test.go +++ b/receiver/k8sobjectsreceiver/receiver_test.go @@ -516,247 +516,3 @@ func TestReceiverWithLeaderElection(t *testing.T) { }, 20*time.Second, 100*time.Millisecond, "logs not collected") } - -func TestWatchWithLeaderElectionStandby(t *testing.T) { - t.Parallel() - - fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{} - fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection} - leaderElectorID := component.MustNewID("k8s_leader_elector") - - mockClient := newMockDynamicClient() - mockClient.createPods( - generatePod("pod1", "default", map[string]any{"environment": "production"}, "1"), - ) - - rCfg := createDefaultConfig().(*Config) - rCfg.makeDynamicClient = mockClient.getMockDynamicClient - rCfg.makeDiscoveryClient = getMockDiscoveryClient - rCfg.ErrorMode = PropagateError - rCfg.IncludeInitialState = false - rCfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}}, - } - rCfg.K8sLeaderElector = &leaderElectorID - - r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), rCfg, consumertest.NewNop()) - require.NoError(t, err) - kr := r.(*k8sobjectsreceiver) - sink := new(consumertest.LogsSink) - kr.consumer = sink - - require.NoError(t, kr.Start(t.Context(), fakeHost)) - - // Become leader -> watches will start asynchronously - fakeLeaderElection.InvokeOnLeading() - - // Give the watch time to establish (avoid list→watch gap) - time.Sleep(150 * time.Millisecond) - - // Now create pods that should be observed by the active watch - mockClient.createPods( - generatePod("pod2", "default", map[string]any{"environment": "x"}, "2"), - generatePod("pod3", "default", map[string]any{"environment": "y"}, "3"), - ) - - require.Eventually(t, func() bool { - return sink.LogRecordCount() == 2 - }, 5*time.Second, 50*time.Millisecond, "watch events not collected while leader") - - // Standby - fakeLeaderElection.InvokeOnStopping() - - // Create while in standby -> should NOT be delivered - mockClient.createPods( - generatePod("pod4", "default", map[string]any{"environment": "standby"}, "4"), - ) - time.Sleep(150 * time.Millisecond) - assert.Equal(t, 2, sink.LogRecordCount(), "no events should be received while in standby") - - // Resume - fakeLeaderElection.InvokeOnLeading() - time.Sleep(150 * time.Millisecond) - - mockClient.createPods( - generatePod("pod5", "default", map[string]any{"environment": "resumed"}, "5"), - ) - - require.Eventually(t, func() bool { - return sink.LogRecordCount() == 3 - }, 5*time.Second, 50*time.Millisecond, "watch did not resume after re-leading") - - assert.NoError(t, kr.Shutdown(t.Context())) -} - -func TestPullWithLeaderElectionStandby(t *testing.T) { - t.Parallel() - - fakeLeaderElection := &k8sleaderelectortest.FakeLeaderElection{} - fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLeaderElection} - leaderElectorID := component.MustNewID("k8s_leader_elector") - - mockClient := newMockDynamicClient() - mockClient.createPods(generatePod("pod1", "default", map[string]any{"environment": "production"}, "1")) - - rCfg := createDefaultConfig().(*Config) - rCfg.makeDynamicClient = mockClient.getMockDynamicClient - rCfg.makeDiscoveryClient = getMockDiscoveryClient - rCfg.ErrorMode = PropagateError - rCfg.Objects = []*K8sObjectsConfig{ - { - Name: "pods", - Mode: PullMode, - Interval: 10 * time.Millisecond, // fast pull to make the test snappy - }, - } - rCfg.K8sLeaderElector = &leaderElectorID - - r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), rCfg, consumertest.NewNop()) - require.NoError(t, err) - kr := r.(*k8sobjectsreceiver) - sink := new(consumertest.LogsSink) - kr.consumer = sink - - require.NoError(t, kr.Start(t.Context(), fakeHost)) - - // Become leader: pulls start - fakeLeaderElection.InvokeOnLeading() - - // Expect at least one pull to have happened - require.Eventually(t, func() bool { return sink.LogRecordCount() >= 1 }, - 2*time.Second, 20*time.Millisecond, "pulls did not start while leader") - - // Go standby: pulls stop - fakeLeaderElection.InvokeOnStopping() - countAtStandby := sink.LogRecordCount() - - // Add more pods while in standby—should NOT increase count - mockClient.createPods( - generatePod("pod2", "default", map[string]any{"environment": "standby"}, "2"), - ) - time.Sleep(100 * time.Millisecond) - assert.Equal(t, countAtStandby, sink.LogRecordCount(), "no pulls should occur while in standby") - - // Regain leadership: pulls resume - fakeLeaderElection.InvokeOnLeading() - - // Now the next pull should include the new pod(s) - require.Eventually(t, func() bool { return sink.LogRecordCount() > countAtStandby }, - 2*time.Second, 20*time.Millisecond, "pulls did not resume after re-leading") - - assert.NoError(t, kr.Shutdown(t.Context())) -} - -func TestWatchLeaderFlapDuringStartup_NoPanic(t *testing.T) { - t.Parallel() - - fakeLE := &k8sleaderelectortest.FakeLeaderElection{} - fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLE} - leaderElectorID := component.MustNewID("k8s_leader_elector") - - mockClient := newMockDynamicClient() - mockClient.createPods(generatePod("pod1", "default", map[string]any{"env": "prod"}, "1")) - - cfg := createDefaultConfig().(*Config) - cfg.makeDynamicClient = mockClient.getMockDynamicClient - cfg.makeDiscoveryClient = getMockDiscoveryClient - cfg.ErrorMode = PropagateError - cfg.IncludeInitialState = false - cfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: WatchMode, Namespaces: []string{"default"}}, - } - cfg.K8sLeaderElector = &leaderElectorID - - r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop()) - require.NoError(t, err) - kr := r.(*k8sobjectsreceiver) - - require.NoError(t, kr.Start(t.Context(), fakeHost)) - - // 1) Become leader once and wait until at least one worker is registered - fakeLE.InvokeOnLeading() - require.Eventually(t, func() bool { - kr.mu.Lock() - n := len(kr.stopperChanList) - kr.mu.Unlock() - return n > 0 - }, 2*time.Second, 10*time.Millisecond, "worker not registered") - - // 2) Flap leadership, but give a *tiny* breathing room between lead/stop - const loops = 100 - done := make(chan struct{}) - go func() { - defer close(done) - for range loops { - fakeLE.InvokeOnStopping() - // small gap so stopWatches() can complete and workers exit - time.Sleep(1 * time.Millisecond) - fakeLE.InvokeOnLeading() - // small gap so a worker can get started and reach the watch loop - time.Sleep(1 * time.Millisecond) - } - }() - - select { - case <-done: - case <-time.After(10 * time.Second): - t.Fatal("leader flap goroutine timed out (possible deadlock)") - } - - assert.NoError(t, kr.Shutdown(t.Context())) -} - -func TestPullLeaderFlapDuringStartup_NoPanic(t *testing.T) { - t.Parallel() - - fakeLE := &k8sleaderelectortest.FakeLeaderElection{} - fakeHost := &k8sleaderelectortest.FakeHost{FakeLeaderElection: fakeLE} - leaderElectorID := component.MustNewID("k8s_leader_elector") - - mockClient := newMockDynamicClient() - mockClient.createPods(generatePod("pod1", "default", map[string]any{"env": "prod"}, "1")) - - cfg := createDefaultConfig().(*Config) - cfg.makeDynamicClient = mockClient.getMockDynamicClient - cfg.makeDiscoveryClient = getMockDiscoveryClient - cfg.ErrorMode = PropagateError - cfg.Objects = []*K8sObjectsConfig{ - {Name: "pods", Mode: PullMode, Interval: 5 * time.Millisecond}, - } - cfg.K8sLeaderElector = &leaderElectorID - - r, err := newReceiver(receivertest.NewNopSettings(metadata.Type), cfg, consumertest.NewNop()) - require.NoError(t, err) - kr := r.(*k8sobjectsreceiver) - - require.NoError(t, kr.Start(t.Context(), fakeHost)) - - // Ensure a worker is registered before flapping - fakeLE.InvokeOnLeading() - require.Eventually(t, func() bool { - kr.mu.Lock() - n := len(kr.stopperChanList) - kr.mu.Unlock() - return n > 0 - }, 2*time.Second, 10*time.Millisecond) - - const loops = 100 - done := make(chan struct{}) - go func() { - defer close(done) - for range loops { - fakeLE.InvokeOnStopping() - time.Sleep(1 * time.Millisecond) - fakeLE.InvokeOnLeading() - time.Sleep(1 * time.Millisecond) - } - }() - - select { - case <-done: - case <-time.After(10 * time.Second): - t.Fatal("leader flap goroutine timed out (possible deadlock)") - } - - assert.NoError(t, kr.Shutdown(t.Context())) -}