Callbacks: Interception and Flow Control¶
Callbacks in PyAgenity provide a powerful interception mechanism that allows you to hook into the execution flow of your agent graphs at critical decision points. Rather than simply observing events, callbacks enable you to actively participate in, modify, and control the execution process as it unfolds.
Understanding the Interception Pattern¶
Think of callbacks as strategic checkpoints placed throughout your agent's thinking process. When your agent is about to call a tool, query an AI model, or execute any external operation, PyAgenity pauses and gives your callback system the opportunity to:
- Validate inputs before they're processed
- Transform or enrich data as it flows through the system
- Implement custom logic for error recovery and handling
- Modify outputs before they're returned to the agent
- Apply security policies and business rules consistently
This creates a layered architecture where your core agent logic remains clean and focused, while cross-cutting concerns like validation, logging, security, and transformation are handled elegantly through the callback system.
Callback Lifecycle and Flow¶
The callback system operates around three fundamental moments in any operation:
Before Invoke: The Preparation Phase¶
from pyagenity.utils.callbacks import register_before_invoke, InvocationType, CallbackContext
async def validate_tool_input(context: CallbackContext, input_data: dict) -> dict:
"""Validate and potentially modify tool inputs before execution."""
if context.function_name == "database_query":
# Apply security validations
if "DROP" in input_data.get("query", "").upper():
raise ValueError("Dangerous SQL operations not allowed")
# Add audit context
input_data["audit_user"] = context.metadata.get("user_id", "unknown")
input_data["timestamp"] = datetime.utcnow().isoformat()
return input_data
# Register for tool invocations
register_before_invoke(InvocationType.TOOL, validate_tool_input)
Before any tool, AI model, or MCP function is called, PyAgenity executes all registered before_invoke
callbacks. This is your opportunity to:
- Validate inputs according to business rules
- Add contextual information or metadata
- Transform data formats or apply normalization
- Implement rate limiting or quota checks
- Log invocation attempts for audit trails
After Invoke: The Processing Phase¶
from pyagenity.utils.callbacks import register_after_invoke
async def enrich_ai_response(context: CallbackContext, input_data: dict, output_data: any) -> any:
"""Enrich AI responses with additional context and formatting."""
if context.invocation_type == InvocationType.AI:
# Add confidence scoring based on response characteristics
response_text = str(output_data)
confidence_score = calculate_confidence(response_text)
# Transform the response if needed
if confidence_score < 0.7:
enhanced_response = await get_clarification_prompt(response_text, input_data)
return enhanced_response
return output_data
register_after_invoke(InvocationType.AI, enrich_ai_response)
After successful execution, after_invoke
callbacks process the results. This phase enables:
- Response validation and quality assessment
- Data transformation and formatting
- Adding computed metadata or enrichment
- Implementing caching strategies
- Logging successful operations
On Error: The Recovery Phase¶
from pyagenity.utils.callbacks import register_on_error
from pyagenity.utils.message import Message
async def handle_tool_errors(context: CallbackContext, input_data: dict, error: Exception) -> Message | None:
"""Implement intelligent error recovery for tool failures."""
if context.function_name == "external_api_call":
if isinstance(error, TimeoutError):
# Implement retry logic with backoff
return await retry_with_backoff(context, input_data, max_retries=3)
elif isinstance(error, AuthenticationError):
# Generate helpful error message for the agent
return Message.from_text(
"The external service authentication failed. "
"Please check the API credentials and try again.",
role="tool"
)
# Return None to propagate the error normally
return None
register_on_error(InvocationType.TOOL, handle_tool_errors)
When operations fail, on_error
callbacks provide sophisticated error handling:
- Implementing retry strategies with exponential backoff
- Converting technical errors into actionable agent messages
- Logging failures for monitoring and debugging
- Providing fallback responses or alternative data sources
Invocation Types and Context¶
PyAgenity distinguishes between three types of operations that can trigger callbacks:
AI Invocations¶
These occur when your agent calls language models for reasoning, planning, or text generation:
async def monitor_ai_usage(context: CallbackContext, input_data: dict) -> dict:
"""Track AI usage patterns and costs."""
if context.invocation_type == InvocationType.AI:
# Log token usage and costs
estimated_tokens = estimate_tokens(input_data.get("messages", []))
log_ai_usage(context.node_name, estimated_tokens)
# Add usage tracking to metadata
input_data["usage_tracking"] = {
"node": context.node_name,
"estimated_tokens": estimated_tokens,
"timestamp": time.time()
}
return input_data
Tool Invocations¶
These trigger when your agent executes functions, APIs, or external services:
async def secure_tool_access(context: CallbackContext, input_data: dict) -> dict:
"""Apply security policies to tool invocations."""
user_permissions = context.metadata.get("user_permissions", [])
# Check if user has permission for this tool
if context.function_name not in user_permissions:
raise PermissionError(f"User not authorized to use {context.function_name}")
# Add security context
input_data["security_context"] = {
"user_id": context.metadata.get("user_id"),
"permissions": user_permissions,
"access_time": datetime.utcnow().isoformat()
}
return input_data
MCP (Model Context Protocol) Invocations¶
These handle calls to external MCP services for specialized capabilities:
async def optimize_mcp_calls(context: CallbackContext, input_data: dict) -> dict:
"""Optimize and cache MCP service calls."""
if context.invocation_type == InvocationType.MCP:
# Check cache first
cache_key = generate_cache_key(context.function_name, input_data)
cached_result = await get_from_cache(cache_key)
if cached_result:
# Return cached result wrapped as appropriate response
return create_cached_response(cached_result)
return input_data
Callback Context and Metadata¶
Each callback receives a rich CallbackContext
that provides detailed information about the current operation:
@dataclass
class CallbackContext:
invocation_type: InvocationType # AI, TOOL, or MCP
node_name: str # Name of the executing node
function_name: str | None # Specific function being called
metadata: dict[str, Any] | None # Additional context data
This context enables callbacks to make intelligent decisions about how to handle different operations:
async def adaptive_callback(context: CallbackContext, input_data: dict) -> dict:
"""Apply different logic based on context."""
# Different handling based on node type
if context.node_name == "research_node":
input_data = await apply_research_policies(input_data)
elif context.node_name == "decision_node":
input_data = await add_decision_context(input_data)
# Function-specific logic
if context.function_name == "web_search":
input_data = await sanitize_search_query(input_data)
# Access custom metadata
user_context = context.metadata.get("user_context", {})
if user_context.get("debug_mode"):
input_data["debug"] = True
return input_data
Advanced Callback Patterns¶
Chained Transformations¶
Multiple callbacks of the same type are executed in registration order, allowing for sophisticated data pipelines:
# First callback: Basic validation
async def validate_input(context: CallbackContext, input_data: dict) -> dict:
if not input_data.get("required_field"):
raise ValueError("Missing required field")
return input_data
# Second callback: Data enrichment
async def enrich_input(context: CallbackContext, input_data: dict) -> dict:
input_data["enriched_at"] = datetime.utcnow().isoformat()
input_data["enriched_by"] = "callback_system"
return input_data
# Third callback: Format transformation
async def transform_format(context: CallbackContext, input_data: dict) -> dict:
# Convert to expected format
return transform_to_service_format(input_data)
# Register in order
register_before_invoke(InvocationType.TOOL, validate_input)
register_before_invoke(InvocationType.TOOL, enrich_input)
register_before_invoke(InvocationType.TOOL, transform_format)
Conditional Logic with Context Awareness¶
async def context_aware_processor(context: CallbackContext, input_data: dict) -> dict:
"""Apply different processing based on runtime context."""
# Environment-based logic
if os.getenv("ENVIRONMENT") == "production":
input_data = await apply_production_safeguards(input_data)
else:
input_data = await add_debug_information(input_data)
# User role-based logic
user_role = context.metadata.get("user_role", "guest")
if user_role == "admin":
input_data["admin_privileges"] = True
elif user_role == "guest":
input_data = await apply_guest_restrictions(input_data)
return input_data
Error Recovery Strategies¶
async def intelligent_error_recovery(
context: CallbackContext,
input_data: dict,
error: Exception
) -> Message | None:
"""Implement sophisticated error recovery patterns."""
# Network-related errors
if isinstance(error, (ConnectionError, TimeoutError)):
retry_count = context.metadata.get("retry_count", 0)
if retry_count < 3:
# Update metadata for next retry
context.metadata["retry_count"] = retry_count + 1
await asyncio.sleep(2 ** retry_count) # Exponential backoff
return await retry_operation(context, input_data)
# Data validation errors
elif isinstance(error, ValidationError):
# Try to fix common issues automatically
fixed_data = await attempt_data_repair(input_data, error)
if fixed_data:
return await execute_with_fixed_data(context, fixed_data)
# Service-specific errors
elif context.function_name == "external_api":
# Generate informative error message for the agent
return Message.from_text(
f"External API call failed: {error}. "
"Consider using alternative data sources or simplified queries.",
role="tool"
)
return None # Let the error propagate
Integration with Agent Graphs¶
Callbacks integrate seamlessly with your graph construction, providing consistent behavior across all nodes:
from pyagenity.utils.callbacks import CallbackManager, default_callback_manager
from pyagenity.graph import StateGraph
# Set up callbacks
register_before_invoke(InvocationType.TOOL, security_validator)
register_after_invoke(InvocationType.AI, response_enhancer)
register_on_error(InvocationType.MCP, error_recovery_handler)
# Create graph with callback integration
graph = StateGraph(AgentState)
graph.add_node("researcher", research_node)
graph.add_node("analyzer", analysis_node)
graph.add_node("tools", ToolNode([web_search, data_processor]))
# Compile with callback manager
compiled_graph = graph.compile(
checkpointer=checkpointer,
callback_manager=default_callback_manager # Uses registered callbacks
)
# All tool calls, AI invocations, and MCP calls will now use your callbacks
result = await compiled_graph.invoke(
{"messages": [user_message]},
config={"user_id": "user123", "permissions": ["web_search", "data_processor"]}
)
Testing and Debugging Callbacks¶
Callbacks can significantly impact your agent's behavior, making testing crucial:
from pyagenity.utils.callbacks import CallbackManager, InvocationType
async def test_callback_behavior():
"""Test callback system with controlled inputs."""
# Create isolated callback manager for testing
test_callback_manager = CallbackManager()
# Register test callbacks
test_callback_manager.register_before_invoke(
InvocationType.TOOL,
test_input_validator
)
# Create test context
test_context = CallbackContext(
invocation_type=InvocationType.TOOL,
node_name="test_node",
function_name="test_function",
metadata={"test": True}
)
# Test the callback
test_input = {"query": "test query"}
result = await test_callback_manager.execute_before_invoke(
test_context,
test_input
)
assert result["query"] == "test query"
assert "processed_by_callback" in result
# Debug callback with logging
async def debug_callback(context: CallbackContext, input_data: dict) -> dict:
"""Debug callback that logs all interactions."""
logger.info(f"Callback triggered: {context.invocation_type}")
logger.info(f"Node: {context.node_name}, Function: {context.function_name}")
logger.info(f"Input data keys: {list(input_data.keys())}")
return input_data
The callback system transforms PyAgenity from a simple execution engine into a sophisticated, controllable platform where every operation can be monitored, modified, and managed according to your specific requirements. By strategically placing callbacks throughout your agent workflows, you create robust, secure, and maintainable AI systems that adapt to complex real-world requirements.