Publisher
Publisher module for PyAgenity events.
This module provides publishers that handle the delivery of events to various outputs, such as console, Redis, Kafka, and RabbitMQ. Publishers are primarily used for logging and monitoring agent behavior, enabling real-time tracking of performance, usage, and debugging in agent graphs.
Key components: - BasePublisher: Abstract base class for all publishers, defining the interface for publishing event - ConsolePublisher: Default publisher that outputs events to the console for development and debugging - Optional publishers: RedisPublisher, KafkaPublisher, RabbitMQPublisher, which are available only if their dependencies are installed
Usage: - Import publishers: from pyagenity.publisher import ConsolePublisher, RedisPublisher (if available) - Instantiate and use in CompiledGraph: graph.compile(publisher=ConsolePublisher()). - Events are emitted as EventModel instances during graph execution, including node starts, completions, and errors.
Dependencies for optional publishers: - RedisPublisher: Requires 'redis.asyncio' (install via pip install redis). - KafkaPublisher: Requires 'aiokafka' (install via pip install aiokafka). - RabbitMQPublisher: Requires 'aio_pika' (install via pip install aio-pika).
For more details, see the individual publisher classes and the PyAgenity documentation.
Modules:
Name | Description |
---|---|
base_publisher |
|
console_publisher |
Console publisher implementation for debugging and testing. |
events |
Event and streaming primitives for agent graph execution. |
kafka_publisher |
Kafka publisher implementation (optional dependency). |
publish |
|
rabbitmq_publisher |
RabbitMQ publisher implementation (optional dependency). |
redis_publisher |
Redis publisher implementation (optional dependency). |
Classes:
Name | Description |
---|---|
BasePublisher |
Abstract base class for event publishers. |
ConsolePublisher |
Publisher that prints events to the console for debugging and testing. |
Attributes¶
Classes¶
BasePublisher
¶
Bases: ABC
Abstract base class for event publishers.
This class defines the interface for publishing events. Subclasses should implement the publish, close, and sync_close methods to provide specific publishing logic.
Attributes:
Name | Type | Description |
---|---|---|
config |
Configuration dictionary for the publisher. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the publisher with the given configuration. |
close |
Close the publisher and release any resources. |
publish |
Publish an event. |
sync_close |
Close the publisher and release any resources (synchronous version). |
Source code in pyagenity/publisher/base_publisher.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
Attributes¶
Functions¶
__init__
¶
__init__(config)
Initialize the publisher with the given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any]
|
Configuration dictionary for the publisher. |
required |
Source code in pyagenity/publisher/base_publisher.py
17 18 19 20 21 22 23 |
|
close
abstractmethod
async
¶
close()
Close the publisher and release any resources.
This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.
Source code in pyagenity/publisher/base_publisher.py
37 38 39 40 41 42 43 44 |
|
publish
abstractmethod
async
¶
publish(event)
Publish an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result of the publish operation. |
Source code in pyagenity/publisher/base_publisher.py
25 26 27 28 29 30 31 32 33 34 35 |
|
sync_close
abstractmethod
¶
sync_close()
Close the publisher and release any resources (synchronous version).
This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.
Source code in pyagenity/publisher/base_publisher.py
46 47 48 49 50 51 52 53 |
|
ConsolePublisher
¶
Bases: BasePublisher
Publisher that prints events to the console for debugging and testing.
This publisher is useful for development and debugging purposes, as it outputs event information to the standard output.
Attributes:
Name | Type | Description |
---|---|---|
format |
Output format ('json' by default). |
|
include_timestamp |
Whether to include timestamp (True by default). |
|
indent |
Indentation for output (2 by default). |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the ConsolePublisher with the given configuration. |
close |
Close the publisher and release any resources. |
publish |
Publish an event to the console. |
sync_close |
Synchronously close the publisher and release any resources. |
Source code in pyagenity/publisher/console_publisher.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
|
Attributes¶
include_timestamp
instance-attribute
¶
include_timestamp = get('include_timestamp', True) if config else True
Functions¶
__init__
¶
__init__(config=None)
Initialize the ConsolePublisher with the given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - format: Output format (default: 'json'). - include_timestamp: Whether to include timestamp (default: True). - indent: Indentation for output (default: 2). |
None
|
Source code in pyagenity/publisher/console_publisher.py
29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
close
async
¶
close()
Close the publisher and release any resources.
ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.
Source code in pyagenity/publisher/console_publisher.py
57 58 59 60 61 62 63 |
|
publish
async
¶
publish(event)
Publish an event to the console.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
None |
Source code in pyagenity/publisher/console_publisher.py
43 44 45 46 47 48 49 50 51 52 53 54 55 |
|
sync_close
¶
sync_close()
Synchronously close the publisher and release any resources.
ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.
Source code in pyagenity/publisher/console_publisher.py
65 66 67 68 69 70 71 |
|
Modules¶
base_publisher
¶
Classes:
Name | Description |
---|---|
BasePublisher |
Abstract base class for event publishers. |
Classes¶
BasePublisher
¶
Bases: ABC
Abstract base class for event publishers.
This class defines the interface for publishing events. Subclasses should implement the publish, close, and sync_close methods to provide specific publishing logic.
Attributes:
Name | Type | Description |
---|---|---|
config |
Configuration dictionary for the publisher. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the publisher with the given configuration. |
close |
Close the publisher and release any resources. |
publish |
Publish an event. |
sync_close |
Close the publisher and release any resources (synchronous version). |
Source code in pyagenity/publisher/base_publisher.py
7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 |
|
Attributes¶
Functions¶
__init__
¶__init__(config)
Initialize the publisher with the given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any]
|
Configuration dictionary for the publisher. |
required |
Source code in pyagenity/publisher/base_publisher.py
17 18 19 20 21 22 23 |
|
close
abstractmethod
async
¶close()
Close the publisher and release any resources.
This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.
Source code in pyagenity/publisher/base_publisher.py
37 38 39 40 41 42 43 44 |
|
publish
abstractmethod
async
¶publish(event)
Publish an event.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
¶ |
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result of the publish operation. |
Source code in pyagenity/publisher/base_publisher.py
25 26 27 28 29 30 31 32 33 34 35 |
|
sync_close
abstractmethod
¶sync_close()
Close the publisher and release any resources (synchronous version).
This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.
Source code in pyagenity/publisher/base_publisher.py
46 47 48 49 50 51 52 53 |
|
console_publisher
¶
Console publisher implementation for debugging and testing.
This module provides a publisher that outputs events to the console for development and debugging purposes.
Classes:
Name | Description |
---|---|
ConsolePublisher |
Publisher that prints events to the console for debugging and testing. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
ConsolePublisher
¶
Bases: BasePublisher
Publisher that prints events to the console for debugging and testing.
This publisher is useful for development and debugging purposes, as it outputs event information to the standard output.
Attributes:
Name | Type | Description |
---|---|---|
format |
Output format ('json' by default). |
|
include_timestamp |
Whether to include timestamp (True by default). |
|
indent |
Indentation for output (2 by default). |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the ConsolePublisher with the given configuration. |
close |
Close the publisher and release any resources. |
publish |
Publish an event to the console. |
sync_close |
Synchronously close the publisher and release any resources. |
Source code in pyagenity/publisher/console_publisher.py
17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 |
|
Attributes¶
include_timestamp
instance-attribute
¶include_timestamp = get('include_timestamp', True) if config else True
Functions¶
__init__
¶__init__(config=None)
Initialize the ConsolePublisher with the given configuration.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - format: Output format (default: 'json'). - include_timestamp: Whether to include timestamp (default: True). - indent: Indentation for output (default: 2). |
None
|
Source code in pyagenity/publisher/console_publisher.py
29 30 31 32 33 34 35 36 37 38 39 40 41 |
|
close
async
¶close()
Close the publisher and release any resources.
ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.
Source code in pyagenity/publisher/console_publisher.py
57 58 59 60 61 62 63 |
|
publish
async
¶publish(event)
Publish an event to the console.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
¶ |
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
None |
Source code in pyagenity/publisher/console_publisher.py
43 44 45 46 47 48 49 50 51 52 53 54 55 |
|
sync_close
¶sync_close()
Synchronously close the publisher and release any resources.
ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.
Source code in pyagenity/publisher/console_publisher.py
65 66 67 68 69 70 71 |
|
events
¶
Event and streaming primitives for agent graph execution.
This module defines event types, content types, and the EventModel for structured streaming of execution updates, tool calls, state changes, messages, and errors in agent graphs.
Classes:
Name | Description |
---|---|
Event |
Enum for event sources (graph, node, tool, streaming). |
EventType |
Enum for event phases (start, progress, result, end, etc.). |
ContentType |
Enum for semantic content types (text, message, tool_call, etc.). |
EventModel |
Structured event chunk for streaming agent graph execution. |
Attributes¶
Classes¶
ContentType
¶
Bases: str
, Enum
Enum for semantic content types in agent graph streaming.
Values
TEXT: Textual content. MESSAGE: Message content. REASONING: Reasoning content. TOOL_CALL: Tool call content. TOOL_RESULT: Tool result content. IMAGE: Image content. AUDIO: Audio content. VIDEO: Video content. DOCUMENT: Document content. DATA: Data content. STATE: State content. UPDATE: Update content. ERROR: Error content.
Attributes:
Name | Type | Description |
---|---|---|
AUDIO |
|
|
DATA |
|
|
DOCUMENT |
|
|
ERROR |
|
|
IMAGE |
|
|
MESSAGE |
|
|
REASONING |
|
|
STATE |
|
|
TEXT |
|
|
TOOL_CALL |
|
|
TOOL_RESULT |
|
|
UPDATE |
|
|
VIDEO |
|
Source code in pyagenity/publisher/events.py
62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 |
|
Attributes¶
Event
¶
Bases: str
, Enum
Enum for event sources in agent graph execution.
Values
GRAPH_EXECUTION: Event from graph execution. NODE_EXECUTION: Event from node execution. TOOL_EXECUTION: Event from tool execution. STREAMING: Event from streaming updates.
Attributes:
Name | Type | Description |
---|---|---|
GRAPH_EXECUTION |
|
|
NODE_EXECUTION |
|
|
STREAMING |
|
|
TOOL_EXECUTION |
|
Source code in pyagenity/publisher/events.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 |
|
Attributes¶
EventModel
¶
Bases: BaseModel
Structured event chunk for streaming agent graph execution.
Represents a chunk of streamed data with event and content semantics, supporting both delta (incremental) and full content. Used for real-time streaming of execution updates, tool calls, state changes, messages, and errors.
Attributes:
Name | Type | Description |
---|---|---|
event |
Event
|
Type of the event source. |
event_type |
EventType
|
Phase of the event (start, progress, end, update). |
content |
str
|
Streamed textual content. |
content_blocks |
list[ContentBlock] | None
|
Structured content blocks for multimodal streaming. |
delta |
bool
|
True if this is a delta update (incremental). |
delta_type |
Literal['text', 'json', 'binary'] | None
|
Type of delta when delta=True. |
block_index |
int | None
|
Index of the content block this chunk applies to. |
chunk_index |
int | None
|
Per-block chunk index for ordering. |
byte_offset |
int | None
|
Byte offset for binary/media streaming. |
data |
dict[str, Any]
|
Additional structured data. |
content_type |
list[ContentType] | None
|
Semantic type of content. |
sequence_id |
int
|
Monotonic sequence ID for stream ordering. |
node_name |
str
|
Name of the node producing this chunk. |
run_id |
str
|
Unique ID for this stream/run. |
thread_id |
str | int
|
Thread ID for this execution. |
timestamp |
float
|
UNIX timestamp of when chunk was created. |
is_error |
bool
|
Marks this chunk as representing an error state. |
metadata |
dict[str, Any]
|
Optional metadata for consumers. |
Classes:
Name | Description |
---|---|
Config |
Pydantic configuration for EventModel. |
Methods:
Name | Description |
---|---|
default |
Create a default EventModel instance with minimal required fields. |
stream |
Create a default EventModel instance for streaming updates. |
Source code in pyagenity/publisher/events.py
96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
|
Attributes¶
block_index
class-attribute
instance-attribute
¶block_index = Field(default=None, description='Index of the content block this chunk applies to')
byte_offset
class-attribute
instance-attribute
¶byte_offset = Field(default=None, description='Byte offset for binary/media streaming')
chunk_index
class-attribute
instance-attribute
¶chunk_index = Field(default=None, description='Per-block chunk index for ordering')
content
class-attribute
instance-attribute
¶content = Field(default='', description='Streamed textual content')
content_blocks
class-attribute
instance-attribute
¶content_blocks = Field(default=None, description='Structured content blocks carried by this event')
content_type
class-attribute
instance-attribute
¶content_type = Field(default=None, description='Semantic type of content')
data
class-attribute
instance-attribute
¶data = Field(default_factory=dict, description='Additional structured data')
delta
class-attribute
instance-attribute
¶delta = Field(default=False, description='True if this is a delta update (incremental)')
delta_type
class-attribute
instance-attribute
¶delta_type = Field(default=None, description='Type of delta when delta=True')
event
class-attribute
instance-attribute
¶event = Field(..., description='Type of the event source')
event_type
class-attribute
instance-attribute
¶event_type = Field(..., description='Phase of the event (start, progress, end, update)')
is_error
class-attribute
instance-attribute
¶is_error = Field(default=False, description='Marks this chunk as representing an error state')
metadata
class-attribute
instance-attribute
¶metadata = Field(default_factory=dict, description='Optional metadata for consumers')
node_name
class-attribute
instance-attribute
¶node_name = Field(default='', description='Name of the node producing this chunk')
run_id
class-attribute
instance-attribute
¶run_id = Field(default_factory=lambda: str(uuid4()), description='Unique ID for this stream/run')
sequence_id
class-attribute
instance-attribute
¶sequence_id = Field(default=0, description='Monotonic sequence ID for stream ordering')
thread_id
class-attribute
instance-attribute
¶thread_id = Field(default='', description='Thread ID for this execution')
timestamp
class-attribute
instance-attribute
¶timestamp = Field(default_factory=time, description='UNIX timestamp of when chunk was created')
Classes¶
Config
¶Pydantic configuration for EventModel.
Attributes:
Name | Type | Description |
---|---|---|
use_enum_values |
Output enums as strings. |
Source code in pyagenity/publisher/events.py
173 174 175 176 177 178 179 180 |
|
Functions¶
default
classmethod
¶default(base_config, data, content_type, event=Event.GRAPH_EXECUTION, event_type=EventType.START, node_name='', extra=None)
Create a default EventModel instance with minimal required fields.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_config
¶ |
dict
|
Base configuration for the event (thread/run/timestamp/user). |
required |
data
¶ |
dict[str, Any]
|
Structured data payload. |
required |
content_type
¶ |
list[ContentType]
|
Semantic type(s) of content. |
required |
event
¶ |
Event
|
Event source type (default: GRAPH_EXECUTION). |
GRAPH_EXECUTION
|
event_type
¶ |
Event phase (default: START). |
START
|
|
node_name
¶ |
str
|
Name of the node producing the event. |
''
|
extra
¶ |
dict[str, Any] | None
|
Additional metadata. |
None
|
Returns:
Name | Type | Description |
---|---|---|
EventModel |
EventModel
|
The created event model instance. |
Source code in pyagenity/publisher/events.py
182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 |
|
stream
classmethod
¶stream(base_config, node_name='', extra=None)
Create a default EventModel instance for streaming updates.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
base_config
¶ |
dict
|
Base configuration for the event (thread/run/timestamp/user). |
required |
node_name
¶ |
str
|
Name of the node producing the event. |
''
|
extra
¶ |
dict[str, Any] | None
|
Additional metadata. |
None
|
Returns:
Name | Type | Description |
---|---|---|
EventModel |
EventModel
|
The created event model instance for streaming. |
Source code in pyagenity/publisher/events.py
229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 |
|
EventType
¶
Bases: str
, Enum
Enum for event phases in agent graph execution.
Values
START: Event marks start of execution. PROGRESS: Event marks progress update. RESULT: Event marks result produced. END: Event marks end of execution. UPDATE: Event marks update. ERROR: Event marks error. INTERRUPTED: Event marks interruption.
Attributes:
Name | Type | Description |
---|---|---|
END |
|
|
ERROR |
|
|
INTERRUPTED |
|
|
PROGRESS |
|
|
RESULT |
|
|
START |
|
|
UPDATE |
|
Source code in pyagenity/publisher/events.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 |
|
Attributes¶
kafka_publisher
¶
Kafka publisher implementation (optional dependency).
Uses aiokafka to publish events to a Kafka topic.
Dependency: aiokafka
Not installed by default; install extra: pip install pyagenity[kafka]
.
Classes:
Name | Description |
---|---|
KafkaPublisher |
Publish events to a Kafka topic using aiokafka. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
KafkaPublisher
¶
Bases: BasePublisher
Publish events to a Kafka topic using aiokafka.
This class provides an asynchronous interface for publishing events to a Kafka topic. It uses the aiokafka library to handle the producer operations. The publisher is lazily initialized and can be reused for multiple publishes.
Attributes:
Name | Type | Description |
---|---|---|
bootstrap_servers |
str
|
Kafka bootstrap servers. |
topic |
str
|
Kafka topic to publish to. |
client_id |
str | None
|
Client ID for the producer. |
_producer |
Lazy-loaded Kafka producer instance. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the KafkaPublisher. |
close |
Close the Kafka producer. |
publish |
Publish an event to the Kafka topic. |
sync_close |
Synchronously close the Kafka producer. |
Source code in pyagenity/publisher/kafka_publisher.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
Attributes¶
bootstrap_servers
instance-attribute
¶bootstrap_servers = get('bootstrap_servers', 'localhost:9092')
Functions¶
__init__
¶__init__(config=None)
Initialize the KafkaPublisher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092"). - topic: Kafka topic to publish to (default: "pyagenity.events"). - client_id: Client ID for the producer. |
None
|
Source code in pyagenity/publisher/kafka_publisher.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
|
close
async
¶close()
Close the Kafka producer.
Stops the producer and cleans up resources. Errors during stopping are logged but do not raise exceptions.
Source code in pyagenity/publisher/kafka_publisher.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
|
publish
async
¶publish(event)
Publish an event to the Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
¶ |
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result of the send_and_wait operation. |
Source code in pyagenity/publisher/kafka_publisher.py
84 85 86 87 88 89 90 91 92 93 94 95 |
|
sync_close
¶sync_close()
Synchronously close the Kafka producer.
This method runs the async close in a new event loop. If called within an active event loop, it logs a warning and skips the operation.
Source code in pyagenity/publisher/kafka_publisher.py
113 114 115 116 117 118 119 120 121 122 |
|
publish
¶
Functions:
Name | Description |
---|---|
publish_event |
Publish an event asynchronously using the background task manager. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
Functions¶
publish_event
¶
publish_event(event, publisher=Inject[BasePublisher], task_manager=Inject[BackgroundTaskManager])
Publish an event asynchronously using the background task manager.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
EventModel
|
The event to publish. |
required |
|
BasePublisher | None
|
The publisher instance (injected). |
Inject[BasePublisher]
|
|
BackgroundTaskManager
|
The background task manager (injected). |
Inject[BackgroundTaskManager]
|
Source code in pyagenity/publisher/publish.py
31 32 33 34 35 36 37 38 39 40 41 42 43 44 |
|
rabbitmq_publisher
¶
RabbitMQ publisher implementation (optional dependency).
Uses aio-pika to publish events to an exchange with a routing key.
Dependency: aio-pika
Not installed by default; install extra: pip install pyagenity[rabbitmq]
.
Classes:
Name | Description |
---|---|
RabbitMQPublisher |
Publish events to RabbitMQ using aio-pika. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
RabbitMQPublisher
¶
Bases: BasePublisher
Publish events to RabbitMQ using aio-pika.
Attributes:
Name | Type | Description |
---|---|---|
url |
str
|
RabbitMQ connection URL. |
exchange |
str
|
Exchange name. |
routing_key |
str
|
Routing key for messages. |
exchange_type |
str
|
Type of exchange. |
declare |
bool
|
Whether to declare the exchange. |
durable |
bool
|
Whether the exchange is durable. |
_conn |
Connection instance. |
|
_channel |
Channel instance. |
|
_exchange |
Exchange instance. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the RabbitMQPublisher. |
close |
Close the RabbitMQ connection and channel. |
publish |
Publish an event to RabbitMQ. |
sync_close |
Synchronously close the RabbitMQ connection. |
Source code in pyagenity/publisher/rabbitmq_publisher.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 |
|
Attributes¶
Functions¶
__init__
¶__init__(config=None)
Initialize the RabbitMQPublisher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/"). - exchange: Exchange name (default: "pyagenity.events"). - routing_key: Routing key (default: "pyagenity.events"). - exchange_type: Exchange type (default: "topic"). - declare: Whether to declare exchange (default: True). - durable: Whether exchange is durable (default: True). |
None
|
Source code in pyagenity/publisher/rabbitmq_publisher.py
39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
|
close
async
¶close()
Close the RabbitMQ connection and channel.
Source code in pyagenity/publisher/rabbitmq_publisher.py
112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 |
|
publish
async
¶publish(event)
Publish an event to RabbitMQ.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
¶ |
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
True on success. |
Source code in pyagenity/publisher/rabbitmq_publisher.py
93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 |
|
sync_close
¶sync_close()
Synchronously close the RabbitMQ connection.
Source code in pyagenity/publisher/rabbitmq_publisher.py
131 132 133 134 135 136 |
|
redis_publisher
¶
Redis publisher implementation (optional dependency).
This publisher uses the redis-py asyncio client to publish events via: - Pub/Sub channels (default), or - Redis Streams (XADD) when configured with mode="stream".
Dependency: redis>=4.2 (provides redis.asyncio).
Not installed by default; install extra: pip install pyagenity[redis]
.
Classes:
Name | Description |
---|---|
RedisPublisher |
Publish events to Redis via Pub/Sub channel or Stream. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
RedisPublisher
¶
Bases: BasePublisher
Publish events to Redis via Pub/Sub channel or Stream.
Attributes:
Name | Type | Description |
---|---|---|
url |
str
|
Redis URL. |
mode |
str
|
Publishing mode ('pubsub' or 'stream'). |
channel |
str
|
Pub/Sub channel name. |
stream |
str
|
Stream name. |
maxlen |
int | None
|
Max length for streams. |
encoding |
str
|
Encoding for messages. |
_redis |
Redis client instance. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the RedisPublisher. |
close |
Close the Redis client. |
publish |
Publish an event to Redis. |
sync_close |
Synchronously close the Redis client. |
Source code in pyagenity/publisher/redis_publisher.py
27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 |
|
Attributes¶
Functions¶
__init__
¶__init__(config=None)
Initialize the RedisPublisher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
config
¶ |
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - url: Redis URL (default: "redis://localhost:6379/0"). - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub'). - channel: Pub/Sub channel name (default: "pyagenity.events"). - stream: Stream name (default: "pyagenity.events"). - maxlen: Max length for streams. - encoding: Encoding (default: "utf-8"). |
None
|
Source code in pyagenity/publisher/redis_publisher.py
40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 |
|
close
async
¶close()
Close the Redis client.
Source code in pyagenity/publisher/redis_publisher.py
114 115 116 117 118 119 120 121 122 123 |
|
publish
async
¶publish(event)
Publish an event to Redis.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
event
¶ |
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result of the publish operation. |
Source code in pyagenity/publisher/redis_publisher.py
92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 |
|
sync_close
¶sync_close()
Synchronously close the Redis client.
Source code in pyagenity/publisher/redis_publisher.py
125 126 127 128 129 130 131 |
|