Skip to content

Commit e3730bf

Browse files
committed
add s3 client class to interact with spark-history-server bucket
1 parent acf1113 commit e3730bf

File tree

2 files changed

+49
-0
lines changed

2 files changed

+49
-0
lines changed
Lines changed: 43 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
import os
2+
3+
import boto3
4+
5+
class S3Client:
6+
def __init__(self, datacenter: str):
7+
self.client = boto3.resource("s3")
8+
self.bucket_name = f"dd-spark-history-server-{datacenter.replace(".", "-")}" # e.g dd-spark-history-server-us1-staging-dog
9+
self.dst_prefix = "indexed_spark_logs/"
10+
11+
12+
def list_contents_by_prefix(self, prefix, bucket):
13+
b = self.client.Bucket(bucket)
14+
keys = [obj.key for obj in b.objects.filter(Prefix=prefix)]
15+
16+
return keys
17+
18+
def is_spark_event_logs_already_indexed(self, spark_app_id: str) -> bool:
19+
prefix = self.dst_prefix + str(spark_app_id)
20+
if self.list_contents_by_prefix(prefix, self.bucket_name):
21+
return True
22+
23+
return False
24+
25+
def copy_spark_events_logs(self, spark_app_id: str):
26+
# get spark events logs file to copy/index
27+
src_prefix = f"spark_logs/{spark_app_id}"
28+
base_logs = self.client.list_contents_by_prefix(src_prefix, self.bucket_name)
29+
if not base_logs:
30+
raise Exception(f"Logs for {spark_app_id} not found. Is the job older than one month?", 404)
31+
32+
# copy log file to new prefix
33+
src_key = base_logs[0]
34+
dst_key = self.dst_prefix + os.path.basename(src_key)
35+
copy_source = {
36+
'Bucket': self.bucket_name,
37+
'Key': src_key
38+
}
39+
40+
bucket = self.client.Bucket(self.bucket_name)
41+
bucket.copy(copy_source, dst_key)
42+
43+
return {"success": True}

src/spark_history_mcp/tools/tools.py

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
TaskMetricDistributions,
2323
)
2424
from ..common.yoshi import Yoshi
25+
from ..common.s3_client import S3Client
2526

2627
from ..utils.utils import parallel_execute
2728

@@ -155,6 +156,11 @@ def get_application(app_id: str, server: Optional[str] = None) -> ApplicationInf
155156
ctx = mcp.get_context()
156157
client = get_client_or_default(ctx, server, app_id)
157158

159+
# Index spark event logs if missing
160+
s3_client = S3Client(datacenter=DATACENTER)
161+
if not s3_client.is_spark_event_logs_already_indexed(app_id):
162+
s3_client.copy_spark_events_logs(app_id)
163+
158164
return client.get_application(app_id)
159165

160166

0 commit comments

Comments
 (0)