-
Notifications
You must be signed in to change notification settings - Fork 4
Improve MCP tool vulnerability heuristics #9
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
sidhpurwala-huzaifa
wants to merge
1
commit into
main
Choose a base branch
from
codex/scan-repo-for-exposed-security-credentials
base: main
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Open
Changes from all commits
Commits
File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -16,7 +16,7 @@ | |
|
|
||
| import json | ||
| from collections.abc import Callable | ||
| from typing import Any, List, Dict | ||
| from typing import Any, Dict, Iterable, List | ||
|
|
||
| from .models import Finding, Severity | ||
| from .spec import SpecCheck | ||
|
|
@@ -48,14 +48,100 @@ def check_unauthenticated_access(tools: list[dict[str, Any]], spec: SpecCheck) - | |
| return _finding(spec, passed, details) | ||
|
|
||
|
|
||
| def _to_lower(value: Any) -> str: | ||
| """Best-effort lower-casing helper that tolerates non-string input.""" | ||
|
|
||
| if isinstance(value, str): | ||
| return value.lower() | ||
| return "" | ||
|
|
||
|
|
||
| def _schema_properties(schema: Any) -> dict[str, Any]: | ||
| """Return the JSON schema properties dictionary if present.""" | ||
|
|
||
| if not isinstance(schema, dict): | ||
| return {} | ||
| props = schema.get("properties") | ||
| if isinstance(props, dict): | ||
| return props | ||
| # Some schemas omit the explicit object type but still provide properties | ||
| if schema.get("type") == "object": | ||
| return props or {} | ||
| return {} | ||
|
|
||
|
|
||
| def _schema_required(schema: Any) -> set[str]: | ||
| """Extract the required parameter names from the schema.""" | ||
|
|
||
| if isinstance(schema, dict): | ||
| required = schema.get("required") | ||
| if isinstance(required, list): | ||
| return {name for name in required if isinstance(name, str)} | ||
| return set() | ||
|
|
||
|
|
||
| def _type_matches(definition: dict[str, Any], expected: str) -> bool: | ||
| """Check whether the definition declares the expected JSON type.""" | ||
|
|
||
| declared = definition.get("type") | ||
| if isinstance(declared, str): | ||
| return declared == expected | ||
| if isinstance(declared, list): | ||
| return expected in declared | ||
| return False | ||
|
|
||
|
|
||
| def _has_guardrails(definition: dict[str, Any]) -> bool: | ||
| """Determine whether a parameter definition has meaningful constraints.""" | ||
|
|
||
| if not isinstance(definition, dict): | ||
| return False | ||
|
|
||
| # Structural constraints count as guardrails | ||
| structural_keys = {"anyOf", "allOf", "oneOf", "if", "then", "else", "not", "dependentSchemas"} | ||
| if any(key in definition for key in structural_keys): | ||
| return True | ||
|
|
||
| constraint_keys: set[str] | ||
| if _type_matches(definition, "string"): | ||
| constraint_keys = { | ||
| "enum", | ||
| "const", | ||
| "pattern", | ||
| "format", | ||
| "contentEncoding", | ||
| "contentMediaType", | ||
| "minLength", | ||
| "maxLength", | ||
| } | ||
| elif _type_matches(definition, "integer") or _type_matches(definition, "number"): | ||
| constraint_keys = {"enum", "const", "minimum", "maximum", "exclusiveMinimum", "exclusiveMaximum", "multipleOf"} | ||
| elif _type_matches(definition, "boolean"): | ||
| constraint_keys = {"enum", "const"} | ||
| elif _type_matches(definition, "array"): | ||
| constraint_keys = {"enum", "const", "items", "minItems", "maxItems"} | ||
| else: | ||
| # Objects or unknown types – treat presence of enum/const as a constraint. | ||
| constraint_keys = {"enum", "const"} | ||
|
|
||
| return any(key in definition for key in constraint_keys) | ||
|
|
||
|
|
||
| def _risk_keywords(text: str, keywords: Iterable[str]) -> list[str]: | ||
| """Return the subset of keywords found in the provided text.""" | ||
|
|
||
| lowered = text.lower() | ||
| return sorted({kw for kw in keywords if kw in lowered}) | ||
|
|
||
|
|
||
| def check_dangerous_capabilities(tools: list[dict[str, Any]], spec: SpecCheck) -> Finding: | ||
| """ | ||
| X-01: Dangerous capability detection in tools. | ||
|
|
||
| Detect tools with dangerous capabilities that lack proper constraints. | ||
| Tools with risky keywords should have input validation constraints. | ||
| """ | ||
| risky_keywords = [ | ||
| risky_keywords = { | ||
| "exec", | ||
| "command", | ||
| "shell", | ||
|
|
@@ -72,32 +158,91 @@ def check_dangerous_capabilities(tools: list[dict[str, Any]], spec: SpecCheck) - | |
| "payment", | ||
| "admin", | ||
| "privilege", | ||
| ] | ||
| } | ||
| risky_parameter_keywords = { | ||
| "command", | ||
| "cmd", | ||
| "script", | ||
| "shell", | ||
| "path", | ||
| "filepath", | ||
| "file_path", | ||
| "file", | ||
| "uri", | ||
| "url", | ||
| "payload", | ||
| "prompt", | ||
| "query", | ||
| "template", | ||
| "body", | ||
| "code", | ||
| "sql", | ||
| } | ||
|
|
||
| metadata_risk_flags = {"dangerous", "allowDangerousOperations", "allowDangerousCommands"} | ||
|
|
||
| risky_tools: list[dict[str, Any]] = [] | ||
| for tool in tools: | ||
| name = (tool.get("name") or "").lower() | ||
| desc = (tool.get("description") or "").lower() | ||
|
|
||
| for tool in tools or []: | ||
| name = _to_lower(tool.get("name")) | ||
| desc = _to_lower(tool.get("description")) | ||
| schema = tool.get("inputSchema") or {} | ||
| props = _schema_properties(schema) | ||
| required_params = _schema_required(schema) | ||
|
|
||
| # Check if tool has constraints | ||
| has_constraints = False | ||
| if isinstance(schema, dict): | ||
| props = schema.get("properties") or {} | ||
| for _, prop in props.items() if isinstance(props, dict) else []: | ||
| if any( | ||
| k in prop | ||
| for k in ("enum", "pattern", "minimum", "maximum", "minLength", "maxLength") | ||
| ): | ||
| has_constraints = True | ||
| break | ||
| name_matches = _risk_keywords(f"{name} {desc}", risky_keywords) | ||
| metadata = tool.get("metadata") or {} | ||
| metadata_matches = [flag for flag in metadata_risk_flags if metadata.get(flag) or metadata.get(flag.lower())] | ||
|
|
||
| # If tool has risky keywords but no constraints, it's dangerous | ||
| if ( | ||
| any(keyword in name or keyword in desc for keyword in risky_keywords) | ||
| and not has_constraints | ||
| ): | ||
| risky_tools.append(tool) | ||
| tool_reasons: list[dict[str, Any]] = [] | ||
|
|
||
| if name_matches and not props: | ||
| tool_reasons.append({ | ||
| "kind": "missing_schema", | ||
| "keywords": name_matches, | ||
| "message": "Tool exposes dangerous capabilities but provides no input schema", | ||
| }) | ||
|
|
||
| for param_name, definition in props.items(): | ||
| if not isinstance(definition, dict): | ||
| continue | ||
|
|
||
| param_keywords = _risk_keywords(param_name, risky_parameter_keywords) | ||
| if not param_keywords: | ||
| param_desc = _to_lower(definition.get("description")) | ||
| param_keywords = _risk_keywords(param_desc, risky_parameter_keywords) | ||
|
|
||
| is_string_like = _type_matches(definition, "string") or _type_matches(definition, "array") | ||
|
|
||
| if (name_matches or metadata_matches or param_keywords) and is_string_like: | ||
| if not _has_guardrails(definition): | ||
| tool_reasons.append( | ||
| { | ||
| "kind": "unconstrained_parameter", | ||
| "parameter": param_name, | ||
| "required": param_name in required_params, | ||
| "keywords": param_keywords or name_matches, | ||
| "message": "Parameter accepts attacker-controlled strings without constraints", | ||
| } | ||
| ) | ||
|
|
||
| if metadata_matches: | ||
| tool_reasons.append( | ||
| { | ||
| "kind": "metadata_flag", | ||
| "flags": metadata_matches, | ||
| "message": "Tool metadata advertises dangerous behavior", | ||
| } | ||
| ) | ||
|
|
||
| if tool_reasons: | ||
| risky_tools.append( | ||
| { | ||
| "name": tool.get("name"), | ||
| "description": tool.get("description"), | ||
| "reasons": tool_reasons, | ||
| } | ||
| ) | ||
|
|
||
| passed = len(risky_tools) == 0 | ||
| details = json.dumps(risky_tools) | ||
|
|
@@ -269,40 +414,67 @@ def check_injection_fuzzing( | |
| "http://169.254.169.254/latest/meta-data/", | ||
| ] | ||
|
|
||
| observed_danger = [] | ||
| for tool in tools: | ||
| danger_indicators = [ | ||
| "uid=", | ||
| "gid=", | ||
| "linux", | ||
| "root:x:", | ||
| "etc/passwd", | ||
| "meta-data", | ||
| "stdout", | ||
| "stderr", | ||
| "returncode", | ||
| "command executed", | ||
| "execution result", | ||
| "listening on", | ||
| "connected to", | ||
| ] | ||
|
|
||
| observed_danger: list[dict[str, Any]] = [] | ||
|
|
||
| for tool in tools or []: | ||
| name = tool.get("name") | ||
| if not name: | ||
| continue | ||
|
|
||
| schema = tool.get("inputSchema") or {} | ||
| props = schema.get("properties") or {} | ||
| props = _schema_properties(schema) | ||
|
|
||
| # Find a string parameter to test | ||
| str_param = None | ||
| for param_name, param_def in props.items() if isinstance(props, dict) else []: | ||
| if param_def.get("type") == "string": | ||
| str_param = param_name | ||
| break | ||
| string_params = [ | ||
| param_name | ||
| for param_name, param_def in props.items() | ||
| if isinstance(param_def, dict) and _type_matches(param_def, "string") | ||
| ] | ||
|
|
||
| if not (name and str_param): | ||
| if not string_params: | ||
| continue | ||
|
|
||
| # Test each payload | ||
| for payload in payloads: | ||
| resp = send_recv_func("tools/call", {"name": name, "arguments": {str_param: payload}}) | ||
|
|
||
| # Check if response contains system information (dangerous) | ||
| text = json.dumps(resp).lower() | ||
| danger_indicators = [ | ||
| "uid=", | ||
| "linux", | ||
| "root:x:", | ||
| "etc/passwd", | ||
| "meta-data", | ||
| "insecure", | ||
| "stdout", | ||
| ] | ||
| if any(indicator in text for indicator in danger_indicators): | ||
| observed_danger.append({"tool": name, "payload": payload, "resp": resp}) | ||
| break # Stop testing this tool after first dangerous response | ||
| for param_name in string_params: | ||
| for payload in payloads: | ||
| try: | ||
| resp = send_recv_func( | ||
| "tools/call", | ||
| {"name": name, "arguments": {param_name: payload}}, | ||
| ) | ||
| except Exception: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe log something here, otherwise, tracking issues could be tricky |
||
| # Ignore transport or execution errors; these do not prove vulnerability | ||
| continue | ||
|
|
||
| text = json.dumps(resp).lower() | ||
| indicator = next((d for d in danger_indicators if d in text), None) | ||
| if indicator: | ||
| observed_danger.append( | ||
| { | ||
| "tool": name, | ||
| "parameter": param_name, | ||
| "payload": payload, | ||
| "indicator": indicator, | ||
| "resp": resp, | ||
| } | ||
| ) | ||
| break | ||
| if observed_danger and observed_danger[-1]["tool"] == name: | ||
| break # Stop after first dangerous finding per tool | ||
|
|
||
| passed = len(observed_danger) == 0 | ||
| details = json.dumps(observed_danger) | ||
|
|
||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This check now returns a simplified structure instead of the full
toolobject (for example,tool.metadatais no longer included). Was this change intentional?We could preserve the previous behavior with something like: