The foundation package for all Hanzo tool implementations. Provides base classes, type definitions, and utilities for building MCP-compatible tools.
Installation
pip install hanzo-tools-core
Or as part of the full toolkit:
pip install hanzo-mcp[tools-all]
Overview
hanzo-tools-core provides:
- BaseTool - Abstract base class for all tool implementations
- FileSystemTool - Specialized base for file operations
- ToolRegistry - Central registry for tool management
- MCPResourceDocument - Structured response format
- ToolContext - Runtime context for tool execution
- PermissionManager - Path-based access control
Quick Start
from hanzo_tools.core import BaseTool, ToolContext
class MyTool(BaseTool):
"""A custom tool implementation."""
@property
def name(self) -> str:
return "my_tool"
@property
def description(self) -> str:
return "Does something useful"
async def call(self, ctx: ToolContext, message: str) -> str:
"""Execute the tool.
Args:
ctx: Tool execution context
message: Input message to process
Returns:
Processed result
"""
return f"Processed: {message}"
def register(self, mcp_server):
"""Register with MCP server."""
@mcp_server.tool(name=self.name, description=self.description)
async def handler(message: str) -> str:
ctx = ToolContext()
return await self.call(ctx, message=message)
from hanzo_tools.core import ToolRegistry, BaseTool
# Register tools
registry = ToolRegistry()
registry.register(MyTool())
# Get a tool by name
tool = registry.get("my_tool")
# List all registered tools
for name, tool in registry.items():
print(f"{name}: {tool.description}")
Base Classes
The abstract base class all tools must inherit from:
from abc import ABC, abstractmethod
class BaseTool(ABC):
"""Abstract base class for all Hanzo tools."""
@property
@abstractmethod
def name(self) -> str:
"""Unique identifier for the tool."""
...
@property
@abstractmethod
def description(self) -> str:
"""Human-readable description shown to LLMs."""
...
@abstractmethod
async def call(self, ctx: ToolContext, **params) -> str:
"""Execute the tool with given parameters."""
...
@abstractmethod
def register(self, mcp_server: FastMCP) -> None:
"""Register the tool with an MCP server."""
...
Specialized base class for tools that operate on the filesystem:
from hanzo_tools.core import FileSystemTool
class ReadTool(FileSystemTool):
"""Read file contents."""
@property
def name(self) -> str:
return "read"
async def call(self, ctx, file_path: str) -> str:
# Automatic path validation and permission checking
validated_path = self.validate_path(file_path)
async with aiofiles.open(validated_path) as f:
return await f.read()
MCPResourceDocument
Structured format for tool responses:
from hanzo_tools.core import MCPResourceDocument
# Create a document response
doc = MCPResourceDocument(
uri="file:///path/to/file.py",
mime_type="text/x-python",
text="def hello(): pass"
)
# Convert to JSON string for MCP
response = doc.to_json_string()
Context Management
ToolContext
Runtime context passed to every tool invocation:
from hanzo_tools.core import ToolContext, create_tool_context
# Create context with custom settings
ctx = create_tool_context(
allowed_paths=["/home/user/project"],
enable_write=True,
timeout=30.0
)
# Access context in tool
async def call(self, ctx: ToolContext, **params):
if ctx.enable_write:
# Perform write operation
...
Permission Management
PermissionManager
Controls which paths tools can access:
from hanzo_tools.core import PermissionManager
pm = PermissionManager(
allowed_paths=[
"/home/user/project",
"/tmp"
],
denied_paths=[
"/home/user/project/.env",
"/home/user/project/secrets"
]
)
# Check if path is allowed
if pm.is_allowed("/home/user/project/src/main.py"):
# Safe to access
...
Decorators
auto_timeout
Automatically handle timeouts for long-running operations:
from hanzo_tools.core import auto_timeout
class SlowTool(BaseTool):
@auto_timeout(seconds=30)
async def call(self, ctx, **params):
# Will automatically timeout after 30 seconds
result = await slow_operation()
return result
Error Handling
Tools should return error messages as strings rather than raising exceptions:
async def call(self, ctx, file_path: str) -> str:
try:
content = await read_file(file_path)
return content
except FileNotFoundError:
return f"Error: File not found: {file_path}"
except PermissionError:
return f"Error: Permission denied: {file_path}"
except Exception as e:
return f"Error: {type(e).__name__}: {str(e)}"
API Reference
Bases: ABC
Abstract base class for all Hanzo tools.
All tool packages must implement this interface to be compatible
with the hanzo-mcp server and tool registry.
Example
class MyTool(BaseTool):
@property
def name(self) -> str:
return "my_tool"
@property
def description(self) -> str:
return "Does something useful"
async def call(self, ctx, **params) -> str:
return "Result"
def register(self, mcp_server):
@mcp_server.tool()
async def my_tool(...):
return await self.call(...)
Get the tool name as it appears in the MCP server.
Get detailed description of the tool's purpose and usage.
call(ctx: Context, **params: Any) -> Any
Execute the tool with the given parameters.
Parameters:
| Name |
Type |
Description |
Default |
ctx
|
Context
|
MCP context for the tool call
|
required
|
**params
|
Any
|
Tool parameters provided by the caller
|
{}
|
Returns:
register(mcp_server: FastMCP) -> None
Register this tool with the MCP server.
Must create a wrapper function with explicit parameters
that calls this tool's call method.
Parameters:
| Name |
Type |
Description |
Default |
mcp_server
|
FastMCP
|
The FastMCP server instance
|
required
|
Bases: BaseTool, ABC
Base class for filesystem-related tools.
Provides common functionality for working with files and directories,
including permission checking and path validation.
Source code in hanzo_tools/core/base.py
| class FileSystemTool(BaseTool, ABC):
"""Base class for filesystem-related tools.
Provides common functionality for working with files and directories,
including permission checking and path validation.
"""
def __init__(self, permission_manager: "PermissionManager | None" = None) -> None:
"""Initialize filesystem tool.
Args:
permission_manager: Permission manager for access control (auto-created if None)
"""
if permission_manager is None:
from hanzo_tools.core.permissions import PermissionManager
permission_manager = PermissionManager()
self.permission_manager = permission_manager
def validate_path(self, path: str, param_name: str = "path") -> "ValidationResult":
"""Validate a path parameter."""
from hanzo_tools.core.validation import validate_path_parameter
return validate_path_parameter(path, param_name)
def is_path_allowed(self, path: str) -> bool:
"""Check if a path is allowed according to permission settings."""
return self.permission_manager.is_path_allowed(path)
|
Initialize filesystem tool.
Parameters:
| Name |
Type |
Description |
Default |
permission_manager
|
PermissionManager | None
|
Permission manager for access control (auto-created if None)
|
None
|
Source code in hanzo_tools/core/base.py
| def __init__(self, permission_manager: "PermissionManager | None" = None) -> None:
"""Initialize filesystem tool.
Args:
permission_manager: Permission manager for access control (auto-created if None)
"""
if permission_manager is None:
from hanzo_tools.core.permissions import PermissionManager
permission_manager = PermissionManager()
self.permission_manager = permission_manager
|
validate_path(path: str, param_name: str = 'path') -> ValidationResult
Validate a path parameter.
Source code in hanzo_tools/core/base.py
| def validate_path(self, path: str, param_name: str = "path") -> "ValidationResult":
"""Validate a path parameter."""
from hanzo_tools.core.validation import validate_path_parameter
return validate_path_parameter(path, param_name)
|
is_path_allowed(path: str) -> bool
Check if a path is allowed according to permission settings.
Source code in hanzo_tools/core/base.py
| def is_path_allowed(self, path: str) -> bool:
"""Check if a path is allowed according to permission settings."""
return self.permission_manager.is_path_allowed(path)
|
Registry for Hanzo tools.
Provides functionality for registering tool implementations
with an MCP server, with support for enable/disable states.
Source code in hanzo_tools/core/base.py
| @final
class ToolRegistry:
"""Registry for Hanzo tools.
Provides functionality for registering tool implementations
with an MCP server, with support for enable/disable states.
"""
# Class-level storage for tool states
_enabled_tools: ClassVar[dict[str, bool]] = {}
_config_loaded: ClassVar[bool] = False
@classmethod
def _load_config(cls) -> None:
"""Load tool enable/disable states from config."""
if cls._config_loaded:
return
import json
config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
if config_file.exists():
try:
with open(config_file) as f:
cls._enabled_tools = json.load(f)
except Exception:
pass
cls._config_loaded = True
@classmethod
def is_tool_enabled(cls, tool_name: str) -> bool:
"""Check if a tool is enabled."""
cls._load_config()
return cls._enabled_tools.get(tool_name, True) # Enabled by default
@classmethod
def set_tool_enabled(cls, tool_name: str, enabled: bool, persist: bool = True) -> None:
"""Enable or disable a tool."""
import json
cls._load_config()
cls._enabled_tools[tool_name] = enabled
if persist:
config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, "w") as f:
json.dump(cls._enabled_tools, f, indent=2)
@staticmethod
def register_tool(mcp_server: FastMCP, tool: BaseTool) -> None:
"""Register a tool with the MCP server.
Args:
mcp_server: The FastMCP server instance
tool: The tool to register
"""
if ToolRegistry.is_tool_enabled(tool.name):
tool.register(mcp_server)
logger.debug(f"Registered tool: {tool.name}")
else:
logger.debug(f"Skipped disabled tool: {tool.name}")
@staticmethod
def register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None:
"""Register multiple tools with the MCP server."""
for tool in tools:
ToolRegistry.register_tool(mcp_server, tool)
|
is_tool_enabled(tool_name: str) -> bool
Check if a tool is enabled.
Source code in hanzo_tools/core/base.py
| @classmethod
def is_tool_enabled(cls, tool_name: str) -> bool:
"""Check if a tool is enabled."""
cls._load_config()
return cls._enabled_tools.get(tool_name, True) # Enabled by default
|
set_tool_enabled(tool_name: str, enabled: bool, persist: bool = True) -> None
Enable or disable a tool.
Source code in hanzo_tools/core/base.py
| @classmethod
def set_tool_enabled(cls, tool_name: str, enabled: bool, persist: bool = True) -> None:
"""Enable or disable a tool."""
import json
cls._load_config()
cls._enabled_tools[tool_name] = enabled
if persist:
config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
config_file.parent.mkdir(parents=True, exist_ok=True)
with open(config_file, "w") as f:
json.dump(cls._enabled_tools, f, indent=2)
|
register_tool(mcp_server: FastMCP, tool: BaseTool) -> None
Register a tool with the MCP server.
Parameters:
| Name |
Type |
Description |
Default |
mcp_server
|
FastMCP
|
The FastMCP server instance
|
required
|
tool
|
BaseTool
|
|
required
|
Source code in hanzo_tools/core/base.py
| @staticmethod
def register_tool(mcp_server: FastMCP, tool: BaseTool) -> None:
"""Register a tool with the MCP server.
Args:
mcp_server: The FastMCP server instance
tool: The tool to register
"""
if ToolRegistry.is_tool_enabled(tool.name):
tool.register(mcp_server)
logger.debug(f"Registered tool: {tool.name}")
else:
logger.debug(f"Skipped disabled tool: {tool.name}")
|
register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None
Register multiple tools with the MCP server.
Source code in hanzo_tools/core/base.py
| @staticmethod
def register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None:
"""Register multiple tools with the MCP server."""
for tool in tools:
ToolRegistry.register_tool(mcp_server, tool)
|
Resource document returned by MCP tools.
Output format options:
- to_json_string(): Clean JSON format (default for structured data)
- to_readable_string(): Human-readable formatted text for display
- to_dict(): Full dict structure with data/metadata
Source code in hanzo_tools/core/types.py
| @dataclass
class MCPResourceDocument:
"""Resource document returned by MCP tools.
Output format options:
- to_json_string(): Clean JSON format (default for structured data)
- to_readable_string(): Human-readable formatted text for display
- to_dict(): Full dict structure with data/metadata
"""
data: Dict[str, Any]
metadata: Optional[Dict[str, Any]] = None
def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format with data/metadata structure."""
result = {"data": self.data}
if self.metadata:
result["metadata"] = self.metadata
return result
def to_json_string(self) -> str:
"""Convert to clean JSON string."""
# Return wrapped in "result" for consistency
return json.dumps({"result": self.data}, indent=2)
def to_readable_string(self) -> str:
"""Convert to human-readable formatted string for display.
Optimized for readability in Claude Code output panels.
"""
lines: List[str] = []
if isinstance(self.data, dict):
# Handle search/find results with "results" array
if "results" in self.data:
results = self.data["results"]
stats = self.data.get("stats", {})
pagination = self.data.get("pagination", {})
# Header with stats
if stats:
query = stats.get("query", stats.get("pattern", ""))
total = stats.get("total", len(results))
time_ms = stats.get("time_ms", {})
if time_ms:
if isinstance(time_ms, dict):
total_time = sum(time_ms.values())
else:
total_time = time_ms
lines.append(f"# Search: '{query}' ({total} results, {total_time}ms)")
else:
lines.append(f"# Found {total} results for '{query}'")
else:
lines.append(f"# Found {len(results)} results")
lines.append("")
# Format each result
for i, result in enumerate(results[:50], 1):
if isinstance(result, dict):
# Common patterns for search results
file_path = result.get("file", result.get("path", ""))
line_num = result.get("line", result.get("line_number", ""))
match_text = result.get("match", result.get("text", result.get("content", "")))
result_type = result.get("type", "")
if file_path:
loc = f"{file_path}:{line_num}" if line_num else file_path
lines.append(f"{i}. {loc}")
if match_text:
# Truncate long matches for readability
preview = match_text[:200] + "..." if len(match_text) > 200 else match_text
lines.append(f" {preview}")
if result_type:
lines.append(f" [{result_type}]")
else:
lines.append(f"{i}. {json.dumps(result, default=str)}")
else:
lines.append(f"{i}. {result}")
# Show pagination info
if pagination:
page = pagination.get("page", 1)
total = pagination.get("total", 0)
has_next = pagination.get("has_next", False)
if has_next and total > 0:
total_pages = (total // 50) + 1
lines.append(f"\n... page {page} of {total_pages} ({total} total)")
# Handle command execution results
elif "output" in self.data or "stdout" in self.data or "stderr" in self.data:
# Shell command output
exit_code = self.data.get("exit_code", self.data.get("returncode", 0))
stdout = self.data.get("output", self.data.get("stdout", ""))
stderr = self.data.get("stderr", "")
elapsed = self.data.get("elapsed", self.data.get("time_ms", ""))
if exit_code == 0:
lines.append(f"✓ Command succeeded")
else:
lines.append(f"✗ Command failed (exit {exit_code})")
if elapsed:
lines.append(f"Time: {elapsed}ms" if isinstance(elapsed, (int, float)) else f"Time: {elapsed}")
lines.append("")
if stdout:
lines.append(stdout.rstrip())
if stderr:
lines.append("\n--- stderr ---")
lines.append(stderr.rstrip())
# Handle error results
elif "error" in self.data:
lines.append(f"Error: {self.data['error']}")
if "details" in self.data:
lines.append(f"Details: {self.data['details']}")
# Generic dict - format as key-value pairs
else:
for key, value in self.data.items():
if isinstance(value, (dict, list)):
lines.append(f"{key}:")
lines.append(json.dumps(value, indent=2, default=str))
else:
lines.append(f"{key}: {value}")
elif isinstance(self.data, list):
# List data - format as numbered items
for i, item in enumerate(self.data[:50], 1):
if isinstance(item, dict):
lines.append(f"{i}. {json.dumps(item, default=str)}")
else:
lines.append(f"{i}. {item}")
if len(self.data) > 50:
lines.append(f"\n... {len(self.data) - 50} more items")
else:
# Scalar or other - just convert to string
lines.append(str(self.data))
# Add metadata footer if present
if self.metadata:
lines.append("")
lines.append("---")
for key, value in self.metadata.items():
lines.append(f"{key}: {value}")
return "\n".join(lines)
|
to_dict() -> Dict[str, Any]
Convert to dictionary format with data/metadata structure.
Source code in hanzo_tools/core/types.py
| def to_dict(self) -> Dict[str, Any]:
"""Convert to dictionary format with data/metadata structure."""
result = {"data": self.data}
if self.metadata:
result["metadata"] = self.metadata
return result
|
Convert to clean JSON string.
Source code in hanzo_tools/core/types.py
| def to_json_string(self) -> str:
"""Convert to clean JSON string."""
# Return wrapped in "result" for consistency
return json.dumps({"result": self.data}, indent=2)
|
to_readable_string() -> str
Convert to human-readable formatted string for display.
Optimized for readability in Claude Code output panels.
Source code in hanzo_tools/core/types.py
| def to_readable_string(self) -> str:
"""Convert to human-readable formatted string for display.
Optimized for readability in Claude Code output panels.
"""
lines: List[str] = []
if isinstance(self.data, dict):
# Handle search/find results with "results" array
if "results" in self.data:
results = self.data["results"]
stats = self.data.get("stats", {})
pagination = self.data.get("pagination", {})
# Header with stats
if stats:
query = stats.get("query", stats.get("pattern", ""))
total = stats.get("total", len(results))
time_ms = stats.get("time_ms", {})
if time_ms:
if isinstance(time_ms, dict):
total_time = sum(time_ms.values())
else:
total_time = time_ms
lines.append(f"# Search: '{query}' ({total} results, {total_time}ms)")
else:
lines.append(f"# Found {total} results for '{query}'")
else:
lines.append(f"# Found {len(results)} results")
lines.append("")
# Format each result
for i, result in enumerate(results[:50], 1):
if isinstance(result, dict):
# Common patterns for search results
file_path = result.get("file", result.get("path", ""))
line_num = result.get("line", result.get("line_number", ""))
match_text = result.get("match", result.get("text", result.get("content", "")))
result_type = result.get("type", "")
if file_path:
loc = f"{file_path}:{line_num}" if line_num else file_path
lines.append(f"{i}. {loc}")
if match_text:
# Truncate long matches for readability
preview = match_text[:200] + "..." if len(match_text) > 200 else match_text
lines.append(f" {preview}")
if result_type:
lines.append(f" [{result_type}]")
else:
lines.append(f"{i}. {json.dumps(result, default=str)}")
else:
lines.append(f"{i}. {result}")
# Show pagination info
if pagination:
page = pagination.get("page", 1)
total = pagination.get("total", 0)
has_next = pagination.get("has_next", False)
if has_next and total > 0:
total_pages = (total // 50) + 1
lines.append(f"\n... page {page} of {total_pages} ({total} total)")
# Handle command execution results
elif "output" in self.data or "stdout" in self.data or "stderr" in self.data:
# Shell command output
exit_code = self.data.get("exit_code", self.data.get("returncode", 0))
stdout = self.data.get("output", self.data.get("stdout", ""))
stderr = self.data.get("stderr", "")
elapsed = self.data.get("elapsed", self.data.get("time_ms", ""))
if exit_code == 0:
lines.append(f"✓ Command succeeded")
else:
lines.append(f"✗ Command failed (exit {exit_code})")
if elapsed:
lines.append(f"Time: {elapsed}ms" if isinstance(elapsed, (int, float)) else f"Time: {elapsed}")
lines.append("")
if stdout:
lines.append(stdout.rstrip())
if stderr:
lines.append("\n--- stderr ---")
lines.append(stderr.rstrip())
# Handle error results
elif "error" in self.data:
lines.append(f"Error: {self.data['error']}")
if "details" in self.data:
lines.append(f"Details: {self.data['details']}")
# Generic dict - format as key-value pairs
else:
for key, value in self.data.items():
if isinstance(value, (dict, list)):
lines.append(f"{key}:")
lines.append(json.dumps(value, indent=2, default=str))
else:
lines.append(f"{key}: {value}")
elif isinstance(self.data, list):
# List data - format as numbered items
for i, item in enumerate(self.data[:50], 1):
if isinstance(item, dict):
lines.append(f"{i}. {json.dumps(item, default=str)}")
else:
lines.append(f"{i}. {item}")
if len(self.data) > 50:
lines.append(f"\n... {len(self.data) - 50} more items")
else:
# Scalar or other - just convert to string
lines.append(str(self.data))
# Add metadata footer if present
if self.metadata:
lines.append("")
lines.append("---")
for key, value in self.metadata.items():
lines.append(f"{key}: {value}")
return "\n".join(lines)
|
Extended context for tool execution.
Provides utilities for logging, progress reporting,
and accessing the MCP context.
Source code in hanzo_tools/core/context.py
| @dataclass
class ToolContext:
"""Extended context for tool execution.
Provides utilities for logging, progress reporting,
and accessing the MCP context.
"""
mcp_ctx: MCPContext
tool_name: Optional[str] = None
metadata: dict[str, Any] = field(default_factory=dict)
async def set_tool_info(self, tool_name: str) -> None:
"""Set the current tool name for logging."""
self.tool_name = tool_name
async def info(self, message: str) -> None:
"""Log an info message."""
logger.info(f"[{self.tool_name or 'tool'}] {message}")
async def warning(self, message: str) -> None:
"""Log a warning message."""
logger.warning(f"[{self.tool_name or 'tool'}] {message}")
async def error(self, message: str) -> None:
"""Log an error message."""
logger.error(f"[{self.tool_name or 'tool'}] {message}")
async def debug(self, message: str) -> None:
"""Log a debug message."""
logger.debug(f"[{self.tool_name or 'tool'}] {message}")
async def progress(self, current: int, total: int, message: str = "") -> None:
"""Report progress."""
pct = (current / total * 100) if total > 0 else 0
await self.info(f"Progress: {current}/{total} ({pct:.1f}%) {message}")
def get(self, key: str, default: Any = None) -> Any:
"""Get a metadata value."""
return self.metadata.get(key, default)
def set(self, key: str, value: Any) -> None:
"""Set a metadata value."""
self.metadata[key] = value
|
set_tool_info
async
set_tool_info(tool_name: str) -> None
Set the current tool name for logging.
Source code in hanzo_tools/core/context.py
| async def set_tool_info(self, tool_name: str) -> None:
"""Set the current tool name for logging."""
self.tool_name = tool_name
|
info
async
info(message: str) -> None
Log an info message.
Source code in hanzo_tools/core/context.py
| async def info(self, message: str) -> None:
"""Log an info message."""
logger.info(f"[{self.tool_name or 'tool'}] {message}")
|
warning
async
warning(message: str) -> None
Log a warning message.
Source code in hanzo_tools/core/context.py
| async def warning(self, message: str) -> None:
"""Log a warning message."""
logger.warning(f"[{self.tool_name or 'tool'}] {message}")
|
error
async
error(message: str) -> None
Log an error message.
Source code in hanzo_tools/core/context.py
| async def error(self, message: str) -> None:
"""Log an error message."""
logger.error(f"[{self.tool_name or 'tool'}] {message}")
|
debug
async
debug(message: str) -> None
Log a debug message.
Source code in hanzo_tools/core/context.py
| async def debug(self, message: str) -> None:
"""Log a debug message."""
logger.debug(f"[{self.tool_name or 'tool'}] {message}")
|
progress
async
progress(current: int, total: int, message: str = '') -> None
Report progress.
Source code in hanzo_tools/core/context.py
| async def progress(self, current: int, total: int, message: str = "") -> None:
"""Report progress."""
pct = (current / total * 100) if total > 0 else 0
await self.info(f"Progress: {current}/{total} ({pct:.1f}%) {message}")
|
get
get(key: str, default: Any = None) -> Any
Get a metadata value.
Source code in hanzo_tools/core/context.py
| def get(self, key: str, default: Any = None) -> Any:
"""Get a metadata value."""
return self.metadata.get(key, default)
|
set
set(key: str, value: Any) -> None
Set a metadata value.
Source code in hanzo_tools/core/context.py
| def set(self, key: str, value: Any) -> None:
"""Set a metadata value."""
self.metadata[key] = value
|
Manages filesystem permissions for tools.
Controls which paths tools are allowed to access.
Source code in hanzo_tools/core/permissions.py
| class PermissionManager:
"""Manages filesystem permissions for tools.
Controls which paths tools are allowed to access.
"""
def __init__(
self,
allowed_paths: Optional[list[str | Path]] = None,
deny_patterns: Optional[list[str]] = None,
):
"""Initialize permission manager.
Args:
allowed_paths: List of allowed base paths (defaults to cwd)
deny_patterns: Patterns to deny (e.g., '.git', 'node_modules')
"""
self.allowed_paths: list[Path] = []
if allowed_paths:
for p in allowed_paths:
self.allowed_paths.append(Path(p).resolve())
else:
self.allowed_paths.append(Path.cwd())
self.deny_patterns = deny_patterns or [
".git",
"__pycache__",
".pyc",
"node_modules",
".env",
".secrets",
]
def is_path_allowed(self, path: str | Path) -> bool:
"""Check if a path is allowed.
Args:
path: Path to check
Returns:
True if path is within allowed paths and not denied
"""
try:
resolved = Path(path).resolve()
# Check if path is under any allowed path
is_under_allowed = any(self._is_subpath(resolved, allowed) for allowed in self.allowed_paths)
if not is_under_allowed:
return False
# Check deny patterns
path_str = str(resolved)
for pattern in self.deny_patterns:
if pattern in path_str:
return False
return True
except Exception:
return False
def _is_subpath(self, path: Path, parent: Path) -> bool:
"""Check if path is under parent."""
try:
path.relative_to(parent)
return True
except ValueError:
return False
def add_allowed_path(self, path: str | Path) -> None:
"""Add a path to the allowed list."""
self.allowed_paths.append(Path(path).resolve())
def add_deny_pattern(self, pattern: str) -> None:
"""Add a deny pattern."""
self.deny_patterns.append(pattern)
|
__init__(allowed_paths: Optional[list[str | Path]] = None, deny_patterns: Optional[list[str]] = None)
Initialize permission manager.
Parameters:
| Name |
Type |
Description |
Default |
allowed_paths
|
Optional[list[str | Path]]
|
List of allowed base paths (defaults to cwd)
|
None
|
deny_patterns
|
Optional[list[str]]
|
Patterns to deny (e.g., '.git', 'node_modules')
|
None
|
Source code in hanzo_tools/core/permissions.py
| def __init__(
self,
allowed_paths: Optional[list[str | Path]] = None,
deny_patterns: Optional[list[str]] = None,
):
"""Initialize permission manager.
Args:
allowed_paths: List of allowed base paths (defaults to cwd)
deny_patterns: Patterns to deny (e.g., '.git', 'node_modules')
"""
self.allowed_paths: list[Path] = []
if allowed_paths:
for p in allowed_paths:
self.allowed_paths.append(Path(p).resolve())
else:
self.allowed_paths.append(Path.cwd())
self.deny_patterns = deny_patterns or [
".git",
"__pycache__",
".pyc",
"node_modules",
".env",
".secrets",
]
|
is_path_allowed(path: str | Path) -> bool
Check if a path is allowed.
Parameters:
| Name |
Type |
Description |
Default |
path
|
str | Path
|
|
required
|
Returns:
| Type |
Description |
bool
|
True if path is within allowed paths and not denied
|
Source code in hanzo_tools/core/permissions.py
| def is_path_allowed(self, path: str | Path) -> bool:
"""Check if a path is allowed.
Args:
path: Path to check
Returns:
True if path is within allowed paths and not denied
"""
try:
resolved = Path(path).resolve()
# Check if path is under any allowed path
is_under_allowed = any(self._is_subpath(resolved, allowed) for allowed in self.allowed_paths)
if not is_under_allowed:
return False
# Check deny patterns
path_str = str(resolved)
for pattern in self.deny_patterns:
if pattern in path_str:
return False
return True
except Exception:
return False
|
add_allowed_path(path: str | Path) -> None
Add a path to the allowed list.
Source code in hanzo_tools/core/permissions.py
| def add_allowed_path(self, path: str | Path) -> None:
"""Add a path to the allowed list."""
self.allowed_paths.append(Path(path).resolve())
|
add_deny_pattern(pattern: str) -> None
Add a deny pattern.
Source code in hanzo_tools/core/permissions.py
| def add_deny_pattern(self, pattern: str) -> None:
"""Add a deny pattern."""
self.deny_patterns.append(pattern)
|