Stream API - Real-time Streaming from AgentFlow¶
This document explains how to use the streamInvoke method for real-time streaming responses from the AgentFlow API.
Overview¶
The streamInvoke method provides real-time streaming of responses from the agent graph using HTTP streaming (NDJSON format). Instead of waiting for the entire response like with invoke, the stream method yields chunks as they arrive from the server, enabling responsive, real-time user interfaces.
Key Differences from invoke¶
| Aspect | invoke |
streamInvoke |
|---|---|---|
| Response Pattern | Wait for entire result | Yield chunks in real-time |
| Data Structure | Single response object | Multiple StreamChunk objects |
| Use Case | Batch processing | Real-time UI updates, chat interfaces |
| Return Type | Promise<InvokeResult> |
AsyncGenerator<StreamChunk> |
| Tool Execution | Automatic loop handling | Manual handling if needed |
| Memory Usage | Higher (loads all data) | Lower (processes incrementally) |
| Callback Support | Yes (onPartialResult) | No (use for-await loop) |
Architecture¶
Flow Diagram¶
Client.stream()
↓
Endpoint.stream() [Streaming starts]
↓
POST /v1/graph/stream (HTTP Streaming)
↓
ReadableStream receives NDJSON chunks
↓
Parse NDJSON line by line
↓
For each complete line:
- Parse JSON to StreamChunk
- Yield chunk to generator
↓
Consumer receives chunks via for-await loop
↓
Process/render based on event type:
- 'message': AI/user message arrived
- 'updates': State/context updated
- 'state': Graph state changed
- 'error': Error occurred
Stream Chunk Structure¶
Each chunk yielded from the stream has this structure:
interface StreamChunk {
event: StreamEventType | string; // Type of event: 'message', 'updates', 'state', 'error'
message?: Message | null; // For 'message' events
state?: AgentState | null; // For 'updates'/'state' events
data?: any; // For other event data
thread_id?: string; // Conversation thread ID
run_id?: string; // Execution run ID
metadata?: Record<string, any>; // Metadata (node, function_name, status, etc.)
timestamp?: number; // UNIX timestamp
}
Stream Event Types¶
enum StreamEventType {
MESSAGE = 'message', // New message from agent or user
UPDATES = 'updates', // State/context updates
STATE = 'state', // State update
ERROR = 'error' // Error occurred
}
Usage¶
Basic Streaming Example¶
import { AgentFlowClient, Message } from 'agentflow-react';
const client = new AgentFlowClient({
baseUrl: 'http://127.0.0.1:8000',
authToken: 'your-token',
debug: false
});
// Create a message
const messages = [Message.text_message('Hello, what can you do?', 'user')];
// Stream the response
const stream = client.stream(messages, {
response_granularity: 'low',
recursion_limit: 25
});
// Iterate over stream chunks
for await (const chunk of stream) {
console.log('Event:', chunk.event);
console.log('Chunk:', chunk);
switch (chunk.event) {
case 'message':
// Handle message (could be assistant response or user message)
if (chunk.message) {
console.log(`[${chunk.message.role}]: ${chunk.message.content}`);
}
break;
case 'updates':
// Handle state updates
if (chunk.state) {
console.log('State updated:', chunk.state);
}
break;
case 'error':
// Handle errors
console.error('Error:', chunk.data);
break;
}
}
console.log('Stream completed');
React Chat Component Example¶
import { useEffect, useRef, useState } from 'react';
import { AgentFlowClient, Message } from 'agentflow-react';
function ChatComponent() {
const [messages, setMessages] = useState<Message[]>([]);
const [isStreaming, setIsStreaming] = useState(false);
const clientRef = useRef<AgentFlowClient>();
useEffect(() => {
clientRef.current = new AgentFlowClient({
baseUrl: 'http://127.0.0.1:8000',
debug: false
});
}, []);
async function handleSendMessage(text: string) {
if (!clientRef.current) return;
// Add user message
const userMsg = Message.text_message(text, 'user');
setMessages(prev => [...prev, userMsg]);
setIsStreaming(true);
try {
// Create streaming request with all previous messages
const stream = clientRef.current.stream(
[...messages, userMsg],
{
response_granularity: 'low',
recursion_limit: 25
}
);
let currentAssistantMessage: Message | null = null;
for await (const chunk of stream) {
if (chunk.event === 'message' && chunk.message) {
const msg = chunk.message;
if (msg.role === 'assistant') {
if (!currentAssistantMessage) {
// New assistant message, add it to state
currentAssistantMessage = msg;
setMessages(prev => [...prev, msg]);
} else {
// Update existing assistant message
currentAssistantMessage = msg;
setMessages(prev => {
const updated = [...prev];
updated[updated.length - 1] = msg;
return updated;
});
}
}
}
}
} catch (error) {
console.error('Streaming error:', error);
} finally {
setIsStreaming(false);
}
}
return (
<div>
{messages.map((msg, idx) => (
<div key={idx} className={msg.role}>
{/* Render message content */}
</div>
))}
{isStreaming && <div>Streaming...</div>}
</div>
);
}
export default ChatComponent;
Advanced: Stream with Tool Execution¶
For scenarios where the server sends remote tool calls during streaming, you can handle them manually.
⚠️ Note: Remote tool calls are only for browser-level APIs. Most tools should be defined in your Python backend. See Tools Guide.
import { AgentFlowClient, Message, StreamEventType } from 'agentflow-react';
async function streamWithToolExecution(client: AgentFlowClient, userMessage: Message) {
const stream = client.stream([userMessage], {
response_granularity: 'low'
});
const allChunks: any[] = [];
for await (const chunk of stream) {
allChunks.push(chunk);
if (chunk.event === 'message' && chunk.message) {
const msg = chunk.message;
// Check if message contains tool calls
const hasToolCalls = msg.content?.some(
(block: any) => block.type === 'remote_tool_call'
);
if (hasToolCalls && client.toolExecutor) {
console.log('Tool calls detected in message');
// Tool execution would be handled here if needed
}
}
}
return allChunks;
}
Stream with Error Handling¶
async function streamWithErrorHandling(
client: AgentFlowClient,
messages: Message[]
) {
try {
const stream = client.stream(messages, {
response_granularity: 'partial',
recursion_limit: 25
});
for await (const chunk of stream) {
if (chunk.event === 'error') {
console.error('Stream error:', chunk.data);
// Handle error appropriately
break;
}
// Process other events
console.log('Received:', chunk.event);
}
} catch (error) {
if (error instanceof Error) {
if (error.message.includes('timeout')) {
console.error('Stream timeout');
} else {
console.error('Stream error:', error.message);
}
}
}
}
Cancelling a Stream¶
async function streamWithCancellation(
client: AgentFlowClient,
messages: Message[]
) {
const abortController = new AbortController();
const timeoutId = setTimeout(() => abortController.abort(), 30000); // 30s timeout
try {
const stream = client.stream(messages, {
response_granularity: 'low'
});
for await (const chunk of stream) {
console.log('Chunk:', chunk.event);
// Cancel after receiving first message
if (chunk.event === 'message') {
abortController.abort();
break;
}
}
} catch (error) {
if ((error as Error).name === 'AbortError') {
console.log('Stream cancelled');
} else {
console.error('Error:', error);
}
} finally {
clearTimeout(timeoutId);
}
}
Configuration Options¶
When calling streamInvoke, you can provide options:
stream(
messages: Message[],
options?: {
initial_state?: Record<string, any>; // Initial state for the graph
config?: Record<string, any>; // Graph config
recursion_limit?: number; // Max iterations (default: 25)
response_granularity?: 'full' | 'partial' | 'low'; // Level of detail
}
)
Response Granularity¶
- 'full': Complete detailed responses
- 'partial': Intermediate updates during processing
- 'low': Minimal updates, optimized for streaming (recommended)
Performance Considerations¶
-
Memory Efficient: Stream processes data incrementally without loading entire response into memory
-
Responsive UI: Chunks arrive as soon as they're generated, enabling real-time UI updates
-
Network Streaming: Uses HTTP/1.1 chunked encoding for efficient data transfer
-
NDJSON Format: Each line is a complete JSON object, easily parseable line-by-line
Common Patterns¶
Update UI on Each Message Chunk¶
for await (const chunk of stream) {
if (chunk.event === 'message') {
updateChatUI(chunk.message);
}
}
Collect All Chunks Then Process¶
const chunks: StreamChunk[] = [];
for await (const chunk of stream) {
chunks.push(chunk);
}
// Process all chunks at once
processAllChunks(chunks);
React Hook for Streaming¶
function useStreamInvoke() {
const [chunks, setChunks] = useState<StreamChunk[]>([]);
const [isLoading, setIsLoading] = useState(false);
const startStream = async (
client: AgentFlowClient,
messages: Message[]
) => {
setIsLoading(true);
setChunks([]);
try {
const stream = client.stream(messages);
for await (const chunk of stream) {
setChunks(prev => [...prev, chunk]);
}
} finally {
setIsLoading(false);
}
};
return { chunks, isLoading, startStream };
}
Debugging¶
Enable debug logging to see stream details:
const client = new AgentFlowClient({
baseUrl: 'http://127.0.0.1:8000',
debug: true // Enables console logging
});
const stream = client.stream(messages);
for await (const chunk of stream) {
// Debug logs will show in console
}
Comparison with Invoke¶
Use invoke when:
- You need the entire result at once
- You want automatic tool execution loop handling
- You have callback-based patterns
- The response is relatively small
Use streamInvoke when:
- Building chat/conversational interfaces
- You want real-time streaming responses
- You need responsive UIs with incremental updates
- Handling large responses efficiently
- Network bandwidth is a concern
- You prefer async generator patterns
API Reference¶
Method Signature¶
stream(
messages: Message[],
options?: {
initial_state?: Record<string, any>;
config?: Record<string, any>;
recursion_limit?: number;
response_granularity?: 'full' | 'partial' | 'low';
}
): AsyncGenerator<StreamChunk, void, unknown>
Endpoint¶
- URL:
/v1/graph/stream - Method:
POST - Content-Type:
application/json - Response:
application/json(NDJSON format) - Streaming: Yes (HTTP/1.1 chunked)
Error Handling¶
The stream will throw errors for: - Network failures - HTTP errors (non-2xx status) - Timeout (default 5 minutes) - Invalid JSON in stream
Wrap in try-catch to handle these gracefully.
Migration from Invoke to Stream¶
If you're using callbacks with invoke:
// Before (with invoke)
await client.invoke(messages, {
onPartialResult: (partial) => {
console.log('Partial:', partial.messages);
}
});
// After (with streamInvoke)
for await (const chunk of client.stream(messages)) {
if (chunk.event === 'message') {
console.log('Message:', chunk.message);
}
}
Troubleshooting¶
Stream stops without completion¶
Check: 1. Network connection 2. Server is running and healthy 3. Authorization token is valid 4. Recursion limit not exceeded
No chunks received¶
Verify: 1. Server is streaming (not hanging) 2. Response format is valid NDJSON 3. Timeout is not too short 4. Initial request is correct
Memory usage increasing¶
Ensure: 1. You're not storing all chunks unnecessarily 2. The for-await loop completes properly 3. No infinite loops in chunk processing
Examples Repository¶
See the examples/ directory for complete working examples:
- examples/stream-basic.ts - Simple streaming example
- examples/stream-react.tsx - React component example
- examples/stream-chat.ts - Chat application pattern
See Also¶
- React Integration - Using stream in React applications with hooks
- React Examples - Complete React streaming components
- Stream Quick Reference - Quick reference for stream events
- API Reference - Complete stream API documentation
- Invoke Usage Guide - Alternative synchronous API
- Tools Guide - Using tools with streaming
- TypeScript Types - Type definitions for streaming
- Troubleshooting - Common streaming issues