Skip to content

Commit 60c4f0e

Browse files
committed
Enforce start time and make datadog client a singleton
1 parent 8ea4496 commit 60c4f0e

File tree

2 files changed

+46
-9
lines changed

2 files changed

+46
-9
lines changed

src/spark_history_mcp/common/datadog.py

Lines changed: 40 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,8 @@
11
import logging
2-
from datetime import datetime, timedelta
2+
from datetime import datetime
3+
from threading import Lock
34

4-
from datadog_api_client import Configuration, ThreadedApiClient, ApiClient
5+
from datadog_api_client import Configuration, ApiClient
56
from datadog_api_client.v2.api.logs_api import LogsApi
67
from datadog_api_client.v2.model.logs_list_request import LogsListRequest
78
from datadog_api_client.v2.model.logs_list_request_page import LogsListRequestPage
@@ -29,7 +30,42 @@ class LogDD(BaseModel):
2930
pod_name: str = Field(description="Pod name where the logs has been emitted")
3031

3132

32-
class Datadog:
33+
class SingletonMeta(type):
34+
"""
35+
This is a thread-safe implementation of Singleton.
36+
"""
37+
38+
_instances = {}
39+
40+
_lock: Lock = Lock()
41+
"""
42+
We now have a lock object that will be used to synchronize threads during
43+
first access to the Singleton.
44+
"""
45+
46+
def __call__(cls, *args, **kwargs):
47+
"""
48+
Possible changes to the value of the `__init__` argument do not affect
49+
the returned instance.
50+
"""
51+
# Now, imagine that the program has just been launched. Since there's no
52+
# Singleton instance yet, multiple threads can simultaneously pass the
53+
# previous conditional and reach this point almost at the same time. The
54+
# first of them will acquire lock and will proceed further, while the
55+
# rest will wait here.
56+
with cls._lock:
57+
# The first thread to acquire the lock, reaches this conditional,
58+
# goes inside and creates the Singleton instance. Once it leaves the
59+
# lock block, a thread that might have been waiting for the lock
60+
# release may then enter this section. But since the Singleton field
61+
# is already initialized, the thread won't create a new object.
62+
if cls not in cls._instances:
63+
instance = super().__call__(*args, **kwargs)
64+
cls._instances[cls] = instance
65+
return cls._instances[cls]
66+
67+
68+
class Datadog(metaclass=SingletonMeta):
3369
LIMIT_PER_QUERY_LOGS = 1000
3470
MAX_RETURN_LOGS = 100000
3571

@@ -46,6 +82,7 @@ def __init__(self):
4682
app_key = vault_api.get_secret_kv_store(DATADOG_SECRET_KEYS, "dd_app_key")
4783

4884
self.configuration = Configuration()
85+
self.configuration.server_variables["site"] = "datadoghq.com"
4986
self.configuration.api_key["apiKeyAuth"] = api_key
5087
self.configuration.api_key["appKeyAuth"] = app_key
5188
self.configuration.enable_retry = True

src/spark_history_mcp/tools/tools.py

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -1363,8 +1363,8 @@ def get_spark_job_logs(
13631363
@mcp.tool()
13641364
def get_operator_logs(
13651365
job_id: str,
1366+
start_time: datetime,
13661367
status: Optional[str] = None,
1367-
start_time: Optional[datetime] = None,
13681368
end_time: Optional[datetime] = None,
13691369
) -> List[LogDD]:
13701370
"""
@@ -1375,8 +1375,8 @@ def get_operator_logs(
13751375
13761376
Args:
13771377
job_id: The mortar/yoshi job ID to get logs for
1378+
start_time: start time to get logs from
13781379
status: Optional status to filter logs by (e.g. error, warn, info)
1379-
start_time: Optional start time to get logs from
13801380
end_time: Optional end time to get logs until (defaults to current time)
13811381
13821382
Returns:
@@ -1400,8 +1400,8 @@ def get_operator_logs(
14001400
@mcp.tool()
14011401
def get_workflow_logs(
14021402
workflow_id: str,
1403+
start_time: datetime,
14031404
status: Optional[str] = None,
1404-
start_time: Optional[datetime] = None,
14051405
end_time: Optional[datetime] = None,
14061406
) -> List[LogDD]:
14071407
"""
@@ -1415,8 +1415,8 @@ def get_workflow_logs(
14151415
14161416
Args:
14171417
workflow_id: The workflow ID to get logs for
1418+
start_time: start time to get logs from
14181419
status: Optional status to filter logs by (e.g. error, warn, info)
1419-
start_time: Optional start time to get logs from
14201420
end_time: Optional end time to get logs until (defaults to current time)
14211421
14221422
Returns:
@@ -1441,8 +1441,8 @@ def get_workflow_logs(
14411441
def get_admission_logs(
14421442
job_id: str,
14431443
app_id: str,
1444+
start_time: datetime,
14441445
status: Optional[str] = None,
1445-
start_time: Optional[datetime] = None,
14461446
end_time: Optional[datetime] = None,
14471447
) -> List[LogDD]:
14481448
"""
@@ -1457,8 +1457,8 @@ def get_admission_logs(
14571457
Args:
14581458
job_id: The job ID to get admission logs for
14591459
app_id: The Spark application ID
1460+
start_time: start time to get logs from
14601461
status: Optional status to filter logs by (e.g. error, warn, info)
1461-
start_time: Optional start time to get logs from
14621462
end_time: Optional end time to get logs until (defaults to current time)
14631463
14641464
Returns:

0 commit comments

Comments
 (0)