Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
228 changes: 154 additions & 74 deletions pkg/cloudprovider/cloudprovider.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"context"
_ "embed"
"fmt"
"log"
"slices"
"strings"
"sync"
Expand Down Expand Up @@ -56,6 +57,8 @@ const (
labelsKey = "capacity.cluster-autoscaler.kubernetes.io/labels"
taintsKey = "capacity.cluster-autoscaler.kubernetes.io/taints"
maxPodsKey = "capacity.cluster-autoscaler.kubernetes.io/maxPods"

machineAnnotation = "cluster.x-k8s.io/machine"
)

func NewCloudProvider(ctx context.Context, kubeClient client.Client, machineProvider machine.Provider, machineDeploymentProvider machinedeployment.Provider) *CloudProvider {
Expand Down Expand Up @@ -89,67 +92,13 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
return nil, fmt.Errorf("cannot satisfy create, NodeClaim is nil")
}

nodeClass, err := c.resolveNodeClassFromNodeClaim(ctx, nodeClaim)
if err != nil {
return nil, fmt.Errorf("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w", nodeClaim.Name, err)
}

instanceTypes, err := c.findInstanceTypesForNodeClass(ctx, nodeClass)
machineDeployment, machine, err := c.provisionMachine(ctx, nodeClaim)
if err != nil {
return nil, fmt.Errorf("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w", nodeClass.Name, nodeClaim.Name, err)
}

// identify which fit requirements
compatibleInstanceTypes := filterCompatibleInstanceTypes(instanceTypes, nodeClaim)
if len(compatibleInstanceTypes) == 0 {
return nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("cannot satisfy create, no compatible instance types found"))
}

// TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
// for now, we sort by resource name and take the first in the list. In the future, this should
// be an option or something more useful like minimum size or cost.
slices.SortFunc(compatibleInstanceTypes, func(a, b *ClusterAPIInstanceType) int {
return cmp.Compare(strings.ToLower(a.Name), strings.ToLower(b.Name))
})
selectedInstanceType := compatibleInstanceTypes[0]

// once scalable resource is identified, increase replicas
machineDeployment, err := c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
if err != nil {
return nil, fmt.Errorf("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err)
}
originalReplicas := *machineDeployment.Spec.Replicas
machineDeployment.Spec.Replicas = ptr.To(originalReplicas + 1)
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
return nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err)
}

// TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
// Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
// then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
// work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
// For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
// Try to find an unclaimed Machine resource for 1 minute.
machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute)
if err != nil {
// unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
// TODO (elmiko) this could probably use improvement to make it more resilient to errors.
machineDeployment.Spec.Replicas = ptr.To(originalReplicas)
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
return nil, fmt.Errorf("cannot satisfy create, error while recovering from failure to find an unclaimed Machine: %w", err)
}
return nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err)
return nil, err
}

// now that we have a Machine for the NodeClaim, we label it as a karpenter member
labels := machine.GetLabels()
labels[providers.NodePoolMemberLabel] = ""
machine.SetLabels(labels)
if err := c.machineProvider.Update(ctx, machine); err != nil {
// if we can't update the Machine with the member label, we need to unwind the addition
// TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
// since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
return nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err)
if machine.Spec.ProviderID == nil {
return nil, fmt.Errorf("cannot satisfy create, waiting for Machine %q to have ProviderID", machine.Name)
}

// fill out nodeclaim with details
Expand All @@ -164,15 +113,29 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)
c.accessLock.Lock()
defer c.accessLock.Unlock()

if len(nodeClaim.Status.ProviderID) == 0 {
return fmt.Errorf("NodeClaim %q does not have a provider ID, cannot delete", nodeClaim.Name)
}
var machine *capiv1beta1.Machine
var err error

// find machine
machine, err := c.machineProvider.Get(ctx, nodeClaim.Status.ProviderID)
if err != nil {
return fmt.Errorf("error finding Machine with provider ID %q to Delete NodeClaim %q: %w", nodeClaim.Status.ProviderID, nodeClaim.Name, err)
if len(nodeClaim.Status.ProviderID) != 0 {
machine, err = c.machineProvider.GetByProviderID(ctx, nodeClaim.Status.ProviderID)
if err != nil {
return fmt.Errorf("error finding Machine with provider ID %q to Delete NodeClaim %q: %w", nodeClaim.Status.ProviderID, nodeClaim.Name, err)
}
} else if machineAnno, ok := nodeClaim.Annotations[machineAnnotation]; ok {
machineNamespace, machineName, err := parseMachineAnnotation(machineAnno)
if err != nil {
return fmt.Errorf("error parsing machine annotation: %w", err)
}

machine, err = c.machineProvider.Get(ctx, machineName, machineNamespace)
if err != nil {
return fmt.Errorf("error finding Machine %q in namespace %s to Delete NodeClaim %q: %w", machineName, machineNamespace, nodeClaim.Name, err)
}
} else {
return fmt.Errorf("NodeClaim %q does not have a provider ID or Machine annotations, cannot delete", nodeClaim.Name)
}

if machine == nil {
return cloudprovider.NewNodeClaimNotFoundError(fmt.Errorf("unable to find Machine with provider ID %q to Delete NodeClaim %q", nodeClaim.Status.ProviderID, nodeClaim.Name))
}
Expand Down Expand Up @@ -225,7 +188,7 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
return nil, fmt.Errorf("no providerID supplied to Get, cannot continue")
}

machine, err := c.machineProvider.Get(ctx, providerID)
machine, err := c.machineProvider.GetByProviderID(ctx, providerID)
if err != nil {
return nil, fmt.Errorf("error getting Machine: %w", err)
}
Expand Down Expand Up @@ -318,6 +281,113 @@ func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
return []cloudprovider.RepairPolicy{}
}

// ProvisionMachine ensures a CAPI Machine exists for the given NodeClaim.
// It creates the Machine if missing, or "gets" it to confirm the asynchronous provisioning status.
func (c *CloudProvider) provisionMachine(ctx context.Context, nodeClaim *karpv1.NodeClaim) (*capiv1beta1.MachineDeployment, *capiv1beta1.Machine, error) {
machineAnno, ok := nodeClaim.Annotations[machineAnnotation]
if !ok {
return c.createMachine(ctx, nodeClaim)
}

machineNamespace, machineName, err := parseMachineAnnotation(machineAnno)
if err != nil {
return nil, nil, fmt.Errorf("error parsing machine annotation: %w", err)
}

machine, err := c.machineProvider.Get(ctx, machineName, machineNamespace)
if err != nil {
return nil, nil, fmt.Errorf("failed to get NodeClaim's Machine %s : %w", machineName, err)
}

machineDeployment, err := c.machineDeploymentFromMachine(ctx, machine)
if err != nil {
return nil, nil, fmt.Errorf("failed to get NodeClaim's MachineDeployment %s : %w", machineName, err)
}

return machineDeployment, machine, nil
}

func (c *CloudProvider) createMachine(ctx context.Context, nodeClaim *karpv1.NodeClaim) (*capiv1beta1.MachineDeployment, *capiv1beta1.Machine, error) {
nodeClass, err := c.resolveNodeClassFromNodeClaim(ctx, nodeClaim)
if err != nil {
return nil, nil, fmt.Errorf("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w", nodeClaim.Name, err)
}

instanceTypes, err := c.findInstanceTypesForNodeClass(ctx, nodeClass)
if err != nil {
return nil, nil, fmt.Errorf("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w", nodeClass.Name, nodeClaim.Name, err)
}

// identify which fit requirements
compatibleInstanceTypes := filterCompatibleInstanceTypes(instanceTypes, nodeClaim)
if len(compatibleInstanceTypes) == 0 {
return nil, nil, cloudprovider.NewInsufficientCapacityError(fmt.Errorf("cannot satisfy create, no compatible instance types found"))
}

// TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
// for now, we sort by resource name and take the first in the list. In the future, this should
// be an option or something more useful like minimum size or cost.
slices.SortFunc(compatibleInstanceTypes, func(a, b *ClusterAPIInstanceType) int {
return cmp.Compare(strings.ToLower(a.Name), strings.ToLower(b.Name))
})
selectedInstanceType := compatibleInstanceTypes[0]

// once scalable resource is identified, increase replicas
machineDeployment, err := c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
if err != nil {
return nil, nil, fmt.Errorf("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err)
}
originalReplicas := *machineDeployment.Spec.Replicas
machineDeployment.Spec.Replicas = ptr.To(originalReplicas + 1)
if err := c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update MachineDeployment %q replicas: %w", machineDeployment.Name, err)
}

// TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
// Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
// then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
// work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
// For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
// Try to find an unclaimed Machine resource for 1 minute.
machine, err := c.pollForUnclaimedMachineInMachineDeploymentWithTimeout(ctx, machineDeployment, time.Minute)
if err != nil {
// unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
// TODO (elmiko) this could probably use improvement to make it more resilient to errors.
defer func() {
machineDeployment, err = c.machineDeploymentProvider.Get(ctx, selectedInstanceType.MachineDeploymentName, selectedInstanceType.MachineDeploymentNamespace)
if err != nil {
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine, unable to find MachineDeployment %q for InstanceType %q: %w", selectedInstanceType.MachineDeploymentName, selectedInstanceType.Name, err))
}

machineDeployment.Spec.Replicas = ptr.To(originalReplicas)
if err = c.machineDeploymentProvider.Update(ctx, machineDeployment); err != nil {
log.Println(fmt.Errorf("error while recovering from failure to find an unclaimed Machine: %w", err))
}
}()

return nil, nil, fmt.Errorf("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w", machineDeployment.Name, err)
}

// now that we have a Machine for the NodeClaim, we label it as a karpenter member
labels := machine.GetLabels()
labels[providers.NodePoolMemberLabel] = ""
machine.SetLabels(labels)
if err := c.machineProvider.Update(ctx, machine); err != nil {
// if we can't update the Machine with the member label, we need to unwind the addition
// TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
// since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
return nil, nil, fmt.Errorf("cannot satisfy create, unable to label Machine %q as a member: %w", machine.Name, err)
}

// Bind the NodeClaim with this machine.
nodeClaim.Annotations[machineAnnotation] = fmt.Sprintf("%s/%s", machine.Namespace, machine.Name)
if err = c.kubeClient.Update(ctx, nodeClaim); err != nil {
return nil, nil, fmt.Errorf("cannot satisfy create, unable to update NodeClaim annotations %q: %w", nodeClaim.Name, err)
}

return machineDeployment, machine, nil
}

func (c *CloudProvider) machineDeploymentFromMachine(ctx context.Context, machine *capiv1beta1.Machine) (*capiv1beta1.MachineDeployment, error) {
mdName, found := machine.GetLabels()[capiv1beta1.MachineDeploymentNameLabel]
if !found {
Expand Down Expand Up @@ -421,15 +491,8 @@ func (c *CloudProvider) pollForUnclaimedMachineInMachineDeploymentWithTimeout(ct
return false, nil
}

// find the first machine with a provider id
for i, m := range machineList {
if m.Spec.ProviderID != nil {
machine = machineList[i]
return true, nil
}
}

return false, nil
machine = machineList[0]
return true, nil
})
if err != nil {
return nil, fmt.Errorf("error polling for an unclaimed Machine in MachineDeployment %q: %w", machineDeployment.Name, err)
Expand Down Expand Up @@ -669,3 +732,20 @@ func zoneLabelFromLabels(labels map[string]string) string {

return zone
}

func parseMachineAnnotation(annotationValue string) (string, string, error) {
parts := strings.Split(annotationValue, "/")
if len(parts) != 2 {
return "", "", fmt.Errorf("invalid machine annotations '%s'. Expected 'namespace/name'", annotationValue)
}

ns := strings.TrimSpace(parts[0])
name := strings.TrimSpace(parts[1])

// Additional validation for empty strings
if ns == "" || name == "" {
return "", "", fmt.Errorf("invalid machine format '%s'. Namespace and name cannot be empty", annotationValue)
}

return ns, name, nil
}
4 changes: 2 additions & 2 deletions pkg/cloudprovider/cloudprovider_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ var _ = Describe("CloudProvider.Delete method", func() {
nodeClaim := karpv1.NodeClaim{}
nodeClaim.Name = "some-node-claim"
err := provider.Delete(context.Background(), &nodeClaim)
Expect(err).To(MatchError(fmt.Errorf("NodeClaim %q does not have a provider ID, cannot delete", nodeClaim.Name)))
Expect(err).To(MatchError(fmt.Errorf("NodeClaim %q does not have a provider ID or Machine annotations, cannot delete", nodeClaim.Name)))
})

It("returns an error when the referenced Machine is not found", func() {
Expand Down Expand Up @@ -193,7 +193,7 @@ var _ = Describe("CloudProvider.Delete method", func() {
Expect(err).ToNot(HaveOccurred())

Eventually(func() map[string]string {
m, err := provider.machineProvider.Get(context.Background(), providerID)
m, err := provider.machineProvider.GetByProviderID(context.Background(), providerID)
Expect(err).ToNot(HaveOccurred())
return m.GetAnnotations()
}).Should(HaveKey(capiv1beta1.DeleteMachineAnnotation))
Expand Down
16 changes: 13 additions & 3 deletions pkg/providers/machine/machine.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,8 @@ import (
)

type Provider interface {
Get(context.Context, string) (*capiv1beta1.Machine, error)
Get(context.Context, string, string) (*capiv1beta1.Machine, error)
GetByProviderID(context.Context, string) (*capiv1beta1.Machine, error)
List(context.Context, *metav1.LabelSelector) ([]*capiv1beta1.Machine, error)
IsDeleting(*capiv1beta1.Machine) bool
AddDeleteAnnotation(context.Context, *capiv1beta1.Machine) error
Expand All @@ -45,10 +46,19 @@ func NewDefaultProvider(_ context.Context, kubeClient client.Client) *DefaultPro
}
}

// Get returns the Machine indicated by the supplied Provider ID or nil if not found.
func (p *DefaultProvider) Get(ctx context.Context, name string, namespace string) (*capiv1beta1.Machine, error) {
machine := &capiv1beta1.Machine{}
err := p.kubeClient.Get(ctx, client.ObjectKey{Name: name, Namespace: namespace}, machine)
if err != nil {
machine = nil
}
return machine, err
}

// GetByProviderID returns the Machine indicated by the supplied Provider ID or nil if not found.
// Because Get is used with a provider ID, it may return a Machine that does not have
// a label for node pool membership.
func (p *DefaultProvider) Get(ctx context.Context, providerID string) (*capiv1beta1.Machine, error) {
func (p *DefaultProvider) GetByProviderID(ctx context.Context, providerID string) (*capiv1beta1.Machine, error) {
machineList := &capiv1beta1.MachineList{}
err := p.kubeClient.List(ctx, machineList)
if err != nil {
Expand Down
12 changes: 6 additions & 6 deletions pkg/providers/machine/machine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
})

It("returns nil when there are no Machines present in API", func() {
machine, err := provider.Get(context.Background(), "")
machine, err := provider.GetByProviderID(context.Background(), "")
Expect(err).ToNot(HaveOccurred())
Expect(machine).To(BeNil())
})
Expand All @@ -86,7 +86,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
machine := newMachine("karpenter-1", "karpenter-cluster", true)
Expect(cl.Create(context.Background(), machine)).To(Succeed())

machine, err := provider.Get(context.Background(), "clusterapi://the-wrong-provider-id")
machine, err := provider.GetByProviderID(context.Background(), "clusterapi://the-wrong-provider-id")
Expect(err).ToNot(HaveOccurred())
Expect(machine).To(BeNil())
})
Expand All @@ -98,7 +98,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
Expect(cl.Create(context.Background(), machine)).To(Succeed())

providerID := *machine.Spec.ProviderID
machine, err := provider.Get(context.Background(), providerID)
machine, err := provider.GetByProviderID(context.Background(), providerID)
Expect(err).ToNot(HaveOccurred())
Expect(machine).Should(HaveField("Name", "karpenter-2"))
})
Expand All @@ -110,7 +110,7 @@ var _ = Describe("Machine DefaultProvider.Get method", func() {
Expect(cl.Create(context.Background(), machine)).To(Succeed())

providerID := *machine.Spec.ProviderID
machine, err := provider.Get(context.Background(), providerID)
machine, err := provider.GetByProviderID(context.Background(), providerID)
Expect(err).ToNot(HaveOccurred())
Expect(machine).Should(HaveField("Name", "karpenter-2"))
})
Expand Down Expand Up @@ -213,7 +213,7 @@ var _ = Describe("Machine DefaultProvider.AddDeleteAnnotation method", func() {
Expect(err).ToNot(HaveOccurred())

Eventually(func() map[string]string {
m, err := provider.Get(context.Background(), *machine.Spec.ProviderID)
m, err := provider.GetByProviderID(context.Background(), *machine.Spec.ProviderID)
Expect(err).ToNot(HaveOccurred())
return m.GetAnnotations()
}).Should(HaveKey(capiv1beta1.DeleteMachineAnnotation))
Expand Down Expand Up @@ -263,7 +263,7 @@ var _ = Describe("Machine DefaultProvider.RemoveDeleteAnnotation method", func()
Expect(err).ToNot(HaveOccurred())

Eventually(func() map[string]string {
m, err := provider.Get(context.Background(), *machine.Spec.ProviderID)
m, err := provider.GetByProviderID(context.Background(), *machine.Spec.ProviderID)
Expect(err).ToNot(HaveOccurred())
return m.GetAnnotations()
}).ShouldNot(HaveKey(capiv1beta1.DeleteMachineAnnotation))
Expand Down