@@ -21,6 +21,7 @@ import (
2121 "context"
2222 _ "embed"
2323 "fmt"
24+ "log"
2425 "slices"
2526 "strings"
2627 "sync"
@@ -56,6 +57,9 @@ const (
5657 labelsKey = "capacity.cluster-autoscaler.kubernetes.io/labels"
5758 taintsKey = "capacity.cluster-autoscaler.kubernetes.io/taints"
5859 maxPodsKey = "capacity.cluster-autoscaler.kubernetes.io/maxPods"
60+
61+ machineAnnotation = "cluster-autoscaler.kubernetes.io/machine"
62+ machineNamespaceAnnotation = "cluster-autoscaler.kubernetes.io/machine-namespace"
5963)
6064
6165func NewCloudProvider (ctx context.Context , kubeClient client.Client , machineProvider machine.Provider , machineDeploymentProvider machinedeployment.Provider ) * CloudProvider {
@@ -89,67 +93,13 @@ func (c *CloudProvider) Create(ctx context.Context, nodeClaim *karpv1.NodeClaim)
8993 return nil , fmt .Errorf ("cannot satisfy create, NodeClaim is nil" )
9094 }
9195
92- nodeClass , err := c .resolveNodeClassFromNodeClaim (ctx , nodeClaim )
93- if err != nil {
94- return nil , fmt .Errorf ("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w" , nodeClaim .Name , err )
95- }
96-
97- instanceTypes , err := c .findInstanceTypesForNodeClass (ctx , nodeClass )
98- if err != nil {
99- return nil , fmt .Errorf ("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w" , nodeClass .Name , nodeClaim .Name , err )
100- }
101-
102- // identify which fit requirements
103- compatibleInstanceTypes := filterCompatibleInstanceTypes (instanceTypes , nodeClaim )
104- if len (compatibleInstanceTypes ) == 0 {
105- return nil , cloudprovider .NewInsufficientCapacityError (fmt .Errorf ("cannot satisfy create, no compatible instance types found" ))
106- }
107-
108- // TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
109- // for now, we sort by resource name and take the first in the list. In the future, this should
110- // be an option or something more useful like minimum size or cost.
111- slices .SortFunc (compatibleInstanceTypes , func (a , b * ClusterAPIInstanceType ) int {
112- return cmp .Compare (strings .ToLower (a .Name ), strings .ToLower (b .Name ))
113- })
114- selectedInstanceType := compatibleInstanceTypes [0 ]
115-
116- // once scalable resource is identified, increase replicas
117- machineDeployment , err := c .machineDeploymentProvider .Get (ctx , selectedInstanceType .MachineDeploymentName , selectedInstanceType .MachineDeploymentNamespace )
96+ machineDeployment , machine , err := c .createOrGetMachine (ctx , nodeClaim )
11897 if err != nil {
119- return nil , fmt .Errorf ("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w" , selectedInstanceType .MachineDeploymentName , selectedInstanceType .Name , err )
120- }
121- originalReplicas := * machineDeployment .Spec .Replicas
122- machineDeployment .Spec .Replicas = ptr .To (originalReplicas + 1 )
123- if err := c .machineDeploymentProvider .Update (ctx , machineDeployment ); err != nil {
124- return nil , fmt .Errorf ("cannot satisfy create, unable to update MachineDeployment %q replicas: %w" , machineDeployment .Name , err )
125- }
126-
127- // TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
128- // Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
129- // then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
130- // work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
131- // For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
132- // Try to find an unclaimed Machine resource for 1 minute.
133- machine , err := c .pollForUnclaimedMachineInMachineDeploymentWithTimeout (ctx , machineDeployment , time .Minute )
134- if err != nil {
135- // unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
136- // TODO (elmiko) this could probably use improvement to make it more resilient to errors.
137- machineDeployment .Spec .Replicas = ptr .To (originalReplicas )
138- if err := c .machineDeploymentProvider .Update (ctx , machineDeployment ); err != nil {
139- return nil , fmt .Errorf ("cannot satisfy create, error while recovering from failure to find an unclaimed Machine: %w" , err )
140- }
141- return nil , fmt .Errorf ("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w" , machineDeployment .Name , err )
98+ return nil , err
14299 }
143100
144- // now that we have a Machine for the NodeClaim, we label it as a karpenter member
145- labels := machine .GetLabels ()
146- labels [providers .NodePoolMemberLabel ] = ""
147- machine .SetLabels (labels )
148- if err := c .machineProvider .Update (ctx , machine ); err != nil {
149- // if we can't update the Machine with the member label, we need to unwind the addition
150- // TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
151- // since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
152- return nil , fmt .Errorf ("cannot satisfy create, unable to label Machine %q as a member: %w" , machine .Name , err )
101+ if machine .Spec .ProviderID == nil {
102+ return nil , fmt .Errorf ("cannot satisfy create, waiting for Machine %q to have ProviderID" , machine .Name )
153103 }
154104
155105 // fill out nodeclaim with details
@@ -164,15 +114,25 @@ func (c *CloudProvider) Delete(ctx context.Context, nodeClaim *karpv1.NodeClaim)
164114 c .accessLock .Lock ()
165115 defer c .accessLock .Unlock ()
166116
167- if len (nodeClaim .Status .ProviderID ) == 0 {
168- return fmt .Errorf ("NodeClaim %q does not have a provider ID, cannot delete" , nodeClaim .Name )
169- }
117+ var machine * capiv1beta1.Machine
118+ var err error
170119
171120 // find machine
172- machine , err := c .machineProvider .Get (ctx , nodeClaim .Status .ProviderID )
173- if err != nil {
174- return fmt .Errorf ("error finding Machine with provider ID %q to Delete NodeClaim %q: %w" , nodeClaim .Status .ProviderID , nodeClaim .Name , err )
121+ if len (nodeClaim .Status .ProviderID ) != 0 {
122+ machine , err = c .machineProvider .GetByProviderID (ctx , nodeClaim .Status .ProviderID )
123+ if err != nil {
124+ return fmt .Errorf ("error finding Machine with provider ID %q to Delete NodeClaim %q: %w" , nodeClaim .Status .ProviderID , nodeClaim .Name , err )
125+ }
126+ } else if machineName , ok := nodeClaim .Annotations [machineAnnotation ]; ok {
127+ machineNs := nodeClaim .Annotations [machineNamespaceAnnotation ]
128+ machine , err = c .machineProvider .Get (ctx , machineName , machineNs )
129+ if err != nil {
130+ return fmt .Errorf ("error finding Machine %q in namespace %s to Delete NodeClaim %q: %w" , machineName , machineNs , nodeClaim .Name , err )
131+ }
132+ } else {
133+ return fmt .Errorf ("NodeClaim %q does not have a provider ID or Machine annotations, cannot delete" , nodeClaim .Name )
175134 }
135+
176136 if machine == nil {
177137 return cloudprovider .NewNodeClaimNotFoundError (fmt .Errorf ("unable to find Machine with provider ID %q to Delete NodeClaim %q" , nodeClaim .Status .ProviderID , nodeClaim .Name ))
178138 }
@@ -225,7 +185,7 @@ func (c *CloudProvider) Get(ctx context.Context, providerID string) (*karpv1.Nod
225185 return nil , fmt .Errorf ("no providerID supplied to Get, cannot continue" )
226186 }
227187
228- machine , err := c .machineProvider .Get (ctx , providerID )
188+ machine , err := c .machineProvider .GetByProviderID (ctx , providerID )
229189 if err != nil {
230190 return nil , fmt .Errorf ("error getting Machine: %w" , err )
231191 }
@@ -318,6 +278,111 @@ func (c *CloudProvider) RepairPolicies() []cloudprovider.RepairPolicy {
318278 return []cloudprovider.RepairPolicy {}
319279}
320280
281+ func (c * CloudProvider ) createOrGetMachine (ctx context.Context , nodeClaim * karpv1.NodeClaim ) (* capiv1beta1.MachineDeployment , * capiv1beta1.Machine , error ) {
282+ machineName , ok := nodeClaim .Annotations [machineAnnotation ]
283+ if ! ok {
284+ return c .createMachine (ctx , nodeClaim )
285+ }
286+
287+ machineNamespace := nodeClaim .Annotations [machineNamespaceAnnotation ]
288+ machine , err := c .machineProvider .Get (ctx , machineName , machineNamespace )
289+ if err != nil {
290+ return nil , nil , fmt .Errorf ("failed to get NodeClaim's Machine %s : %w" , machineName , err )
291+ }
292+
293+ machineDeployment , err := c .machineDeploymentFromMachine (ctx , machine )
294+ if err != nil {
295+ return nil , nil , fmt .Errorf ("failed to get NodeClaim's MachineDeployment %s : %w" , machineName , err )
296+ }
297+
298+ return machineDeployment , machine , nil
299+ }
300+
301+ func (c * CloudProvider ) createMachine (ctx context.Context , nodeClaim * karpv1.NodeClaim ) (* capiv1beta1.MachineDeployment , * capiv1beta1.Machine , error ) {
302+ nodeClass , err := c .resolveNodeClassFromNodeClaim (ctx , nodeClaim )
303+ if err != nil {
304+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to resolve NodeClass from NodeClaim %q: %w" , nodeClaim .Name , err )
305+ }
306+
307+ instanceTypes , err := c .findInstanceTypesForNodeClass (ctx , nodeClass )
308+ if err != nil {
309+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to get instance types for NodeClass %q of NodeClaim %q: %w" , nodeClass .Name , nodeClaim .Name , err )
310+ }
311+
312+ // identify which fit requirements
313+ compatibleInstanceTypes := filterCompatibleInstanceTypes (instanceTypes , nodeClaim )
314+ if len (compatibleInstanceTypes ) == 0 {
315+ return nil , nil , cloudprovider .NewInsufficientCapacityError (fmt .Errorf ("cannot satisfy create, no compatible instance types found" ))
316+ }
317+
318+ // TODO (elmiko) if multiple instance types are found to be compatible we need to select one.
319+ // for now, we sort by resource name and take the first in the list. In the future, this should
320+ // be an option or something more useful like minimum size or cost.
321+ slices .SortFunc (compatibleInstanceTypes , func (a , b * ClusterAPIInstanceType ) int {
322+ return cmp .Compare (strings .ToLower (a .Name ), strings .ToLower (b .Name ))
323+ })
324+ selectedInstanceType := compatibleInstanceTypes [0 ]
325+
326+ // once scalable resource is identified, increase replicas
327+ machineDeployment , err := c .machineDeploymentProvider .Get (ctx , selectedInstanceType .MachineDeploymentName , selectedInstanceType .MachineDeploymentNamespace )
328+ if err != nil {
329+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to find MachineDeployment %q for InstanceType %q: %w" , selectedInstanceType .MachineDeploymentName , selectedInstanceType .Name , err )
330+ }
331+ originalReplicas := * machineDeployment .Spec .Replicas
332+ machineDeployment .Spec .Replicas = ptr .To (originalReplicas + 1 )
333+ if err := c .machineDeploymentProvider .Update (ctx , machineDeployment ); err != nil {
334+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to update MachineDeployment %q replicas: %w" , machineDeployment .Name , err )
335+ }
336+
337+ // TODO (elmiko) it would be nice to have a more elegant solution to the asynchronous machine creation.
338+ // Initially, it appeared that we could have a Machine controller which could reconcile new Machines and
339+ // then associate them with NodeClaims by using a sentinel value for the Provider ID. But, this may not
340+ // work as we expect since the karpenter core can use the Provider ID as a key into one of its internal caches.
341+ // For now, the method of waiting for the Machine seemed straightforward although it does make the `Create` method a blocking call.
342+ // Try to find an unclaimed Machine resource for 1 minute.
343+ machine , err := c .pollForUnclaimedMachineInMachineDeploymentWithTimeout (ctx , machineDeployment , time .Minute )
344+ if err != nil {
345+ // unable to find a Machine for the NodeClaim, this could be due to timeout or error, but the replica count needs to be reset.
346+ // TODO (elmiko) this could probably use improvement to make it more resilient to errors.
347+ defer func () {
348+ machineDeployment , err = c .machineDeploymentProvider .Get (ctx , selectedInstanceType .MachineDeploymentName , selectedInstanceType .MachineDeploymentNamespace )
349+ if err != nil {
350+ log .Println (fmt .Errorf ("error while recovering from failure to find an unclaimed Machine, unable to find MachineDeployment %q for InstanceType %q: %w" , selectedInstanceType .MachineDeploymentName , selectedInstanceType .Name , err ))
351+ }
352+
353+ machineDeployment .Spec .Replicas = ptr .To (originalReplicas )
354+ if err = c .machineDeploymentProvider .Update (ctx , machineDeployment ); err != nil {
355+ log .Println (fmt .Errorf ("error while recovering from failure to find an unclaimed Machine: %w" , err ))
356+ }
357+ }()
358+
359+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to find an unclaimed Machine for MachineDeployment %q: %w" , machineDeployment .Name , err )
360+ }
361+
362+ // now that we have a Machine for the NodeClaim, we label it as a karpenter member
363+ labels := machine .GetLabels ()
364+ labels [providers .NodePoolMemberLabel ] = ""
365+ machine .SetLabels (labels )
366+ if err := c .machineProvider .Update (ctx , machine ); err != nil {
367+ // if we can't update the Machine with the member label, we need to unwind the addition
368+ // TODO (elmiko) add more logic here to fix the error, if we are in this state it's not clear how to fix,
369+ // since we have a Machine, we should be reducing the replicas and annotating the Machine for deletion.
370+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to label Machine %q as a member: %w" , machine .Name , err )
371+ }
372+
373+ // Infrastructure instance is not assigned yet to the machine, which can take time.
374+ // Bind the NodeClaim with this machine and return an error until it is assigned
375+ if machine .Spec .ProviderID == nil {
376+ nodeClaim .Annotations [machineAnnotation ] = machine .Name
377+ nodeClaim .Annotations [machineNamespaceAnnotation ] = machine .Namespace
378+ if err = c .kubeClient .Update (ctx , nodeClaim ); err != nil {
379+ return nil , nil , fmt .Errorf ("cannot satisfy create, unable to update NodeClaim annotaitons %q: %w" , nodeClaim .Name , err )
380+ }
381+ }
382+
383+ return machineDeployment , machine , nil
384+ }
385+
321386func (c * CloudProvider ) machineDeploymentFromMachine (ctx context.Context , machine * capiv1beta1.Machine ) (* capiv1beta1.MachineDeployment , error ) {
322387 mdName , found := machine .GetLabels ()[capiv1beta1 .MachineDeploymentNameLabel ]
323388 if ! found {
@@ -421,15 +486,8 @@ func (c *CloudProvider) pollForUnclaimedMachineInMachineDeploymentWithTimeout(ct
421486 return false , nil
422487 }
423488
424- // find the first machine with a provider id
425- for i , m := range machineList {
426- if m .Spec .ProviderID != nil {
427- machine = machineList [i ]
428- return true , nil
429- }
430- }
431-
432- return false , nil
489+ machine = machineList [0 ]
490+ return true , nil
433491 })
434492 if err != nil {
435493 return nil , fmt .Errorf ("error polling for an unclaimed Machine in MachineDeployment %q: %w" , machineDeployment .Name , err )
0 commit comments