Skip to content

Commit d8a8cab

Browse files
committed
add s3 client class to interact with spark-history-server bucket
1 parent 60c4f0e commit d8a8cab

File tree

4 files changed

+103
-1
lines changed

4 files changed

+103
-1
lines changed

mcp-server.sh

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2,4 +2,4 @@ export VAULT_ADDR=https://vault.us1.staging.dog
22
vault login -method=oidc
33
export VAULT_TOKEN=$(vault print token)
44

5-
uv run -m spark_history_mcp.core.main
5+
aws-vault exec sso-staging-engineering -- uv run -m spark_history_mcp.core.main
Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,19 @@
1+
import time
2+
3+
4+
def backoff_retry(delay=2, retries=3):
5+
def decorator(func):
6+
def wrapper(*args, **kwargs):
7+
current_retry = 0
8+
current_delay = delay
9+
while current_retry < retries:
10+
try:
11+
return func(*args, **kwargs)
12+
except Exception as e:
13+
current_retry += 1
14+
if current_retry >= retries:
15+
raise e
16+
time.sleep(current_delay)
17+
current_delay *= 2
18+
return wrapper
19+
return decorator
Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,74 @@
1+
import os
2+
3+
import boto3
4+
import requests
5+
6+
from src.spark_history_mcp.common.decorators import backoff_retry
7+
from src.spark_history_mcp.common.variable import POD_NAME
8+
9+
10+
class S3Client:
11+
def __init__(self, datacenter: str):
12+
self.client = boto3.resource("s3")
13+
self.bucket_name = f"dd-spark-history-server-{datacenter.replace(".", "-")}" # e.g dd-spark-history-server-us1-staging-dog
14+
self.dst_prefix = "indexed_spark_logs/"
15+
16+
shs_url_prefix =f"https://spark-history-server.{datacenter}"
17+
if POD_NAME:
18+
shs_url_prefix = "https://spark-history-server.spark.all-clusters.local-dc.fabric.dog:5554"
19+
self.shs_url_prefix = shs_url_prefix
20+
21+
def list_contents_by_prefix(self, prefix, bucket):
22+
b = self.client.Bucket(bucket)
23+
keys = [obj.key for obj in b.objects.filter(Prefix=prefix)]
24+
25+
return keys
26+
27+
def is_spark_event_logs_already_indexed(self, spark_app_id: str) -> bool:
28+
prefix = self.dst_prefix + str(spark_app_id)
29+
if self.list_contents_by_prefix(prefix, self.bucket_name):
30+
return True
31+
32+
return False
33+
34+
@backoff_retry(retries=5, delay=2)
35+
def poll_spark_history_server(self, spark_app_id: str) -> Exception | None:
36+
print("entered function")
37+
full_url = f"{self.shs_url_prefix}/history/{spark_app_id}/jobs/"
38+
try:
39+
resp = requests.get(full_url, timeout=3)
40+
except requests.exceptions.Timeout:
41+
raise Exception(f"Spark History Server request timed out: {full_url}", 408)
42+
except requests.exceptions.ConnectionError:
43+
raise Exception("Spark History Server unavailable, please try again shortly", 503)
44+
45+
if resp.status_code == 404:
46+
raise Exception(f"Spark History Server didn't finish parsing event logs: {full_url}", 404)
47+
48+
return None
49+
50+
def copy_spark_events_logs(self, spark_app_id: str) -> Exception | None:
51+
# get spark events logs file to copy/index
52+
src_prefix = f"spark_logs/{spark_app_id}"
53+
base_logs = self.list_contents_by_prefix(src_prefix, self.bucket_name)
54+
if not base_logs:
55+
raise Exception(f"Logs for {spark_app_id} not found. Is the job older than one month?", 404)
56+
57+
# copy log file to new prefix
58+
src_key = base_logs[0]
59+
dst_key = self.dst_prefix + os.path.basename(src_key)
60+
copy_source = {
61+
'Bucket': self.bucket_name,
62+
'Key': src_key
63+
}
64+
65+
bucket = self.client.Bucket(self.bucket_name)
66+
bucket.copy(copy_source, dst_key)
67+
68+
# poll SHS until event logs are parsed
69+
try:
70+
self.poll_spark_history_server(spark_app_id)
71+
except Exception as e:
72+
raise Exception(f"Error polling Spark History Server: {e}") from e
73+
74+
return None

src/spark_history_mcp/tools/tools.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
)
2424
from ..common.datadog import Datadog, LogDD
2525
from ..common.yoshi import Yoshi
26+
from ..common.s3_client import S3Client
2627

2728
from ..utils.utils import parallel_execute
2829

@@ -156,6 +157,14 @@ def get_application(app_id: str, server: Optional[str] = None) -> ApplicationInf
156157
ctx = mcp.get_context()
157158
client = get_client_or_default(ctx, server, app_id)
158159

160+
# Index spark event logs if missing
161+
s3_client = S3Client(datacenter=DATACENTER)
162+
if not s3_client.is_spark_event_logs_already_indexed(app_id):
163+
try:
164+
s3_client.copy_spark_events_logs(app_id)
165+
except Exception as e:
166+
raise Exception(f"Failed to copy events logs for app_id {app_id}: {e}") from e
167+
159168
return client.get_application(app_id)
160169

161170

0 commit comments

Comments
 (0)