Execution & Streaming Runtime¶
Once a StateGraph
is compiled into a CompiledGraph
, you gain a uniform API for synchronous, asynchronous, and
streaming execution plus lifecycle management (stop, resume, background tasks, persistence, publishing).
Entry Points¶
Method | Mode | Use Case |
---|---|---|
invoke(input, config, granularity) |
Sync blocking | CLI scripts, tests, small batch tasks |
ainvoke(input, config, granularity) |
Async | Web handlers, async services |
stream(input, config, granularity) |
Sync generator | Progressive output in non-async contexts |
astream(input, config, granularity) |
Async generator | Live UIs, websockets, server-sent events |
All methods accept input_data
containing an initial messages
list for new runs and optional additional payload keys.
Response Granularity¶
ResponseGranularity
controls output detail:
Level | Contents |
---|---|
LOW | Final messages only |
PARTIAL | Messages + context summary + core metadata |
FULL | Entire final state (all fields), messages |
Choose LOW for chat responses, FULL for debugging or persistence workflows.
Streaming Semantics¶
When a node returns a ModelResponseConverter
(e.g. LiteLLM wrapper) in streaming mode:
- Interim partial messages with
delta=True
emitted - Tool call deltas surface early so UI can reflect pending tool execution
- Final aggregated message (same logical turn) emitted with
delta=False
Applications should accumulate content from deltas keyed by message_id
or display incrementally.
Background Tasks¶
The BackgroundTaskManager
(in DI) can schedule async functions that should not block the main reasoning loop—e.g.
telemetry flush, vector store indexing, summarisation.
Pattern:
from injectq import Inject
from pyagenity.utils.background_task_manager import BackgroundTaskManager
async def summarizer(state): ...
async def node(state, config, tasks: BackgroundTaskManager = Inject[BackgroundTaskManager]):
tasks.create_task(summarizer(state))
return state
Ensure background tasks are idempotent or reference stable state snapshots to avoid race conditions.
Stop & Interrupt Control¶
PyAgenity provides flexible execution control for human-in-the-loop workflows:
Mechanism | When Applied | Purpose | Response Time |
---|---|---|---|
stop(config) / astop(config) |
Runtime | Politely request current thread halt | Next node boundary |
interrupt_before=[..] |
Compile time | Force pause before specific nodes | Immediate (before node execution) |
interrupt_after=[..] |
Compile time | Force pause after specific nodes | Immediate (after node completion) |
Execution State During Interrupts¶
The AgentState.execution_meta
tracks pause/resume state:
from pyagenity.state import ExecutionStatus
# Check interrupt status
if state.execution_meta.is_interrupted():
print(f"Status: {state.execution_meta.status}") # INTERRUPTED_BEFORE or INTERRUPTED_AFTER
print(f"Node: {state.execution_meta.interrupted_node}")
print(f"Reason: {state.execution_meta.interrupt_reason}")
# Resume execution
state.clear_interrupt() # Usually handled automatically during invoke/ainvoke
Resume Behavior¶
An interrupted run resumes with the same thread_id
:
- Checkpointer restores saved state and execution metadata
- Input data merged with existing context (additive, not replacement)
- Execution continues from the interruption point
- Interrupt flags automatically cleared
Integration with Streaming¶
Interrupts work seamlessly with streaming execution:
# Streaming with interrupt handling
config = {"thread_id": "interactive-session"}
async for chunk in app.astream(input_data, config=config):
if chunk.event_type == "interrupted":
print(f"⏸️ Paused: {chunk.metadata.get('status')}")
# Handle interrupt (e.g., get user approval)
approval = await get_user_approval()
if approval:
# Resume streaming
async for resume_chunk in app.astream({
"messages": [Message.text_message("User approved")]
}, config=config):
print(f"▶️ {resume_chunk.content}")
else:
await app.astop(config) # Cancel execution
break
else:
print(f"📤 {chunk.content}")
Key Implementation Notes:
- Interrupts require a checkpointer for state persistence
- Thread IDs must be consistent between pause and resume
- Stop requests are checked at node boundaries (not mid-node)
- Event publishers emit INTERRUPTED
event types for monitoring
For comprehensive interrupt strategies, approval workflows, and debugging patterns, see Human-in-the-Loop & Interrupts.
Checkpointing & Persistence¶
If a checkpointer is supplied during compile, each step can persist state (strategy depends on implementation: in-memory, Postgres/Redis, etc.). This enables:
- Resumable conversations
- Auditing / replay
- External analytics enrichment
For high-frequency streaming, you may checkpoint only on node completion (implementation detail of specific checkpointer).
Event Publishing¶
A BasePublisher
implementation receives structured events (start, node_enter, node_exit, message_delta, error, complete).
Use publishers to drive:
- Live dashboards
- Audit logs
- Metrics pipelines
Chain with callbacks (DI: CallbackManager
) for custom instrumentation or tracing.
Execution Metadata¶
AgentState.execution_meta
tracks:
Field | Meaning |
---|---|
current_node |
Node about to run or just completed (depending on phase) |
step |
Incrementing counter (used for recursion limit enforcement) |
status |
Running / Completed / Error / Interrupted |
error |
Error detail if failed |
interrupted flags |
Pause control for manual resume |
Nodes should not mutate internals directly; use helper methods (advance_step()
, set_current_node()
).
Error Handling¶
Uncaught node exceptions propagate; publisher emits error event; state marked errored. Strategies:
- Wrap fragile IO in retries
- Convert recoverable faults to messages and continue
- Use
Command(goto=...)
for fallback branches
Performance Considerations¶
Concern | Guidance |
---|---|
Large context growth | Summarize into context_summary periodically |
Tool latency | Parallelize independent tools (future enhancement) or cache by args |
Excessive checkpoint writes | Batch or checkpoint every N steps/config flag |
High token cost | Trim old messages or use memory store integration |
Minimal Execution Example¶
res = app.invoke({"messages": [Message.text_message("Hello")]}, config={"thread_id": "t1"})
for msg in res["messages"]:
print(msg.text())
Streaming variant:
for chunk in app.stream({"messages": [Message.text_message("Explain quantum dots")]}, config={"thread_id": "t2"}):
if chunk.delta:
print(chunk.text(), end="", flush=True)
Next: Advanced Patterns (advanced.md
).