Skip to content

mcp proxy + live activity log #23

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 24 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,20 @@ classifiers = [
dependencies = [
"mcp[cli]>=1.6.0",
"rich>=14.0.0",
"pyjson5>=1.6.8",
"aiofiles>=23.1.0",
"types-aiofiles",
"pyjson5>=1.6.8",
"pydantic>=2.11.2",
"lark>=1.1.9",
"psutil>=5.9.0",
"invariant-ai>=0.3",
"fastapi>=0.115.12",
"uvicorn>=0.34.2",
"invariant-sdk>=0.0.11",
"pyyaml>=6.0.2",
"regex>=2024.11.6",
"aiohttp>=3.11.16",
"rapidfuzz>=3.13.0",
"rapidfuzz>=3.13.0"
]

[project.scripts]
Expand All @@ -35,6 +40,7 @@ packages = ["mcp_scan"]
[project.optional-dependencies]
test = [
"pytest>=7.4.0",
"pytest-lazy-fixtures>=1.1.2",
"anyio>=4.0.0"
]
dev = [
Expand Down
48 changes: 44 additions & 4 deletions src/mcp_scan/StorageFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,11 @@
from datetime import datetime

import rich
import yaml # type: ignore
from pydantic import ValidationError

from mcp_scan_server.models import GuardrailConfig

from .models import Entity, ScannedEntities, ScannedEntity, entity_type_to_str, hash_entity
from .utils import upload_whitelist_entry

Expand All @@ -19,16 +22,22 @@ class StorageFile:
def __init__(self, path: str):
logger.debug("Initializing StorageFile with path: %s", path)
self.path = os.path.expanduser(path)

logger.debug("Expanded path: %s", self.path)
# if path is a file
self.scanned_entities: ScannedEntities = ScannedEntities({})
self.whitelist: dict[str, str] = {}
self.guardrails_config: GuardrailConfig = GuardrailConfig({})

if os.path.isfile(path):
msg = f"Legacy storage file detected at {path}, converting to new format"
logger.info(msg)
rich.print(f"[bold]{msg}[/bold]")
if os.path.isfile(self.path):
rich.print(f"[bold]Legacy storage file detected at {self.path}, converting to new format[/bold]")
# legacy format
with open(self.path, "r") as f:
legacy_data = json.load(f)
if "__whitelist" in legacy_data:
self.whitelist = legacy_data["__whitelist"]
del legacy_data["__whitelist"]

try:
logger.debug("Loading legacy format file")
with open(path) as f:
Expand All @@ -52,6 +61,7 @@ def __init__(self, path: str):
if os.path.exists(path) and os.path.isdir(path):
logger.debug("Path exists and is a directory: %s", path)
scanned_entities_path = os.path.join(path, "scanned_entities.json")

if os.path.exists(scanned_entities_path):
logger.debug("Loading scanned entities from: %s", scanned_entities_path)
with open(scanned_entities_path) as f:
Expand All @@ -69,6 +79,23 @@ def __init__(self, path: str):
self.whitelist = json.load(f)
logger.info("Successfully loaded whitelist with %d entries", len(self.whitelist))

guardrails_config_path = os.path.join(self.path, "guardrails_config.yml")
if os.path.exists(guardrails_config_path):
with open(guardrails_config_path, "r") as f:
try:
guardrails_config_data = yaml.safe_load(f.read()) or {}
self.guardrails_config = GuardrailConfig.model_validate(guardrails_config_data)
except yaml.YAMLError as e:
rich.print(
f"[bold red]Could not parse guardrails config file "
f"{guardrails_config_path}: {e}[/bold red]"
)
except ValidationError as e:
rich.print(
f"[bold red]Could not validate guardrails config file "
f"{guardrails_config_path}: {e}[/bold red]"
)

def reset_whitelist(self) -> None:
logger.info("Resetting whitelist")
self.whitelist = {}
Expand Down Expand Up @@ -139,6 +166,19 @@ def is_whitelisted(self, entity: Entity) -> bool:
logger.debug("Checking if entity %s is whitelisted: %s", entity.name, result)
return result

def create_guardrails_config(self) -> str:
"""
If the guardrails config file does not exist, create it with default values.

Returns the path to the guardrails config file.
"""
guardrails_config_path = os.path.join(self.path, "guardrails_config.yml")
if not os.path.exists(guardrails_config_path):
with open(guardrails_config_path, "w") as f:
if self.guardrails_config is not None:
f.write(self.guardrails_config.model_dump_yaml())
return guardrails_config_path

def save(self) -> None:
logger.info("Saving storage data to %s", self.path)
try:
Expand Down
Loading