Skip to main content

Streaming

AgentFlow gives you three execution modes — invoke (sync, wait for finish), stream (sync generator), and astream (async generator). Streaming is essential for chat UIs where users expect to see words appear as the model produces them.


How it works

Use invoke when you need the full result before proceeding. Use stream / astream when the client should see partial responses immediately.


StreamChunk

Every streaming event is a StreamChunk Pydantic model:

from agentflow.core.state.stream_chunks import StreamChunk, StreamEvent

class StreamChunk(BaseModel):
event: StreamEvent # "message" | "state" | "error" | "updates"
message: Message | None # populated for StreamEvent.MESSAGE
state: AgentState | None # populated for StreamEvent.STATE
data: dict | None # populated for StreamEvent.ERROR / UPDATES
thread_id: str | None
run_id: str | None
metadata: dict | None
timestamp: float # UNIX timestamp

StreamEvent values

StreamEventValueWhen sentPopulated field
StreamEvent.MESSAGE"message"Each model output messagechunk.message
StreamEvent.STATE"state"After each node completeschunk.state
StreamEvent.ERROR"error"Execution errorchunk.data
StreamEvent.UPDATES"updates"Custom node-level updateschunk.data

ResponseGranularity

Both stream() and astream() accept a response_granularity parameter to control what is included in each StreamChunk:

from agentflow.utils import ResponseGranularity
ValueDescription
ResponseGranularity.LOWOnly the latest messages (default)
ResponseGranularity.PARTIALContext, context_summary, and latest messages
ResponseGranularity.FULLComplete state and messages

Synchronous streaming

app.stream() is a synchronous generator. Use it from non-async code:

import asyncio
from agentflow.core.state import Message
from agentflow.utils import ResponseGranularity
from agentflow.core.state.stream_chunks import StreamEvent

for chunk in app.stream(
{"messages": [Message.text_message("Tell me a short story.")]},
config={"thread_id": "stream-1", "recursion_limit": 10},
response_granularity=ResponseGranularity.LOW,
):
if chunk.event == StreamEvent.MESSAGE and chunk.message is not None:
print(chunk.message.text(), end="", flush=True)

print() # trailing newline

Asynchronous streaming

app.astream() is an async generator. Use it inside async code (e.g., FastAPI, async tests):

import asyncio
from agentflow.core.state import Message
from agentflow.utils import ResponseGranularity
from agentflow.core.state.stream_chunks import StreamEvent

async def main():
inp = {"messages": [Message.text_message("Call get_weather for Tokyo.")]}
config = {"thread_id": "astream-1", "recursion_limit": 10}

async for chunk in app.astream(inp, config, ResponseGranularity.LOW):
if chunk.event == StreamEvent.MESSAGE and chunk.message is not None:
print(chunk.message.text(), end="", flush=True)
elif chunk.event == StreamEvent.STATE and chunk.state is not None:
print(f"\n[state received, step={chunk.state.execution_meta.step}]")

asyncio.run(main())

Inspecting all chunk types

async for chunk in app.astream(inp, config):
match chunk.event:
case StreamEvent.MESSAGE:
# new message from the model or a tool
print("message:", chunk.message.text())
case StreamEvent.STATE:
# node completed — full or partial state
print("state step:", chunk.state.execution_meta.step)
case StreamEvent.ERROR:
print("error:", chunk.data)
case StreamEvent.UPDATES:
print("updates:", chunk.data)

invoke and ainvoke

For non-streaming use, app.invoke() and app.ainvoke() return a plain dict:

# Sync
result = app.invoke(
{"messages": [Message.text_message("Hello!")]},
config={"thread_id": "t1"},
response_granularity=ResponseGranularity.LOW,
)
messages = result["messages"] # list of Message

# Async
result = await app.ainvoke(
{"messages": [Message.text_message("Hello!")]},
config={"thread_id": "t1"},
)

The returned dict keys depend on response_granularity:

GranularityKeys present
LOWmessages
PARTIALmessages, context, context_summary
FULLmessages, state

Stopping a running stream

Call app.stop() / app.astop() to request cancellation. The graph checks a stop flag after each node and exits cleanly:

# stop from another coroutine / thread
await app.astop({"thread_id": "stream-1"})

Via the REST API:

POST /v1/graph/stop
Content-Type: application/json

{"thread_id": "stream-1"}

Streaming via the REST API

POST /v1/graph/stream returns a server-sent events (SSE) response. Each event is a JSON-encoded StreamChunk:

curl -N -X POST http://127.0.0.1:8000/v1/graph/stream \
-H "Content-Type: application/json" \
-d '{
"messages": [{"role": "user", "content": "Tell me a short story."}],
"config": {"thread_id": "rest-stream-1"}
}'

Response:

data: {"event": "message", "message": {"role": "assistant", "content": [{"type": "text", "text": "Once"}]}, ...}
data: {"event": "message", "message": {"role": "assistant", "content": [{"type": "text", "text": " upon a time"}]}, ...}
data: {"event": "state", "state": {...}, ...}

Streaming in TypeScript

AgentFlowClient.stream returns an async iterator of StreamChunk:

import { AgentFlowClient, Message, StreamEvent } from "@10xscale/agentflow-client";

const client = new AgentFlowClient({ baseUrl: "http://127.0.0.1:8000" });

for await (const chunk of client.stream(
[Message.textMessage("Tell me a short story.")],
{ config: { threadId: "ts-stream-1" } },
)) {
if (chunk.event === StreamEvent.MESSAGE && chunk.message) {
process.stdout.write(chunk.message.text ?? "");
}
}
console.log();