Skip to main content

Checkpointing and threads

By default every app.invoke call is stateless — each call starts with an empty AgentState. A checkpointer adds persistence: state is saved after each node run and restored at the start of the next call for the same thread.

The config dict

Every checkpointer method accepts a config dict. At minimum it must contain a thread_id:

config = {
"thread_id": "conv-abc123", # required — identifies the conversation
"user_id": "user-42", # optional — used by PgCheckpointer for row scoping
}

thread_id is the primary key for all state, message, and thread-info operations. user_id scopes data within a multi-tenant deployment and is used by PgCheckpointer when you query threads for a specific user.

Thread lifecycle

A thread is created on the first call and persists until you explicitly delete it. For InMemoryCheckpointer data is lost on process restart; PgCheckpointer survives restarts.

Attaching a checkpointer

Pass the checkpointer to graph.compile():

from agentflow.storage.checkpointer import InMemoryCheckpointer
from agentflow.core.state import AgentState, Message

checkpointer = InMemoryCheckpointer()
app = graph.compile(checkpointer=checkpointer)

Provide thread_id in every call. The first argument is always a dict with a "messages" key — not an AgentState object:

config = {"thread_id": "session-1", "recursion_limit": 10}

# Turn 1
res = app.invoke(
{"messages": [Message.text_message("What is 2 + 2?")]},
config=config,
)

# Turn 2 — same thread_id, resumes from saved state
res = app.invoke(
{"messages": [Message.text_message("Now multiply that by 3.")]},
config=config,
)

InMemoryCheckpointer

InMemoryCheckpointer stores everything in Python dicts guarded by asyncio.Lock objects. It is async-first — all operations are non-blocking coroutines. Sync wrappers (put_state, get_state, …) run the coroutines via run_coroutine().

from agentflow.storage.checkpointer import InMemoryCheckpointer

checkpointer = InMemoryCheckpointer()

Internal storage:

AttributeTypePurpose
_statesdict[str, StateT]Current agent state per thread key
_state_cachedict[str, StateT]Lightweight read cache for state
_generic_cachedict[str, (Any, float)]Optional key-value cache with optional TTL
_messagesdefaultdict[str, list[Message]]Message history per thread key
_threadsdict[str, dict]Thread metadata per thread key

Thread key is simply str(thread_id) — there is no user scoping in the in-memory implementation.

Use for: development, unit tests, and short-lived single-process deployments.
Do not use for: anything that requires state to survive a restart or be shared across multiple workers.


PgCheckpointer

PgCheckpointer uses PostgreSQL for durable storage and Redis for a low-latency read cache. It is designed for production multi-worker deployments.

Constructor signature

from agentflow.storage.checkpointer import PgCheckpointer

checkpointer = PgCheckpointer(
# --- PostgreSQL connection (pick one) ---
postgres_dsn="postgresql://user:pass@localhost/mydb", # DSN string
# OR
pg_pool=existing_asyncpg_pool, # pass an existing pool
# OR
pool_config={"min_size": 2, "max_size": 10}, # kwargs forwarded to asyncpg.create_pool

# --- Redis connection (pick one, optional) ---
redis_url="redis://localhost:6379/0",
# OR
redis=existing_redis_instance,
# OR
redis_pool=existing_connection_pool,
# OR
redis_pool_config={"max_connections": 20},

# --- Optional tuning ---
schema="public", # PostgreSQL schema (default: "public")
cache_ttl=86400, # Redis TTL in seconds (default: 24 h)
user_id_type="string", # Column type for user_id: "string" | "int" | "bigint"
release_resources=True, # Close pools on cleanup
)

Schema migration

On the first call to await checkpointer.asetup() (or checkpointer.setup()) the checkpointer:

  1. Checks if the tables already exist.
  2. Runs any pending migration steps to bring the schema to the current version.
  3. Creates tables if they do not exist.

You must call setup() once before using the checkpointer:

await checkpointer.asetup()
app = graph.compile(checkpointer=checkpointer)

Installing the extra

pip install "10xscale-agentflow[pg_checkpoint]"

This installs asyncpg and redis[asyncio].


Checkpointer API reference

All methods exist in both async (preferred) and sync (convenience wrapper) flavours. The sync wrappers call run_coroutine() internally.

State methods

Async methodSync methodDescription
aput_state(config, state)put_state(config, state)Persist the current AgentState for the thread
aget_state(config)get_state(config)Load the saved AgentState; returns None if not found
aclear_state(config)clear_state(config)Delete the persisted state for a thread
aput_state_cache(config, state)put_state_cache(config, state)Write to the fast read cache
aget_state_cache(config)get_state_cache(config)Read from the fast cache (Redis for PgCheckpointer)

The state cache is a thin Redis-backed read layer in PgCheckpointer (TTL-controlled). It avoids a Postgres round-trip on repeated reads for the same thread. For InMemoryCheckpointer both _states and _state_cache live in memory.

Message methods

Checkpointers also store the full message history for a thread independently of the compressed state:

Async methodSync methodDescription
aput_messages(config, messages, metadata)put_messages(...)Append one or more Message objects
aget_message(config, message_id)get_message(...)Retrieve a single message by ID
alist_messages(config, search, offset, limit)list_messages(...)Paginated and optionally full-text searched message list
adelete_message(config, message_id)delete_message(...)Remove a single message

Thread methods

Thread metadata (ThreadInfo) is stored separately from state:

Async methodSync methodDescription
aput_thread(config, thread_info)put_thread(...)Create or update thread metadata
aget_thread(config)get_thread(...)Get ThreadInfo for a thread
alist_threads(config, search, offset, limit)list_threads(...)Paginated thread listing scoped to user_id
aclean_thread(config)clean_thread(...)Delete all state, messages, and metadata for a thread

Generic cache methods

An optional key-value cache layer for arbitrary JSON values (e.g., pre-computed embeddings or rendered prompts):

# Store a value with a 10-minute TTL
await checkpointer.aput_cache_value("llm-cache", "prompt-hash-abc", {"response": "..."}, ttl_seconds=600)

# Retrieve it
value = await checkpointer.aget_cache_value("llm-cache", "prompt-hash-abc")

# Delete it
await checkpointer.aclear_cache_value("llm-cache", "prompt-hash-abc")

# List all keys in a namespace
keys = await checkpointer.alist_cache_keys("llm-cache", prefix="prompt-")

InMemoryCheckpointer supports TTL on the generic cache via expiry timestamps. PgCheckpointer uses Redis SETEX / GET / DEL / SCAN for these operations.


Direct checkpointer usage (outside the graph)

You can interact with a checkpointer directly — useful for admin scripts, migrations, or custom REST endpoints:

from agentflow.storage.checkpointer import InMemoryCheckpointer
from agentflow.core.state import AgentState, Message

cp = InMemoryCheckpointer()
config = {"thread_id": "t1", "user_id": "u1"}

# Save state manually
state = AgentState(context=[Message.text_message("Hello", role="user")])
await cp.aput_state(config, state)

# Load it back
loaded = await cp.aget_state(config)

# List all messages
messages = await cp.alist_messages(config, limit=50)

# Clean the thread
await cp.aclean_thread(config)

Reading thread state via REST

When running behind the API server the same data is exposed over HTTP:

# Read current state
GET /v1/threads/{thread_id}/state

# Overwrite state
PUT /v1/threads/{thread_id}/state

# Read message history
GET /v1/threads/{thread_id}/messages

# List threads for a user
GET /v1/threads

See REST API: Threads for request/response schemas.

Configuring via agentflow.json

In the API layer the checkpointer is declared in agentflow.json — the graph module stays storage-agnostic:

{
"agent": "graph.react:app",
"checkpointer": "graph.dependencies:my_checkpointer"
}

See Configure agentflow.json.


Choosing a checkpointer

CriterionInMemoryCheckpointerPgCheckpointer
State survives restartNoYes
Shared across workersNoYes
Multi-tenant user scopingNoYes
External dependenciesNonePostgreSQL + Redis
Setup requiredNoawait cp.asetup()
Best forDev, testsProduction