Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion controlplane/kubeadm/internal/controllers/inplace_trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,8 @@ func (r *KubeadmControlPlaneReconciler) triggerInPlaceUpdate(ctx context.Context
// Note: Intentionally using client.Patch (via hooks.MarkAsPending + patchHelper) instead of SSA. Otherwise we would
// have to ensure we preserve PendingHooksAnnotation on existing Machines in KCP and that would lead to race
// conditions when the Machine controller tries to remove the annotation and KCP adds it back.
if err := hooks.MarkAsPending(ctx, r.Client, desiredMachine, runtimehooksv1.UpdateMachine); err != nil {
// Note: This call will update the resourceVersion on desiredMachine, so that WaitForCacheToBeUpToDate also considers this change.
if err := hooks.MarkAsPending(ctx, r.Client, desiredMachine, true, runtimehooksv1.UpdateMachine); err != nil {
return errors.Wrapf(err, "failed to complete triggering in-place update for Machine %s", klog.KObj(machine))
}

Expand Down
4 changes: 2 additions & 2 deletions exp/topology/desiredstate/desired_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -649,7 +649,7 @@ func (g *generator) computeControlPlaneVersion(ctx context.Context, s *scope.Sco
// After BeforeClusterUpgrade unblocked the upgrade, consider the upgrade started.
// As a consequence, the system start tracking the intent of calling AfterClusterUpgrade once the upgrade is complete.
// Note: this also prevent the BeforeClusterUpgrade to be called again (until after the upgrade is completed).
if err := hooks.MarkAsPending(ctx, g.Client, s.Current.Cluster, runtimehooksv1.AfterClusterUpgrade); err != nil {
if err := hooks.MarkAsPending(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.AfterClusterUpgrade); err != nil {
return "", err
}
}
Expand Down Expand Up @@ -685,7 +685,7 @@ func (g *generator) computeControlPlaneVersion(ctx context.Context, s *scope.Sco
if machineDeploymentPendingUpgrade || machinePoolPendingUpgrade {
hooksToBeCalled = append(hooksToBeCalled, runtimehooksv1.BeforeWorkersUpgrade, runtimehooksv1.AfterWorkersUpgrade)
}
if err := hooks.MarkAsPending(ctx, g.Client, s.Current.Cluster, hooksToBeCalled...); err != nil {
if err := hooks.MarkAsPending(ctx, g.Client, s.Current.Cluster, false, hooksToBeCalled...); err != nil {
return "", err
}
}
Expand Down
12 changes: 6 additions & 6 deletions exp/topology/desiredstate/lifecycle_hooks.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (g *generator) callAfterControlPlaneUpgradeHook(ctx context.Context, s *sco
return false, err
}
if len(extensionHandlers) == 0 {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.AfterControlPlaneUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.AfterControlPlaneUpgrade); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -220,7 +220,7 @@ func (g *generator) callAfterControlPlaneUpgradeHook(ctx context.Context, s *sco
)
return false, nil
}
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.AfterControlPlaneUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.AfterControlPlaneUpgrade); err != nil {
return false, err
}

Expand Down Expand Up @@ -248,7 +248,7 @@ func (g *generator) callBeforeWorkersUpgradeHook(ctx context.Context, s *scope.S
return false, err
}
if len(extensionHandlers) == 0 {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.BeforeWorkersUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.BeforeWorkersUpgrade); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -282,7 +282,7 @@ func (g *generator) callBeforeWorkersUpgradeHook(ctx context.Context, s *scope.S
)
return false, nil
}
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.BeforeWorkersUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.BeforeWorkersUpgrade); err != nil {
return false, err
}

Expand Down Expand Up @@ -311,7 +311,7 @@ func (g *generator) callAfterWorkersUpgradeHook(ctx context.Context, s *scope.Sc
return false, err
}
if len(extensionHandlers) == 0 {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.AfterWorkersUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.AfterWorkersUpgrade); err != nil {
return false, err
}
return true, nil
Expand Down Expand Up @@ -344,7 +344,7 @@ func (g *generator) callAfterWorkersUpgradeHook(ctx context.Context, s *scope.Sc
)
return false, nil
}
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, runtimehooksv1.AfterWorkersUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, g.Client, s.Current.Cluster, false, runtimehooksv1.AfterWorkersUpgrade); err != nil {
return false, err
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,9 @@ func (r *Reconciler) completeInPlaceUpdate(ctx context.Context, s *scope) error
}
}

if err := hooks.MarkAsDone(ctx, r.Client, s.machine, runtimehooksv1.UpdateMachine); err != nil {
// Note: This call will not update the resourceVersion on machine, so that the patchHelper in the main
// Reconcile func won't get a conflict.
if err := hooks.MarkAsDone(ctx, r.Client, s.machine, false, runtimehooksv1.UpdateMachine); err != nil {
return err
}

Expand Down
4 changes: 2 additions & 2 deletions internal/controllers/topology/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,7 @@ func (r *Reconciler) reconcileDelete(ctx context.Context, s *scope.Scope) (ctrl.
return ctrl.Result{}, err
}
if len(extensionHandlers) == 0 {
if err := hooks.MarkAsOkToDelete(ctx, r.Client, cluster); err != nil {
if err := hooks.MarkAsOkToDelete(ctx, r.Client, cluster, false); err != nil {
return ctrl.Result{}, err
}
return ctrl.Result{}, nil
Expand Down Expand Up @@ -577,7 +577,7 @@ func (r *Reconciler) reconcileDelete(ctx context.Context, s *scope.Scope) (ctrl.
}
// The BeforeClusterDelete hook returned a non-blocking response. Now the cluster is ready to be deleted.
// Lets mark the cluster as `ok-to-delete`
if err := hooks.MarkAsOkToDelete(ctx, r.Client, cluster); err != nil {
if err := hooks.MarkAsOkToDelete(ctx, r.Client, cluster, false); err != nil {
return ctrl.Result{}, err
}
log.Info(fmt.Sprintf("Cluster deletion is unblocked by %s hook", runtimecatalog.HookName(runtimehooksv1.BeforeClusterDelete)))
Expand Down
8 changes: 4 additions & 4 deletions internal/controllers/topology/cluster/reconcile_state.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,7 +187,7 @@ func (r *Reconciler) callAfterHooks(ctx context.Context, s *scope.Scope) error {
func (r *Reconciler) callAfterControlPlaneInitialized(ctx context.Context, s *scope.Scope) error {
// If the cluster topology is being created then track to intent to call the AfterControlPlaneInitialized hook so that we can call it later.
if !s.Current.Cluster.Spec.InfrastructureRef.IsDefined() && !s.Current.Cluster.Spec.ControlPlaneRef.IsDefined() {
if err := hooks.MarkAsPending(ctx, r.Client, s.Current.Cluster, runtimehooksv1.AfterControlPlaneInitialized); err != nil {
if err := hooks.MarkAsPending(ctx, r.Client, s.Current.Cluster, false, runtimehooksv1.AfterControlPlaneInitialized); err != nil {
return err
}
}
Expand All @@ -211,7 +211,7 @@ func (r *Reconciler) callAfterControlPlaneInitialized(ctx context.Context, s *sc
return err
}
s.HookResponseTracker.Add(runtimehooksv1.AfterControlPlaneInitialized, hookResponse)
if err := hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, runtimehooksv1.AfterControlPlaneInitialized); err != nil {
if err := hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, false, runtimehooksv1.AfterControlPlaneInitialized); err != nil {
return err
}
}
Expand Down Expand Up @@ -259,7 +259,7 @@ func (r *Reconciler) callAfterClusterUpgrade(ctx context.Context, s *scope.Scope
return err
}
if len(extensionHandlers) == 0 {
return hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, runtimehooksv1.AfterClusterUpgrade)
return hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, false, runtimehooksv1.AfterClusterUpgrade)
}

// DeepCopy cluster because ConvertFrom has side effects like adding the conversion annotation.
Expand All @@ -285,7 +285,7 @@ func (r *Reconciler) callAfterClusterUpgrade(ctx context.Context, s *scope.Scope
}

// The hook is successfully called; we can remove this hook from the list of pending-hooks.
if err := hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, runtimehooksv1.AfterClusterUpgrade); err != nil {
if err := hooks.MarkAsDone(ctx, r.Client, s.Current.Cluster, false, runtimehooksv1.AfterClusterUpgrade); err != nil {
return err
}

Expand Down
73 changes: 47 additions & 26 deletions internal/hooks/tracking.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,24 +29,28 @@ import (

runtimev1 "sigs.k8s.io/cluster-api/api/runtime/v1beta2"
runtimecatalog "sigs.k8s.io/cluster-api/exp/runtime/catalog"
"sigs.k8s.io/cluster-api/util/patch"
)

// MarkAsPending adds to the object's PendingHooksAnnotation the intent to execute a hook after an operation completes.
// Usually this function is called when an operation is starting in order to track the intent to call an After<operation> hook later in the process.
func MarkAsPending(ctx context.Context, c client.Client, obj client.Object, hooks ...runtimecatalog.Hook) error {
func MarkAsPending(ctx context.Context, c client.Client, obj client.Object, updateResourceVersionOnObject bool, hooks ...runtimecatalog.Hook) error {
hookNames := []string{}
for _, hook := range hooks {
hookNames = append(hookNames, runtimecatalog.HookName(hook))
}

patchHelper, err := patch.NewHelper(obj, c)
if err != nil {
return errors.Wrapf(err, "failed to mark %q hook(s) as pending", strings.Join(hookNames, ","))
orig := obj.DeepCopyObject().(client.Object)

if changed := MarkObjectAsPending(obj, hooks...); !changed {
return nil
}

MarkObjectAsPending(obj, hooks...)
if err := patchHelper.Patch(ctx, obj); err != nil {
// In some cases it is preferred to not update resourceVersion in the input object,
// because this could lead to conflict errors e.g. when patching at the end of a reconcile loop.
if !updateResourceVersionOnObject {
obj = obj.DeepCopyObject().(client.Object)
}
if err := c.Patch(ctx, obj, client.MergeFrom(orig)); err != nil {
return errors.Wrapf(err, "failed to mark %q hook(s) as pending", strings.Join(hookNames, ","))
}

Expand All @@ -55,7 +59,7 @@ func MarkAsPending(ctx context.Context, c client.Client, obj client.Object, hook

// MarkObjectAsPending adds to the object's PendingHooksAnnotation the intent to execute a hook after an operation completes.
// Usually this function is called when an operation is starting in order to track the intent to call an After<operation> hook later in the process.
func MarkObjectAsPending(obj client.Object, hooks ...runtimecatalog.Hook) {
func MarkObjectAsPending(obj client.Object, hooks ...runtimecatalog.Hook) (changed bool) {
hookNames := []string{}
for _, hook := range hooks {
hookNames = append(hookNames, runtimecatalog.HookName(hook))
Expand All @@ -66,8 +70,16 @@ func MarkObjectAsPending(obj client.Object, hooks ...runtimecatalog.Hook) {
if annotations == nil {
annotations = map[string]string{}
}
annotations[runtimev1.PendingHooksAnnotation] = addToCommaSeparatedList(annotations[runtimev1.PendingHooksAnnotation], hookNames...)

newAnnotationValue := addToCommaSeparatedList(annotations[runtimev1.PendingHooksAnnotation], hookNames...)

if annotations[runtimev1.PendingHooksAnnotation] == newAnnotationValue {
return false
}

annotations[runtimev1.PendingHooksAnnotation] = newAnnotationValue
obj.SetAnnotations(annotations)
return true
}

// IsPending returns true if there is an intent to call a hook being tracked in the object's PendingHooksAnnotation.
Expand All @@ -83,30 +95,33 @@ func IsPending(hook runtimecatalog.Hook, obj client.Object) bool {
// MarkAsDone removes the intent to call a Hook from the object's PendingHooksAnnotation.
// Usually this func is called after all the registered extensions for the Hook returned an answer without requests
// to hold on to the object's lifecycle (retryAfterSeconds).
func MarkAsDone(ctx context.Context, c client.Client, obj client.Object, hooks ...runtimecatalog.Hook) error {
hookNames := []string{}
for _, hook := range hooks {
hookNames = append(hookNames, runtimecatalog.HookName(hook))
func MarkAsDone(ctx context.Context, c client.Client, obj client.Object, updateResourceVersionOnObject bool, hook runtimecatalog.Hook) error {
if !IsPending(hook, obj) {
return nil
}

patchHelper, err := patch.NewHelper(obj, c)
if err != nil {
return errors.Wrapf(err, "failed to mark %q hook(s) as done", strings.Join(hookNames, ","))
}
hookName := runtimecatalog.HookName(hook)

orig := obj.DeepCopyObject().(client.Object)

// Read the annotation of the objects and add the hook to the comma separated list
annotations := obj.GetAnnotations()
if annotations == nil {
annotations = map[string]string{}
}
annotations[runtimev1.PendingHooksAnnotation] = removeFromCommaSeparatedList(annotations[runtimev1.PendingHooksAnnotation], hookNames...)
annotations[runtimev1.PendingHooksAnnotation] = removeFromCommaSeparatedList(annotations[runtimev1.PendingHooksAnnotation], hookName)
if annotations[runtimev1.PendingHooksAnnotation] == "" {
delete(annotations, runtimev1.PendingHooksAnnotation)
}
obj.SetAnnotations(annotations)

if err := patchHelper.Patch(ctx, obj); err != nil {
return errors.Wrapf(err, "failed to mark %q hook(s) as done", strings.Join(hookNames, ","))
// In some cases it is preferred to not update resourceVersion in the input object,
// because this could lead to conflict errors e.g. when patching at the end of a reconcile loop.
if !updateResourceVersionOnObject {
obj = obj.DeepCopyObject().(client.Object)
}
if err := c.Patch(ctx, obj, client.MergeFrom(orig)); err != nil {
return errors.Wrapf(err, "failed to mark %q hook as done", hookName)
}

return nil
Expand All @@ -125,16 +140,17 @@ func IsOkToDelete(obj client.Object) bool {
}

// MarkAsOkToDelete adds the OkToDeleteAnnotation annotation to the object and patches it.
func MarkAsOkToDelete(ctx context.Context, c client.Client, obj client.Object) error {
func MarkAsOkToDelete(ctx context.Context, c client.Client, obj client.Object, updateResourceVersionOnObject bool) error {
if _, ok := obj.GetAnnotations()[runtimev1.OkToDeleteAnnotation]; ok {
return nil
}

gvk, err := apiutil.GVKForObject(obj, c.Scheme())
if err != nil {
return errors.Wrapf(err, "failed to mark %s as ok to delete: failed to get GVK for object", klog.KObj(obj))
}

patchHelper, err := patch.NewHelper(obj, c)
if err != nil {
return errors.Wrapf(err, "failed to mark %s %s as ok to delete", gvk.Kind, klog.KObj(obj))
}
orig := obj.DeepCopyObject().(client.Object)

annotations := obj.GetAnnotations()
if annotations == nil {
Expand All @@ -143,7 +159,12 @@ func MarkAsOkToDelete(ctx context.Context, c client.Client, obj client.Object) e
annotations[runtimev1.OkToDeleteAnnotation] = ""
obj.SetAnnotations(annotations)

if err := patchHelper.Patch(ctx, obj); err != nil {
// In some cases it is preferred to not update resourceVersion in the input object,
// because this could lead to conflict errors e.g. when patching at the end of a reconcile loop.
if !updateResourceVersionOnObject {
obj = obj.DeepCopyObject().(client.Object)
}
Comment on lines +164 to +166
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if !updateResourceVersionOnObject {
obj = obj.DeepCopyObject().(client.Object)
}
// In some cases it is preferred to not update resourceVersion in the input object,
// because this could lead to conflict errors e.g. when patching at the end of a reconcile loop.
if !updateResourceVersionOnObject {
obj = obj.DeepCopyObject().(client.Object)
}

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same in MarkAsDone/MarkAsOkToDelete

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added

if err := c.Patch(ctx, obj, client.MergeFrom(orig)); err != nil {
return errors.Wrapf(err, "failed to mark %s %s as ok to delete", gvk.Kind, klog.KObj(obj))
}

Expand Down
Loading