Skip to content

Events

Event and streaming primitives for agent graph execution.

This module defines event types, content types, and the EventModel for structured streaming of execution updates, tool calls, state changes, messages, and errors in agent graphs.

Classes:

Name Description
Event

Enum for event sources (graph, node, tool, streaming).

EventType

Enum for event phases (start, progress, result, end, etc.).

ContentType

Enum for semantic content types (text, message, tool_call, etc.).

EventModel

Structured event chunk for streaming agent graph execution.

Attributes

Classes

ContentType

Bases: str, Enum

Enum for semantic content types in agent graph streaming.

Values

TEXT: Textual content. MESSAGE: Message content. REASONING: Reasoning content. TOOL_CALL: Tool call content. TOOL_RESULT: Tool result content. IMAGE: Image content. AUDIO: Audio content. VIDEO: Video content. DOCUMENT: Document content. DATA: Data content. STATE: State content. UPDATE: Update content. ERROR: Error content.

Attributes:

Name Type Description
AUDIO
DATA
DOCUMENT
ERROR
IMAGE
MESSAGE
REASONING
STATE
TEXT
TOOL_CALL
TOOL_RESULT
UPDATE
VIDEO
Source code in pyagenity/publisher/events.py
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
class ContentType(str, enum.Enum):
    """Enum for semantic content types in agent graph streaming.

    Values:
        TEXT: Textual content.
        MESSAGE: Message content.
        REASONING: Reasoning content.
        TOOL_CALL: Tool call content.
        TOOL_RESULT: Tool result content.
        IMAGE: Image content.
        AUDIO: Audio content.
        VIDEO: Video content.
        DOCUMENT: Document content.
        DATA: Data content.
        STATE: State content.
        UPDATE: Update content.
        ERROR: Error content.
    """

    TEXT = "text"
    MESSAGE = "message"
    REASONING = "reasoning"
    TOOL_CALL = "tool_call"
    TOOL_RESULT = "tool_result"
    IMAGE = "image"
    AUDIO = "audio"
    VIDEO = "video"
    DOCUMENT = "document"
    DATA = "data"
    STATE = "state"
    UPDATE = "update"
    ERROR = "error"

Attributes

AUDIO class-attribute instance-attribute
AUDIO = 'audio'
DATA class-attribute instance-attribute
DATA = 'data'
DOCUMENT class-attribute instance-attribute
DOCUMENT = 'document'
ERROR class-attribute instance-attribute
ERROR = 'error'
IMAGE class-attribute instance-attribute
IMAGE = 'image'
MESSAGE class-attribute instance-attribute
MESSAGE = 'message'
REASONING class-attribute instance-attribute
REASONING = 'reasoning'
STATE class-attribute instance-attribute
STATE = 'state'
TEXT class-attribute instance-attribute
TEXT = 'text'
TOOL_CALL class-attribute instance-attribute
TOOL_CALL = 'tool_call'
TOOL_RESULT class-attribute instance-attribute
TOOL_RESULT = 'tool_result'
UPDATE class-attribute instance-attribute
UPDATE = 'update'
VIDEO class-attribute instance-attribute
VIDEO = 'video'

Event

Bases: str, Enum

Enum for event sources in agent graph execution.

Values

GRAPH_EXECUTION: Event from graph execution. NODE_EXECUTION: Event from node execution. TOOL_EXECUTION: Event from tool execution. STREAMING: Event from streaming updates.

Attributes:

Name Type Description
GRAPH_EXECUTION
NODE_EXECUTION
STREAMING
TOOL_EXECUTION
Source code in pyagenity/publisher/events.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
class Event(str, enum.Enum):
    """Enum for event sources in agent graph execution.

    Values:
        GRAPH_EXECUTION: Event from graph execution.
        NODE_EXECUTION: Event from node execution.
        TOOL_EXECUTION: Event from tool execution.
        STREAMING: Event from streaming updates.
    """

    GRAPH_EXECUTION = "graph_execution"
    NODE_EXECUTION = "node_execution"
    TOOL_EXECUTION = "tool_execution"
    STREAMING = "streaming"

Attributes

GRAPH_EXECUTION class-attribute instance-attribute
GRAPH_EXECUTION = 'graph_execution'
NODE_EXECUTION class-attribute instance-attribute
NODE_EXECUTION = 'node_execution'
STREAMING class-attribute instance-attribute
STREAMING = 'streaming'
TOOL_EXECUTION class-attribute instance-attribute
TOOL_EXECUTION = 'tool_execution'

EventModel

Bases: BaseModel

Structured event chunk for streaming agent graph execution.

Represents a chunk of streamed data with event and content semantics, supporting both delta (incremental) and full content. Used for real-time streaming of execution updates, tool calls, state changes, messages, and errors.

Attributes:

Name Type Description
event Event

Type of the event source.

event_type EventType

Phase of the event (start, progress, end, update).

content str

Streamed textual content.

content_blocks list[ContentBlock] | None

Structured content blocks for multimodal streaming.

delta bool

True if this is a delta update (incremental).

delta_type Literal['text', 'json', 'binary'] | None

Type of delta when delta=True.

block_index int | None

Index of the content block this chunk applies to.

chunk_index int | None

Per-block chunk index for ordering.

byte_offset int | None

Byte offset for binary/media streaming.

data dict[str, Any]

Additional structured data.

content_type list[ContentType] | None

Semantic type of content.

sequence_id int

Monotonic sequence ID for stream ordering.

node_name str

Name of the node producing this chunk.

run_id str

Unique ID for this stream/run.

thread_id str | int

Thread ID for this execution.

timestamp float

UNIX timestamp of when chunk was created.

is_error bool

Marks this chunk as representing an error state.

metadata dict[str, Any]

Optional metadata for consumers.

Classes:

Name Description
Config

Pydantic configuration for EventModel.

Methods:

Name Description
default

Create a default EventModel instance with minimal required fields.

stream

Create a default EventModel instance for streaming updates.

Source code in pyagenity/publisher/events.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
class EventModel(BaseModel):
    """
    Structured event chunk for streaming agent graph execution.

    Represents a chunk of streamed data with event and content semantics, supporting both delta
    (incremental) and full content. Used for real-time streaming of execution updates, tool calls,
    state changes, messages, and errors.

    Attributes:
        event: Type of the event source.
        event_type: Phase of the event (start, progress, end, update).
        content: Streamed textual content.
        content_blocks: Structured content blocks for multimodal streaming.
        delta: True if this is a delta update (incremental).
        delta_type: Type of delta when delta=True.
        block_index: Index of the content block this chunk applies to.
        chunk_index: Per-block chunk index for ordering.
        byte_offset: Byte offset for binary/media streaming.
        data: Additional structured data.
        content_type: Semantic type of content.
        sequence_id: Monotonic sequence ID for stream ordering.
        node_name: Name of the node producing this chunk.
        run_id: Unique ID for this stream/run.
        thread_id: Thread ID for this execution.
        timestamp: UNIX timestamp of when chunk was created.
        is_error: Marks this chunk as representing an error state.
        metadata: Optional metadata for consumers.
    """

    # Event metadata
    event: Event = Field(..., description="Type of the event source")
    event_type: EventType = Field(
        ..., description="Phase of the event (start, progress, end, update)"
    )

    # Streamed content
    content: str = Field(default="", description="Streamed textual content")
    # Structured content blocks for multimodal/structured streaming
    content_blocks: list[ContentBlock] | None = Field(
        default=None, description="Structured content blocks carried by this event"
    )
    # Delta controls
    delta: bool = Field(default=False, description="True if this is a delta update (incremental)")
    delta_type: Literal["text", "json", "binary"] | None = Field(
        default=None, description="Type of delta when delta=True"
    )
    block_index: int | None = Field(
        default=None, description="Index of the content block this chunk applies to"
    )
    chunk_index: int | None = Field(default=None, description="Per-block chunk index for ordering")
    byte_offset: int | None = Field(
        default=None, description="Byte offset for binary/media streaming"
    )

    # Data payload
    data: dict[str, Any] = Field(default_factory=dict, description="Additional structured data")

    # Metadata
    content_type: list[ContentType] | None = Field(
        default=None, description="Semantic type of content"
    )
    sequence_id: int = Field(default=0, description="Monotonic sequence ID for stream ordering")
    node_name: str = Field(default="", description="Name of the node producing this chunk")
    run_id: str = Field(
        default_factory=lambda: str(uuid.uuid4()), description="Unique ID for this stream/run"
    )
    thread_id: str | int = Field(default="", description="Thread ID for this execution")
    timestamp: float = Field(
        default_factory=time.time, description="UNIX timestamp of when chunk was created"
    )
    is_error: bool = Field(
        default=False, description="Marks this chunk as representing an error state"
    )
    metadata: dict[str, Any] = Field(
        default_factory=dict, description="Optional metadata for consumers"
    )

    class Config:
        """Pydantic configuration for EventModel.

        Attributes:
            use_enum_values: Output enums as strings.
        """

        use_enum_values = True  # Output enums as strings

    @classmethod
    def default(
        cls,
        base_config: dict,
        data: dict[str, Any],
        content_type: list[ContentType],
        event: Event = Event.GRAPH_EXECUTION,
        event_type=EventType.START,
        node_name: str = "",
        extra: dict[str, Any] | None = None,
    ) -> "EventModel":
        """Create a default EventModel instance with minimal required fields.

        Args:
            base_config: Base configuration for the event (thread/run/timestamp/user).
            data: Structured data payload.
            content_type: Semantic type(s) of content.
            event: Event source type (default: GRAPH_EXECUTION).
            event_type: Event phase (default: START).
            node_name: Name of the node producing the event.
            extra: Additional metadata.

        Returns:
            EventModel: The created event model instance.
        """
        thread_id = base_config.get("thread_id", "")
        run_id = base_config.get("run_id", "")

        metadata = {
            "run_timestamp": base_config.get("timestamp", ""),
            "user_id": base_config.get("user_id"),
            "is_stream": base_config.get("is_stream", False),
        }
        if extra:
            metadata.update(extra)
        return cls(
            event=event,
            event_type=event_type,
            delta=False,
            content_type=content_type,
            data=data,
            thread_id=thread_id,
            node_name=node_name,
            run_id=run_id,
            metadata=metadata,
        )

    @classmethod
    def stream(
        cls,
        base_config: dict,
        node_name: str = "",
        extra: dict[str, Any] | None = None,
    ) -> "EventModel":
        """Create a default EventModel instance for streaming updates.

        Args:
            base_config: Base configuration for the event (thread/run/timestamp/user).
            node_name: Name of the node producing the event.
            extra: Additional metadata.

        Returns:
            EventModel: The created event model instance for streaming.
        """
        thread_id = base_config.get("thread_id", "")
        run_id = base_config.get("run_id", "")

        metadata = {
            "run_timestamp": base_config.get("timestamp", ""),
            "user_id": base_config.get("user_id"),
            "is_stream": base_config.get("is_stream", False),
        }
        if extra:
            metadata.update(extra)
        return cls(
            event=Event.STREAMING,
            event_type=EventType.UPDATE,
            delta=True,
            content_type=[ContentType.TEXT, ContentType.REASONING],
            data={},
            thread_id=thread_id,
            node_name=node_name,
            run_id=run_id,
            metadata=metadata,
        )

Attributes

block_index class-attribute instance-attribute
block_index = Field(default=None, description='Index of the content block this chunk applies to')
byte_offset class-attribute instance-attribute
byte_offset = Field(default=None, description='Byte offset for binary/media streaming')
chunk_index class-attribute instance-attribute
chunk_index = Field(default=None, description='Per-block chunk index for ordering')
content class-attribute instance-attribute
content = Field(default='', description='Streamed textual content')
content_blocks class-attribute instance-attribute
content_blocks = Field(default=None, description='Structured content blocks carried by this event')
content_type class-attribute instance-attribute
content_type = Field(default=None, description='Semantic type of content')
data class-attribute instance-attribute
data = Field(default_factory=dict, description='Additional structured data')
delta class-attribute instance-attribute
delta = Field(default=False, description='True if this is a delta update (incremental)')
delta_type class-attribute instance-attribute
delta_type = Field(default=None, description='Type of delta when delta=True')
event class-attribute instance-attribute
event = Field(..., description='Type of the event source')
event_type class-attribute instance-attribute
event_type = Field(..., description='Phase of the event (start, progress, end, update)')
is_error class-attribute instance-attribute
is_error = Field(default=False, description='Marks this chunk as representing an error state')
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict, description='Optional metadata for consumers')
node_name class-attribute instance-attribute
node_name = Field(default='', description='Name of the node producing this chunk')
run_id class-attribute instance-attribute
run_id = Field(default_factory=lambda: str(uuid4()), description='Unique ID for this stream/run')
sequence_id class-attribute instance-attribute
sequence_id = Field(default=0, description='Monotonic sequence ID for stream ordering')
thread_id class-attribute instance-attribute
thread_id = Field(default='', description='Thread ID for this execution')
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=time, description='UNIX timestamp of when chunk was created')

Classes

Config

Pydantic configuration for EventModel.

Attributes:

Name Type Description
use_enum_values

Output enums as strings.

Source code in pyagenity/publisher/events.py
173
174
175
176
177
178
179
180
class Config:
    """Pydantic configuration for EventModel.

    Attributes:
        use_enum_values: Output enums as strings.
    """

    use_enum_values = True  # Output enums as strings
Attributes
use_enum_values class-attribute instance-attribute
use_enum_values = True

Functions

default classmethod
default(base_config, data, content_type, event=Event.GRAPH_EXECUTION, event_type=EventType.START, node_name='', extra=None)

Create a default EventModel instance with minimal required fields.

Parameters:

Name Type Description Default
base_config
dict

Base configuration for the event (thread/run/timestamp/user).

required
data
dict[str, Any]

Structured data payload.

required
content_type
list[ContentType]

Semantic type(s) of content.

required
event
Event

Event source type (default: GRAPH_EXECUTION).

GRAPH_EXECUTION
event_type

Event phase (default: START).

START
node_name
str

Name of the node producing the event.

''
extra
dict[str, Any] | None

Additional metadata.

None

Returns:

Name Type Description
EventModel EventModel

The created event model instance.

Source code in pyagenity/publisher/events.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
@classmethod
def default(
    cls,
    base_config: dict,
    data: dict[str, Any],
    content_type: list[ContentType],
    event: Event = Event.GRAPH_EXECUTION,
    event_type=EventType.START,
    node_name: str = "",
    extra: dict[str, Any] | None = None,
) -> "EventModel":
    """Create a default EventModel instance with minimal required fields.

    Args:
        base_config: Base configuration for the event (thread/run/timestamp/user).
        data: Structured data payload.
        content_type: Semantic type(s) of content.
        event: Event source type (default: GRAPH_EXECUTION).
        event_type: Event phase (default: START).
        node_name: Name of the node producing the event.
        extra: Additional metadata.

    Returns:
        EventModel: The created event model instance.
    """
    thread_id = base_config.get("thread_id", "")
    run_id = base_config.get("run_id", "")

    metadata = {
        "run_timestamp": base_config.get("timestamp", ""),
        "user_id": base_config.get("user_id"),
        "is_stream": base_config.get("is_stream", False),
    }
    if extra:
        metadata.update(extra)
    return cls(
        event=event,
        event_type=event_type,
        delta=False,
        content_type=content_type,
        data=data,
        thread_id=thread_id,
        node_name=node_name,
        run_id=run_id,
        metadata=metadata,
    )
stream classmethod
stream(base_config, node_name='', extra=None)

Create a default EventModel instance for streaming updates.

Parameters:

Name Type Description Default
base_config
dict

Base configuration for the event (thread/run/timestamp/user).

required
node_name
str

Name of the node producing the event.

''
extra
dict[str, Any] | None

Additional metadata.

None

Returns:

Name Type Description
EventModel EventModel

The created event model instance for streaming.

Source code in pyagenity/publisher/events.py
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
@classmethod
def stream(
    cls,
    base_config: dict,
    node_name: str = "",
    extra: dict[str, Any] | None = None,
) -> "EventModel":
    """Create a default EventModel instance for streaming updates.

    Args:
        base_config: Base configuration for the event (thread/run/timestamp/user).
        node_name: Name of the node producing the event.
        extra: Additional metadata.

    Returns:
        EventModel: The created event model instance for streaming.
    """
    thread_id = base_config.get("thread_id", "")
    run_id = base_config.get("run_id", "")

    metadata = {
        "run_timestamp": base_config.get("timestamp", ""),
        "user_id": base_config.get("user_id"),
        "is_stream": base_config.get("is_stream", False),
    }
    if extra:
        metadata.update(extra)
    return cls(
        event=Event.STREAMING,
        event_type=EventType.UPDATE,
        delta=True,
        content_type=[ContentType.TEXT, ContentType.REASONING],
        data={},
        thread_id=thread_id,
        node_name=node_name,
        run_id=run_id,
        metadata=metadata,
    )

EventType

Bases: str, Enum

Enum for event phases in agent graph execution.

Values

START: Event marks start of execution. PROGRESS: Event marks progress update. RESULT: Event marks result produced. END: Event marks end of execution. UPDATE: Event marks update. ERROR: Event marks error. INTERRUPTED: Event marks interruption.

Attributes:

Name Type Description
END
ERROR
INTERRUPTED
PROGRESS
RESULT
START
UPDATE
Source code in pyagenity/publisher/events.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
class EventType(str, enum.Enum):
    """Enum for event phases in agent graph execution.

    Values:
        START: Event marks start of execution.
        PROGRESS: Event marks progress update.
        RESULT: Event marks result produced.
        END: Event marks end of execution.
        UPDATE: Event marks update.
        ERROR: Event marks error.
        INTERRUPTED: Event marks interruption.
    """

    START = "start"
    PROGRESS = "progress"
    RESULT = "result"
    END = "end"
    UPDATE = "update"
    ERROR = "error"
    INTERRUPTED = "interrupted"

Attributes

END class-attribute instance-attribute
END = 'end'
ERROR class-attribute instance-attribute
ERROR = 'error'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PROGRESS class-attribute instance-attribute
PROGRESS = 'progress'
RESULT class-attribute instance-attribute
RESULT = 'result'
START class-attribute instance-attribute
START = 'start'
UPDATE class-attribute instance-attribute
UPDATE = 'update'