Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
274 changes: 223 additions & 51 deletions src/mcp_scanner/security_checks.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

import json
from collections.abc import Callable
from typing import Any, List, Dict
from typing import Any, Dict, Iterable, List

from .models import Finding, Severity
from .spec import SpecCheck
Expand Down Expand Up @@ -48,14 +48,100 @@ def check_unauthenticated_access(tools: list[dict[str, Any]], spec: SpecCheck) -
return _finding(spec, passed, details)


def _to_lower(value: Any) -> str:
"""Best-effort lower-casing helper that tolerates non-string input."""

if isinstance(value, str):
return value.lower()
return ""


def _schema_properties(schema: Any) -> dict[str, Any]:
"""Return the JSON schema properties dictionary if present."""

if not isinstance(schema, dict):
return {}
props = schema.get("properties")
if isinstance(props, dict):
return props
# Some schemas omit the explicit object type but still provide properties
if schema.get("type") == "object":
return props or {}
return {}


def _schema_required(schema: Any) -> set[str]:
"""Extract the required parameter names from the schema."""

if isinstance(schema, dict):
required = schema.get("required")
if isinstance(required, list):
return {name for name in required if isinstance(name, str)}
return set()


def _type_matches(definition: dict[str, Any], expected: str) -> bool:
"""Check whether the definition declares the expected JSON type."""

declared = definition.get("type")
if isinstance(declared, str):
return declared == expected
if isinstance(declared, list):
return expected in declared
return False


def _has_guardrails(definition: dict[str, Any]) -> bool:
"""Determine whether a parameter definition has meaningful constraints."""

if not isinstance(definition, dict):
return False

# Structural constraints count as guardrails
structural_keys = {"anyOf", "allOf", "oneOf", "if", "then", "else", "not", "dependentSchemas"}
if any(key in definition for key in structural_keys):
return True

constraint_keys: set[str]
if _type_matches(definition, "string"):
constraint_keys = {
"enum",
"const",
"pattern",
"format",
"contentEncoding",
"contentMediaType",
"minLength",
"maxLength",
}
elif _type_matches(definition, "integer") or _type_matches(definition, "number"):
constraint_keys = {"enum", "const", "minimum", "maximum", "exclusiveMinimum", "exclusiveMaximum", "multipleOf"}
elif _type_matches(definition, "boolean"):
constraint_keys = {"enum", "const"}
elif _type_matches(definition, "array"):
constraint_keys = {"enum", "const", "items", "minItems", "maxItems"}
else:
# Objects or unknown types – treat presence of enum/const as a constraint.
constraint_keys = {"enum", "const"}

return any(key in definition for key in constraint_keys)


def _risk_keywords(text: str, keywords: Iterable[str]) -> list[str]:
"""Return the subset of keywords found in the provided text."""

lowered = text.lower()
return sorted({kw for kw in keywords if kw in lowered})


def check_dangerous_capabilities(tools: list[dict[str, Any]], spec: SpecCheck) -> Finding:
"""
X-01: Dangerous capability detection in tools.

Detect tools with dangerous capabilities that lack proper constraints.
Tools with risky keywords should have input validation constraints.
"""
risky_keywords = [
risky_keywords = {
"exec",
"command",
"shell",
Expand All @@ -72,32 +158,91 @@ def check_dangerous_capabilities(tools: list[dict[str, Any]], spec: SpecCheck) -
"payment",
"admin",
"privilege",
]
}
risky_parameter_keywords = {
"command",
"cmd",
"script",
"shell",
"path",
"filepath",
"file_path",
"file",
"uri",
"url",
"payload",
"prompt",
"query",
"template",
"body",
"code",
"sql",
}

metadata_risk_flags = {"dangerous", "allowDangerousOperations", "allowDangerousCommands"}

risky_tools: list[dict[str, Any]] = []
for tool in tools:
name = (tool.get("name") or "").lower()
desc = (tool.get("description") or "").lower()

for tool in tools or []:
name = _to_lower(tool.get("name"))
desc = _to_lower(tool.get("description"))
schema = tool.get("inputSchema") or {}
props = _schema_properties(schema)
required_params = _schema_required(schema)

# Check if tool has constraints
has_constraints = False
if isinstance(schema, dict):
props = schema.get("properties") or {}
for _, prop in props.items() if isinstance(props, dict) else []:
if any(
k in prop
for k in ("enum", "pattern", "minimum", "maximum", "minLength", "maxLength")
):
has_constraints = True
break
name_matches = _risk_keywords(f"{name} {desc}", risky_keywords)
metadata = tool.get("metadata") or {}
metadata_matches = [flag for flag in metadata_risk_flags if metadata.get(flag) or metadata.get(flag.lower())]

# If tool has risky keywords but no constraints, it's dangerous
if (
any(keyword in name or keyword in desc for keyword in risky_keywords)
and not has_constraints
):
risky_tools.append(tool)
tool_reasons: list[dict[str, Any]] = []

if name_matches and not props:
tool_reasons.append({
"kind": "missing_schema",
"keywords": name_matches,
"message": "Tool exposes dangerous capabilities but provides no input schema",
})

for param_name, definition in props.items():
if not isinstance(definition, dict):
continue

param_keywords = _risk_keywords(param_name, risky_parameter_keywords)
if not param_keywords:
param_desc = _to_lower(definition.get("description"))
param_keywords = _risk_keywords(param_desc, risky_parameter_keywords)

is_string_like = _type_matches(definition, "string") or _type_matches(definition, "array")

if (name_matches or metadata_matches or param_keywords) and is_string_like:
if not _has_guardrails(definition):
tool_reasons.append(
{
"kind": "unconstrained_parameter",
"parameter": param_name,
"required": param_name in required_params,
"keywords": param_keywords or name_matches,
"message": "Parameter accepts attacker-controlled strings without constraints",
}
)

if metadata_matches:
tool_reasons.append(
{
"kind": "metadata_flag",
"flags": metadata_matches,
"message": "Tool metadata advertises dangerous behavior",
}
)

if tool_reasons:
risky_tools.append(
Copy link
Contributor

@ccronca ccronca Nov 4, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check now returns a simplified structure instead of the full tool object (for example, tool.metadata is no longer included). Was this change intentional?

We could preserve the previous behavior with something like:

risky_tools.append({
      **tool, 
      "reasons": tool_reasons, 
  })

{
"name": tool.get("name"),
"description": tool.get("description"),
"reasons": tool_reasons,
}
)

passed = len(risky_tools) == 0
details = json.dumps(risky_tools)
Expand Down Expand Up @@ -269,40 +414,67 @@ def check_injection_fuzzing(
"http://169.254.169.254/latest/meta-data/",
]

observed_danger = []
for tool in tools:
danger_indicators = [
"uid=",
"gid=",
"linux",
"root:x:",
"etc/passwd",
"meta-data",
"stdout",
"stderr",
"returncode",
"command executed",
"execution result",
"listening on",
"connected to",
]

observed_danger: list[dict[str, Any]] = []

for tool in tools or []:
name = tool.get("name")
if not name:
continue

schema = tool.get("inputSchema") or {}
props = schema.get("properties") or {}
props = _schema_properties(schema)

# Find a string parameter to test
str_param = None
for param_name, param_def in props.items() if isinstance(props, dict) else []:
if param_def.get("type") == "string":
str_param = param_name
break
string_params = [
param_name
for param_name, param_def in props.items()
if isinstance(param_def, dict) and _type_matches(param_def, "string")
]

if not (name and str_param):
if not string_params:
continue

# Test each payload
for payload in payloads:
resp = send_recv_func("tools/call", {"name": name, "arguments": {str_param: payload}})

# Check if response contains system information (dangerous)
text = json.dumps(resp).lower()
danger_indicators = [
"uid=",
"linux",
"root:x:",
"etc/passwd",
"meta-data",
"insecure",
"stdout",
]
if any(indicator in text for indicator in danger_indicators):
observed_danger.append({"tool": name, "payload": payload, "resp": resp})
break # Stop testing this tool after first dangerous response
for param_name in string_params:
for payload in payloads:
try:
resp = send_recv_func(
"tools/call",
{"name": name, "arguments": {param_name: payload}},
)
except Exception:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe log something here, otherwise, tracking issues could be tricky

# Ignore transport or execution errors; these do not prove vulnerability
continue

text = json.dumps(resp).lower()
indicator = next((d for d in danger_indicators if d in text), None)
if indicator:
observed_danger.append(
{
"tool": name,
"parameter": param_name,
"payload": payload,
"indicator": indicator,
"resp": resp,
}
)
break
if observed_danger and observed_danger[-1]["tool"] == name:
break # Stop after first dangerous finding per tool

passed = len(observed_danger) == 0
details = json.dumps(observed_danger)
Expand Down
Loading
Loading