Skip to content

Commit d9e1907

Browse files
committed
Add pagination
1 parent 84cf11c commit d9e1907

File tree

2 files changed

+40
-30
lines changed

2 files changed

+40
-30
lines changed

src/spark_history_mcp/common/datadog.py

Lines changed: 37 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -1,14 +1,13 @@
11
import logging
22
from datetime import datetime, timedelta
33

4-
from datadog_api_client import Configuration, ThreadedApiClient
4+
from datadog_api_client import Configuration, ThreadedApiClient, ApiClient
55
from datadog_api_client.v2.api.logs_api import LogsApi
6-
from datadog_api_client.v2.model.log import Log
76
from datadog_api_client.v2.model.logs_list_request import LogsListRequest
87
from datadog_api_client.v2.model.logs_list_request_page import LogsListRequestPage
9-
from datadog_api_client.v2.model.logs_list_response import LogsListResponse
108
from datadog_api_client.v2.model.logs_query_filter import LogsQueryFilter
119
from datadog_api_client.v2.model.logs_sort import LogsSort
10+
from pydantic import BaseModel, Field
1211

1312
from spark_history_mcp.common.variable import (
1413
POD_NAMESPACE,
@@ -21,8 +20,17 @@
2120
DATADOG_SECRET_KEYS = f"k8s/{POD_NAMESPACE}/{POD_SERVICE_ACCOUNT}/datadog"
2221

2322

23+
class LogDD(BaseModel):
24+
timestamp: datetime = Field(description="Timestamp when the log has been emitted")
25+
message: str = Field(description="Log message")
26+
status: str = Field(description="Log level")
27+
host: str = Field(description="Host where the logs has been emitted")
28+
pod_name: str = Field(description="Pod name where the logs has been emitted")
29+
30+
2431
class Datadog:
25-
LOG_LIMIT = 1000
32+
LIMIT_PER_QUERY_LOGS = 1000
33+
MAX_RETURN_LOGS = 100000
2634

2735
def __init__(self):
2836
vault_api = VaultApi()
@@ -42,13 +50,10 @@ def __init__(self):
4250
self.configuration.enable_retry = True
4351
self.configuration.max_retries = 5
4452

45-
# TODO manage pagination
46-
# add yield on each page
47-
# see pagination on mcp
4853
def get_logs(
4954
self, index_names: list[str], query: str, _from: datetime, to: datetime
50-
) -> list[Log]:
51-
with ThreadedApiClient(self.configuration) as api_client:
55+
) -> list[LogDD]:
56+
with ApiClient(self.configuration) as api_client:
5257
logs_api_instance = LogsApi(api_client)
5358
request = LogsListRequest(
5459
filter=LogsQueryFilter(
@@ -59,28 +64,33 @@ def get_logs(
5964
),
6065
sort=LogsSort.TIMESTAMP_ASCENDING,
6166
page=LogsListRequestPage(
62-
limit=self.LOG_LIMIT,
67+
limit=self.LIMIT_PER_QUERY_LOGS,
6368
),
6469
)
6570
try:
66-
response: LogsListResponse = logs_api_instance.list_logs(
67-
body=request
68-
).get()
69-
70-
logs = []
71-
if response.data:
72-
for log in response.data:
73-
pod_name = next((tag for tag in log.attributes.get("tags", []) if tag.startswith('pod_name:')), None).replace('pod_name:','')
74-
logs.append(
75-
{
76-
"id": log.id,
77-
"timestamp": log.attributes.timestamp,
78-
"message": log.attributes.get("message", ""),
79-
"status": log.attributes.get("status", ""),
80-
"host": log.attributes.get("host", ""),
81-
"pod_name":pod_name,
82-
}
71+
logs: list[LogDD] = []
72+
# Use list_logs_with_pagination for automatic pagination
73+
for log in logs_api_instance.list_logs_with_pagination(body=request):
74+
pod_name = next(
75+
(
76+
tag
77+
for tag in log.attributes.get("tags", [])
78+
if tag.startswith("pod_name:")
79+
),
80+
None,
81+
).replace("pod_name:", "")
82+
logs.append(
83+
LogDD(
84+
timestamp=log.attributes.timestamp,
85+
message=log.attributes.get("message", ""),
86+
status=log.attributes.get("status", ""),
87+
host=log.attributes.get("host", ""),
88+
pod_name=pod_name,
8389
)
90+
)
91+
92+
if len(logs) >= self.MAX_RETURN_LOGS:
93+
break
8494

8595
return logs
8696
except Exception as e:

src/spark_history_mcp/tools/tools.py

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
import logging
55
from typing import Any, Dict, List, Optional
66

7-
from datadog_api_client.v2.model.log import Log
87
from yoshi_client.domains.data_eng_infra.shared.libs.py.yoshi_client import Job
98

109
from spark_history_mcp.core.app import mcp
@@ -22,7 +21,7 @@
2221
StageStatus,
2322
TaskMetricDistributions,
2423
)
25-
from ..common.datadog import Datadog
24+
from ..common.datadog import Datadog, LogDD
2625
from ..common.yoshi import Yoshi
2726

2827
from ..utils.utils import parallel_execute
@@ -1320,14 +1319,15 @@ def get_job_definition(job_id: str) -> Job:
13201319
return Yoshi(DATACENTER).get_job_definition(job_id)
13211320

13221321

1322+
# TODO see to add pagination on mcp
13231323
@mcp.tool()
13241324
def get_spark_job_logs(
13251325
job_id: str,
13261326
retry_attempt: int,
13271327
start_time: datetime,
13281328
end_time: Optional[datetime] = None,
13291329
status: Optional[str] = None,
1330-
) -> list[Log]:
1330+
) -> list[LogDD]:
13311331
"""
13321332
Get logs from DataDog for a Spark job execution.
13331333

0 commit comments

Comments
 (0)