Skip to content

Publisher

Publisher module for PyAgenity events.

This module provides publishers that handle the delivery of events to various outputs, such as console, Redis, Kafka, and RabbitMQ. Publishers are primarily used for logging and monitoring agent behavior, enabling real-time tracking of performance, usage, and debugging in agent graphs.

Key components: - BasePublisher: Abstract base class for all publishers, defining the interface for publishing event - ConsolePublisher: Default publisher that outputs events to the console for development and debugging - Optional publishers: RedisPublisher, KafkaPublisher, RabbitMQPublisher, which are available only if their dependencies are installed

Usage: - Import publishers: from pyagenity.publisher import ConsolePublisher, RedisPublisher (if available) - Instantiate and use in CompiledGraph: graph.compile(publisher=ConsolePublisher()). - Events are emitted as EventModel instances during graph execution, including node starts, completions, and errors.

Dependencies for optional publishers: - RedisPublisher: Requires 'redis.asyncio' (install via pip install redis). - KafkaPublisher: Requires 'aiokafka' (install via pip install aiokafka). - RabbitMQPublisher: Requires 'aio_pika' (install via pip install aio-pika).

For more details, see the individual publisher classes and the PyAgenity documentation.

Modules:

Name Description
base_publisher
console_publisher

Console publisher implementation for debugging and testing.

events

Event and streaming primitives for agent graph execution.

kafka_publisher

Kafka publisher implementation (optional dependency).

publish
rabbitmq_publisher

RabbitMQ publisher implementation (optional dependency).

redis_publisher

Redis publisher implementation (optional dependency).

Classes:

Name Description
BasePublisher

Abstract base class for event publishers.

ConsolePublisher

Publisher that prints events to the console for debugging and testing.

Attributes

KafkaPublisher module-attribute

KafkaPublisher = None

RabbitMQPublisher module-attribute

RabbitMQPublisher = None

RedisPublisher module-attribute

RedisPublisher = None

__all__ module-attribute

__all__ = ['BasePublisher', 'ConsolePublisher']

Classes

BasePublisher

Bases: ABC

Abstract base class for event publishers.

This class defines the interface for publishing events. Subclasses should implement the publish, close, and sync_close methods to provide specific publishing logic.

Attributes:

Name Type Description
config

Configuration dictionary for the publisher.

Methods:

Name Description
__init__

Initialize the publisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event.

sync_close

Close the publisher and release any resources (synchronous version).

Source code in pyagenity/publisher/base_publisher.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class BasePublisher(ABC):
    """Abstract base class for event publishers.

    This class defines the interface for publishing events. Subclasses should implement
    the publish, close, and sync_close methods to provide specific publishing logic.

    Attributes:
        config: Configuration dictionary for the publisher.
    """

    def __init__(self, config: dict[str, Any]):
        """Initialize the publisher with the given configuration.

        Args:
            config: Configuration dictionary for the publisher.
        """
        self.config = config

    @abstractmethod
    async def publish(self, event: EventModel) -> Any:
        """Publish an event.

        Args:
            event: The event to publish.

        Returns:
            The result of the publish operation.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self):
        """Close the publisher and release any resources.

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError

    @abstractmethod
    def sync_close(self):
        """Close the publisher and release any resources (synchronous version).

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError

Attributes

config instance-attribute
config = config

Functions

__init__
__init__(config)

Initialize the publisher with the given configuration.

Parameters:

Name Type Description Default
config
dict[str, Any]

Configuration dictionary for the publisher.

required
Source code in pyagenity/publisher/base_publisher.py
17
18
19
20
21
22
23
def __init__(self, config: dict[str, Any]):
    """Initialize the publisher with the given configuration.

    Args:
        config: Configuration dictionary for the publisher.
    """
    self.config = config
close abstractmethod async
close()

Close the publisher and release any resources.

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
37
38
39
40
41
42
43
44
@abstractmethod
async def close(self):
    """Close the publisher and release any resources.

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError
publish abstractmethod async
publish(event)

Publish an event.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the publish operation.

Source code in pyagenity/publisher/base_publisher.py
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
async def publish(self, event: EventModel) -> Any:
    """Publish an event.

    Args:
        event: The event to publish.

    Returns:
        The result of the publish operation.
    """
    raise NotImplementedError
sync_close abstractmethod
sync_close()

Close the publisher and release any resources (synchronous version).

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
46
47
48
49
50
51
52
53
@abstractmethod
def sync_close(self):
    """Close the publisher and release any resources (synchronous version).

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError

ConsolePublisher

Bases: BasePublisher

Publisher that prints events to the console for debugging and testing.

This publisher is useful for development and debugging purposes, as it outputs event information to the standard output.

Attributes:

Name Type Description
format

Output format ('json' by default).

include_timestamp

Whether to include timestamp (True by default).

indent

Indentation for output (2 by default).

Methods:

Name Description
__init__

Initialize the ConsolePublisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event to the console.

sync_close

Synchronously close the publisher and release any resources.

Source code in pyagenity/publisher/console_publisher.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ConsolePublisher(BasePublisher):
    """Publisher that prints events to the console for debugging and testing.

    This publisher is useful for development and debugging purposes, as it outputs event information
    to the standard output.

    Attributes:
        format: Output format ('json' by default).
        include_timestamp: Whether to include timestamp (True by default).
        indent: Indentation for output (2 by default).
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the ConsolePublisher with the given configuration.

        Args:
            config: Configuration dictionary. Supported keys:
                - format: Output format (default: 'json').
                - include_timestamp: Whether to include timestamp (default: True).
                - indent: Indentation for output (default: 2).
        """
        super().__init__(config or {})
        self.format = config.get("format", "json") if config else "json"
        self.include_timestamp = config.get("include_timestamp", True) if config else True
        self.indent = config.get("indent", 2) if config else 2

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to the console.

        Args:
            event: The event to publish.

        Returns:
            None
        """
        msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
        msg += f"-> Payload: {event.data}"
        msg += f" -> {event.metadata}"
        print(msg)  # noqa: T201

    async def close(self):
        """Close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher closed")

    def sync_close(self):
        """Synchronously close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher sync closed")

Attributes

config instance-attribute
config = config
format instance-attribute
format = get('format', 'json') if config else 'json'
include_timestamp instance-attribute
include_timestamp = get('include_timestamp', True) if config else True
indent instance-attribute
indent = get('indent', 2) if config else 2

Functions

__init__
__init__(config=None)

Initialize the ConsolePublisher with the given configuration.

Parameters:

Name Type Description Default
config
dict[str, Any] | None

Configuration dictionary. Supported keys: - format: Output format (default: 'json'). - include_timestamp: Whether to include timestamp (default: True). - indent: Indentation for output (default: 2).

None
Source code in pyagenity/publisher/console_publisher.py
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the ConsolePublisher with the given configuration.

    Args:
        config: Configuration dictionary. Supported keys:
            - format: Output format (default: 'json').
            - include_timestamp: Whether to include timestamp (default: True).
            - indent: Indentation for output (default: 2).
    """
    super().__init__(config or {})
    self.format = config.get("format", "json") if config else "json"
    self.include_timestamp = config.get("include_timestamp", True) if config else True
    self.indent = config.get("indent", 2) if config else 2
close async
close()

Close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
57
58
59
60
61
62
63
async def close(self):
    """Close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher closed")
publish async
publish(event)

Publish an event to the console.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

None

Source code in pyagenity/publisher/console_publisher.py
43
44
45
46
47
48
49
50
51
52
53
54
55
async def publish(self, event: EventModel) -> Any:
    """Publish an event to the console.

    Args:
        event: The event to publish.

    Returns:
        None
    """
    msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
    msg += f"-> Payload: {event.data}"
    msg += f" -> {event.metadata}"
    print(msg)  # noqa: T201
sync_close
sync_close()

Synchronously close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
65
66
67
68
69
70
71
def sync_close(self):
    """Synchronously close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher sync closed")

Modules

base_publisher

Classes:

Name Description
BasePublisher

Abstract base class for event publishers.

Classes

BasePublisher

Bases: ABC

Abstract base class for event publishers.

This class defines the interface for publishing events. Subclasses should implement the publish, close, and sync_close methods to provide specific publishing logic.

Attributes:

Name Type Description
config

Configuration dictionary for the publisher.

Methods:

Name Description
__init__

Initialize the publisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event.

sync_close

Close the publisher and release any resources (synchronous version).

Source code in pyagenity/publisher/base_publisher.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class BasePublisher(ABC):
    """Abstract base class for event publishers.

    This class defines the interface for publishing events. Subclasses should implement
    the publish, close, and sync_close methods to provide specific publishing logic.

    Attributes:
        config: Configuration dictionary for the publisher.
    """

    def __init__(self, config: dict[str, Any]):
        """Initialize the publisher with the given configuration.

        Args:
            config: Configuration dictionary for the publisher.
        """
        self.config = config

    @abstractmethod
    async def publish(self, event: EventModel) -> Any:
        """Publish an event.

        Args:
            event: The event to publish.

        Returns:
            The result of the publish operation.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self):
        """Close the publisher and release any resources.

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError

    @abstractmethod
    def sync_close(self):
        """Close the publisher and release any resources (synchronous version).

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError
Attributes
config instance-attribute
config = config
Functions
__init__
__init__(config)

Initialize the publisher with the given configuration.

Parameters:

Name Type Description Default
config dict[str, Any]

Configuration dictionary for the publisher.

required
Source code in pyagenity/publisher/base_publisher.py
17
18
19
20
21
22
23
def __init__(self, config: dict[str, Any]):
    """Initialize the publisher with the given configuration.

    Args:
        config: Configuration dictionary for the publisher.
    """
    self.config = config
close abstractmethod async
close()

Close the publisher and release any resources.

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
37
38
39
40
41
42
43
44
@abstractmethod
async def close(self):
    """Close the publisher and release any resources.

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError
publish abstractmethod async
publish(event)

Publish an event.

Parameters:

Name Type Description Default
event EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the publish operation.

Source code in pyagenity/publisher/base_publisher.py
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
async def publish(self, event: EventModel) -> Any:
    """Publish an event.

    Args:
        event: The event to publish.

    Returns:
        The result of the publish operation.
    """
    raise NotImplementedError
sync_close abstractmethod
sync_close()

Close the publisher and release any resources (synchronous version).

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
46
47
48
49
50
51
52
53
@abstractmethod
def sync_close(self):
    """Close the publisher and release any resources (synchronous version).

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError

console_publisher

Console publisher implementation for debugging and testing.

This module provides a publisher that outputs events to the console for development and debugging purposes.

Classes:

Name Description
ConsolePublisher

Publisher that prints events to the console for debugging and testing.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

ConsolePublisher

Bases: BasePublisher

Publisher that prints events to the console for debugging and testing.

This publisher is useful for development and debugging purposes, as it outputs event information to the standard output.

Attributes:

Name Type Description
format

Output format ('json' by default).

include_timestamp

Whether to include timestamp (True by default).

indent

Indentation for output (2 by default).

Methods:

Name Description
__init__

Initialize the ConsolePublisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event to the console.

sync_close

Synchronously close the publisher and release any resources.

Source code in pyagenity/publisher/console_publisher.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ConsolePublisher(BasePublisher):
    """Publisher that prints events to the console for debugging and testing.

    This publisher is useful for development and debugging purposes, as it outputs event information
    to the standard output.

    Attributes:
        format: Output format ('json' by default).
        include_timestamp: Whether to include timestamp (True by default).
        indent: Indentation for output (2 by default).
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the ConsolePublisher with the given configuration.

        Args:
            config: Configuration dictionary. Supported keys:
                - format: Output format (default: 'json').
                - include_timestamp: Whether to include timestamp (default: True).
                - indent: Indentation for output (default: 2).
        """
        super().__init__(config or {})
        self.format = config.get("format", "json") if config else "json"
        self.include_timestamp = config.get("include_timestamp", True) if config else True
        self.indent = config.get("indent", 2) if config else 2

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to the console.

        Args:
            event: The event to publish.

        Returns:
            None
        """
        msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
        msg += f"-> Payload: {event.data}"
        msg += f" -> {event.metadata}"
        print(msg)  # noqa: T201

    async def close(self):
        """Close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher closed")

    def sync_close(self):
        """Synchronously close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher sync closed")
Attributes
config instance-attribute
config = config
format instance-attribute
format = get('format', 'json') if config else 'json'
include_timestamp instance-attribute
include_timestamp = get('include_timestamp', True) if config else True
indent instance-attribute
indent = get('indent', 2) if config else 2
Functions
__init__
__init__(config=None)

Initialize the ConsolePublisher with the given configuration.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Configuration dictionary. Supported keys: - format: Output format (default: 'json'). - include_timestamp: Whether to include timestamp (default: True). - indent: Indentation for output (default: 2).

None
Source code in pyagenity/publisher/console_publisher.py
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the ConsolePublisher with the given configuration.

    Args:
        config: Configuration dictionary. Supported keys:
            - format: Output format (default: 'json').
            - include_timestamp: Whether to include timestamp (default: True).
            - indent: Indentation for output (default: 2).
    """
    super().__init__(config or {})
    self.format = config.get("format", "json") if config else "json"
    self.include_timestamp = config.get("include_timestamp", True) if config else True
    self.indent = config.get("indent", 2) if config else 2
close async
close()

Close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
57
58
59
60
61
62
63
async def close(self):
    """Close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher closed")
publish async
publish(event)

Publish an event to the console.

Parameters:

Name Type Description Default
event EventModel

The event to publish.

required

Returns:

Type Description
Any

None

Source code in pyagenity/publisher/console_publisher.py
43
44
45
46
47
48
49
50
51
52
53
54
55
async def publish(self, event: EventModel) -> Any:
    """Publish an event to the console.

    Args:
        event: The event to publish.

    Returns:
        None
    """
    msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
    msg += f"-> Payload: {event.data}"
    msg += f" -> {event.metadata}"
    print(msg)  # noqa: T201
sync_close
sync_close()

Synchronously close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
65
66
67
68
69
70
71
def sync_close(self):
    """Synchronously close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher sync closed")

events

Event and streaming primitives for agent graph execution.

This module defines event types, content types, and the EventModel for structured streaming of execution updates, tool calls, state changes, messages, and errors in agent graphs.

Classes:

Name Description
Event

Enum for event sources (graph, node, tool, streaming).

EventType

Enum for event phases (start, progress, result, end, etc.).

ContentType

Enum for semantic content types (text, message, tool_call, etc.).

EventModel

Structured event chunk for streaming agent graph execution.

Attributes

Classes

ContentType

Bases: str, Enum

Enum for semantic content types in agent graph streaming.

Values

TEXT: Textual content. MESSAGE: Message content. REASONING: Reasoning content. TOOL_CALL: Tool call content. TOOL_RESULT: Tool result content. IMAGE: Image content. AUDIO: Audio content. VIDEO: Video content. DOCUMENT: Document content. DATA: Data content. STATE: State content. UPDATE: Update content. ERROR: Error content.

Attributes:

Name Type Description
AUDIO
DATA
DOCUMENT
ERROR
IMAGE
MESSAGE
REASONING
STATE
TEXT
TOOL_CALL
TOOL_RESULT
UPDATE
VIDEO
Source code in pyagenity/publisher/events.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ContentType(str, enum.Enum):
    """Enum for semantic content types in agent graph streaming.

    Values:
        TEXT: Textual content.
        MESSAGE: Message content.
        REASONING: Reasoning content.
        TOOL_CALL: Tool call content.
        TOOL_RESULT: Tool result content.
        IMAGE: Image content.
        AUDIO: Audio content.
        VIDEO: Video content.
        DOCUMENT: Document content.
        DATA: Data content.
        STATE: State content.
        UPDATE: Update content.
        ERROR: Error content.
    """

    TEXT = "text"
    MESSAGE = "message"
    REASONING = "reasoning"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    IMAGE = "image"
    AUDIO = "audio"
    VIDEO = "video"
    DOCUMENT = "document"
    DATA = "data"
    STATE = "state"
    UPDATE = "update"
    ERROR = "error"
Attributes
AUDIO class-attribute instance-attribute
AUDIO = 'audio'
DATA class-attribute instance-attribute
DATA = 'data'
DOCUMENT class-attribute instance-attribute
DOCUMENT = 'document'
ERROR class-attribute instance-attribute
ERROR = 'error'
IMAGE class-attribute instance-attribute
IMAGE = 'image'
MESSAGE class-attribute instance-attribute
MESSAGE = 'message'
REASONING class-attribute instance-attribute
REASONING = 'reasoning'
STATE class-attribute instance-attribute
STATE = 'state'
TEXT class-attribute instance-attribute
TEXT = 'text'
TOOL_CALL class-attribute instance-attribute
TOOL_CALL = 'tool_call'
TOOL_RESULT class-attribute instance-attribute
TOOL_RESULT = 'tool_result'
UPDATE class-attribute instance-attribute
UPDATE = 'update'
VIDEO class-attribute instance-attribute
VIDEO = 'video'
Event

Bases: str, Enum

Enum for event sources in agent graph execution.

Values

GRAPH_EXECUTION: Event from graph execution. NODE_EXECUTION: Event from node execution. TOOL_EXECUTION: Event from tool execution. STREAMING: Event from streaming updates.

Attributes:

Name Type Description
GRAPH_EXECUTION
NODE_EXECUTION
STREAMING
TOOL_EXECUTION
Source code in pyagenity/publisher/events.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Event(str, enum.Enum):
    """Enum for event sources in agent graph execution.

    Values:
        GRAPH_EXECUTION: Event from graph execution.
        NODE_EXECUTION: Event from node execution.
        TOOL_EXECUTION: Event from tool execution.
        STREAMING: Event from streaming updates.
    """

    GRAPH_EXECUTION = "graph_execution"
    NODE_EXECUTION = "node_execution"
    TOOL_EXECUTION = "tool_execution"
    STREAMING = "streaming"
Attributes
GRAPH_EXECUTION class-attribute instance-attribute
GRAPH_EXECUTION = 'graph_execution'
NODE_EXECUTION class-attribute instance-attribute
NODE_EXECUTION = 'node_execution'
STREAMING class-attribute instance-attribute
STREAMING = 'streaming'
TOOL_EXECUTION class-attribute instance-attribute
TOOL_EXECUTION = 'tool_execution'
EventModel

Bases: BaseModel

Structured event chunk for streaming agent graph execution.

Represents a chunk of streamed data with event and content semantics, supporting both delta (incremental) and full content. Used for real-time streaming of execution updates, tool calls, state changes, messages, and errors.

Attributes:

Name Type Description
event Event

Type of the event source.

event_type EventType

Phase of the event (start, progress, end, update).

content str

Streamed textual content.

content_blocks list[ContentBlock] | None

Structured content blocks for multimodal streaming.

delta bool

True if this is a delta update (incremental).

delta_type Literal['text', 'json', 'binary'] | None

Type of delta when delta=True.

block_index int | None

Index of the content block this chunk applies to.

chunk_index int | None

Per-block chunk index for ordering.

byte_offset int | None

Byte offset for binary/media streaming.

data dict[str, Any]

Additional structured data.

content_type list[ContentType] | None

Semantic type of content.

sequence_id int

Monotonic sequence ID for stream ordering.

node_name str

Name of the node producing this chunk.

run_id str

Unique ID for this stream/run.

thread_id str | int

Thread ID for this execution.

timestamp float

UNIX timestamp of when chunk was created.

is_error bool

Marks this chunk as representing an error state.

metadata dict[str, Any]

Optional metadata for consumers.

Classes:

Name Description
Config

Pydantic configuration for EventModel.

Methods:

Name Description
default

Create a default EventModel instance with minimal required fields.

stream

Create a default EventModel instance for streaming updates.

Source code in pyagenity/publisher/events.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class EventModel(BaseModel):
    """
    Structured event chunk for streaming agent graph execution.

    Represents a chunk of streamed data with event and content semantics, supporting both delta
    (incremental) and full content. Used for real-time streaming of execution updates, tool calls,
    state changes, messages, and errors.

    Attributes:
        event: Type of the event source.
        event_type: Phase of the event (start, progress, end, update).
        content: Streamed textual content.
        content_blocks: Structured content blocks for multimodal streaming.
        delta: True if this is a delta update (incremental).
        delta_type: Type of delta when delta=True.
        block_index: Index of the content block this chunk applies to.
        chunk_index: Per-block chunk index for ordering.
        byte_offset: Byte offset for binary/media streaming.
        data: Additional structured data.
        content_type: Semantic type of content.
        sequence_id: Monotonic sequence ID for stream ordering.
        node_name: Name of the node producing this chunk.
        run_id: Unique ID for this stream/run.
        thread_id: Thread ID for this execution.
        timestamp: UNIX timestamp of when chunk was created.
        is_error: Marks this chunk as representing an error state.
        metadata: Optional metadata for consumers.
    """

    # Event metadata
    event: Event = Field(..., description="Type of the event source")
    event_type: EventType = Field(
        ..., description="Phase of the event (start, progress, end, update)"
    )

    # Streamed content
    content: str = Field(default="", description="Streamed textual content")
    # Structured content blocks for multimodal/structured streaming
    content_blocks: list[ContentBlock] | None = Field(
        default=None, description="Structured content blocks carried by this event"
    )
    # Delta controls
    delta: bool = Field(default=False, description="True if this is a delta update (incremental)")
    delta_type: Literal["text", "json", "binary"] | None = Field(
        default=None, description="Type of delta when delta=True"
    )
    block_index: int | None = Field(
        default=None, description="Index of the content block this chunk applies to"
    )
    chunk_index: int | None = Field(default=None, description="Per-block chunk index for ordering")
    byte_offset: int | None = Field(
        default=None, description="Byte offset for binary/media streaming"
    )

    # Data payload
    data: dict[str, Any] = Field(default_factory=dict, description="Additional structured data")

    # Metadata
    content_type: list[ContentType] | None = Field(
        default=None, description="Semantic type of content"
    )
    sequence_id: int = Field(default=0, description="Monotonic sequence ID for stream ordering")
    node_name: str = Field(default="", description="Name of the node producing this chunk")
    run_id: str = Field(
        default_factory=lambda: str(uuid.uuid4()), description="Unique ID for this stream/run"
    )
    thread_id: str | int = Field(default="", description="Thread ID for this execution")
    timestamp: float = Field(
        default_factory=time.time, description="UNIX timestamp of when chunk was created"
    )
    is_error: bool = Field(
        default=False, description="Marks this chunk as representing an error state"
    )
    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Optional metadata for consumers"
    )

    class Config:
        """Pydantic configuration for EventModel.

        Attributes:
            use_enum_values: Output enums as strings.
        """

        use_enum_values = True  # Output enums as strings

    @classmethod
    def default(
        cls,
        base_config: dict,
        data: dict[str, Any],
        content_type: list[ContentType],
        event: Event = Event.GRAPH_EXECUTION,
        event_type=EventType.START,
        node_name: str = "",
        extra: dict[str, Any] | None = None,
    ) -> "EventModel":
        """Create a default EventModel instance with minimal required fields.

        Args:
            base_config: Base configuration for the event (thread/run/timestamp/user).
            data: Structured data payload.
            content_type: Semantic type(s) of content.
            event: Event source type (default: GRAPH_EXECUTION).
            event_type: Event phase (default: START).
            node_name: Name of the node producing the event.
            extra: Additional metadata.

        Returns:
            EventModel: The created event model instance.
        """
        thread_id = base_config.get("thread_id", "")
        run_id = base_config.get("run_id", "")

        metadata = {
            "run_timestamp": base_config.get("timestamp", ""),
            "user_id": base_config.get("user_id"),
            "is_stream": base_config.get("is_stream", False),
        }
        if extra:
            metadata.update(extra)
        return cls(
            event=event,
            event_type=event_type,
            delta=False,
            content_type=content_type,
            data=data,
            thread_id=thread_id,
            node_name=node_name,
            run_id=run_id,
            metadata=metadata,
        )

    @classmethod
    def stream(
        cls,
        base_config: dict,
        node_name: str = "",
        extra: dict[str, Any] | None = None,
    ) -> "EventModel":
        """Create a default EventModel instance for streaming updates.

        Args:
            base_config: Base configuration for the event (thread/run/timestamp/user).
            node_name: Name of the node producing the event.
            extra: Additional metadata.

        Returns:
            EventModel: The created event model instance for streaming.
        """
        thread_id = base_config.get("thread_id", "")
        run_id = base_config.get("run_id", "")

        metadata = {
            "run_timestamp": base_config.get("timestamp", ""),
            "user_id": base_config.get("user_id"),
            "is_stream": base_config.get("is_stream", False),
        }
        if extra:
            metadata.update(extra)
        return cls(
            event=Event.STREAMING,
            event_type=EventType.UPDATE,
            delta=True,
            content_type=[ContentType.TEXT, ContentType.REASONING],
            data={},
            thread_id=thread_id,
            node_name=node_name,
            run_id=run_id,
            metadata=metadata,
        )
Attributes
block_index class-attribute instance-attribute
block_index = Field(default=None, description='Index of the content block this chunk applies to')
byte_offset class-attribute instance-attribute
byte_offset = Field(default=None, description='Byte offset for binary/media streaming')
chunk_index class-attribute instance-attribute
chunk_index = Field(default=None, description='Per-block chunk index for ordering')
content class-attribute instance-attribute
content = Field(default='', description='Streamed textual content')
content_blocks class-attribute instance-attribute
content_blocks = Field(default=None, description='Structured content blocks carried by this event')
content_type class-attribute instance-attribute
content_type = Field(default=None, description='Semantic type of content')
data class-attribute instance-attribute
data = Field(default_factory=dict, description='Additional structured data')
delta class-attribute instance-attribute
delta = Field(default=False, description='True if this is a delta update (incremental)')
delta_type class-attribute instance-attribute
delta_type = Field(default=None, description='Type of delta when delta=True')
event class-attribute instance-attribute
event = Field(..., description='Type of the event source')
event_type class-attribute instance-attribute
event_type = Field(..., description='Phase of the event (start, progress, end, update)')
is_error class-attribute instance-attribute
is_error = Field(default=False, description='Marks this chunk as representing an error state')
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict, description='Optional metadata for consumers')
node_name class-attribute instance-attribute
node_name = Field(default='', description='Name of the node producing this chunk')
run_id class-attribute instance-attribute
run_id = Field(default_factory=lambda: str(uuid4()), description='Unique ID for this stream/run')
sequence_id class-attribute instance-attribute
sequence_id = Field(default=0, description='Monotonic sequence ID for stream ordering')
thread_id class-attribute instance-attribute
thread_id = Field(default='', description='Thread ID for this execution')
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=time, description='UNIX timestamp of when chunk was created')
Classes
Config

Pydantic configuration for EventModel.

Attributes:

Name Type Description
use_enum_values

Output enums as strings.

Source code in pyagenity/publisher/events.py
173
174
175
176
177
178
179
180
class Config:
    """Pydantic configuration for EventModel.

    Attributes:
        use_enum_values: Output enums as strings.
    """

    use_enum_values = True  # Output enums as strings
Attributes
use_enum_values class-attribute instance-attribute
use_enum_values = True
Functions
default classmethod
default(base_config, data, content_type, event=Event.GRAPH_EXECUTION, event_type=EventType.START, node_name='', extra=None)

Create a default EventModel instance with minimal required fields.

Parameters:

Name Type Description Default
base_config dict

Base configuration for the event (thread/run/timestamp/user).

required
data dict[str, Any]

Structured data payload.

required
content_type list[ContentType]

Semantic type(s) of content.

required
event Event

Event source type (default: GRAPH_EXECUTION).

GRAPH_EXECUTION
event_type

Event phase (default: START).

START
node_name str

Name of the node producing the event.

''
extra dict[str, Any] | None

Additional metadata.

None

Returns:

Name Type Description
EventModel EventModel

The created event model instance.

Source code in pyagenity/publisher/events.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@classmethod
def default(
    cls,
    base_config: dict,
    data: dict[str, Any],
    content_type: list[ContentType],
    event: Event = Event.GRAPH_EXECUTION,
    event_type=EventType.START,
    node_name: str = "",
    extra: dict[str, Any] | None = None,
) -> "EventModel":
    """Create a default EventModel instance with minimal required fields.

    Args:
        base_config: Base configuration for the event (thread/run/timestamp/user).
        data: Structured data payload.
        content_type: Semantic type(s) of content.
        event: Event source type (default: GRAPH_EXECUTION).
        event_type: Event phase (default: START).
        node_name: Name of the node producing the event.
        extra: Additional metadata.

    Returns:
        EventModel: The created event model instance.
    """
    thread_id = base_config.get("thread_id", "")
    run_id = base_config.get("run_id", "")

    metadata = {
        "run_timestamp": base_config.get("timestamp", ""),
        "user_id": base_config.get("user_id"),
        "is_stream": base_config.get("is_stream", False),
    }
    if extra:
        metadata.update(extra)
    return cls(
        event=event,
        event_type=event_type,
        delta=False,
        content_type=content_type,
        data=data,
        thread_id=thread_id,
        node_name=node_name,
        run_id=run_id,
        metadata=metadata,
    )
stream classmethod
stream(base_config, node_name='', extra=None)

Create a default EventModel instance for streaming updates.

Parameters:

Name Type Description Default
base_config dict

Base configuration for the event (thread/run/timestamp/user).

required
node_name str

Name of the node producing the event.

''
extra dict[str, Any] | None

Additional metadata.

None

Returns:

Name Type Description
EventModel EventModel

The created event model instance for streaming.

Source code in pyagenity/publisher/events.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@classmethod
def stream(
    cls,
    base_config: dict,
    node_name: str = "",
    extra: dict[str, Any] | None = None,
) -> "EventModel":
    """Create a default EventModel instance for streaming updates.

    Args:
        base_config: Base configuration for the event (thread/run/timestamp/user).
        node_name: Name of the node producing the event.
        extra: Additional metadata.

    Returns:
        EventModel: The created event model instance for streaming.
    """
    thread_id = base_config.get("thread_id", "")
    run_id = base_config.get("run_id", "")

    metadata = {
        "run_timestamp": base_config.get("timestamp", ""),
        "user_id": base_config.get("user_id"),
        "is_stream": base_config.get("is_stream", False),
    }
    if extra:
        metadata.update(extra)
    return cls(
        event=Event.STREAMING,
        event_type=EventType.UPDATE,
        delta=True,
        content_type=[ContentType.TEXT, ContentType.REASONING],
        data={},
        thread_id=thread_id,
        node_name=node_name,
        run_id=run_id,
        metadata=metadata,
    )
EventType

Bases: str, Enum

Enum for event phases in agent graph execution.

Values

START: Event marks start of execution. PROGRESS: Event marks progress update. RESULT: Event marks result produced. END: Event marks end of execution. UPDATE: Event marks update. ERROR: Event marks error. INTERRUPTED: Event marks interruption.

Attributes:

Name Type Description
END
ERROR
INTERRUPTED
PROGRESS
RESULT
START
UPDATE
Source code in pyagenity/publisher/events.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class EventType(str, enum.Enum):
    """Enum for event phases in agent graph execution.

    Values:
        START: Event marks start of execution.
        PROGRESS: Event marks progress update.
        RESULT: Event marks result produced.
        END: Event marks end of execution.
        UPDATE: Event marks update.
        ERROR: Event marks error.
        INTERRUPTED: Event marks interruption.
    """

    START = "start"
    PROGRESS = "progress"
    RESULT = "result"
    END = "end"
    UPDATE = "update"
    ERROR = "error"
    INTERRUPTED = "interrupted"
Attributes
END class-attribute instance-attribute
END = 'end'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PROGRESS class-attribute instance-attribute
PROGRESS = 'progress'
RESULT class-attribute instance-attribute
RESULT = 'result'
START class-attribute instance-attribute
START = 'start'
UPDATE class-attribute instance-attribute
UPDATE = 'update'

kafka_publisher

Kafka publisher implementation (optional dependency).

Uses aiokafka to publish events to a Kafka topic.

Dependency: aiokafka Not installed by default; install extra: pip install pyagenity[kafka].

Classes:

Name Description
KafkaPublisher

Publish events to a Kafka topic using aiokafka.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

KafkaPublisher

Bases: BasePublisher

Publish events to a Kafka topic using aiokafka.

This class provides an asynchronous interface for publishing events to a Kafka topic. It uses the aiokafka library to handle the producer operations. The publisher is lazily initialized and can be reused for multiple publishes.

Attributes:

Name Type Description
bootstrap_servers str

Kafka bootstrap servers.

topic str

Kafka topic to publish to.

client_id str | None

Client ID for the producer.

_producer

Lazy-loaded Kafka producer instance.

Methods:

Name Description
__init__

Initialize the KafkaPublisher.

close

Close the Kafka producer.

publish

Publish an event to the Kafka topic.

sync_close

Synchronously close the Kafka producer.

Source code in pyagenity/publisher/kafka_publisher.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class KafkaPublisher(BasePublisher):
    """Publish events to a Kafka topic using aiokafka.

    This class provides an asynchronous interface for publishing events to a Kafka topic.
    It uses the aiokafka library to handle the producer operations. The publisher is
    lazily initialized and can be reused for multiple publishes.

    Attributes:
        bootstrap_servers: Kafka bootstrap servers.
        topic: Kafka topic to publish to.
        client_id: Client ID for the producer.
        _producer: Lazy-loaded Kafka producer instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the KafkaPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092").
                - topic: Kafka topic to publish to (default: "pyagenity.events").
                - client_id: Client ID for the producer.
        """
        super().__init__(config or {})
        self.bootstrap_servers: str = self.config.get("bootstrap_servers", "localhost:9092")
        self.topic: str = self.config.get("topic", "pyagenity.events")
        self.client_id: str | None = self.config.get("client_id")
        self._producer = None  # type: ignore[var-annotated]

    async def _get_producer(self):
        """Get or create the Kafka producer instance.

        This method lazily initializes the producer if it hasn't been created yet.
        It imports aiokafka and starts the producer.

        Returns:
            The initialized producer instance.

        Raises:
            RuntimeError: If the 'aiokafka' package is not installed.
        """
        if self._producer is not None:
            return self._producer

        try:
            aiokafka = importlib.import_module("aiokafka")
        except Exception as exc:
            raise RuntimeError(
                "KafkaPublisher requires the 'aiokafka' package. Install with "
                "'pip install pyagenity[kafka]' or 'pip install aiokafka'."
            ) from exc

        producer_cls = aiokafka.AIOKafkaProducer
        self._producer = producer_cls(
            bootstrap_servers=self.bootstrap_servers,
            client_id=self.client_id,
        )
        await self._producer.start()
        return self._producer

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to the Kafka topic.

        Args:
            event: The event to publish.

        Returns:
            The result of the send_and_wait operation.
        """
        producer = await self._get_producer()
        payload = json.dumps(event.model_dump()).encode("utf-8")
        return await producer.send_and_wait(self.topic, payload)

    async def close(self):
        """Close the Kafka producer.

        Stops the producer and cleans up resources. Errors during stopping are logged
        but do not raise exceptions.
        """
        if self._producer is None:
            return

        try:
            await self._producer.stop()
        except Exception:
            logger.debug("KafkaPublisher close encountered an error", exc_info=True)
        finally:
            self._producer = None

    def sync_close(self):
        """Synchronously close the Kafka producer.

        This method runs the async close in a new event loop. If called within an
        active event loop, it logs a warning and skips the operation.
        """
        try:
            asyncio.run(self.close())
        except RuntimeError:
            logger.warning("sync_close called within an active event loop; skipping.")
Attributes
bootstrap_servers instance-attribute
bootstrap_servers = get('bootstrap_servers', 'localhost:9092')
client_id instance-attribute
client_id = get('client_id')
config instance-attribute
config = config
topic instance-attribute
topic = get('topic', 'pyagenity.events')
Functions
__init__
__init__(config=None)

Initialize the KafkaPublisher.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Configuration dictionary. Supported keys: - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092"). - topic: Kafka topic to publish to (default: "pyagenity.events"). - client_id: Client ID for the producer.

None
Source code in pyagenity/publisher/kafka_publisher.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the KafkaPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092").
            - topic: Kafka topic to publish to (default: "pyagenity.events").
            - client_id: Client ID for the producer.
    """
    super().__init__(config or {})
    self.bootstrap_servers: str = self.config.get("bootstrap_servers", "localhost:9092")
    self.topic: str = self.config.get("topic", "pyagenity.events")
    self.client_id: str | None = self.config.get("client_id")
    self._producer = None  # type: ignore[var-annotated]
close async
close()

Close the Kafka producer.

Stops the producer and cleans up resources. Errors during stopping are logged but do not raise exceptions.

Source code in pyagenity/publisher/kafka_publisher.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def close(self):
    """Close the Kafka producer.

    Stops the producer and cleans up resources. Errors during stopping are logged
    but do not raise exceptions.
    """
    if self._producer is None:
        return

    try:
        await self._producer.stop()
    except Exception:
        logger.debug("KafkaPublisher close encountered an error", exc_info=True)
    finally:
        self._producer = None
publish async
publish(event)

Publish an event to the Kafka topic.

Parameters:

Name Type Description Default
event EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the send_and_wait operation.

Source code in pyagenity/publisher/kafka_publisher.py
84
85
86
87
88
89
90
91
92
93
94
95
async def publish(self, event: EventModel) -> Any:
    """Publish an event to the Kafka topic.

    Args:
        event: The event to publish.

    Returns:
        The result of the send_and_wait operation.
    """
    producer = await self._get_producer()
    payload = json.dumps(event.model_dump()).encode("utf-8")
    return await producer.send_and_wait(self.topic, payload)
sync_close
sync_close()

Synchronously close the Kafka producer.

This method runs the async close in a new event loop. If called within an active event loop, it logs a warning and skips the operation.

Source code in pyagenity/publisher/kafka_publisher.py
113
114
115
116
117
118
119
120
121
122
def sync_close(self):
    """Synchronously close the Kafka producer.

    This method runs the async close in a new event loop. If called within an
    active event loop, it logs a warning and skips the operation.
    """
    try:
        asyncio.run(self.close())
    except RuntimeError:
        logger.warning("sync_close called within an active event loop; skipping.")

publish

Functions:

Name Description
publish_event

Publish an event asynchronously using the background task manager.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

Functions

publish_event
publish_event(event, publisher=Inject[BasePublisher], task_manager=Inject[BackgroundTaskManager])

Publish an event asynchronously using the background task manager.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required
publisher
BasePublisher | None

The publisher instance (injected).

Inject[BasePublisher]
task_manager
BackgroundTaskManager

The background task manager (injected).

Inject[BackgroundTaskManager]
Source code in pyagenity/publisher/publish.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def publish_event(
    event: EventModel,
    publisher: BasePublisher | None = Inject[BasePublisher],
    task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
) -> None:
    """Publish an event asynchronously using the background task manager.

    Args:
        event: The event to publish.
        publisher: The publisher instance (injected).
        task_manager: The background task manager (injected).
    """
    # Store the task to prevent it from being garbage collected
    task_manager.create_task(_publish_event_task(event, publisher))

rabbitmq_publisher

RabbitMQ publisher implementation (optional dependency).

Uses aio-pika to publish events to an exchange with a routing key.

Dependency: aio-pika Not installed by default; install extra: pip install pyagenity[rabbitmq].

Classes:

Name Description
RabbitMQPublisher

Publish events to RabbitMQ using aio-pika.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

RabbitMQPublisher

Bases: BasePublisher

Publish events to RabbitMQ using aio-pika.

Attributes:

Name Type Description
url str

RabbitMQ connection URL.

exchange str

Exchange name.

routing_key str

Routing key for messages.

exchange_type str

Type of exchange.

declare bool

Whether to declare the exchange.

durable bool

Whether the exchange is durable.

_conn

Connection instance.

_channel

Channel instance.

_exchange

Exchange instance.

Methods:

Name Description
__init__

Initialize the RabbitMQPublisher.

close

Close the RabbitMQ connection and channel.

publish

Publish an event to RabbitMQ.

sync_close

Synchronously close the RabbitMQ connection.

Source code in pyagenity/publisher/rabbitmq_publisher.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class RabbitMQPublisher(BasePublisher):
    """Publish events to RabbitMQ using aio-pika.

    Attributes:
        url: RabbitMQ connection URL.
        exchange: Exchange name.
        routing_key: Routing key for messages.
        exchange_type: Type of exchange.
        declare: Whether to declare the exchange.
        durable: Whether the exchange is durable.
        _conn: Connection instance.
        _channel: Channel instance.
        _exchange: Exchange instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the RabbitMQPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/").
                - exchange: Exchange name (default: "pyagenity.events").
                - routing_key: Routing key (default: "pyagenity.events").
                - exchange_type: Exchange type (default: "topic").
                - declare: Whether to declare exchange (default: True).
                - durable: Whether exchange is durable (default: True).
        """
        super().__init__(config or {})
        self.url: str = self.config.get("url", "amqp://guest:guest@localhost/")
        self.exchange: str = self.config.get("exchange", "pyagenity.events")
        self.routing_key: str = self.config.get("routing_key", "pyagenity.events")
        self.exchange_type: str = self.config.get("exchange_type", "topic")
        self.declare: bool = self.config.get("declare", True)
        self.durable: bool = self.config.get("durable", True)

        self._conn = None  # type: ignore[var-annotated]
        self._channel = None  # type: ignore[var-annotated]
        self._exchange = None  # type: ignore[var-annotated]

    async def _ensure(self):
        """Ensure the connection, channel, and exchange are initialized."""
        if self._exchange is not None:
            return

        try:
            aio_pika = importlib.import_module("aio_pika")
        except Exception as exc:
            raise RuntimeError(
                "RabbitMQPublisher requires the 'aio-pika' package. Install with "
                "'pip install pyagenity[rabbitmq]' or 'pip install aio-pika'."
            ) from exc

        # Connect and declare exchange if needed
        self._conn = await aio_pika.connect_robust(self.url)
        self._channel = await self._conn.channel()

        if self.declare:
            ex_type = getattr(
                aio_pika.ExchangeType,
                self.exchange_type.upper(),
                aio_pika.ExchangeType.TOPIC,
            )
            self._exchange = await self._channel.declare_exchange(
                self.exchange, ex_type, durable=self.durable
            )
        else:
            # Fall back to default exchange
            self._exchange = self._channel.default_exchange

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to RabbitMQ.

        Args:
            event: The event to publish.

        Returns:
            True on success.
        """
        await self._ensure()
        payload = json.dumps(event.model_dump()).encode("utf-8")

        aio_pika = importlib.import_module("aio_pika")
        message = aio_pika.Message(body=payload)
        if self._exchange is None:
            raise RuntimeError("RabbitMQPublisher exchange not initialized")
        await self._exchange.publish(message, routing_key=self.routing_key)
        return True

    async def close(self):
        """Close the RabbitMQ connection and channel."""
        try:
            if self._channel is not None:
                await self._channel.close()
        except Exception:
            logger.debug("RabbitMQPublisher channel close error", exc_info=True)
        finally:
            self._channel = None

        try:
            if self._conn is not None:
                await self._conn.close()
        except Exception:
            logger.debug("RabbitMQPublisher connection close error", exc_info=True)
        finally:
            self._conn = None
            self._exchange = None

    def sync_close(self):
        """Synchronously close the RabbitMQ connection."""
        try:
            asyncio.run(self.close())
        except RuntimeError:
            logger.warning("sync_close called within an active event loop; skipping.")
Attributes
config instance-attribute
config = config
declare instance-attribute
declare = get('declare', True)
durable instance-attribute
durable = get('durable', True)
exchange instance-attribute
exchange = get('exchange', 'pyagenity.events')
exchange_type instance-attribute
exchange_type = get('exchange_type', 'topic')
routing_key instance-attribute
routing_key = get('routing_key', 'pyagenity.events')
url instance-attribute
url = get('url', 'amqp://guest:guest@localhost/')
Functions
__init__
__init__(config=None)

Initialize the RabbitMQPublisher.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Configuration dictionary. Supported keys: - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/"). - exchange: Exchange name (default: "pyagenity.events"). - routing_key: Routing key (default: "pyagenity.events"). - exchange_type: Exchange type (default: "topic"). - declare: Whether to declare exchange (default: True). - durable: Whether exchange is durable (default: True).

None
Source code in pyagenity/publisher/rabbitmq_publisher.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the RabbitMQPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/").
            - exchange: Exchange name (default: "pyagenity.events").
            - routing_key: Routing key (default: "pyagenity.events").
            - exchange_type: Exchange type (default: "topic").
            - declare: Whether to declare exchange (default: True).
            - durable: Whether exchange is durable (default: True).
    """
    super().__init__(config or {})
    self.url: str = self.config.get("url", "amqp://guest:guest@localhost/")
    self.exchange: str = self.config.get("exchange", "pyagenity.events")
    self.routing_key: str = self.config.get("routing_key", "pyagenity.events")
    self.exchange_type: str = self.config.get("exchange_type", "topic")
    self.declare: bool = self.config.get("declare", True)
    self.durable: bool = self.config.get("durable", True)

    self._conn = None  # type: ignore[var-annotated]
    self._channel = None  # type: ignore[var-annotated]
    self._exchange = None  # type: ignore[var-annotated]
close async
close()

Close the RabbitMQ connection and channel.

Source code in pyagenity/publisher/rabbitmq_publisher.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def close(self):
    """Close the RabbitMQ connection and channel."""
    try:
        if self._channel is not None:
            await self._channel.close()
    except Exception:
        logger.debug("RabbitMQPublisher channel close error", exc_info=True)
    finally:
        self._channel = None

    try:
        if self._conn is not None:
            await self._conn.close()
    except Exception:
        logger.debug("RabbitMQPublisher connection close error", exc_info=True)
    finally:
        self._conn = None
        self._exchange = None
publish async
publish(event)

Publish an event to RabbitMQ.

Parameters:

Name Type Description Default
event EventModel

The event to publish.

required

Returns:

Type Description
Any

True on success.

Source code in pyagenity/publisher/rabbitmq_publisher.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def publish(self, event: EventModel) -> Any:
    """Publish an event to RabbitMQ.

    Args:
        event: The event to publish.

    Returns:
        True on success.
    """
    await self._ensure()
    payload = json.dumps(event.model_dump()).encode("utf-8")

    aio_pika = importlib.import_module("aio_pika")
    message = aio_pika.Message(body=payload)
    if self._exchange is None:
        raise RuntimeError("RabbitMQPublisher exchange not initialized")
    await self._exchange.publish(message, routing_key=self.routing_key)
    return True
sync_close
sync_close()

Synchronously close the RabbitMQ connection.

Source code in pyagenity/publisher/rabbitmq_publisher.py
131
132
133
134
135
136
def sync_close(self):
    """Synchronously close the RabbitMQ connection."""
    try:
        asyncio.run(self.close())
    except RuntimeError:
        logger.warning("sync_close called within an active event loop; skipping.")

redis_publisher

Redis publisher implementation (optional dependency).

This publisher uses the redis-py asyncio client to publish events via: - Pub/Sub channels (default), or - Redis Streams (XADD) when configured with mode="stream".

Dependency: redis>=4.2 (provides redis.asyncio). Not installed by default; install extra: pip install pyagenity[redis].

Classes:

Name Description
RedisPublisher

Publish events to Redis via Pub/Sub channel or Stream.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

RedisPublisher

Bases: BasePublisher

Publish events to Redis via Pub/Sub channel or Stream.

Attributes:

Name Type Description
url str

Redis URL.

mode str

Publishing mode ('pubsub' or 'stream').

channel str

Pub/Sub channel name.

stream str

Stream name.

maxlen int | None

Max length for streams.

encoding str

Encoding for messages.

_redis

Redis client instance.

Methods:

Name Description
__init__

Initialize the RedisPublisher.

close

Close the Redis client.

publish

Publish an event to Redis.

sync_close

Synchronously close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class RedisPublisher(BasePublisher):
    """Publish events to Redis via Pub/Sub channel or Stream.

    Attributes:
        url: Redis URL.
        mode: Publishing mode ('pubsub' or 'stream').
        channel: Pub/Sub channel name.
        stream: Stream name.
        maxlen: Max length for streams.
        encoding: Encoding for messages.
        _redis: Redis client instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the RedisPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - url: Redis URL (default: "redis://localhost:6379/0").
                - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub').
                - channel: Pub/Sub channel name (default: "pyagenity.events").
                - stream: Stream name (default: "pyagenity.events").
                - maxlen: Max length for streams.
                - encoding: Encoding (default: "utf-8").
        """
        super().__init__(config or {})
        self.url: str = self.config.get("url", "redis://localhost:6379/0")
        self.mode: str = self.config.get("mode", "pubsub")
        self.channel: str = self.config.get("channel", "pyagenity.events")
        self.stream: str = self.config.get("stream", "pyagenity.events")
        self.maxlen: int | None = self.config.get("maxlen")
        self.encoding: str = self.config.get("encoding", "utf-8")

        # Lazy import & connect on first use to avoid ImportError at import-time.
        self._redis = None  # type: ignore[var-annotated]

    async def _get_client(self):
        """Get or create the Redis client.

        Returns:
            The Redis client instance.

        Raises:
            RuntimeError: If connection fails.
        """
        if self._redis is not None:
            return self._redis

        try:
            redis_asyncio = importlib.import_module("redis.asyncio")
        except Exception as exc:  # ImportError and others
            raise RuntimeError(
                "RedisPublisher requires the 'redis' package. Install with "
                "'pip install pyagenity[redis]' or 'pip install redis'."
            ) from exc

        try:
            self._redis = redis_asyncio.from_url(
                self.url, encoding=self.encoding, decode_responses=False
            )
        except Exception as exc:
            raise RuntimeError(f"RedisPublisher failed to connect to Redis at {self.url}") from exc

        return self._redis

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to Redis.

        Args:
            event: The event to publish.

        Returns:
            The result of the publish operation.
        """
        client = await self._get_client()
        payload = json.dumps(event.model_dump()).encode(self.encoding)

        if self.mode == "stream":
            # XADD to stream
            fields = {"data": payload}
            if self.maxlen is not None:
                return await client.xadd(self.stream, fields, maxlen=self.maxlen, approximate=True)
            return await client.xadd(self.stream, fields)

        # Default: Pub/Sub channel
        return await client.publish(self.channel, payload)

    async def close(self):
        """Close the Redis client."""
        if self._redis is not None:
            try:
                await self._redis.close()
                await self._redis.connection_pool.disconnect(inuse_connections=True)
            except Exception:  # best-effort close
                logger.debug("RedisPublisher close encountered an error", exc_info=True)
            finally:
                self._redis = None

    def sync_close(self):
        """Synchronously close the Redis client."""
        try:
            asyncio.run(self.close())
        except RuntimeError:
            # Already in an event loop; fall back to scheduling close
            logger.warning("sync_close called within an active event loop; skipping.")
Attributes
channel instance-attribute
channel = get('channel', 'pyagenity.events')
config instance-attribute
config = config
encoding instance-attribute
encoding = get('encoding', 'utf-8')
maxlen instance-attribute
maxlen = get('maxlen')
mode instance-attribute
mode = get('mode', 'pubsub')
stream instance-attribute
stream = get('stream', 'pyagenity.events')
url instance-attribute
url = get('url', 'redis://localhost:6379/0')
Functions
__init__
__init__(config=None)

Initialize the RedisPublisher.

Parameters:

Name Type Description Default
config dict[str, Any] | None

Configuration dictionary. Supported keys: - url: Redis URL (default: "redis://localhost:6379/0"). - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub'). - channel: Pub/Sub channel name (default: "pyagenity.events"). - stream: Stream name (default: "pyagenity.events"). - maxlen: Max length for streams. - encoding: Encoding (default: "utf-8").

None
Source code in pyagenity/publisher/redis_publisher.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the RedisPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - url: Redis URL (default: "redis://localhost:6379/0").
            - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub').
            - channel: Pub/Sub channel name (default: "pyagenity.events").
            - stream: Stream name (default: "pyagenity.events").
            - maxlen: Max length for streams.
            - encoding: Encoding (default: "utf-8").
    """
    super().__init__(config or {})
    self.url: str = self.config.get("url", "redis://localhost:6379/0")
    self.mode: str = self.config.get("mode", "pubsub")
    self.channel: str = self.config.get("channel", "pyagenity.events")
    self.stream: str = self.config.get("stream", "pyagenity.events")
    self.maxlen: int | None = self.config.get("maxlen")
    self.encoding: str = self.config.get("encoding", "utf-8")

    # Lazy import & connect on first use to avoid ImportError at import-time.
    self._redis = None  # type: ignore[var-annotated]
close async
close()

Close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
114
115
116
117
118
119
120
121
122
123
async def close(self):
    """Close the Redis client."""
    if self._redis is not None:
        try:
            await self._redis.close()
            await self._redis.connection_pool.disconnect(inuse_connections=True)
        except Exception:  # best-effort close
            logger.debug("RedisPublisher close encountered an error", exc_info=True)
        finally:
            self._redis = None
publish async
publish(event)

Publish an event to Redis.

Parameters:

Name Type Description Default
event EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the publish operation.

Source code in pyagenity/publisher/redis_publisher.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def publish(self, event: EventModel) -> Any:
    """Publish an event to Redis.

    Args:
        event: The event to publish.

    Returns:
        The result of the publish operation.
    """
    client = await self._get_client()
    payload = json.dumps(event.model_dump()).encode(self.encoding)

    if self.mode == "stream":
        # XADD to stream
        fields = {"data": payload}
        if self.maxlen is not None:
            return await client.xadd(self.stream, fields, maxlen=self.maxlen, approximate=True)
        return await client.xadd(self.stream, fields)

    # Default: Pub/Sub channel
    return await client.publish(self.channel, payload)
sync_close
sync_close()

Synchronously close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
125
126
127
128
129
130
131
def sync_close(self):
    """Synchronously close the Redis client."""
    try:
        asyncio.run(self.close())
    except RuntimeError:
        # Already in an event loop; fall back to scheduling close
        logger.warning("sync_close called within an active event loop; skipping.")