Skip to content

Commit c7ae2e7

Browse files
committed
add s3 client class to interact with spark-history-server bucket
1 parent acf1113 commit c7ae2e7

File tree

1 file changed

+42
-0
lines changed

1 file changed

+42
-0
lines changed
Lines changed: 42 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,42 @@
1+
import os
2+
3+
import boto3
4+
5+
class S3Client:
6+
def __init__(self, datacenter: str):
7+
self.client = boto3.resource("s3")
8+
self.bucket_name = f"dd-spark-history-server-{datacenter.replace(".", "-")}" # e.g dd-spark-history-server-us1-staging-dog
9+
10+
11+
def list_contents_by_prefix(self, prefix, bucket):
12+
b = self.client.Bucket(bucket)
13+
keys = [obj.key for obj in b.objects.filter(Prefix=prefix)]
14+
15+
return keys
16+
17+
def copy_spark_events_logs(self, spark_app_id: str):
18+
src_prefix = f"spark_logs/{spark_app_id}"
19+
dst_prefix = "indexed_spark_logs/"
20+
21+
# don't copy if events are already indexed
22+
if self.list_contents_by_prefix(dst_prefix + str(spark_app_id), self.bucket_name):
23+
return {"success": True}
24+
25+
# get spark events logs file to copy/index
26+
base_logs = self.client.list_contents_by_prefix(src_prefix, self.bucket_name)
27+
if not base_logs:
28+
raise Exception(f"Logs for {spark_app_id} not found. Is the job older than one month?", 404)
29+
30+
31+
# copy log file to new prefix
32+
src_key = base_logs[0]
33+
dst_key = dst_prefix + os.path.basename(src_key)
34+
copy_source = {
35+
'Bucket': self.bucket_name,
36+
'Key': src_key
37+
}
38+
39+
bucket = self.client.Bucket(self.bucket_name)
40+
bucket.copy(copy_source, dst_key)
41+
42+
return {"success": True}

0 commit comments

Comments
 (0)