Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion TESTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@ task install
task start-spark-bg && task start-mcp-bg && task start-inspector-bg

# Opens http://localhost:6274 automatically in your browser
# When done: task stop-all
# When done
task stop-all
```

**Alternative** (if you prefer manual control):
Expand Down
2 changes: 1 addition & 1 deletion config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
servers:
local:
default: true # if server name is not provided in tool calls, this Spark History Server is used
url: "http://localhost:18080"
url: "https://spark-history-server.us1.staging.dog/"
# Optional authentication (can also use environment variables).
# auth:
# username: ${SHS_SERVERS_LOCAL_AUTH_USERNAME}
Expand Down
5 changes: 5 additions & 0 deletions mcp-server.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
export VAULT_ADDR=https://vault.us1.staging.dog
vault login -method=oidc
export VAULT_TOKEN=$(vault print token)

aws-vault exec sso-staging-engineering -- uv run -m spark_history_mcp.core.main
10 changes: 9 additions & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ dependencies = [
"boto3~=1.34",
"pydantic-settings>=2.9.1",
"requests[socks]>=2.31.0",
"pysocks>=1.7.1"
"pysocks>=1.7.1",
"yoshi-client",
"dd-internal-authentication",
"datadog-api-client>=2.45.0",
"httpx-retries>=0.4.5",
]

[build-system]
Expand Down Expand Up @@ -103,3 +107,7 @@ name = "pypi-test"
url = "https://test.pypi.org/simple/"
publish-url = "https://test.pypi.org/legacy/"
explicit = true

[tool.uv.sources]
yoshi-client = { url = "https://binaries.ddbuild.io/dd-source/python/yoshi_client-0.0.76422691-py3-none-any.whl" }
dd-internal-authentication = { url = "https://binaries.ddbuild.io/dd-source/python/dd_internal_authentication-0.0.79707356-py3-none-any.whl" }
Empty file.
137 changes: 137 additions & 0 deletions src/spark_history_mcp/common/datadog.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import logging
from datetime import datetime
from threading import Lock

from datadog_api_client import Configuration, ApiClient
from datadog_api_client.v2.api.logs_api import LogsApi
from datadog_api_client.v2.model.logs_list_request import LogsListRequest
from datadog_api_client.v2.model.logs_list_request_page import LogsListRequestPage
from datadog_api_client.v2.model.logs_query_filter import LogsQueryFilter
from datadog_api_client.v2.model.logs_sort import LogsSort
from pydantic import BaseModel, Field

from spark_history_mcp.common.variable import (
POD_NAMESPACE,
POD_SERVICE_ACCOUNT,
)
from spark_history_mcp.common.vault import VaultApi

logger = logging.getLogger(__name__)

DATADOG_SECRET_KEYS = f"k8s/{POD_NAMESPACE}/{POD_SERVICE_ACCOUNT}/datadog"


class LogDD(BaseModel):
timestamp: datetime = Field(description="Timestamp when the log has been emitted")
message: str = Field(description="Log message")
status: str = Field(description="Log level")
host: str = Field(description="Host where the logs has been emitted")
service: str = Field(description="Service where the logs has been emitted")
pod_name: str = Field(description="Pod name where the logs has been emitted")


class SingletonMeta(type):
"""
This is a thread-safe implementation of Singleton.
"""

_instances = {}

_lock: Lock = Lock()
"""
We now have a lock object that will be used to synchronize threads during
first access to the Singleton.
"""

def __call__(cls, *args, **kwargs):
"""
Possible changes to the value of the `__init__` argument do not affect
the returned instance.
"""
# Now, imagine that the program has just been launched. Since there's no
# Singleton instance yet, multiple threads can simultaneously pass the
# previous conditional and reach this point almost at the same time. The
# first of them will acquire lock and will proceed further, while the
# rest will wait here.
with cls._lock:
# The first thread to acquire the lock, reaches this conditional,
# goes inside and creates the Singleton instance. Once it leaves the
# lock block, a thread that might have been waiting for the lock
# release may then enter this section. But since the Singleton field
# is already initialized, the thread won't create a new object.
if cls not in cls._instances:
instance = super().__call__(*args, **kwargs)
cls._instances[cls] = instance
return cls._instances[cls]


class Datadog(metaclass=SingletonMeta):
LIMIT_PER_QUERY_LOGS = 1000
MAX_RETURN_LOGS = 100000

def __init__(self):
vault_api = VaultApi()

logger.info(
f"Retrieving open lineage API Key with {DATADOG_SECRET_KEYS}: dd_api_key"
)
api_key = vault_api.get_secret_kv_store(DATADOG_SECRET_KEYS, "dd_api_key")
logger.info(
f"Retrieving open lineage API Key with {DATADOG_SECRET_KEYS}: dd_app_key"
)
app_key = vault_api.get_secret_kv_store(DATADOG_SECRET_KEYS, "dd_app_key")

self.configuration = Configuration()
self.configuration.server_variables["site"] = "datadoghq.com"
self.configuration.api_key["apiKeyAuth"] = api_key
self.configuration.api_key["appKeyAuth"] = app_key
self.configuration.enable_retry = True
self.configuration.max_retries = 5

def get_logs(
self, index_names: list[str], query: str, _from: datetime, to: datetime
) -> list[LogDD]:
with ApiClient(self.configuration) as api_client:
logs_api_instance = LogsApi(api_client)
request = LogsListRequest(
filter=LogsQueryFilter(
query=query,
indexes=index_names,
_from=_from.isoformat(),
to=to.isoformat(),
),
sort=LogsSort.TIMESTAMP_ASCENDING,
page=LogsListRequestPage(
limit=self.LIMIT_PER_QUERY_LOGS,
),
)
try:
logs: list[LogDD] = []
# Use list_logs_with_pagination for automatic pagination
for log in logs_api_instance.list_logs_with_pagination(body=request):
pod_name = next(
(
tag
for tag in log.attributes.get("tags", [])
if tag.startswith("pod_name:")
),
None,
).replace("pod_name:", "")
logs.append(
LogDD(
timestamp=log.attributes.timestamp,
message=log.attributes.get("message", ""),
status=log.attributes.get("status", ""),
host=log.attributes.get("host", ""),
service=log.attributes.get("service", ""),
pod_name=pod_name,
)
)

if len(logs) >= self.MAX_RETURN_LOGS:
break

return logs
except Exception as e:
logger.error(f"Error retrieving logs: {e}")
raise
19 changes: 19 additions & 0 deletions src/spark_history_mcp/common/decorators.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
import time


def backoff_retry(delay=2, retries=3):
def decorator(func):
def wrapper(*args, **kwargs):
current_retry = 0
current_delay = delay
while current_retry < retries:
try:
return func(*args, **kwargs)
except Exception as e:
current_retry += 1
if current_retry >= retries:
raise e
time.sleep(current_delay)
current_delay *= 2
return wrapper
return decorator
74 changes: 74 additions & 0 deletions src/spark_history_mcp/common/s3_client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
import os

import boto3
import requests

from src.spark_history_mcp.common.decorators import backoff_retry
from src.spark_history_mcp.common.variable import POD_NAME


class S3Client:
def __init__(self, datacenter: str):
self.client = boto3.resource("s3")
self.bucket_name = f"dd-spark-history-server-{datacenter.replace(".", "-")}" # e.g dd-spark-history-server-us1-staging-dog
self.dst_prefix = "indexed_spark_logs/"

shs_url_prefix =f"https://spark-history-server.{datacenter}"
if POD_NAME:
shs_url_prefix = "https://spark-history-server.spark.all-clusters.local-dc.fabric.dog:5554"
self.shs_url_prefix = shs_url_prefix

def list_contents_by_prefix(self, prefix, bucket):
b = self.client.Bucket(bucket)
keys = [obj.key for obj in b.objects.filter(Prefix=prefix)]

return keys

def is_spark_event_logs_already_indexed(self, spark_app_id: str) -> bool:
prefix = self.dst_prefix + str(spark_app_id)
if self.list_contents_by_prefix(prefix, self.bucket_name):
return True

return False

@backoff_retry(retries=5, delay=2)
def poll_spark_history_server(self, spark_app_id: str) -> Exception | None:
print("entered function")
full_url = f"{self.shs_url_prefix}/history/{spark_app_id}/jobs/"
try:
resp = requests.get(full_url, timeout=3)
except requests.exceptions.Timeout:
raise Exception(f"Spark History Server request timed out: {full_url}", 408)
except requests.exceptions.ConnectionError:
raise Exception("Spark History Server unavailable, please try again shortly", 503)

if resp.status_code == 404:
raise Exception(f"Spark History Server didn't finish parsing event logs: {full_url}", 404)

return None

def copy_spark_events_logs(self, spark_app_id: str) -> Exception | None:
# get spark events logs file to copy/index
src_prefix = f"spark_logs/{spark_app_id}"
base_logs = self.list_contents_by_prefix(src_prefix, self.bucket_name)
if not base_logs:
raise Exception(f"Logs for {spark_app_id} not found. Is the job older than one month?", 404)

# copy log file to new prefix
src_key = base_logs[0]
dst_key = self.dst_prefix + os.path.basename(src_key)
copy_source = {
'Bucket': self.bucket_name,
'Key': src_key
}

bucket = self.client.Bucket(self.bucket_name)
bucket.copy(copy_source, dst_key)

# poll SHS until event logs are parsed
try:
self.poll_spark_history_server(spark_app_id)
except Exception as e:
raise Exception(f"Error polling Spark History Server: {e}") from e

return None
5 changes: 5 additions & 0 deletions src/spark_history_mcp/common/variable.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
import os

POD_NAME = os.getenv("POD_NAME")
POD_NAMESPACE = os.getenv("POD_NAMESPACE", "spark")
POD_SERVICE_ACCOUNT = os.getenv("POD_SERVICE_ACCOUNT", "mcp-spark-history-server")
77 changes: 77 additions & 0 deletions src/spark_history_mcp/common/vault.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import logging
import os

import httpx
from httpx_retries import Retry, RetryTransport

from spark_history_mcp.common.variable import POD_NAME

logger = logging.getLogger(__name__)


class JWT:
def __init__(self, audience: str, datacenter: str):
self.audience = audience

from dd_internal_authentication.libs.py.dd_internal_authentication.dd_internal_authentication.client import (
JWTDDToolAuthClientTokenManager,
JWTInternalServiceAuthClientTokenManager,
)

if POD_NAME:
logger.info("Using internal service auth client")
self.token_manager = JWTInternalServiceAuthClientTokenManager(
issuer="sycamore"
)
else:
logger.info("Using internal ddtool auth client")
self.token_manager = JWTDDToolAuthClientTokenManager.instance(
name=self.audience, datacenter=datacenter
)

def get_token(self) -> str:
return str(self.token_manager.get_token(self.audience))


class VaultApi:
VAULT_URL = os.environ.get("VAULT_ADDR", "http://127.0.0.1:8658/vault/agent")
DEFAULT_TIMEOUT_SECONDS = 30

def __init__(self):
self.transport = RetryTransport(retry=Retry(total=5, backoff_factor=0.5))

def get_secret_kv_store(self, secret_path: str, key: str) -> str | None:
"""
Fetch a secret from the kv Vault store:
docs: https://datadoghq.atlassian.net/wiki/spaces/RUNTIME/pages/2701559033/Vault#Application-(v1)-versus-KV-(v2)-stores
:param secret_path: a path to a secret:
:param key: key to fetch from the secret:
:return: the secret data for the specific key
"""
logger.info(
"Fetching secret from Vault",
extra={"kv_backend": "kv", "path": secret_path},
)

headers = {"X-Vault-Request": "true"}
if POD_NAME is None:
headers["X-Vault-Token"] =os.getenv("VAULT_TOKEN")
with httpx.Client(transport=self.transport) as client:
response = client.get(
f"{self.VAULT_URL}/v1/kv/data/{secret_path}",
headers=headers,
timeout=self.DEFAULT_TIMEOUT_SECONDS,
)
response.raise_for_status()

# kv-v2 has a nested structure, see
# https://www.vaultproject.io/api/secret/kv/kv-v2#read-secret-version for an example
secret_data = response.json().get("data", {}).get("data", None)

if not secret_data:
logger.warning(
f"Could not find secret data in kv-v2 store for path: {secret_path}"
)
return None

return secret_data.get(key, None)
28 changes: 28 additions & 0 deletions src/spark_history_mcp/common/yoshi.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@

from yoshi_client.domains.data_eng_infra.shared.libs.py.yoshi_client import (
ApiClient,
Configuration,
Job,
JobApi,
)

from spark_history_mcp.common.variable import POD_NAME
from spark_history_mcp.common.vault import JWT


class Yoshi:
AUDIENCE = "rapid-data-eng-infra"

def __init__(self, datacenter: str):
host =f"https://yoshi.{datacenter}"
if POD_NAME:
host = f"https://yoshi.{self.AUDIENCE}.all-clusters.local-dc.fabric.dog:8443"
self.configuration = Configuration(
host=host,
access_token=JWT(audience=self.AUDIENCE,datacenter=datacenter).get_token(),
)

def get_job_definition(self, job_id: str) -> Job:
with ApiClient(self.configuration) as api_client:
job_api = JobApi(api_client)
return job_api.v2_get_job(job_id=job_id)
Loading