Skip to content

Publisher: Real-time Agent Observability

The publisher system in PyAgenity provides real-time visibility into your agent's execution, transforming what was once a black box into a transparent, observable process. Rather than simply logging events after the fact, the publisher system creates live streams of execution data that enable monitoring, debugging, analytics, and real-time decision making.

Understanding Event-Driven Observability

Traditional logging systems capture what happened after it's over. PyAgenity's publisher system captures what's happening as it happens, creating a continuous stream of execution events that flow from your agent graphs to whatever destination you choose—console output, message queues, databases, monitoring systems, or custom analytics platforms.

Think of it as the nervous system of your AI application: every decision, every tool call, every state change, every error generates events that flow through the publisher pipeline, giving you unprecedented insight into your agent's behavior and performance.

Event Model: The Foundation of Observability

Every observable action in PyAgenity is captured as a structured EventModel that contains rich metadata about what's happening:

from pyagenity.publisher.events import EventModel, Event, EventType, ContentType

# Events are automatically generated during execution
event = EventModel(
    event=Event.NODE_EXECUTION,           # Source: graph, node, tool, or streaming
    event_type=EventType.START,           # Phase: start, progress, result, end, error
    content="Processing user query...",    # Human-readable content
    content_type=ContentType.TEXT,        # Semantic type of content
    node_name="research_agent",           # Which node is executing
    run_id="run_12345",                   # Unique execution identifier
    thread_id="thread_abc",               # Conversation thread
    sequence_id=1,                        # Ordering within the stream
    timestamp=1638360000.0,               # When this occurred
    metadata={                            # Additional context
        "user_id": "user_123",
        "query_type": "research",
        "estimated_duration": 5.2
    }
)

This rich event model enables sophisticated analysis, filtering, and routing based on any combination of attributes, making it possible to build powerful monitoring and analytics systems on top of your agent execution.

Event Sources and Types

PyAgenity generates events from four primary sources, each providing different levels of granularity:

Graph Execution Events

These provide the highest-level view of your agent's operation:

# Automatic graph-level events include:
Event.GRAPH_EXECUTION + EventType.START     # Agent conversation begins
Event.GRAPH_EXECUTION + EventType.PROGRESS  # Moving between nodes
Event.GRAPH_EXECUTION + EventType.RESULT    # Final response generated
Event.GRAPH_EXECUTION + EventType.END       # Conversation complete
Event.GRAPH_EXECUTION + EventType.ERROR     # Graph-level failures

Graph events help you understand the overall flow and performance of your agent conversations, including duration, success rates, and flow patterns.

Node Execution Events

These track individual node operations within your graph:

# Node execution lifecycle events:
Event.NODE_EXECUTION + EventType.START      # Node begins processing
Event.NODE_EXECUTION + EventType.PROGRESS   # Node internal progress
Event.NODE_EXECUTION + EventType.RESULT     # Node produces output
Event.NODE_EXECUTION + EventType.END        # Node completes
Event.NODE_EXECUTION + EventType.ERROR      # Node encounters error

Node events are crucial for identifying bottlenecks, understanding decision flows, and debugging issues in specific parts of your agent logic.

Tool Execution Events

These capture all tool and function calls:

# Tool execution events provide detailed operational insights:
Event.TOOL_EXECUTION + EventType.START      # Tool call initiated
Event.TOOL_EXECUTION + EventType.PROGRESS   # Tool processing
Event.TOOL_EXECUTION + EventType.RESULT     # Tool returns data
Event.TOOL_EXECUTION + EventType.END        # Tool call complete
Event.TOOL_EXECUTION + EventType.ERROR      # Tool call fails

Tool events enable monitoring of external service calls, API usage, performance analysis, and error tracking for all your agent's external interactions.

Streaming Events

These capture real-time content generation:

# Streaming events for real-time content delivery:
Event.STREAMING + EventType.START           # Stream begins
Event.STREAMING + EventType.PROGRESS        # Content chunks
Event.STREAMING + EventType.END             # Stream complete
Event.STREAMING + EventType.INTERRUPTED     # Stream stopped

Streaming events enable real-time UI updates, progressive content delivery, and live monitoring of content generation processes.

Content Types and Semantic Understanding

Events carry semantic information about their content through the ContentType enum, enabling intelligent processing and routing:

from pyagenity.publisher.events import ContentType

# Text and messaging content
ContentType.TEXT         # Plain text content
ContentType.MESSAGE      # Structured message content
ContentType.REASONING    # Agent reasoning/thinking content

# Tool and function content
ContentType.TOOL_CALL    # Tool invocation details
ContentType.TOOL_RESULT  # Tool execution results

# Multimedia content
ContentType.IMAGE        # Image content or references
ContentType.AUDIO        # Audio content or references
ContentType.VIDEO        # Video content or references
ContentType.DOCUMENT     # Document content or references

# System content
ContentType.STATE        # Agent state information
ContentType.UPDATE       # General update notifications
ContentType.ERROR        # Error information
ContentType.DATA         # Structured data payloads

This semantic typing enables sophisticated event processing, such as routing error events to monitoring systems while sending reasoning content to debugging interfaces.

Publisher Implementations

PyAgenity provides multiple publisher implementations for different use cases:

Console Publisher: Development and Debugging

from pyagenity.publisher.console_publisher import ConsolePublisher

# Simple console output for development
console_publisher = ConsolePublisher({
    "format": "json",           # Output format: json or text
    "include_timestamp": True,  # Include timestamps
    "indent": 2                 # JSON indentation
})

# Configure your graph to use console publishing
compiled_graph = graph.compile(
    checkpointer=checkpointer,
    publisher=console_publisher
)

# Now all execution events will be printed to console
result = await compiled_graph.invoke(
    {"messages": [user_message]},
    config={"user_id": "user_123"}
)

Console output provides immediate feedback during development:

{
  "event": "node_execution",
  "event_type": "start",
  "node_name": "research_agent",
  "content": "Beginning research phase...",
  "timestamp": 1638360000.0,
  "metadata": {
    "user_id": "user_123",
    "query": "What are the latest AI developments?"
  }
}

Redis Publisher: Distributed Systems

from pyagenity.publisher.redis_publisher import RedisPublisher

# Publish to Redis streams for distributed processing
redis_publisher = RedisPublisher({
    "redis_url": "redis://localhost:6379",
    "stream_name": "agent_events",
    "max_len": 10000  # Keep last 10k events
})

Redis publishing enables: - Multiple consumers processing events - Event persistence and replay - Distributed monitoring systems - Real-time dashboards across services

Kafka Publisher: Enterprise Event Streaming

from pyagenity.publisher.kafka_publisher import KafkaPublisher

# Enterprise-grade event streaming
kafka_publisher = KafkaPublisher({
    "bootstrap_servers": ["localhost:9092"],
    "topic": "agent_execution_events",
    "key_serializer": "json",
    "value_serializer": "json"
})

Kafka publishing provides: - High-throughput event processing - Event durability and replication - Complex event processing pipelines - Integration with analytics platforms

RabbitMQ Publisher: Flexible Messaging

from pyagenity.publisher.rabbitmq_publisher import RabbitMQPublisher

# Flexible messaging with routing
rabbitmq_publisher = RabbitMQPublisher({
    "connection_url": "amqp://localhost:5672",
    "exchange": "agent_events",
    "routing_key": "execution.{node_name}",  # Dynamic routing
    "durable": True
})

RabbitMQ enables: - Sophisticated routing patterns - Multiple subscriber types - Guaranteed delivery - Load balancing across consumers

Event Processing Patterns

The publisher system enables powerful event processing patterns:

Real-time Monitoring Dashboard

import asyncio
from pyagenity.publisher.redis_publisher import RedisPublisher

class AgentMonitor:
    def __init__(self):
        self.active_runs = {}
        self.performance_metrics = {}

    async def monitor_events(self):
        """Process events in real-time for dashboard updates."""
        async for event in self.event_stream():
            await self.process_event(event)

    async def process_event(self, event: EventModel):
        """Update monitoring metrics based on incoming events."""

        # Track active executions
        if event.event_type == EventType.START:
            self.active_runs[event.run_id] = {
                "start_time": event.timestamp,
                "node_name": event.node_name,
                "status": "running"
            }

        # Calculate performance metrics
        elif event.event_type == EventType.END:
            if event.run_id in self.active_runs:
                duration = event.timestamp - self.active_runs[event.run_id]["start_time"]
                node_name = event.node_name

                if node_name not in self.performance_metrics:
                    self.performance_metrics[node_name] = []

                self.performance_metrics[node_name].append(duration)
                del self.active_runs[event.run_id]

        # Track errors
        elif event.event_type == EventType.ERROR:
            await self.handle_error_event(event)

    async def handle_error_event(self, event: EventModel):
        """Handle error events with alerting."""
        error_data = {
            "node": event.node_name,
            "error": event.content,
            "timestamp": event.timestamp,
            "run_id": event.run_id
        }

        # Send alert if error rate is high
        recent_errors = await self.get_recent_error_rate(event.node_name)
        if recent_errors > 0.1:  # > 10% error rate
            await self.send_alert(f"High error rate in {event.node_name}: {recent_errors:.2%}")

Event-Driven Analytics

class AgentAnalytics:
    def __init__(self):
        self.tool_usage = {}
        self.conversation_patterns = {}
        self.user_behavior = {}

    async def analyze_events(self):
        """Continuous analytics processing."""
        async for event in self.event_stream():
            await self.update_analytics(event)

    async def update_analytics(self, event: EventModel):
        """Update analytics based on event patterns."""

        # Tool usage analytics
        if event.event == Event.TOOL_EXECUTION and event.event_type == EventType.START:
            tool_name = event.metadata.get("function_name")
            if tool_name:
                self.tool_usage[tool_name] = self.tool_usage.get(tool_name, 0) + 1

        # Conversation flow analysis
        if event.event == Event.NODE_EXECUTION:
            user_id = event.metadata.get("user_id")
            if user_id:
                if user_id not in self.conversation_patterns:
                    self.conversation_patterns[user_id] = []

                self.conversation_patterns[user_id].append({
                    "node": event.node_name,
                    "timestamp": event.timestamp,
                    "type": event.event_type
                })

        # Generate insights periodically
        if len(self.tool_usage) % 100 == 0:  # Every 100 tool calls
            await self.generate_insights()

    async def generate_insights(self):
        """Generate actionable insights from collected data."""
        # Most used tools
        popular_tools = sorted(self.tool_usage.items(), key=lambda x: x[1], reverse=True)

        # Conversation patterns
        avg_conversation_length = sum(
            len(pattern) for pattern in self.conversation_patterns.values()
        ) / len(self.conversation_patterns) if self.conversation_patterns else 0

        insights = {
            "popular_tools": popular_tools[:5],
            "avg_conversation_length": avg_conversation_length,
            "total_conversations": len(self.conversation_patterns),
            "timestamp": time.time()
        }

        await self.store_insights(insights)

Custom Event Filtering and Routing

class EventRouter:
    def __init__(self):
        self.routes = {
            "errors": self.handle_errors,
            "performance": self.handle_performance,
            "content": self.handle_content,
            "tools": self.handle_tools
        }

    async def route_events(self):
        """Route events to appropriate handlers."""
        async for event in self.event_stream():
            # Route error events
            if event.event_type == EventType.ERROR:
                await self.routes["errors"](event)

            # Route performance events
            elif event.event in [Event.NODE_EXECUTION, Event.GRAPH_EXECUTION]:
                await self.routes["performance"](event)

            # Route tool events
            elif event.event == Event.TOOL_EXECUTION:
                await self.routes["tools"](event)

            # Route content events
            elif event.content_type in [ContentType.TEXT, ContentType.MESSAGE]:
                await self.routes["content"](event)

    async def handle_errors(self, event: EventModel):
        """Specialized error handling."""
        # Send to error monitoring system
        await self.send_to_monitoring(event)

        # Log critical errors
        if event.metadata.get("severity") == "critical":
            await self.alert_on_call_team(event)

    async def handle_performance(self, event: EventModel):
        """Performance monitoring."""
        # Track execution times
        if event.event_type == EventType.END:
            duration = event.metadata.get("duration")
            if duration and duration > 10.0:  # > 10 seconds
                await self.log_slow_operation(event)

    async def handle_tools(self, event: EventModel):
        """Tool usage tracking."""
        # Track API costs and usage
        if event.event_type == EventType.END:
            cost = event.metadata.get("api_cost", 0)
            await self.update_cost_tracking(event.metadata.get("function_name"), cost)

Integration with Agent Graphs

Publishers integrate seamlessly with your graph construction, providing consistent observability across all execution patterns:

from pyagenity.graph import StateGraph
from pyagenity.publisher.console_publisher import ConsolePublisher

# Create your publisher
publisher = ConsolePublisher({"format": "json", "indent": 2})

# Build your graph
graph = StateGraph(AgentState)
graph.add_node("planner", planning_agent)
graph.add_node("researcher", research_agent)
graph.add_node("tools", ToolNode([web_search, calculator]))

# Set up conditional flows
graph.add_conditional_edges("planner", routing_logic, {
    "research": "researcher",
    "calculate": "tools",
    END: END
})

# Compile with publisher for complete observability
compiled_graph = graph.compile(
    checkpointer=checkpointer,
    publisher=publisher  # All events will be published
)

# Execute with full observability
async for chunk in compiled_graph.astream(
    {"messages": [user_message]},
    config={"user_id": "user_123", "session_id": "session_456"}
):
    # Both the chunks and the published events provide insight
    # Chunks show what the user sees
    # Events show how the agent is thinking and operating
    print(f"User sees: {chunk}")
    # Meanwhile, events are flowing to your monitoring systems

Advanced Event Customization

You can extend the event system with custom metadata and routing:

from pyagenity.publisher.events import EventModel
from pyagenity.publisher.publish import publish_event

# Custom event generation
async def custom_node_with_events(state: AgentState, config: dict):
    """Node that generates custom observability events."""

    # Generate custom start event
    start_event = EventModel(
        event=Event.NODE_EXECUTION,
        event_type=EventType.START,
        content="Beginning custom analysis...",
        node_name="custom_analyzer",
        run_id=config.get("run_id"),
        thread_id=config.get("thread_id"),
        metadata={
            "analysis_type": "sentiment",
            "data_source": config.get("data_source"),
            "user_tier": config.get("user_tier", "free"),
            "expected_duration": 2.5
        }
    )
    publish_event(start_event)

    # Perform analysis with progress events
    for step in ["preprocessing", "analysis", "postprocessing"]:
        progress_event = EventModel(
            event=Event.NODE_EXECUTION,
            event_type=EventType.PROGRESS,
            content=f"Executing {step}...",
            node_name="custom_analyzer",
            run_id=config.get("run_id"),
            metadata={"step": step, "progress": get_progress_percentage()}
        )
        publish_event(progress_event)

        # Do actual work
        result = await perform_analysis_step(step, state)

    # Generate completion event
    end_event = EventModel(
        event=Event.NODE_EXECUTION,
        event_type=EventType.END,
        content="Analysis complete",
        node_name="custom_analyzer",
        run_id=config.get("run_id"),
        metadata={
            "results_count": len(result),
            "confidence_score": calculate_confidence(result),
            "processing_time": get_processing_time()
        }
    )
    publish_event(end_event)

    return result

Production Monitoring Strategies

For production deployments, combine multiple publishers and processing strategies:

class ProductionMonitoring:
    def __init__(self):
        # Multiple publishers for different purposes
        self.console_publisher = ConsolePublisher({"format": "json"})  # Development
        self.kafka_publisher = KafkaPublisher({  # Production analytics
            "bootstrap_servers": ["kafka1:9092", "kafka2:9092"],
            "topic": "agent_events_prod"
        })
        self.redis_publisher = RedisPublisher({  # Real-time dashboards
            "redis_url": "redis://redis-cluster:6379",
            "stream_name": "live_agent_events"
        })

        # Health metrics
        self.health_metrics = {
            "total_events": 0,
            "error_count": 0,
            "avg_response_time": 0.0,
            "active_sessions": set()
        }

    async def setup_monitoring(self, graph):
        """Set up comprehensive monitoring for production."""

        # Use composite publisher for multiple destinations
        composite_publisher = CompositePublisher([
            self.kafka_publisher,   # Long-term analytics
            self.redis_publisher,   # Real-time monitoring
        ])

        return graph.compile(
            checkpointer=production_checkpointer,
            publisher=composite_publisher
        )

    async def monitor_health(self):
        """Continuous health monitoring."""
        while True:
            # Check error rates
            error_rate = self.health_metrics["error_count"] / max(
                self.health_metrics["total_events"], 1
            )

            if error_rate > 0.05:  # > 5% error rate
                await self.alert_operations_team(f"High error rate: {error_rate:.2%}")

            # Check response times
            if self.health_metrics["avg_response_time"] > 30.0:  # > 30 seconds
                await self.alert_performance_issue(
                    f"Slow response time: {self.health_metrics['avg_response_time']:.1f}s"
                )

            await asyncio.sleep(60)  # Check every minute

The publisher system transforms PyAgenity agents from opaque processes into fully observable, monitorable, and analytically rich systems. By providing real-time insight into every aspect of agent execution—from high-level conversation flows to individual tool calls—publishers enable you to build production-ready AI systems with the observability and control needed for enterprise deployment.