Skip to content

hanzo-tools-core

The foundation package for all Hanzo tool implementations. Provides base classes, type definitions, and utilities for building MCP-compatible tools.

Installation

pip install hanzo-tools-core

Or as part of the full toolkit:

pip install hanzo-mcp[tools-all]

Overview

hanzo-tools-core provides:

  • BaseTool - Abstract base class for all tool implementations
  • FileSystemTool - Specialized base for file operations
  • ToolRegistry - Central registry for tool management
  • MCPResourceDocument - Structured response format
  • ToolContext - Runtime context for tool execution
  • PermissionManager - Path-based access control

Quick Start

Creating a Custom Tool

from hanzo_tools.core import BaseTool, ToolContext

class MyTool(BaseTool):
    """A custom tool implementation."""

    @property
    def name(self) -> str:
        return "my_tool"

    @property
    def description(self) -> str:
        return "Does something useful"

    async def call(self, ctx: ToolContext, message: str) -> str:
        """Execute the tool.

        Args:
            ctx: Tool execution context
            message: Input message to process

        Returns:
            Processed result
        """
        return f"Processed: {message}"

    def register(self, mcp_server):
        """Register with MCP server."""
        @mcp_server.tool(name=self.name, description=self.description)
        async def handler(message: str) -> str:
            ctx = ToolContext()
            return await self.call(ctx, message=message)

Using the Tool Registry

from hanzo_tools.core import ToolRegistry, BaseTool

# Register tools
registry = ToolRegistry()
registry.register(MyTool())

# Get a tool by name
tool = registry.get("my_tool")

# List all registered tools
for name, tool in registry.items():
    print(f"{name}: {tool.description}")

Base Classes

BaseTool

The abstract base class all tools must inherit from:

from abc import ABC, abstractmethod

class BaseTool(ABC):
    """Abstract base class for all Hanzo tools."""

    @property
    @abstractmethod
    def name(self) -> str:
        """Unique identifier for the tool."""
        ...

    @property
    @abstractmethod
    def description(self) -> str:
        """Human-readable description shown to LLMs."""
        ...

    @abstractmethod
    async def call(self, ctx: ToolContext, **params) -> str:
        """Execute the tool with given parameters."""
        ...

    @abstractmethod
    def register(self, mcp_server: FastMCP) -> None:
        """Register the tool with an MCP server."""
        ...

FileSystemTool

Specialized base class for tools that operate on the filesystem:

from hanzo_tools.core import FileSystemTool

class ReadTool(FileSystemTool):
    """Read file contents."""

    @property
    def name(self) -> str:
        return "read"

    async def call(self, ctx, file_path: str) -> str:
        # Automatic path validation and permission checking
        validated_path = self.validate_path(file_path)
        async with aiofiles.open(validated_path) as f:
            return await f.read()

Response Format

MCPResourceDocument

Structured format for tool responses:

from hanzo_tools.core import MCPResourceDocument

# Create a document response
doc = MCPResourceDocument(
    uri="file:///path/to/file.py",
    mime_type="text/x-python",
    text="def hello(): pass"
)

# Convert to JSON string for MCP
response = doc.to_json_string()

Context Management

ToolContext

Runtime context passed to every tool invocation:

from hanzo_tools.core import ToolContext, create_tool_context

# Create context with custom settings
ctx = create_tool_context(
    allowed_paths=["/home/user/project"],
    enable_write=True,
    timeout=30.0
)

# Access context in tool
async def call(self, ctx: ToolContext, **params):
    if ctx.enable_write:
        # Perform write operation
        ...

Permission Management

PermissionManager

Controls which paths tools can access:

from hanzo_tools.core import PermissionManager

pm = PermissionManager(
    allowed_paths=[
        "/home/user/project",
        "/tmp"
    ],
    denied_paths=[
        "/home/user/project/.env",
        "/home/user/project/secrets"
    ]
)

# Check if path is allowed
if pm.is_allowed("/home/user/project/src/main.py"):
    # Safe to access
    ...

Decorators

auto_timeout

Automatically handle timeouts for long-running operations:

from hanzo_tools.core import auto_timeout

class SlowTool(BaseTool):
    @auto_timeout(seconds=30)
    async def call(self, ctx, **params):
        # Will automatically timeout after 30 seconds
        result = await slow_operation()
        return result

Error Handling

Tools should return error messages as strings rather than raising exceptions:

async def call(self, ctx, file_path: str) -> str:
    try:
        content = await read_file(file_path)
        return content
    except FileNotFoundError:
        return f"Error: File not found: {file_path}"
    except PermissionError:
        return f"Error: Permission denied: {file_path}"
    except Exception as e:
        return f"Error: {type(e).__name__}: {str(e)}"

API Reference

Bases: ABC

Abstract base class for all Hanzo tools.

All tool packages must implement this interface to be compatible with the hanzo-mcp server and tool registry.

Example

class MyTool(BaseTool): @property def name(self) -> str: return "my_tool"

@property
def description(self) -> str:
    return "Does something useful"

async def call(self, ctx, **params) -> str:
    return "Result"

def register(self, mcp_server):
    @mcp_server.tool()
    async def my_tool(...):
        return await self.call(...)

name abstractmethod property

name: str

Get the tool name as it appears in the MCP server.

description abstractmethod property

description: str

Get detailed description of the tool's purpose and usage.

call abstractmethod async

call(ctx: Context, **params: Any) -> Any

Execute the tool with the given parameters.

Parameters:

Name Type Description Default
ctx Context

MCP context for the tool call

required
**params Any

Tool parameters provided by the caller

{}

Returns:

Type Description
Any

Tool execution result

register abstractmethod

register(mcp_server: FastMCP) -> None

Register this tool with the MCP server.

Must create a wrapper function with explicit parameters that calls this tool's call method.

Parameters:

Name Type Description Default
mcp_server FastMCP

The FastMCP server instance

required

Bases: BaseTool, ABC

Base class for filesystem-related tools.

Provides common functionality for working with files and directories, including permission checking and path validation.

Source code in hanzo_tools/core/base.py
class FileSystemTool(BaseTool, ABC):
    """Base class for filesystem-related tools.

    Provides common functionality for working with files and directories,
    including permission checking and path validation.
    """

    def __init__(self, permission_manager: "PermissionManager | None" = None) -> None:
        """Initialize filesystem tool.

        Args:
            permission_manager: Permission manager for access control (auto-created if None)
        """
        if permission_manager is None:
            from hanzo_tools.core.permissions import PermissionManager

            permission_manager = PermissionManager()
        self.permission_manager = permission_manager

    def validate_path(self, path: str, param_name: str = "path") -> "ValidationResult":
        """Validate a path parameter."""
        from hanzo_tools.core.validation import validate_path_parameter

        return validate_path_parameter(path, param_name)

    def is_path_allowed(self, path: str) -> bool:
        """Check if a path is allowed according to permission settings."""
        return self.permission_manager.is_path_allowed(path)

__init__

__init__(permission_manager: PermissionManager | None = None) -> None

Initialize filesystem tool.

Parameters:

Name Type Description Default
permission_manager PermissionManager | None

Permission manager for access control (auto-created if None)

None
Source code in hanzo_tools/core/base.py
def __init__(self, permission_manager: "PermissionManager | None" = None) -> None:
    """Initialize filesystem tool.

    Args:
        permission_manager: Permission manager for access control (auto-created if None)
    """
    if permission_manager is None:
        from hanzo_tools.core.permissions import PermissionManager

        permission_manager = PermissionManager()
    self.permission_manager = permission_manager

validate_path

validate_path(path: str, param_name: str = 'path') -> ValidationResult

Validate a path parameter.

Source code in hanzo_tools/core/base.py
def validate_path(self, path: str, param_name: str = "path") -> "ValidationResult":
    """Validate a path parameter."""
    from hanzo_tools.core.validation import validate_path_parameter

    return validate_path_parameter(path, param_name)

is_path_allowed

is_path_allowed(path: str) -> bool

Check if a path is allowed according to permission settings.

Source code in hanzo_tools/core/base.py
def is_path_allowed(self, path: str) -> bool:
    """Check if a path is allowed according to permission settings."""
    return self.permission_manager.is_path_allowed(path)

Registry for Hanzo tools.

Provides functionality for registering tool implementations with an MCP server, with support for enable/disable states.

Source code in hanzo_tools/core/base.py
@final
class ToolRegistry:
    """Registry for Hanzo tools.

    Provides functionality for registering tool implementations
    with an MCP server, with support for enable/disable states.
    """

    # Class-level storage for tool states
    _enabled_tools: ClassVar[dict[str, bool]] = {}
    _config_loaded: ClassVar[bool] = False

    @classmethod
    def _load_config(cls) -> None:
        """Load tool enable/disable states from config."""
        if cls._config_loaded:
            return

        import json

        config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
        if config_file.exists():
            try:
                with open(config_file) as f:
                    cls._enabled_tools = json.load(f)
            except Exception:
                pass
        cls._config_loaded = True

    @classmethod
    def is_tool_enabled(cls, tool_name: str) -> bool:
        """Check if a tool is enabled."""
        cls._load_config()
        return cls._enabled_tools.get(tool_name, True)  # Enabled by default

    @classmethod
    def set_tool_enabled(cls, tool_name: str, enabled: bool, persist: bool = True) -> None:
        """Enable or disable a tool."""
        import json

        cls._load_config()
        cls._enabled_tools[tool_name] = enabled

        if persist:
            config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
            config_file.parent.mkdir(parents=True, exist_ok=True)
            with open(config_file, "w") as f:
                json.dump(cls._enabled_tools, f, indent=2)

    @staticmethod
    def register_tool(mcp_server: FastMCP, tool: BaseTool) -> None:
        """Register a tool with the MCP server.

        Args:
            mcp_server: The FastMCP server instance
            tool: The tool to register
        """
        if ToolRegistry.is_tool_enabled(tool.name):
            tool.register(mcp_server)
            logger.debug(f"Registered tool: {tool.name}")
        else:
            logger.debug(f"Skipped disabled tool: {tool.name}")

    @staticmethod
    def register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None:
        """Register multiple tools with the MCP server."""
        for tool in tools:
            ToolRegistry.register_tool(mcp_server, tool)

is_tool_enabled classmethod

is_tool_enabled(tool_name: str) -> bool

Check if a tool is enabled.

Source code in hanzo_tools/core/base.py
@classmethod
def is_tool_enabled(cls, tool_name: str) -> bool:
    """Check if a tool is enabled."""
    cls._load_config()
    return cls._enabled_tools.get(tool_name, True)  # Enabled by default

set_tool_enabled classmethod

set_tool_enabled(tool_name: str, enabled: bool, persist: bool = True) -> None

Enable or disable a tool.

Source code in hanzo_tools/core/base.py
@classmethod
def set_tool_enabled(cls, tool_name: str, enabled: bool, persist: bool = True) -> None:
    """Enable or disable a tool."""
    import json

    cls._load_config()
    cls._enabled_tools[tool_name] = enabled

    if persist:
        config_file = Path.home() / ".hanzo" / "mcp" / "tool_states.json"
        config_file.parent.mkdir(parents=True, exist_ok=True)
        with open(config_file, "w") as f:
            json.dump(cls._enabled_tools, f, indent=2)

register_tool staticmethod

register_tool(mcp_server: FastMCP, tool: BaseTool) -> None

Register a tool with the MCP server.

Parameters:

Name Type Description Default
mcp_server FastMCP

The FastMCP server instance

required
tool BaseTool

The tool to register

required
Source code in hanzo_tools/core/base.py
@staticmethod
def register_tool(mcp_server: FastMCP, tool: BaseTool) -> None:
    """Register a tool with the MCP server.

    Args:
        mcp_server: The FastMCP server instance
        tool: The tool to register
    """
    if ToolRegistry.is_tool_enabled(tool.name):
        tool.register(mcp_server)
        logger.debug(f"Registered tool: {tool.name}")
    else:
        logger.debug(f"Skipped disabled tool: {tool.name}")

register_tools staticmethod

register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None

Register multiple tools with the MCP server.

Source code in hanzo_tools/core/base.py
@staticmethod
def register_tools(mcp_server: FastMCP, tools: list[BaseTool]) -> None:
    """Register multiple tools with the MCP server."""
    for tool in tools:
        ToolRegistry.register_tool(mcp_server, tool)

Resource document returned by MCP tools.

Output format options: - to_json_string(): Clean JSON format (default for structured data) - to_readable_string(): Human-readable formatted text for display - to_dict(): Full dict structure with data/metadata

Source code in hanzo_tools/core/types.py
@dataclass
class MCPResourceDocument:
    """Resource document returned by MCP tools.

    Output format options:
    - to_json_string(): Clean JSON format (default for structured data)
    - to_readable_string(): Human-readable formatted text for display
    - to_dict(): Full dict structure with data/metadata
    """

    data: Dict[str, Any]
    metadata: Optional[Dict[str, Any]] = None

    def to_dict(self) -> Dict[str, Any]:
        """Convert to dictionary format with data/metadata structure."""
        result = {"data": self.data}
        if self.metadata:
            result["metadata"] = self.metadata
        return result

    def to_json_string(self) -> str:
        """Convert to clean JSON string."""
        # Return wrapped in "result" for consistency
        return json.dumps({"result": self.data}, indent=2)

    def to_readable_string(self) -> str:
        """Convert to human-readable formatted string for display.

        Optimized for readability in Claude Code output panels.
        """
        lines: List[str] = []

        if isinstance(self.data, dict):
            # Handle search/find results with "results" array
            if "results" in self.data:
                results = self.data["results"]
                stats = self.data.get("stats", {})
                pagination = self.data.get("pagination", {})

                # Header with stats
                if stats:
                    query = stats.get("query", stats.get("pattern", ""))
                    total = stats.get("total", len(results))
                    time_ms = stats.get("time_ms", {})
                    if time_ms:
                        if isinstance(time_ms, dict):
                            total_time = sum(time_ms.values())
                        else:
                            total_time = time_ms
                        lines.append(f"# Search: '{query}' ({total} results, {total_time}ms)")
                    else:
                        lines.append(f"# Found {total} results for '{query}'")
                else:
                    lines.append(f"# Found {len(results)} results")
                lines.append("")

                # Format each result
                for i, result in enumerate(results[:50], 1):
                    if isinstance(result, dict):
                        # Common patterns for search results
                        file_path = result.get("file", result.get("path", ""))
                        line_num = result.get("line", result.get("line_number", ""))
                        match_text = result.get("match", result.get("text", result.get("content", "")))
                        result_type = result.get("type", "")

                        if file_path:
                            loc = f"{file_path}:{line_num}" if line_num else file_path
                            lines.append(f"{i}. {loc}")
                            if match_text:
                                # Truncate long matches for readability
                                preview = match_text[:200] + "..." if len(match_text) > 200 else match_text
                                lines.append(f"   {preview}")
                            if result_type:
                                lines.append(f"   [{result_type}]")
                        else:
                            lines.append(f"{i}. {json.dumps(result, default=str)}")
                    else:
                        lines.append(f"{i}. {result}")

                # Show pagination info
                if pagination:
                    page = pagination.get("page", 1)
                    total = pagination.get("total", 0)
                    has_next = pagination.get("has_next", False)
                    if has_next and total > 0:
                        total_pages = (total // 50) + 1
                        lines.append(f"\n... page {page} of {total_pages} ({total} total)")

            # Handle command execution results
            elif "output" in self.data or "stdout" in self.data or "stderr" in self.data:
                # Shell command output
                exit_code = self.data.get("exit_code", self.data.get("returncode", 0))
                stdout = self.data.get("output", self.data.get("stdout", ""))
                stderr = self.data.get("stderr", "")
                elapsed = self.data.get("elapsed", self.data.get("time_ms", ""))

                if exit_code == 0:
                    lines.append(f"✓ Command succeeded")
                else:
                    lines.append(f"✗ Command failed (exit {exit_code})")

                if elapsed:
                    lines.append(f"Time: {elapsed}ms" if isinstance(elapsed, (int, float)) else f"Time: {elapsed}")
                lines.append("")

                if stdout:
                    lines.append(stdout.rstrip())
                if stderr:
                    lines.append("\n--- stderr ---")
                    lines.append(stderr.rstrip())

            # Handle error results
            elif "error" in self.data:
                lines.append(f"Error: {self.data['error']}")
                if "details" in self.data:
                    lines.append(f"Details: {self.data['details']}")

            # Generic dict - format as key-value pairs
            else:
                for key, value in self.data.items():
                    if isinstance(value, (dict, list)):
                        lines.append(f"{key}:")
                        lines.append(json.dumps(value, indent=2, default=str))
                    else:
                        lines.append(f"{key}: {value}")

        elif isinstance(self.data, list):
            # List data - format as numbered items
            for i, item in enumerate(self.data[:50], 1):
                if isinstance(item, dict):
                    lines.append(f"{i}. {json.dumps(item, default=str)}")
                else:
                    lines.append(f"{i}. {item}")
            if len(self.data) > 50:
                lines.append(f"\n... {len(self.data) - 50} more items")

        else:
            # Scalar or other - just convert to string
            lines.append(str(self.data))

        # Add metadata footer if present
        if self.metadata:
            lines.append("")
            lines.append("---")
            for key, value in self.metadata.items():
                lines.append(f"{key}: {value}")

        return "\n".join(lines)

to_dict

to_dict() -> Dict[str, Any]

Convert to dictionary format with data/metadata structure.

Source code in hanzo_tools/core/types.py
def to_dict(self) -> Dict[str, Any]:
    """Convert to dictionary format with data/metadata structure."""
    result = {"data": self.data}
    if self.metadata:
        result["metadata"] = self.metadata
    return result

to_json_string

to_json_string() -> str

Convert to clean JSON string.

Source code in hanzo_tools/core/types.py
def to_json_string(self) -> str:
    """Convert to clean JSON string."""
    # Return wrapped in "result" for consistency
    return json.dumps({"result": self.data}, indent=2)

to_readable_string

to_readable_string() -> str

Convert to human-readable formatted string for display.

Optimized for readability in Claude Code output panels.

Source code in hanzo_tools/core/types.py
def to_readable_string(self) -> str:
    """Convert to human-readable formatted string for display.

    Optimized for readability in Claude Code output panels.
    """
    lines: List[str] = []

    if isinstance(self.data, dict):
        # Handle search/find results with "results" array
        if "results" in self.data:
            results = self.data["results"]
            stats = self.data.get("stats", {})
            pagination = self.data.get("pagination", {})

            # Header with stats
            if stats:
                query = stats.get("query", stats.get("pattern", ""))
                total = stats.get("total", len(results))
                time_ms = stats.get("time_ms", {})
                if time_ms:
                    if isinstance(time_ms, dict):
                        total_time = sum(time_ms.values())
                    else:
                        total_time = time_ms
                    lines.append(f"# Search: '{query}' ({total} results, {total_time}ms)")
                else:
                    lines.append(f"# Found {total} results for '{query}'")
            else:
                lines.append(f"# Found {len(results)} results")
            lines.append("")

            # Format each result
            for i, result in enumerate(results[:50], 1):
                if isinstance(result, dict):
                    # Common patterns for search results
                    file_path = result.get("file", result.get("path", ""))
                    line_num = result.get("line", result.get("line_number", ""))
                    match_text = result.get("match", result.get("text", result.get("content", "")))
                    result_type = result.get("type", "")

                    if file_path:
                        loc = f"{file_path}:{line_num}" if line_num else file_path
                        lines.append(f"{i}. {loc}")
                        if match_text:
                            # Truncate long matches for readability
                            preview = match_text[:200] + "..." if len(match_text) > 200 else match_text
                            lines.append(f"   {preview}")
                        if result_type:
                            lines.append(f"   [{result_type}]")
                    else:
                        lines.append(f"{i}. {json.dumps(result, default=str)}")
                else:
                    lines.append(f"{i}. {result}")

            # Show pagination info
            if pagination:
                page = pagination.get("page", 1)
                total = pagination.get("total", 0)
                has_next = pagination.get("has_next", False)
                if has_next and total > 0:
                    total_pages = (total // 50) + 1
                    lines.append(f"\n... page {page} of {total_pages} ({total} total)")

        # Handle command execution results
        elif "output" in self.data or "stdout" in self.data or "stderr" in self.data:
            # Shell command output
            exit_code = self.data.get("exit_code", self.data.get("returncode", 0))
            stdout = self.data.get("output", self.data.get("stdout", ""))
            stderr = self.data.get("stderr", "")
            elapsed = self.data.get("elapsed", self.data.get("time_ms", ""))

            if exit_code == 0:
                lines.append(f"✓ Command succeeded")
            else:
                lines.append(f"✗ Command failed (exit {exit_code})")

            if elapsed:
                lines.append(f"Time: {elapsed}ms" if isinstance(elapsed, (int, float)) else f"Time: {elapsed}")
            lines.append("")

            if stdout:
                lines.append(stdout.rstrip())
            if stderr:
                lines.append("\n--- stderr ---")
                lines.append(stderr.rstrip())

        # Handle error results
        elif "error" in self.data:
            lines.append(f"Error: {self.data['error']}")
            if "details" in self.data:
                lines.append(f"Details: {self.data['details']}")

        # Generic dict - format as key-value pairs
        else:
            for key, value in self.data.items():
                if isinstance(value, (dict, list)):
                    lines.append(f"{key}:")
                    lines.append(json.dumps(value, indent=2, default=str))
                else:
                    lines.append(f"{key}: {value}")

    elif isinstance(self.data, list):
        # List data - format as numbered items
        for i, item in enumerate(self.data[:50], 1):
            if isinstance(item, dict):
                lines.append(f"{i}. {json.dumps(item, default=str)}")
            else:
                lines.append(f"{i}. {item}")
        if len(self.data) > 50:
            lines.append(f"\n... {len(self.data) - 50} more items")

    else:
        # Scalar or other - just convert to string
        lines.append(str(self.data))

    # Add metadata footer if present
    if self.metadata:
        lines.append("")
        lines.append("---")
        for key, value in self.metadata.items():
            lines.append(f"{key}: {value}")

    return "\n".join(lines)

Extended context for tool execution.

Provides utilities for logging, progress reporting, and accessing the MCP context.

Source code in hanzo_tools/core/context.py
@dataclass
class ToolContext:
    """Extended context for tool execution.

    Provides utilities for logging, progress reporting,
    and accessing the MCP context.
    """

    mcp_ctx: MCPContext
    tool_name: Optional[str] = None
    metadata: dict[str, Any] = field(default_factory=dict)

    async def set_tool_info(self, tool_name: str) -> None:
        """Set the current tool name for logging."""
        self.tool_name = tool_name

    async def info(self, message: str) -> None:
        """Log an info message."""
        logger.info(f"[{self.tool_name or 'tool'}] {message}")

    async def warning(self, message: str) -> None:
        """Log a warning message."""
        logger.warning(f"[{self.tool_name or 'tool'}] {message}")

    async def error(self, message: str) -> None:
        """Log an error message."""
        logger.error(f"[{self.tool_name or 'tool'}] {message}")

    async def debug(self, message: str) -> None:
        """Log a debug message."""
        logger.debug(f"[{self.tool_name or 'tool'}] {message}")

    async def progress(self, current: int, total: int, message: str = "") -> None:
        """Report progress."""
        pct = (current / total * 100) if total > 0 else 0
        await self.info(f"Progress: {current}/{total} ({pct:.1f}%) {message}")

    def get(self, key: str, default: Any = None) -> Any:
        """Get a metadata value."""
        return self.metadata.get(key, default)

    def set(self, key: str, value: Any) -> None:
        """Set a metadata value."""
        self.metadata[key] = value

set_tool_info async

set_tool_info(tool_name: str) -> None

Set the current tool name for logging.

Source code in hanzo_tools/core/context.py
async def set_tool_info(self, tool_name: str) -> None:
    """Set the current tool name for logging."""
    self.tool_name = tool_name

info async

info(message: str) -> None

Log an info message.

Source code in hanzo_tools/core/context.py
async def info(self, message: str) -> None:
    """Log an info message."""
    logger.info(f"[{self.tool_name or 'tool'}] {message}")

warning async

warning(message: str) -> None

Log a warning message.

Source code in hanzo_tools/core/context.py
async def warning(self, message: str) -> None:
    """Log a warning message."""
    logger.warning(f"[{self.tool_name or 'tool'}] {message}")

error async

error(message: str) -> None

Log an error message.

Source code in hanzo_tools/core/context.py
async def error(self, message: str) -> None:
    """Log an error message."""
    logger.error(f"[{self.tool_name or 'tool'}] {message}")

debug async

debug(message: str) -> None

Log a debug message.

Source code in hanzo_tools/core/context.py
async def debug(self, message: str) -> None:
    """Log a debug message."""
    logger.debug(f"[{self.tool_name or 'tool'}] {message}")

progress async

progress(current: int, total: int, message: str = '') -> None

Report progress.

Source code in hanzo_tools/core/context.py
async def progress(self, current: int, total: int, message: str = "") -> None:
    """Report progress."""
    pct = (current / total * 100) if total > 0 else 0
    await self.info(f"Progress: {current}/{total} ({pct:.1f}%) {message}")

get

get(key: str, default: Any = None) -> Any

Get a metadata value.

Source code in hanzo_tools/core/context.py
def get(self, key: str, default: Any = None) -> Any:
    """Get a metadata value."""
    return self.metadata.get(key, default)

set

set(key: str, value: Any) -> None

Set a metadata value.

Source code in hanzo_tools/core/context.py
def set(self, key: str, value: Any) -> None:
    """Set a metadata value."""
    self.metadata[key] = value

Manages filesystem permissions for tools.

Controls which paths tools are allowed to access.

Source code in hanzo_tools/core/permissions.py
class PermissionManager:
    """Manages filesystem permissions for tools.

    Controls which paths tools are allowed to access.
    """

    def __init__(
        self,
        allowed_paths: Optional[list[str | Path]] = None,
        deny_patterns: Optional[list[str]] = None,
    ):
        """Initialize permission manager.

        Args:
            allowed_paths: List of allowed base paths (defaults to cwd)
            deny_patterns: Patterns to deny (e.g., '.git', 'node_modules')
        """
        self.allowed_paths: list[Path] = []
        if allowed_paths:
            for p in allowed_paths:
                self.allowed_paths.append(Path(p).resolve())
        else:
            self.allowed_paths.append(Path.cwd())

        self.deny_patterns = deny_patterns or [
            ".git",
            "__pycache__",
            ".pyc",
            "node_modules",
            ".env",
            ".secrets",
        ]

    def is_path_allowed(self, path: str | Path) -> bool:
        """Check if a path is allowed.

        Args:
            path: Path to check

        Returns:
            True if path is within allowed paths and not denied
        """
        try:
            resolved = Path(path).resolve()

            # Check if path is under any allowed path
            is_under_allowed = any(self._is_subpath(resolved, allowed) for allowed in self.allowed_paths)

            if not is_under_allowed:
                return False

            # Check deny patterns
            path_str = str(resolved)
            for pattern in self.deny_patterns:
                if pattern in path_str:
                    return False

            return True

        except Exception:
            return False

    def _is_subpath(self, path: Path, parent: Path) -> bool:
        """Check if path is under parent."""
        try:
            path.relative_to(parent)
            return True
        except ValueError:
            return False

    def add_allowed_path(self, path: str | Path) -> None:
        """Add a path to the allowed list."""
        self.allowed_paths.append(Path(path).resolve())

    def add_deny_pattern(self, pattern: str) -> None:
        """Add a deny pattern."""
        self.deny_patterns.append(pattern)

__init__

__init__(allowed_paths: Optional[list[str | Path]] = None, deny_patterns: Optional[list[str]] = None)

Initialize permission manager.

Parameters:

Name Type Description Default
allowed_paths Optional[list[str | Path]]

List of allowed base paths (defaults to cwd)

None
deny_patterns Optional[list[str]]

Patterns to deny (e.g., '.git', 'node_modules')

None
Source code in hanzo_tools/core/permissions.py
def __init__(
    self,
    allowed_paths: Optional[list[str | Path]] = None,
    deny_patterns: Optional[list[str]] = None,
):
    """Initialize permission manager.

    Args:
        allowed_paths: List of allowed base paths (defaults to cwd)
        deny_patterns: Patterns to deny (e.g., '.git', 'node_modules')
    """
    self.allowed_paths: list[Path] = []
    if allowed_paths:
        for p in allowed_paths:
            self.allowed_paths.append(Path(p).resolve())
    else:
        self.allowed_paths.append(Path.cwd())

    self.deny_patterns = deny_patterns or [
        ".git",
        "__pycache__",
        ".pyc",
        "node_modules",
        ".env",
        ".secrets",
    ]

is_path_allowed

is_path_allowed(path: str | Path) -> bool

Check if a path is allowed.

Parameters:

Name Type Description Default
path str | Path

Path to check

required

Returns:

Type Description
bool

True if path is within allowed paths and not denied

Source code in hanzo_tools/core/permissions.py
def is_path_allowed(self, path: str | Path) -> bool:
    """Check if a path is allowed.

    Args:
        path: Path to check

    Returns:
        True if path is within allowed paths and not denied
    """
    try:
        resolved = Path(path).resolve()

        # Check if path is under any allowed path
        is_under_allowed = any(self._is_subpath(resolved, allowed) for allowed in self.allowed_paths)

        if not is_under_allowed:
            return False

        # Check deny patterns
        path_str = str(resolved)
        for pattern in self.deny_patterns:
            if pattern in path_str:
                return False

        return True

    except Exception:
        return False

add_allowed_path

add_allowed_path(path: str | Path) -> None

Add a path to the allowed list.

Source code in hanzo_tools/core/permissions.py
def add_allowed_path(self, path: str | Path) -> None:
    """Add a path to the allowed list."""
    self.allowed_paths.append(Path(path).resolve())

add_deny_pattern

add_deny_pattern(pattern: str) -> None

Add a deny pattern.

Source code in hanzo_tools/core/permissions.py
def add_deny_pattern(self, pattern: str) -> None:
    """Add a deny pattern."""
    self.deny_patterns.append(pattern)