Skip to main content

stream()

client.stream() sends a list of messages to the agent graph and returns an AsyncGenerator that yields StreamChunk objects in real time as the server produces them. Use streaming when you want to show incremental output to the user — token by token, update by update — instead of waiting for the full response.

Endpoint: POST /v1/graph/stream
Source: src/endpoints/stream.ts


Signature

client.stream(
messages: Message[],
options?: StreamOptions
): AsyncGenerator<StreamChunk, void, unknown>

stream() is synchronous — it returns the generator immediately without making a network call. The HTTP request starts when you begin iterating with for await.


Parameters

messages

Same as invoke(). An array of Message objects.

options (optional)

FieldTypeDefaultDescription
initial_stateRecord<string, any>undefinedInitial state values to seed the graph.
configRecord<string, any>undefinedRun config. Set configurable.thread_id for persistent state.
recursion_limitnumber25Maximum recursion depth for the graph.
response_granularity'full' | 'partial' | 'low''low'Controls how much data is included in state/update events. Default is 'low' for streaming (unlike invoke where the default is 'full').

Return value: AsyncGenerator<StreamChunk>

The generator yields StreamChunk objects. Iteration ends when the server closes the stream.

StreamChunk

interface StreamChunk {
event: StreamEventType | string;
message?: Message | null;
state?: AgentState | null;
data?: any;
thread_id?: string;
run_id?: string;
metadata?: StreamMetadata | Record<string, any>;
timestamp?: number;
}

interface StreamMetadata {
is_new_thread: boolean;
thread_id: string;
[key: string]: any;
}

StreamEventType

The event field on a StreamChunk is a string matching one of these values (exported as the StreamEventType enum):

EventEnum valueDescription
'message'StreamEventType.MESSAGEA new or updated Message from the agent. The message field contains the full Message object. On streaming models this fires multiple times with partial token content (message.delta = true), followed by a final chunk with delta = false.
'updates'StreamEventType.UPDATESThe graph state was updated. The state field contains the updated AgentState. Only emitted when response_granularity is 'partial' or 'full'.
'state'StreamEventType.STATEThe complete current state snapshot. Only emitted when response_granularity is 'full'.
'error'StreamEventType.ERRORAn error occurred during graph execution. The data field contains the error details.

Async iterator examples

Basic streaming

import { AgentFlowClient, Message, StreamEventType } from '@10xscale/agentflow-client';

const client = new AgentFlowClient({ baseUrl: 'http://localhost:8000' });

const stream = client.stream([
Message.text_message('Write me a short poem about the ocean.'),
]);

for await (const chunk of stream) {
if (chunk.event === StreamEventType.MESSAGE && chunk.message) {
const textBlock = chunk.message.content.find(b => b.type === 'text');
if (textBlock && 'text' in textBlock) {
process.stdout.write(textBlock.text);
}
}
}
console.log('\n--- Stream complete ---');

Streaming with state updates

const stream = client.stream(
[Message.text_message('Analyse this dataset.')],
{
config: { configurable: { thread_id: 'analysis-001' } },
response_granularity: 'full',
}
);

for await (const chunk of stream) {
switch (chunk.event) {
case StreamEventType.MESSAGE:
if (chunk.message?.delta) {
// Partial token — append to UI
appendTokenToUI(chunk.message);
} else if (chunk.message) {
// Final message — replace partial content
setFinalMessage(chunk.message);
}
break;

case StreamEventType.UPDATES:
console.log('State updated:', chunk.state);
break;

case StreamEventType.STATE:
console.log('Full state snapshot:', chunk.state);
break;

case StreamEventType.ERROR:
console.error('Stream error:', chunk.data);
break;
}
}

Streaming in a React component

import { useState } from 'react';
import { AgentFlowClient, Message, StreamEventType } from '@10xscale/agentflow-client';

const client = new AgentFlowClient({ baseUrl: 'http://localhost:8000' });

function ChatBox() {
const [output, setOutput] = useState('');

async function sendMessage(text: string) {
setOutput('');

const stream = client.stream([Message.text_message(text)]);

for await (const chunk of stream) {
if (chunk.event === StreamEventType.MESSAGE && chunk.message?.delta) {
const textBlock = chunk.message.content.find(b => b.type === 'text');
if (textBlock && 'text' in textBlock) {
setOutput(prev => prev + textBlock.text);
}
}
}
}

return (
<div>
<button onClick={() => sendMessage('Hello!')}>Send</button>
<pre>{output}</pre>
</div>
);
}

Collecting the full response from a stream

If you want to wait for the complete stream but still use streaming internally:

async function streamToString(messages: Message[]): Promise<string> {
const stream = client.stream(messages);
let fullText = '';

for await (const chunk of stream) {
if (chunk.event === StreamEventType.MESSAGE && chunk.message) {
for (const block of chunk.message.content) {
if (block.type === 'text') {
fullText += block.text;
}
}
}
}

return fullText;
}

Stopping a stream mid-way

The generator supports early exit via break or return. The server keeps running until the stopGraph() call is sent:

const stream = client.stream([Message.text_message('Tell me a very long story')]);
let wordCount = 0;

for await (const chunk of stream) {
if (chunk.event === StreamEventType.MESSAGE && chunk.message) {
const textBlock = chunk.message.content.find(b => b.type === 'text');
if (textBlock && 'text' in textBlock) {
wordCount += textBlock.text.split(' ').length;
}
}

if (wordCount > 500) {
// Stop iterating locally and request server-side stop
const threadId = '...'; // from chunk.thread_id or your config
await client.stopGraph(threadId);
break;
}
}

StreamRequest (advanced)

The shape of the request body sent to POST /v1/graph/stream:

interface StreamRequest {
messages: any[];
initial_state?: Record<string, any>;
config?: Record<string, any>;
recursion_limit?: number; // default 25
response_granularity?: 'full' | 'partial' | 'low';
}

Stream format

The server sends the stream as NDJSON (newline-delimited JSON) over HTTP. Each line is a JSON object matching the StreamChunk shape. The client's parser also handles concatenated JSON objects (no newlines) for compatibility with some proxy configurations.

Response header: Content-Type: text/event-stream

Example raw stream output:

{"event":"message","message":{"role":"assistant","content":[{"type":"text","text":"The"}],"delta":true},"thread_id":"abc123"}
{"event":"message","message":{"role":"assistant","content":[{"type":"text","text":" capital"}],"delta":true},"thread_id":"abc123"}
{"event":"message","message":{"role":"assistant","content":[{"type":"text","text":" of France is Paris."}],"delta":false},"thread_id":"abc123"}
{"event":"updates","state":{"messages":[...]},"thread_id":"abc123"}

Using curl

curl --no-buffer -X POST http://localhost:8000/v1/graph/stream \
-H 'Content-Type: application/json' \
-H 'Authorization: Bearer YOUR_TOKEN' \
-d '{
"messages": [{ "role": "user", "content": [{"type": "text", "text": "Hello"}], "message_id": "0" }],
"response_granularity": "low"
}'

invoke() vs stream() comparison

Aspectinvoke()stream()
Network requestsOne per loop iterationOne streaming connection
When result arrivesAfter full graph executionToken by token in real time
Default response_granularity'full''low'
Remote tool handlingAutomatic loopManual — check for RemoteToolCallBlock in message chunks
Return typePromise<InvokeResult>AsyncGenerator<StreamChunk>
Best forBackground tasks, batchChat UIs, real-time displays

Common errors

ErrorCauseFix
AgentFlowError status 401Missing or invalid auth.Set auth in config.
Stream stops mid-responseServer timeout or network issue.Wrap the for await loop in try/catch and retry.
event: 'error' chunksGraph execution error.Read chunk.data for the error message and check server logs.

What you learned

  • stream() returns an AsyncGenerator — iterate it with for await.
  • StreamEventType.MESSAGE with delta: true is a partial token; delta: false is the final message.
  • StreamEventType.UPDATES and STATE require response_granularity: 'partial' or 'full'.
  • Use response_granularity: 'low' for the best streaming performance in chat UIs.
  • Use stopGraph() to cancel a running stream on the server.

Next step

See reference/client/threads to learn how to manage persistent conversation threads.