Skip to content

Commit 224aebe

Browse files
committed
Added k8s container deployment
1 parent a0d5226 commit 224aebe

File tree

2 files changed

+187
-36
lines changed

2 files changed

+187
-36
lines changed

internal/app/container_handler.go

Lines changed: 40 additions & 31 deletions
Original file line numberDiff line numberDiff line change
@@ -300,19 +300,19 @@ func (h *ContainerHandler) idleAppShutdown(ctx context.Context) {
300300
func (h *ContainerHandler) healthChecker(ctx context.Context) {
301301
time.Sleep(60 * time.Second) // wait for 1 minute to let the app start up
302302
h.Debug().Msgf("Health checker started for app %s", h.app.Id)
303+
fullHash, err := h.getAppHash()
304+
if err != nil {
305+
h.Error().Err(err).Msgf("Error getting app hash for %s", h.app.Id)
306+
return
307+
}
308+
containerName := container.GenContainerName(h.app.Id, h.manager, fullHash)
303309
for range h.healthCheckTicker.C {
304-
err := h.WaitForHealth(h.containerConfig.StatusHealthAttempts)
310+
err := h.WaitForHealth(h.containerConfig.StatusHealthAttempts, containerName)
305311
if err == nil {
306312
continue
307313
}
308314
h.Info().Msgf("Health check failed for app %s: %s", h.app.Id, err)
309315

310-
fullHash, err := h.getAppHash()
311-
if err != nil {
312-
h.Error().Err(err).Msgf("Error getting app hash for %s", h.app.Id)
313-
break
314-
}
315-
316316
if h.app.notifyClose != nil {
317317
// Notify the server to close the app so that it gets reinitialized on next API call
318318
h.app.notifyClose <- h.app.AppPathDomain()
@@ -696,7 +696,7 @@ func (h *ContainerHandler) DevReload(ctx context.Context, dryRun bool) error {
696696
h.hostNamePort = hostNamePort
697697

698698
if h.health != "" {
699-
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup)
699+
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup, containerName)
700700
if err != nil {
701701
logs, _ := h.manager.GetContainerLogs(ctx, containerName)
702702
return fmt.Errorf("error waiting for health: %w. Logs\n %s", err, logs)
@@ -706,22 +706,38 @@ func (h *ContainerHandler) DevReload(ctx context.Context, dryRun bool) error {
706706
return nil
707707
}
708708

709-
func (h *ContainerHandler) WaitForHealth(attempts int) error {
709+
func (h *ContainerHandler) WaitForHealth(attempts int, containerName container.ContainerName) error {
710710
client := &http.Client{
711711
Timeout: time.Duration(h.containerConfig.HealthTimeoutSecs) * time.Second,
712712
}
713713

714714
var err error
715715
var resp *http.Response
716+
var hostNamePort string
717+
var running bool
718+
sleepMillis := 50
716719
for attempt := 1; attempt <= attempts; attempt++ {
720+
hostNamePort, running, err = h.manager.GetContainerState(context.Background(), containerName)
721+
if err != nil {
722+
return fmt.Errorf("error getting running containers: %w", err)
723+
}
724+
if running {
725+
h.currentState = ContainerStateRunning
726+
h.hostNamePort = hostNamePort
727+
} else {
728+
h.currentState = ContainerStateUnknown
729+
h.hostNamePort = ""
730+
}
731+
717732
var proxyUrl *url.URL
718733
proxyUrl, err = url.Parse(h.GetProxyUrl())
719-
if err != nil || proxyUrl.Host == "" {
734+
if err != nil || !running || proxyUrl.Host == "" {
720735
if err == nil {
721736
err = fmt.Errorf("could not find container proxy url")
722737
}
723-
sleepSecs := math.Min(float64(attempt), 5)
724-
time.Sleep(time.Duration(sleepSecs) * time.Second)
738+
sleepMillis *= 2
739+
sleepTimeMillis := math.Min(float64(sleepMillis), 5000)
740+
time.Sleep(time.Duration(sleepTimeMillis) * time.Millisecond)
725741
continue
726742
}
727743
if !h.stripAppPath {
@@ -744,8 +760,9 @@ func (h *ContainerHandler) WaitForHealth(attempts int) error {
744760
}
745761

746762
h.Debug().Msgf("Attempt %d failed on %s : status %s err %s", attempt, proxyUrl, statusCode, err)
747-
sleepSecs := math.Min(float64(attempt), 5)
748-
time.Sleep(time.Duration(sleepSecs) * time.Second)
763+
sleepMillis *= 2
764+
sleepTimeMillis := math.Min(float64(sleepMillis), 5000)
765+
time.Sleep(time.Duration(sleepTimeMillis) * time.Millisecond)
749766
}
750767

751768
h.Error().Msgf("Health check failed for app %s after %d attempts: %v", h.app.Id, attempts, err)
@@ -814,6 +831,7 @@ func (h *ContainerHandler) ProdReload(ctx context.Context, dryRun bool) error {
814831
}
815832

816833
if hostNamePort != "" {
834+
// Container is present, make sure it is in the correct state
817835
h.stateLock.Lock()
818836
defer h.stateLock.Unlock()
819837

@@ -825,17 +843,8 @@ func (h *ContainerHandler) ProdReload(ctx context.Context, dryRun bool) error {
825843
return fmt.Errorf("error starting container: %w", err)
826844
}
827845

828-
// Fetch port number after starting the container
829-
hostNamePort, running, err = h.manager.GetContainerState(ctx, containerName)
830-
if err != nil {
831-
return fmt.Errorf("error getting running containers: %w", err)
832-
}
833-
if hostNamePort == "" || !running {
834-
return fmt.Errorf("container not running after starting")
835-
}
836-
h.hostNamePort = hostNamePort
837846
if h.health != "" {
838-
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup)
847+
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup, containerName)
839848
if err != nil {
840849
return fmt.Errorf("error waiting for health: %w", err)
841850
}
@@ -910,6 +919,13 @@ func (h *ContainerHandler) ProdReload(ctx context.Context, dryRun bool) error {
910919
return fmt.Errorf("error starting container after update: %w", err)
911920
}
912921

922+
if h.health != "" {
923+
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup, containerName)
924+
if err != nil {
925+
return fmt.Errorf("error waiting for health: %w", err)
926+
}
927+
}
928+
913929
hostNamePort, running, err := h.manager.GetContainerState(ctx, containerName)
914930
if err != nil {
915931
return fmt.Errorf("error getting running containers: %w", err)
@@ -920,13 +936,6 @@ func (h *ContainerHandler) ProdReload(ctx context.Context, dryRun bool) error {
920936
h.currentState = ContainerStateRunning
921937
h.hostNamePort = hostNamePort
922938

923-
if h.health != "" {
924-
err = h.WaitForHealth(h.containerConfig.HealthAttemptsAfterStartup)
925-
if err != nil {
926-
return fmt.Errorf("error waiting for health: %w", err)
927-
}
928-
}
929-
930939
return nil
931940
}
932941

internal/container/kubernetes.go

Lines changed: 147 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -10,9 +10,16 @@ import (
1010
"time"
1111

1212
"github.com/openrundev/openrun/internal/types"
13+
apps "k8s.io/api/apps/v1"
14+
core "k8s.io/api/core/v1"
15+
apierrors "k8s.io/apimachinery/pkg/api/errors"
16+
meta "k8s.io/apimachinery/pkg/apis/meta/v1"
17+
18+
"k8s.io/apimachinery/pkg/util/intstr"
1319
"k8s.io/client-go/kubernetes"
1420
"k8s.io/client-go/rest"
1521
"k8s.io/client-go/tools/clientcmd"
22+
"k8s.io/client-go/util/retry"
1623
)
1724

1825
type KubernetesContainerManager struct {
@@ -22,6 +29,11 @@ type KubernetesContainerManager struct {
2229
restConfig *rest.Config
2330
}
2431

32+
func sanitizeContainerName(name string) string {
33+
name = sanitizeName(name)
34+
return name[:60] // max length for a Kubernetes object name is 63
35+
}
36+
2537
func NewKubernetesContainerManager(logger *types.Logger, config *types.ServerConfig) (*KubernetesContainerManager, error) {
2638
cfg, err := loadConfig()
2739
if err != nil {
@@ -108,30 +120,82 @@ func (k *KubernetesContainerManager) BuildImage(ctx context.Context, imgName Ima
108120
}
109121

110122
func (k *KubernetesContainerManager) GetContainerState(ctx context.Context, name ContainerName) (string, bool, error) {
111-
return "", false, nil
123+
name = ContainerName(sanitizeContainerName(string(name)))
124+
svc, err := k.clientSet.CoreV1().
125+
Services(k.config.Kubernetes.Namespace).
126+
Get(ctx, string(name), meta.GetOptions{})
127+
if err != nil {
128+
if apierrors.IsNotFound(err) {
129+
return "", false, nil
130+
}
131+
return "", false, fmt.Errorf("get service %s/%s: %w", k.config.Kubernetes.Namespace, string(name), err)
132+
}
133+
if len(svc.Spec.Ports) == 0 {
134+
return "", false, fmt.Errorf("service %s/%s has no ports", k.config.Kubernetes.Namespace, string(name))
135+
}
136+
137+
svcPort := svc.Spec.Ports[0].Port
138+
hostNamePort := fmt.Sprintf("%s.%s.svc.cluster.local:%d", svc.Name, svc.Namespace, svcPort)
139+
140+
// --- Get Deployment & ready pods ---
141+
dep, err := k.clientSet.AppsV1().
142+
Deployments(k.config.Kubernetes.Namespace).
143+
Get(ctx, string(name), meta.GetOptions{})
144+
if err != nil {
145+
return "", false, fmt.Errorf("get deployment %s/%s: %w", k.config.Kubernetes.Namespace, string(name), err)
146+
}
147+
148+
return hostNamePort, dep.Status.ReadyReplicas > 0, nil
112149
}
113150

114151
func (k *KubernetesContainerManager) SupportsInPlaceContainerUpdate() bool {
115-
return true
152+
return false
116153
}
117154

118155
func (k *KubernetesContainerManager) InPlaceContainerUpdate(ctx context.Context, appEntry *types.AppEntry, containerName ContainerName,
119156
imageName ImageName, port int64, envMap map[string]string, mountArgs []string,
120157
containerOptions map[string]string) error {
121-
return nil
158+
// in place upgrade will make it difficult to do atomic upgrade across multiple apps. Instead create a new
159+
// service/deployment during deployment, that will work similar to the way containers are managed in non-k8s scenario.
160+
return fmt.Errorf("in place container update is not supported for kubernetes container manager")
122161
}
123162

124163
func (k *KubernetesContainerManager) StartContainer(ctx context.Context, name ContainerName) error {
125-
return nil
164+
name = ContainerName(sanitizeContainerName(string(name)))
165+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
166+
scale, err := k.clientSet.AppsV1().Deployments(k.config.Kubernetes.Namespace).GetScale(ctx, string(name), meta.GetOptions{})
167+
if err != nil {
168+
return err
169+
}
170+
scale.Spec.Replicas = 1
171+
_, err = k.clientSet.AppsV1().Deployments(k.config.Kubernetes.Namespace).UpdateScale(ctx, string(name), scale, meta.UpdateOptions{})
172+
return err
173+
})
126174
}
127175

128176
func (k *KubernetesContainerManager) StopContainer(ctx context.Context, name ContainerName) error {
129-
return nil
177+
name = ContainerName(sanitizeContainerName(string(name)))
178+
return retry.RetryOnConflict(retry.DefaultRetry, func() error {
179+
scale, err := k.clientSet.AppsV1().Deployments(k.config.Kubernetes.Namespace).GetScale(ctx, string(name), meta.GetOptions{})
180+
if err != nil {
181+
return err
182+
}
183+
scale.Spec.Replicas = 0 // scale down to zero
184+
_, err = k.clientSet.AppsV1().Deployments(k.config.Kubernetes.Namespace).UpdateScale(ctx, string(name), scale, meta.UpdateOptions{})
185+
return err
186+
})
130187
}
131188

132189
func (k *KubernetesContainerManager) RunContainer(ctx context.Context, appEntry *types.AppEntry, containerName ContainerName,
133190
imageName ImageName, port int64, envMap map[string]string, mountArgs []string,
134191
containerOptions map[string]string) error {
192+
imageName = ImageName(k.config.Registry.URL + "/" + string(imageName))
193+
containerName = ContainerName(sanitizeContainerName(string(containerName)))
194+
hostNamePort, err := k.createDeployment(ctx, string(containerName), string(imageName), int32(port))
195+
if err != nil {
196+
return fmt.Errorf("create app: %w", err)
197+
}
198+
k.Logger.Info().Msgf("created app service %s with host name port %s", containerName, hostNamePort)
135199
return nil
136200
}
137201

@@ -146,3 +210,81 @@ func (k *KubernetesContainerManager) VolumeExists(ctx context.Context, name Volu
146210
func (k *KubernetesContainerManager) VolumeCreate(ctx context.Context, name VolumeName) error {
147211
return nil
148212
}
213+
214+
// createDeployment creates a Deployment + Service and returns the Service URL.
215+
func (k *KubernetesContainerManager) createDeployment(ctx context.Context, name, image string, port int32) (string, error) {
216+
labels := map[string]string{"app": name}
217+
replicas := int32(1) // min = max = 1
218+
219+
dep := &apps.Deployment{
220+
ObjectMeta: meta.ObjectMeta{
221+
Name: name,
222+
Namespace: k.config.Kubernetes.Namespace,
223+
Labels: labels,
224+
},
225+
Spec: apps.DeploymentSpec{
226+
Replicas: &replicas,
227+
Selector: &meta.LabelSelector{
228+
MatchLabels: labels,
229+
},
230+
Template: core.PodTemplateSpec{
231+
ObjectMeta: meta.ObjectMeta{
232+
Labels: labels,
233+
},
234+
Spec: core.PodSpec{
235+
Containers: []core.Container{
236+
{
237+
Name: name,
238+
Image: image,
239+
Ports: []core.ContainerPort{
240+
{
241+
ContainerPort: port,
242+
Protocol: core.ProtocolTCP,
243+
},
244+
},
245+
},
246+
},
247+
},
248+
},
249+
},
250+
}
251+
252+
if _, err := k.clientSet.AppsV1().Deployments(k.config.Kubernetes.Namespace).Create(ctx, dep, meta.CreateOptions{}); err != nil {
253+
return "", fmt.Errorf("create deployment: %w", err)
254+
}
255+
256+
svc := &core.Service{
257+
ObjectMeta: meta.ObjectMeta{
258+
Name: name,
259+
Namespace: k.config.Kubernetes.Namespace,
260+
Labels: labels,
261+
},
262+
Spec: core.ServiceSpec{
263+
Type: core.ServiceTypeClusterIP,
264+
Selector: labels,
265+
Ports: []core.ServicePort{
266+
{
267+
Name: "http",
268+
Port: port,
269+
TargetPort: intstr.FromInt(int(port)),
270+
Protocol: core.ProtocolTCP,
271+
},
272+
},
273+
},
274+
}
275+
276+
svc, err := k.clientSet.CoreV1().Services(k.config.Kubernetes.Namespace).Create(ctx, svc, meta.CreateOptions{})
277+
if err != nil {
278+
return "", fmt.Errorf("create service: %w", err)
279+
}
280+
281+
if len(svc.Spec.Ports) == 0 {
282+
return "", fmt.Errorf("service has no ports")
283+
}
284+
285+
// In-cluster DNS URL
286+
servicePort := svc.Spec.Ports[0].Port
287+
url := fmt.Sprintf("%s.%s.svc.cluster.local:%d", svc.Name, svc.Namespace, servicePort)
288+
289+
return url, nil
290+
}

0 commit comments

Comments
 (0)