Skip to content

Commit 31eb6aa

Browse files
authored
feat: Add check_cluster_upgrades function (#117)
* feat: Add check_cluster_upgrades function This commit introduces a new function to check for Kubernetes version upgrades in all clusters. It compares the current cluster version with the latest stable version and reports which clusters need an upgrade. * fix: Handle no clusters found in check_cluster_upgrades This commit updates the check_cluster_upgrades function to handle the case where no clusters are discovered. It now returns a success status with a message indicating that no clusters were found, instead of an error. * fix: Improve cluster discovery in check_cluster_upgrades This commit modifies the _discover_clusters method in check_cluster_upgrades.py to provide more informative error messages when kubectl config get-contexts fails or returns invalid JSON. It also ensures that WDS clusters are correctly filtered out.
1 parent 06ceb1e commit 31eb6aa

File tree

3 files changed

+215
-5
lines changed

3 files changed

+215
-5
lines changed

src/shared/functions/__init__.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515
from src.shared.functions.multicluster_create import MultiClusterCreateFunction
1616
from src.shared.functions.multicluster_logs import MultiClusterLogsFunction
1717
from src.shared.functions.namespace_utils import NamespaceUtilsFunction
18+
from src.shared.functions.check_cluster_upgrades import CheckClusterUpgradesFunction
1819

1920

2021
def initialize_functions():
@@ -49,4 +50,7 @@ def initialize_functions():
4950
function_registry.register(GVRCDiscoveryFunction())
5051
function_registry.register(NamespaceUtilsFunction())
5152

53+
# Register cluster upgrade check function
54+
function_registry.register(CheckClusterUpgradesFunction())
55+
5256
# Add more function registrations here as they are created
Lines changed: 206 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,206 @@
1+
from __future__ import annotations
2+
3+
import asyncio
4+
import json
5+
from dataclasses import dataclass, field
6+
from typing import Any, Dict, List, Optional
7+
8+
import aiohttp
9+
10+
from src.shared.base_functions import BaseFunction
11+
12+
13+
@dataclass
14+
class ClusterUpgradeStatus:
15+
"""Represents the upgrade status of a single cluster."""
16+
17+
cluster_name: str
18+
current_version: str
19+
latest_version: str
20+
upgrade_needed: bool
21+
error: Optional[str] = None
22+
23+
24+
@dataclass
25+
class CheckClusterUpgradesOutput:
26+
"""Output for the check_cluster_upgrades function."""
27+
28+
status: str
29+
clusters: List[ClusterUpgradeStatus] = field(default_factory=list)
30+
summary: str = ""
31+
32+
33+
class CheckClusterUpgradesFunction(BaseFunction):
34+
"""
35+
Checks for available Kubernetes version upgrades for all clusters.
36+
"""
37+
38+
def __init__(self):
39+
super().__init__(
40+
name="check_cluster_upgrades",
41+
description="Checks for available Kubernetes version upgrades for all clusters.",
42+
)
43+
44+
async def execute(self, kubeconfig: str = "") -> Dict[str, Any]:
45+
"""
46+
Checks for available Kubernetes version upgrades for all clusters.
47+
48+
Args:
49+
kubeconfig: Path to the kubeconfig file.
50+
51+
Returns:
52+
A dictionary with the upgrade status of each cluster.
53+
"""
54+
try:
55+
latest_version = await self._get_latest_stable_k8s_version()
56+
if not latest_version:
57+
return {
58+
"status": "error",
59+
"error": "Could not fetch the latest stable Kubernetes version.",
60+
}
61+
62+
clusters = await self._discover_clusters(kubeconfig)
63+
if not clusters:
64+
return CheckClusterUpgradesOutput(
65+
status="success",
66+
summary="No clusters discovered.",
67+
).__dict__
68+
69+
upgrade_statuses = await asyncio.gather(
70+
*[
71+
self._get_cluster_upgrade_status(cluster, latest_version, kubeconfig)
72+
for cluster in clusters
73+
]
74+
)
75+
76+
output = CheckClusterUpgradesOutput(
77+
status="success",
78+
clusters=[status for status in upgrade_statuses if status],
79+
summary=f"Compared against the latest stable Kubernetes version: {latest_version}",
80+
)
81+
82+
return output.__dict__
83+
84+
except Exception as e:
85+
return {"status": "error", "error": str(e)}
86+
87+
async def _get_latest_stable_k8s_version(self) -> Optional[str]:
88+
"""Fetches the latest stable Kubernetes version from Google's GCS bucket."""
89+
url = "https://storage.googleapis.com/kubernetes-release/release/stable.txt"
90+
try:
91+
async with aiohttp.ClientSession() as session:
92+
async with session.get(url) as response:
93+
if response.status == 200:
94+
return (await response.text()).strip()
95+
except Exception:
96+
return None
97+
return None
98+
99+
async def _discover_clusters(self, kubeconfig: str) -> List[Dict[str, Any]]:
100+
"""Discover available clusters using kubectl."""
101+
cmd = ["kubectl", "config", "get-contexts", "-o", "json"]
102+
if kubeconfig:
103+
cmd.extend(["--kubeconfig", kubeconfig])
104+
105+
result = await self._run_command(cmd)
106+
if result["returncode"] != 0:
107+
# Return an empty list but log the error or make it accessible for debugging
108+
print(f"Error discovering clusters: {result['stderr']}")
109+
return []
110+
111+
try:
112+
contexts = json.loads(result["stdout"])["contexts"]
113+
return [
114+
{"name": context["name"], "context": context["name"]}
115+
for context in contexts
116+
if "wds" not in context["name"]
117+
]
118+
except (json.JSONDecodeError, KeyError) as e:
119+
print(f"Error parsing kubectl contexts: {e}")
120+
return []
121+
122+
async def _get_cluster_upgrade_status(
123+
self, cluster: Dict[str, Any], latest_version: str, kubeconfig: str
124+
) -> Optional[ClusterUpgradeStatus]:
125+
"""Gets the upgrade status of a single cluster."""
126+
cmd = [
127+
"kubectl",
128+
"get",
129+
"nodes",
130+
"-o",
131+
"json",
132+
"--context",
133+
cluster["context"],
134+
]
135+
if kubeconfig:
136+
cmd.extend(["--kubeconfig", kubeconfig])
137+
138+
result = await self._run_command(cmd)
139+
if result["returncode"] != 0:
140+
return ClusterUpgradeStatus(
141+
cluster_name=cluster["name"],
142+
current_version="N/A",
143+
latest_version=latest_version,
144+
upgrade_needed=False,
145+
error=result["stderr"],
146+
)
147+
148+
try:
149+
nodes = json.loads(result["stdout"])["items"]
150+
if not nodes:
151+
return ClusterUpgradeStatus(
152+
cluster_name=cluster["name"],
153+
current_version="N/A",
154+
latest_version=latest_version,
155+
upgrade_needed=False,
156+
error="No nodes found in the cluster.",
157+
)
158+
159+
# For simplicity, we'll use the version of the first node.
160+
# In a real-world scenario, you might want to check all nodes.
161+
kubelet_version = nodes[0]["status"]["nodeInfo"]["kubeletVersion"]
162+
163+
# Simple version comparison, assuming semantic versioning.
164+
# This might need to be more robust for production use.
165+
upgrade_needed = kubelet_version < latest_version
166+
167+
return ClusterUpgradeStatus(
168+
cluster_name=cluster["name"],
169+
current_version=kubelet_version,
170+
latest_version=latest_version,
171+
upgrade_needed=upgrade_needed,
172+
)
173+
except (json.JSONDecodeError, KeyError) as e:
174+
return ClusterUpgradeStatus(
175+
cluster_name=cluster["name"],
176+
current_version="N/A",
177+
latest_version=latest_version,
178+
upgrade_needed=False,
179+
error=f"Failed to parse node information: {e}",
180+
)
181+
182+
async def _run_command(self, cmd: List[str]) -> Dict[str, Any]:
183+
"""Run a shell command asynchronously."""
184+
process = await asyncio.create_subprocess_exec(
185+
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
186+
)
187+
stdout, stderr = await process.communicate()
188+
189+
return {
190+
"returncode": process.returncode,
191+
"stdout": stdout.decode(),
192+
"stderr": stderr.decode(),
193+
}
194+
195+
def get_schema(self) -> Dict[str, Any]:
196+
"""Define the JSON schema for function parameters."""
197+
return {
198+
"type": "object",
199+
"properties": {
200+
"kubeconfig": {
201+
"type": "string",
202+
"description": "Path to the kubeconfig file.",
203+
},
204+
},
205+
"required": [],
206+
}

tests/test_namespace_utils.py

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -125,7 +125,7 @@ async def test_list_namespaces(
125125

126126
with patch.object(namespace_function, "_run_command", return_value=mock_result):
127127
result = await namespace_function._list_namespaces(
128-
mock_clusters[0], None, False, "", False, None, "", ""
128+
mock_clusters[0], None, False, "", False, None, "", "", ""
129129
)
130130

131131
assert result["status"] == "success"
@@ -147,7 +147,7 @@ async def test_list_namespaces_with_filter(
147147

148148
with patch.object(namespace_function, "_run_command", return_value=mock_result):
149149
result = await namespace_function._list_namespaces(
150-
mock_clusters[0], ["default"], False, "", False, None, "", ""
150+
mock_clusters[0], ["default"], False, "", False, None, "", "", ""
151151
)
152152

153153
assert result["status"] == "success"
@@ -219,7 +219,7 @@ async def test_list_namespace_resources(self, namespace_function, mock_clusters)
219219
return_value=mock_resources,
220220
):
221221
result = await namespace_function._list_namespace_resources(
222-
mock_clusters[0], None, True, None, "", ""
222+
mock_clusters[0], None, True, None, "", "", ""
223223
)
224224

225225
assert result["status"] == "success"
@@ -261,7 +261,7 @@ def mock_run_command(cmd):
261261
namespace_function, "_run_command", side_effect=mock_run_command
262262
):
263263
resources = await namespace_function._get_namespace_resources(
264-
mock_clusters[0], "default", ["pods"], "", ""
264+
mock_clusters[0], "default", ["pods"], "", "", ""
265265
)
266266

267267
assert len(resources) == 1
@@ -345,7 +345,7 @@ async def test_execute_namespace_operation_error(
345345
):
346346
"""Test error handling in namespace operations."""
347347
result = await namespace_function._execute_namespace_operation(
348-
mock_clusters[0], "invalid", None, False, "", "", None, False, "", "table"
348+
mock_clusters[0], "invalid", None, False, "", "", None, False, "", "", "table"
349349
)
350350

351351
assert result["status"] == "error"

0 commit comments

Comments
 (0)