Skip to content

Callbacks: Interception and Flow Control

Callbacks in PyAgenity provide a powerful interception mechanism that allows you to hook into the execution flow of your agent graphs at critical decision points. Rather than simply observing events, callbacks enable you to actively participate in, modify, and control the execution process as it unfolds.

Understanding the Interception Pattern

Think of callbacks as strategic checkpoints placed throughout your agent's thinking process. When your agent is about to call a tool, query an AI model, or execute any external operation, PyAgenity pauses and gives your callback system the opportunity to:

  • Validate inputs before they're processed
  • Transform or enrich data as it flows through the system
  • Implement custom logic for error recovery and handling
  • Modify outputs before they're returned to the agent
  • Apply security policies and business rules consistently

This creates a layered architecture where your core agent logic remains clean and focused, while cross-cutting concerns like validation, logging, security, and transformation are handled elegantly through the callback system.

Callback Lifecycle and Flow

The callback system operates around three fundamental moments in any operation:

Before Invoke: The Preparation Phase

from pyagenity.utils.callbacks import register_before_invoke, InvocationType, CallbackContext

async def validate_tool_input(context: CallbackContext, input_data: dict) -> dict:
    """Validate and potentially modify tool inputs before execution."""
    if context.function_name == "database_query":
        # Apply security validations
        if "DROP" in input_data.get("query", "").upper():
            raise ValueError("Dangerous SQL operations not allowed")

        # Add audit context
        input_data["audit_user"] = context.metadata.get("user_id", "unknown")
        input_data["timestamp"] = datetime.utcnow().isoformat()

    return input_data

# Register for tool invocations
register_before_invoke(InvocationType.TOOL, validate_tool_input)

Before any tool, AI model, or MCP function is called, PyAgenity executes all registered before_invoke callbacks. This is your opportunity to: - Validate inputs according to business rules - Add contextual information or metadata - Transform data formats or apply normalization - Implement rate limiting or quota checks - Log invocation attempts for audit trails

After Invoke: The Processing Phase

from pyagenity.utils.callbacks import register_after_invoke

async def enrich_ai_response(context: CallbackContext, input_data: dict, output_data: any) -> any:
    """Enrich AI responses with additional context and formatting."""
    if context.invocation_type == InvocationType.AI:
        # Add confidence scoring based on response characteristics
        response_text = str(output_data)
        confidence_score = calculate_confidence(response_text)

        # Transform the response if needed
        if confidence_score < 0.7:
            enhanced_response = await get_clarification_prompt(response_text, input_data)
            return enhanced_response

    return output_data

register_after_invoke(InvocationType.AI, enrich_ai_response)

After successful execution, after_invoke callbacks process the results. This phase enables: - Response validation and quality assessment - Data transformation and formatting - Adding computed metadata or enrichment - Implementing caching strategies - Logging successful operations

On Error: The Recovery Phase

from pyagenity.utils.callbacks import register_on_error
from pyagenity.utils.message import Message

async def handle_tool_errors(context: CallbackContext, input_data: dict, error: Exception) -> Message | None:
    """Implement intelligent error recovery for tool failures."""
    if context.function_name == "external_api_call":
        if isinstance(error, TimeoutError):
            # Implement retry logic with backoff
            return await retry_with_backoff(context, input_data, max_retries=3)

        elif isinstance(error, AuthenticationError):
            # Generate helpful error message for the agent
            return Message.from_text(
                "The external service authentication failed. "
                "Please check the API credentials and try again.",
                role="tool"
            )

    # Return None to propagate the error normally
    return None

register_on_error(InvocationType.TOOL, handle_tool_errors)

When operations fail, on_error callbacks provide sophisticated error handling: - Implementing retry strategies with exponential backoff - Converting technical errors into actionable agent messages - Logging failures for monitoring and debugging - Providing fallback responses or alternative data sources

Invocation Types and Context

PyAgenity distinguishes between three types of operations that can trigger callbacks:

AI Invocations

These occur when your agent calls language models for reasoning, planning, or text generation:

async def monitor_ai_usage(context: CallbackContext, input_data: dict) -> dict:
    """Track AI usage patterns and costs."""
    if context.invocation_type == InvocationType.AI:
        # Log token usage and costs
        estimated_tokens = estimate_tokens(input_data.get("messages", []))
        log_ai_usage(context.node_name, estimated_tokens)

        # Add usage tracking to metadata
        input_data["usage_tracking"] = {
            "node": context.node_name,
            "estimated_tokens": estimated_tokens,
            "timestamp": time.time()
        }

    return input_data

Tool Invocations

These trigger when your agent executes functions, APIs, or external services:

async def secure_tool_access(context: CallbackContext, input_data: dict) -> dict:
    """Apply security policies to tool invocations."""
    user_permissions = context.metadata.get("user_permissions", [])

    # Check if user has permission for this tool
    if context.function_name not in user_permissions:
        raise PermissionError(f"User not authorized to use {context.function_name}")

    # Add security context
    input_data["security_context"] = {
        "user_id": context.metadata.get("user_id"),
        "permissions": user_permissions,
        "access_time": datetime.utcnow().isoformat()
    }

    return input_data

MCP (Model Context Protocol) Invocations

These handle calls to external MCP services for specialized capabilities:

async def optimize_mcp_calls(context: CallbackContext, input_data: dict) -> dict:
    """Optimize and cache MCP service calls."""
    if context.invocation_type == InvocationType.MCP:
        # Check cache first
        cache_key = generate_cache_key(context.function_name, input_data)
        cached_result = await get_from_cache(cache_key)

        if cached_result:
            # Return cached result wrapped as appropriate response
            return create_cached_response(cached_result)

    return input_data

Callback Context and Metadata

Each callback receives a rich CallbackContext that provides detailed information about the current operation:

@dataclass
class CallbackContext:
    invocation_type: InvocationType  # AI, TOOL, or MCP
    node_name: str                   # Name of the executing node
    function_name: str | None        # Specific function being called
    metadata: dict[str, Any] | None  # Additional context data

This context enables callbacks to make intelligent decisions about how to handle different operations:

async def adaptive_callback(context: CallbackContext, input_data: dict) -> dict:
    """Apply different logic based on context."""

    # Different handling based on node type
    if context.node_name == "research_node":
        input_data = await apply_research_policies(input_data)
    elif context.node_name == "decision_node":
        input_data = await add_decision_context(input_data)

    # Function-specific logic
    if context.function_name == "web_search":
        input_data = await sanitize_search_query(input_data)

    # Access custom metadata
    user_context = context.metadata.get("user_context", {})
    if user_context.get("debug_mode"):
        input_data["debug"] = True

    return input_data

Advanced Callback Patterns

Chained Transformations

Multiple callbacks of the same type are executed in registration order, allowing for sophisticated data pipelines:

# First callback: Basic validation
async def validate_input(context: CallbackContext, input_data: dict) -> dict:
    if not input_data.get("required_field"):
        raise ValueError("Missing required field")
    return input_data

# Second callback: Data enrichment
async def enrich_input(context: CallbackContext, input_data: dict) -> dict:
    input_data["enriched_at"] = datetime.utcnow().isoformat()
    input_data["enriched_by"] = "callback_system"
    return input_data

# Third callback: Format transformation
async def transform_format(context: CallbackContext, input_data: dict) -> dict:
    # Convert to expected format
    return transform_to_service_format(input_data)

# Register in order
register_before_invoke(InvocationType.TOOL, validate_input)
register_before_invoke(InvocationType.TOOL, enrich_input)
register_before_invoke(InvocationType.TOOL, transform_format)

Conditional Logic with Context Awareness

async def context_aware_processor(context: CallbackContext, input_data: dict) -> dict:
    """Apply different processing based on runtime context."""

    # Environment-based logic
    if os.getenv("ENVIRONMENT") == "production":
        input_data = await apply_production_safeguards(input_data)
    else:
        input_data = await add_debug_information(input_data)

    # User role-based logic
    user_role = context.metadata.get("user_role", "guest")
    if user_role == "admin":
        input_data["admin_privileges"] = True
    elif user_role == "guest":
        input_data = await apply_guest_restrictions(input_data)

    return input_data

Error Recovery Strategies

async def intelligent_error_recovery(
    context: CallbackContext, 
    input_data: dict, 
    error: Exception
) -> Message | None:
    """Implement sophisticated error recovery patterns."""

    # Network-related errors
    if isinstance(error, (ConnectionError, TimeoutError)):
        retry_count = context.metadata.get("retry_count", 0)
        if retry_count < 3:
            # Update metadata for next retry
            context.metadata["retry_count"] = retry_count + 1
            await asyncio.sleep(2 ** retry_count)  # Exponential backoff
            return await retry_operation(context, input_data)

    # Data validation errors
    elif isinstance(error, ValidationError):
        # Try to fix common issues automatically
        fixed_data = await attempt_data_repair(input_data, error)
        if fixed_data:
            return await execute_with_fixed_data(context, fixed_data)

    # Service-specific errors
    elif context.function_name == "external_api":
        # Generate informative error message for the agent
        return Message.from_text(
            f"External API call failed: {error}. "
            "Consider using alternative data sources or simplified queries.",
            role="tool"
        )

    return None  # Let the error propagate

Integration with Agent Graphs

Callbacks integrate seamlessly with your graph construction, providing consistent behavior across all nodes:

from pyagenity.utils.callbacks import CallbackManager, default_callback_manager
from pyagenity.graph import StateGraph

# Set up callbacks
register_before_invoke(InvocationType.TOOL, security_validator)
register_after_invoke(InvocationType.AI, response_enhancer)
register_on_error(InvocationType.MCP, error_recovery_handler)

# Create graph with callback integration
graph = StateGraph(AgentState)
graph.add_node("researcher", research_node)
graph.add_node("analyzer", analysis_node)
graph.add_node("tools", ToolNode([web_search, data_processor]))

# Compile with callback manager
compiled_graph = graph.compile(
    checkpointer=checkpointer,
    callback_manager=default_callback_manager  # Uses registered callbacks
)

# All tool calls, AI invocations, and MCP calls will now use your callbacks
result = await compiled_graph.invoke(
    {"messages": [user_message]},
    config={"user_id": "user123", "permissions": ["web_search", "data_processor"]}
)

Testing and Debugging Callbacks

Callbacks can significantly impact your agent's behavior, making testing crucial:

from pyagenity.utils.callbacks import CallbackManager, InvocationType

async def test_callback_behavior():
    """Test callback system with controlled inputs."""

    # Create isolated callback manager for testing
    test_callback_manager = CallbackManager()

    # Register test callbacks
    test_callback_manager.register_before_invoke(
        InvocationType.TOOL, 
        test_input_validator
    )

    # Create test context
    test_context = CallbackContext(
        invocation_type=InvocationType.TOOL,
        node_name="test_node",
        function_name="test_function",
        metadata={"test": True}
    )

    # Test the callback
    test_input = {"query": "test query"}
    result = await test_callback_manager.execute_before_invoke(
        test_context, 
        test_input
    )

    assert result["query"] == "test query"
    assert "processed_by_callback" in result

# Debug callback with logging
async def debug_callback(context: CallbackContext, input_data: dict) -> dict:
    """Debug callback that logs all interactions."""
    logger.info(f"Callback triggered: {context.invocation_type}")
    logger.info(f"Node: {context.node_name}, Function: {context.function_name}")
    logger.info(f"Input data keys: {list(input_data.keys())}")
    return input_data

The callback system transforms PyAgenity from a simple execution engine into a sophisticated, controllable platform where every operation can be monitored, modified, and managed according to your specific requirements. By strategically placing callbacks throughout your agent workflows, you create robust, secure, and maintainable AI systems that adapt to complex real-world requirements.