Skip to content

Stream chunks

Stream chunk primitives for unified streaming data handling.

This module provides a unified StreamChunk class that can encapsulate different types of streaming data (Messages, EventModels, etc.) in a type-safe manner. This enables clean separation between conversation content and execution state while providing a consistent interface for streaming consumers.

Classes:

Name Description
StreamChunk

Unified wrapper for streaming data with type discrimination.

Classes

StreamChunk

Bases: BaseModel

Unified wrapper for different types of streaming data.

This class provides a single interface for handling various streaming chunk types (messages, events, state updates, errors) with type-safe discrimination.

Attributes:

Name Type Description
type

The type of streaming chunk.

data dict | None

The actual chunk data (Message, EventModel, dict, etc.).

metadata dict | None

Optional additional metadata for the chunk.

Classes:

Name Description
Config

Pydantic configuration for EventModel.

Source code in agentflow/state/stream_chunks.py
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class StreamChunk(BaseModel):
    """
    Unified wrapper for different types of streaming data.

    This class provides a single interface for handling various streaming chunk types
    (messages, events, state updates, errors) with type-safe discrimination.

    Attributes:
        type: The type of streaming chunk.
        data: The actual chunk data (Message, EventModel, dict, etc.).
        metadata: Optional additional metadata for the chunk.
    """

    event: StreamEvent = StreamEvent.MESSAGE
    # data holders for different chunk types
    message: Message | None = None
    state: AgentState | None = None
    # Placeholder for other chunk types
    data: dict | None = None

    # Optional identifiers
    thread_id: str | None = None
    run_id: str | None = None
    # Optional metadata
    metadata: dict | None = None
    timestamp: float = Field(
        default_factory=datetime.now().timestamp,
        description="UNIX timestamp of when chunk was created",
    )

    class Config:
        """Pydantic configuration for EventModel.

        Attributes:
            use_enum_values: Output enums as strings.
        """

        use_enum_values = True

Attributes

data class-attribute instance-attribute
data = None
event class-attribute instance-attribute
event = MESSAGE
message class-attribute instance-attribute
message = None
metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
state class-attribute instance-attribute
state = None
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=timestamp, description='UNIX timestamp of when chunk was created')

Classes

Config

Pydantic configuration for EventModel.

Attributes:

Name Type Description
use_enum_values

Output enums as strings.

Source code in agentflow/state/stream_chunks.py
59
60
61
62
63
64
65
66
class Config:
    """Pydantic configuration for EventModel.

    Attributes:
        use_enum_values: Output enums as strings.
    """

    use_enum_values = True
Attributes
use_enum_values class-attribute instance-attribute
use_enum_values = True

StreamEvent

Bases: str, Enum

Attributes:

Name Type Description
ERROR
MESSAGE
STATE
UPDATES
Source code in agentflow/state/stream_chunks.py
22
23
24
25
26
class StreamEvent(str, enum.Enum):
    STATE = "state"
    MESSAGE = "message"
    ERROR = "error"
    UPDATES = "updates"

Attributes

ERROR class-attribute instance-attribute
ERROR = 'error'
MESSAGE class-attribute instance-attribute
MESSAGE = 'message'
STATE class-attribute instance-attribute
STATE = 'state'
UPDATES class-attribute instance-attribute
UPDATES = 'updates'