Skip to content

Commit 8ea4496

Browse files
committed
Add logs mcp tool for operator, workflow and admission
1 parent d9e1907 commit 8ea4496

File tree

2 files changed

+104
-13
lines changed

2 files changed

+104
-13
lines changed

src/spark_history_mcp/common/datadog.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ class LogDD(BaseModel):
2525
message: str = Field(description="Log message")
2626
status: str = Field(description="Log level")
2727
host: str = Field(description="Host where the logs has been emitted")
28+
service: str = Field(description="Service where the logs has been emitted")
2829
pod_name: str = Field(description="Pod name where the logs has been emitted")
2930

3031

@@ -85,6 +86,7 @@ def get_logs(
8586
message=log.attributes.get("message", ""),
8687
status=log.attributes.get("status", ""),
8788
host=log.attributes.get("host", ""),
89+
service=log.attributes.get("service", ""),
8890
pod_name=pod_name,
8991
)
9092
)

src/spark_history_mcp/tools/tools.py

Lines changed: 102 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1341,7 +1341,7 @@ def get_spark_job_logs(
13411341
retry_attempt: The retry attempt number of the mortar/yoshi job
13421342
start_time: Start time of the mortar/yoshi job
13431343
end_time: Optional end time of the mortar/yoshi job (defaults to current time if the stop_timestamp is not available on the mortar/yoshi job)
1344-
status: Optional status to filter logs by (can be error, warn, info)
1344+
status: Optional status to filter logs by (e.g. error, warn, info)
13451345
13461346
Returns:
13471347
The matching log entries from DataDog for the specified job
@@ -1354,40 +1354,129 @@ def get_spark_job_logs(
13541354
query += f" status:{status}"
13551355

13561356
return Datadog().get_logs(
1357-
index_names=["data-eng", "dd-events"], query=query, _from=start_time, to=end_time
1357+
index_names=["data-eng", "dd-events"],
1358+
query=query,
1359+
_from=start_time,
1360+
to=end_time,
13581361
)
13591362

1360-
1361-
# @mcp.tool()
1363+
@mcp.tool()
13621364
def get_operator_logs(
13631365
job_id: str,
13641366
status: Optional[str] = None,
13651367
start_time: Optional[datetime] = None,
13661368
end_time: Optional[datetime] = None,
1367-
):
1368-
# can be merged with get_job_logs???
1369-
pass
1369+
) -> List[LogDD]:
1370+
"""
1371+
Get logs from the Spark operator for a specific mortar/yoshi job.
13701372
1373+
Retrieves logs from DataDog matching the given mortar/yoshi job ID and optional filters.
1374+
The logs come from the Spark operator service which handles job submissions and lifecycle.
13711375
1372-
# @mcp.tool()
1376+
Args:
1377+
job_id: The mortar/yoshi job ID to get logs for
1378+
status: Optional status to filter logs by (e.g. error, warn, info)
1379+
start_time: Optional start time to get logs from
1380+
end_time: Optional end time to get logs until (defaults to current time)
1381+
1382+
Returns:
1383+
List[LogDD]: List of matching log entries from DataDog for the specified job
1384+
"""
1385+
if end_time is None:
1386+
end_time = datetime.now()
1387+
1388+
query = f"service:spark-operator {job_id}"
1389+
if status is not None:
1390+
query += f" status:{status}"
1391+
1392+
return Datadog().get_logs(
1393+
index_names=["mortar"],
1394+
query=query,
1395+
_from=start_time,
1396+
to=end_time,
1397+
)
1398+
1399+
1400+
@mcp.tool()
13731401
def get_workflow_logs(
13741402
workflow_id: str,
13751403
status: Optional[str] = None,
13761404
start_time: Optional[datetime] = None,
13771405
end_time: Optional[datetime] = None,
1378-
):
1379-
pass
1406+
) -> List[LogDD]:
1407+
"""
1408+
Get logs from the Spark gateway worker for a specific workflow.
13801409
1410+
Retrieves logs from DataDog matching the given workflow ID and optional filters.
1411+
The logs come from the Spark gateway worker service which manages workflow execution.
13811412
1382-
# @mcp.tool()
1413+
The spark gateway is used to create spark application CR in kube from the yoshi job definition
1414+
This CR is then used by the spark operator to run the spark application
1415+
1416+
Args:
1417+
workflow_id: The workflow ID to get logs for
1418+
status: Optional status to filter logs by (e.g. error, warn, info)
1419+
start_time: Optional start time to get logs from
1420+
end_time: Optional end time to get logs until (defaults to current time)
1421+
1422+
Returns:
1423+
List[LogDD]: List of matching log entries from DataDog for the specified workflow
1424+
"""
1425+
if end_time is None:
1426+
end_time = datetime.now()
1427+
1428+
query = f"service:spark_gateway-worker @WorkflowID:{workflow_id}"
1429+
if status is not None:
1430+
query += f" status:{status}"
1431+
1432+
return Datadog().get_logs(
1433+
index_names=["mortar"],
1434+
query=query,
1435+
_from=start_time,
1436+
to=end_time,
1437+
)
1438+
1439+
1440+
@mcp.tool()
13831441
def get_admission_logs(
13841442
job_id: str,
1443+
app_id: str,
13851444
status: Optional[str] = None,
13861445
start_time: Optional[datetime] = None,
13871446
end_time: Optional[datetime] = None,
1388-
):
1389-
pass
1447+
) -> List[LogDD]:
1448+
"""
1449+
Get admission controller and kueue service logs for a specific mortar/yoshi job and spark application.
13901450
1451+
Retrieves logs from DataDog matching the given mortar/yoshi job ID and spark application ID from the gateway
1452+
admission controller and kueue job services which handle job admission and queuing.
1453+
1454+
Spark gateway calls those admission services to select on which kubernetes cluster the spark application must
1455+
be submitted
1456+
1457+
Args:
1458+
job_id: The job ID to get admission logs for
1459+
app_id: The Spark application ID
1460+
status: Optional status to filter logs by (e.g. error, warn, info)
1461+
start_time: Optional start time to get logs from
1462+
end_time: Optional end time to get logs until (defaults to current time)
1463+
1464+
Returns:
1465+
List[LogDD]: List of matching log entries from DataDog from the admission services
1466+
"""
1467+
if end_time is None:
1468+
end_time = datetime.now()
1469+
1470+
query = f"(service:gateway-admission-controller {job_id}) OR (service:kueue-job-service spark_app_name:{app_id})"
1471+
if status is not None:
1472+
query += f" status:{status}"
1473+
1474+
return Datadog().get_logs(
1475+
index_names=["mortar"],
1476+
query=query,
1477+
_from=start_time,
1478+
to=end_time,
1479+
)
13911480

13921481
# @mcp.tool()
13931482
def get_oom_metrics_job(job_id: str):

0 commit comments

Comments
 (0)