Skip to main content

Checkpointers

When to use this

A checkpointer persists graph state between requests so conversations survive server restarts, threads can be paused and resumed, and multiple turns stay coherent. Without a checkpointer the graph uses an in-memory default that is reset every call.

Import paths

from agentflow.storage.checkpointer import BaseCheckpointer, InMemoryCheckpointer
# Optional — requires asyncpg
from agentflow.storage.checkpointer import PgCheckpointer

BaseCheckpointer[StateT]

Abstract base class for all checkpointer implementations. Provides both async and sync method pairs.

Abstract async methods

Each abstract method must be implemented by a subclass:

MethodSignatureDescription
asetupasync () -> AnyInitialise the storage backend (create tables, connect pools…).
aput_stateasync (config, state) -> StateTPersist a state snapshot for the thread in config["thread_id"].
aget_stateasync (config) -> StateT | NoneLoad the latest state snapshot for the thread.
aclear_stateasync (config) -> AnyDelete all state for the thread.
aput_state_cacheasync (config, state) -> AnyWrite to the fast cache (Redis or in-memory).
aget_state_cacheasync (config) -> StateT | NoneRead from the fast cache.
aput_messagesasync (config, messages) -> AnyAppend messages for the thread.
aget_messagesasync (config, search, offset, limit) -> list[Message]List messages.
aget_messageasync (config, message_id) -> Message | NoneFetch a single message by ID.
adelete_messageasync (config, message_id) -> AnyDelete a single message.
aget_threadsasync (config, search, offset, limit) -> list[ThreadInfo]List threads.
aget_threadasync (config) -> ThreadInfo | NoneGet thread metadata.
adelete_threadasync (config) -> AnyDelete a thread and all its state and messages.

Sync wrappers

For every async axxx() method there is a sync xxx() wrapper that calls asyncio.run(). Use these only from non-async contexts (e.g. a management script):

checkpointer.put_state(config, state)
checkpointer.get_state(config)

Wiring into a graph

from agentflow.storage.checkpointer import InMemoryCheckpointer

app = graph.compile(checkpointer=InMemoryCheckpointer())

InMemoryCheckpointer

In-process dictionary-based storage. Zero dependencies.

from agentflow.storage.checkpointer import InMemoryCheckpointer

checkpointer = InMemoryCheckpointer()
app = graph.compile(checkpointer=checkpointer)

When to use:

  • Unit tests and CI.
  • Local development when you don't need state to survive a restart.
  • Ephemeral single-process jobs.

When NOT to use:

  • Any multi-process or multi-worker deployment.
  • Production applications where conversation history must survive crashes.

Storage behaviour

StorageKeyData
_statesthread_idLatest serialised state snapshot.
_state_cachethread_idHot cache for the running execution.
_messagesthread_idOrdered message list.
_threadsthread_idThread metadata (name, created_at, etc.).

All access is guarded by per-bucket asyncio.Lock instances for safe concurrent use within a single process.


PgCheckpointer

PostgreSQL-backed checkpointer with optional Redis caching. Production-grade.

Optional dependency

Requires asyncpg. Install with:

pip install asyncpg

Redis caching is optional but recommended for high-traffic deployments:

pip install redis
from agentflow.storage.checkpointer import PgCheckpointer

checkpointer = PgCheckpointer(
postgres_dsn="postgresql://user:pass@localhost:5432/mydb",
redis_url="redis://localhost:6379/0", # optional
)

await checkpointer.asetup() # creates tables if they don't exist
app = graph.compile(checkpointer=checkpointer)

Constructor parameters

ParameterTypeDescription
postgres_dsnstr | NonePostgreSQL DSN. Required unless pg_pool is provided.
pg_poolasyncpg.Pool | NonePre-created asyncpg connection pool.
pool_configdict | NoneConfig passed to asyncpg.create_pool() (min_size, max_size, etc.).
redis_urlstr | NoneRedis URL for caching.
redisRedis | NonePre-created Redis instance.
redis_poolConnectionPool | NonePre-created Redis connection pool.
cache_ttlintRedis cache TTL in seconds. Default: 86400 (24 hours).

ID types

PgCheckpointer adapts its schema based on the id_type registered by the compiled graph's id_generator:

id_typeSQL column type
stringVARCHAR(255)
intSERIAL
bigintBIGSERIAL

This is set automatically. You do not set it directly.

Schema migration

asetup() creates the required tables if they do not exist. It is idempotent — safe to call on every startup.


Writing a custom checkpointer

from typing import Any
from agentflow.storage.checkpointer import BaseCheckpointer
from agentflow.core.state import AgentState, Message

class DynamoDBCheckpointer(BaseCheckpointer):

async def asetup(self) -> Any:
# Create DynamoDB tables
...

async def aput_state(self, config: dict, state: AgentState) -> AgentState:
thread_id = config["thread_id"]
# Serialise and PUT to DynamoDB
...
return state

async def aget_state(self, config: dict) -> AgentState | None:
thread_id = config["thread_id"]
# GET from DynamoDB and deserialise
...

async def aclear_state(self, config: dict) -> Any:
...

async def aput_state_cache(self, config: dict, state: AgentState) -> Any:
... # in-memory dict, no external call

async def aget_state_cache(self, config: dict) -> AgentState | None:
...

async def aput_messages(self, config: dict, messages: list[Message]) -> Any:
...

async def aget_messages(self, config: dict, search: str | None, offset: int, limit: int) -> list[Message]:
...

async def aget_message(self, config: dict, message_id: str) -> Message | None:
...

async def adelete_message(self, config: dict, message_id: str) -> Any:
...

async def aget_threads(self, config: dict, search: str | None, offset: int, limit: int):
...

async def aget_thread(self, config: dict):
...

async def adelete_thread(self, config: dict) -> Any:
...

Config dictionary

All checkpointer methods accept a config dict. The required key is:

config = {"thread_id": "session-abc123"}

Additional keys used internally:

  • user_id — scopes message searches by user.
  • run_id — tracks a specific invocation.

Common errors

ErrorCauseFix
StorageErrorUnrecoverable PostgreSQL error.Check Postgres logs and DSN config.
TransientStorageErrorTemporary Postgres failure.Automatically retried by the framework.
ImportError: asyncpgPgCheckpointer used without asyncpg installed.Run pip install asyncpg.
State lost between requestsUsing InMemoryCheckpointer with multiple workers.Switch to PgCheckpointer or ensure a single-process deployment.