Skip to main content

Publishers

When to use this

Use a publisher when you need to observe graph execution in real time — for monitoring dashboards, analytics pipelines, or external stream processors. Publishers receive structured EventModel objects at every node transition, tool call, and streaming chunk.

Import paths

from agentflow.runtime.publisher import BasePublisher, ConsolePublisher
from agentflow.runtime.publisher.events import Event, EventType, ContentType, EventModel

# Optional backends
from agentflow.runtime.publisher import RedisPublisher # pip install redis
from agentflow.runtime.publisher import KafkaPublisher # pip install aiokafka
from agentflow.runtime.publisher import RabbitMQPublisher # pip install aio-pika

EventModel

The unit of data published to a publisher. Every significant moment in graph execution emits one.

from agentflow.runtime.publisher.events import EventModel
FieldTypeDescription
event_idstrUUID identifying this event.
eventEventSource of the event (graph, node, tool, streaming).
event_typeEventTypePhase of the event (start, progress, result, end, error…).
content_typeContentTypeSemantic type of the payload (text, tool_call, state…).
node_namestr | NoneGraph node that emitted the event.
dataAnyThe event payload.
contentContentBlock | NoneContent block if relevant.
thread_idstr | NoneThread for this execution.
run_idstr | NoneRun for this execution.
timestampdatetimeWhen the event was emitted.
metadatadictAdditional context.

Event — source enum

from agentflow.runtime.publisher.events import Event
ValueDescription
GRAPH_EXECUTIONEmitted by the graph runner (start/end of full execution).
NODE_EXECUTIONEmitted at the start and end of each node.
TOOL_EXECUTIONEmitted before and after each tool call.
STREAMINGEmitted for each incremental streaming chunk from the LLM.

EventType — phase enum

from agentflow.runtime.publisher.events import EventType
ValueWhen emitted
STARTExecution begins.
PROGRESSIntermediate update during streaming.
RESULTA result is ready (tool result, LLM completion).
ENDExecution ends normally.
UPDATEState or data updated.
ERRORAn error occurred.
INTERRUPTEDExecution paused at an interrupt point.

ContentType — payload type enum

from agentflow.runtime.publisher.events import ContentType
ValueWhen used
TEXTPlain text output.
MESSAGEFull message object.
REASONINGExtended thinking trace.
TOOL_CALLTool invocation request.
TOOL_RESULTTool execution result.
IMAGEImage content.
AUDIOAudio content.
VIDEOVideo content.
DOCUMENTDocument content.
DATABinary/structured data.
STATEGraph state snapshot.
UPDATEIncremental update.
ERRORError payload.

BasePublisher

Abstract class. All publishers implement this interface.

from agentflow.runtime.publisher import BasePublisher

Abstract methods

MethodSignatureDescription
publishasync (event: EventModel) -> AnyPublish one event. Raises RuntimeError if the publisher is closed.
closeasync () -> NoneRelease connections and resources. Idempotent.
sync_close() -> NoneSynchronous close for use in non-async shutdown handlers.

Context manager

async with ConsolePublisher() as publisher:
app = graph.compile(publisher=publisher)
await app.ainvoke(...)
# publisher is automatically closed

ConsolePublisher

Prints events to stdout. For development and debugging only.

from agentflow.runtime.publisher import ConsolePublisher

publisher = ConsolePublisher(config={
"format": "json",
"include_timestamp": True,
"indent": 2,
})

app = graph.compile(publisher=publisher)
Config keyDefaultDescription
format"json"Output format.
include_timestampTrueInclude timestamp in output.
indent2JSON indentation.

RedisPublisher

Publishes events to a Redis Pub/Sub channel or Redis Stream.

Optional dependency
pip install 10xscale-agentflow[redis]
# or: pip install redis>=4.2
from agentflow.runtime.publisher import RedisPublisher

publisher = RedisPublisher(config={
"url": "redis://localhost:6379/0",
"mode": "pubsub", # or "stream"
"channel": "agentflow.events",
"stream": "agentflow.events",
"maxlen": 10000, # max stream length (stream mode only)
"max_connections": 10,
"socket_timeout": 5.0,
"health_check_interval": 30,
})

app = graph.compile(publisher=publisher)
Config keyDefaultDescription
urlredis://localhost:6379/0Redis connection URL.
modepubsub"pubsub" for Pub/Sub or "stream" for Redis Streams.
channelagentflow.eventsPub/Sub channel name.
streamagentflow.eventsStream name for "stream" mode.
maxlenNoneMaximum stream entries (stream mode). Set for bounded streams.
max_connections10Connection pool size.
socket_timeout5.0Socket timeout in seconds.
socket_connect_timeout5.0Connection timeout in seconds.
socket_keepaliveTrueTCP keepalive.
health_check_interval30Seconds between pool health checks.

KafkaPublisher

Publishes events to an Apache Kafka topic.

Optional dependency
pip install aiokafka
from agentflow.runtime.publisher import KafkaPublisher

publisher = KafkaPublisher(config={
"bootstrap_servers": "localhost:9092",
"topic": "agentflow-events",
"compression_type": "gzip",
})

app = graph.compile(publisher=publisher)

RabbitMQPublisher

Publishes events to a RabbitMQ exchange.

Optional dependency
pip install aio-pika
from agentflow.runtime.publisher import RabbitMQPublisher

publisher = RabbitMQPublisher(config={
"url": "amqp://guest:guest@localhost:5672/",
"exchange": "agentflow",
"routing_key": "events",
})

app = graph.compile(publisher=publisher)

Writing a custom publisher

from agentflow.runtime.publisher import BasePublisher
from agentflow.runtime.publisher.events import EventModel
import httpx

class WebhookPublisher(BasePublisher):

def __init__(self, webhook_url: str, config: dict | None = None):
super().__init__(config or {})
self.webhook_url = webhook_url
self._client: httpx.AsyncClient | None = None

async def publish(self, event: EventModel) -> None:
if self._is_closed:
raise RuntimeError("Publisher is closed")
if self._client is None:
self._client = httpx.AsyncClient()
await self._client.post(self.webhook_url, json=event.model_dump())

async def close(self):
if not self._is_closed:
if self._client:
await self._client.aclose()
self._is_closed = True

def sync_close(self):
import asyncio
asyncio.run(self.close())

Common errors

ErrorCauseFix
RuntimeError: Cannot publish to closed publisherpublish() called after close().Do not call ainvoke after aclose(). Use the async context manager.
ImportError: redisRedisPublisher used without redis installed.pip install redis.
ImportError: aiokafkaKafkaPublisher used without aiokafka.pip install aiokafka.
ImportError: aio-pikaRabbitMQPublisher used without aio-pika.pip install aio-pika.
Events missing from channelPublisher not passed to graph.compile().Add publisher=my_publisher to compile().