Skip to content

Checkpointer: The Agent's Session Memory

The checkpointer in PyAgenity serves as your agent's session memory—a sophisticated persistence layer that maintains the complete record of interactions, state transitions, and execution history. Unlike working memory (AgentState), which focuses on immediate context, checkpointers preserve the full conversational narrative for different purposes entirely.

The Session Memory Philosophy

Think of checkpointers as the difference between what you're thinking about right now versus what you might want to look back on later. Session memory serves several distinct purposes:

  • Conversation Continuity: Resume interactions exactly where they left off
  • User Experience: Provide conversation history in interfaces
  • Debugging & Analytics: Track agent behavior and decision paths
  • Audit & Compliance: Maintain comprehensive interaction records

The key insight is that session memory is not for the agent's immediate thinking—it's for persistence, recovery, and human-oriented use cases.

The Dual-Storage Architecture

PyAgenity implements a sophisticated dual-storage strategy that balances speed with durability:

┌─────────────────┐    Fast Access    ┌─────────────────┐
│   Redis Cache   │ ←──────────────── │  Active Agent   │
│   (Hot Layer)   │                   │   Execution     │
└─────────────────┘                   └─────────────────┘
         │                                      │
         │ Background Sync                     │ Immediate Persist
         ▼                                     ▼
┌─────────────────┐                   ┌─────────────────┐
│   PostgreSQL    │                   │   PostgreSQL    │
│  (Cold Layer)   │                   │  (Cold Layer)   │
└─────────────────┘                   └─────────────────┘

Why This Dual Approach?

The architecture reflects different temporal access patterns:

  • Active conversations need millisecond response times (Redis cache)
  • Historical data can tolerate moderate latency (PostgreSQL storage)
  • Data integrity requires durable persistence (PostgreSQL with transactions)
  • System recovery demands reliable state reconstruction
from pyagenity.checkpointer import PgCheckpointer

checkpointer = PgCheckpointer(
    postgres_dsn="postgresql://user:pass@localhost/db",
    redis_url="redis://localhost:6379",
    cache_ttl=86400  # 24-hour cache expiration
)

Understanding Persistence Granularity

Checkpointers operate at different levels of granularity, each serving specific use cases:

1. State Persistence: The Agent's Mental Snapshots

# Save the complete agent state
await checkpointer.aput_state(config, state)

# Retrieve agent state for conversation resumption  
recovered_state = await checkpointer.aget_state(config)

State persistence captures the agent's complete mental state at a given moment—context, summaries, execution metadata, and any custom state fields.

2. Message Persistence: The Interaction Chronicle

# Persist individual messages with metadata
await checkpointer.aput_messages(
    config, 
    messages=[tool_call_message, tool_result_message],
    metadata={"execution_step": 3, "node": "tool_executor"}
)

# Query conversation history
messages = await checkpointer.alist_messages(
    config,
    limit=50,
    search="weather query"
)

Message persistence maintains the detailed interaction history—every user input, assistant response, tool call, and system message with full metadata.

3. Thread Persistence: The Conversation Metadata

# Maintain thread-level information
thread_info = ThreadInfo(
    thread_id="conv_123",
    user_id="alice",
    thread_name="Weather Inquiry", 
    metadata={
        "tags": ["weather", "location_services"],
        "created_at": "2024-10-01T12:00:00Z"
    },
    updated_at="2024-10-01T12:30:00Z",
    run_id="run_456"
)
await checkpointer.aput_thread(config, thread_info)

Thread persistence captures conversation-level metadata—titles, tags, participants, and organizational information that helps manage multiple conversation streams.

The Caching Strategy: Speed Meets Durability

The brilliance of PyAgenity's checkpointer design lies in its intelligent caching strategy that optimizes for both performance and reliability.

Hot Path: Active Conversation Flow

# During active conversation, state flows through cache
config = {"thread_id": "active_conv", "user_id": "alice"}

# Fast retrieval from Redis cache
cached_state = await checkpointer.aget_state_cache(config)

if cached_state:
    # Continue with cached state (millisecond response)
    state = cached_state
else:
    # Cold start: load from PostgreSQL (acceptable latency)
    state = await checkpointer.aget_state(config)

Write-Through Pattern: Consistency Without Sacrificing Speed

# When updating state, both cache and database are updated
await checkpointer.aput_state(config, updated_state)

# This operation:
# 1. Immediately updates Redis cache (fast subsequent reads)
# 2. Persists to PostgreSQL (durability guarantee)
# 3. Sets appropriate TTL (cache management)

Cache Invalidation and Expiration

# Automatic cache management based on conversation patterns
checkpointer = PgCheckpointer(
    cache_ttl=86400,  # 24-hour expiration for inactive conversations
    max_cached_threads=1000  # LRU eviction for memory management
)

# Active conversations stay hot, inactive ones naturally expire

Checkpointer Implementations: Choosing the Right Strategy

PyAgenity provides different checkpointer implementations optimized for different deployment scenarios:

InMemoryCheckpointer: Development and Testing

from pyagenity.checkpointer import InMemoryCheckpointer

# Perfect for development, testing, and demos
checkpointer = InMemoryCheckpointer()

# Benefits:
# - Zero setup complexity
# - Immediate availability  
# - Perfect for unit tests
# - No external dependencies

# Limitations:
# - Not persistent across process restarts
# - Single-process only
# - Memory-limited scalability

When to use: Development, testing, demos, single-session applications

PgCheckpointer: Production-Ready Persistence

from pyagenity.checkpointer import PgCheckpointer

# Production-grade persistence with caching
checkpointer = PgCheckpointer(
    postgres_dsn="postgresql://user:pass@host:5432/db",
    redis_url="redis://cache-host:6379",
    user_id_type="string",  # or "int", "bigint" 
    cache_ttl=3600,
    release_resources=True  # Clean shutdown
)

# Benefits:
# - Full persistence across restarts
# - High-performance caching layer
# - ACID transaction guarantees
# - Multi-process and distributed support
# - Configurable ID types for integration

# Setup required:
await checkpointer.asetup()  # Initialize database schema

When to use: Production applications, multi-user systems, applications requiring durability

Configuration Patterns and Integration

Database Integration Patterns

# Pattern 1: Separate Database for Agent State
checkpointer = PgCheckpointer(
    postgres_dsn="postgresql://agent:pass@agent-db:5432/agent_state",
    redis_url="redis://agent-cache:6379/1"
)

# Pattern 2: Shared Database with Custom Schema  
checkpointer = PgCheckpointer(
    postgres_dsn="postgresql://app:pass@main-db:5432/app_db",
    schema="agent"  # Tables: agent_states, agent_messages, etc.
)

# Pattern 3: Connection Pool Reuse
existing_pool = await asyncpg.create_pool(dsn)
checkpointer = PgCheckpointer(pg_pool=existing_pool)

ID Type Configuration for System Integration

# Match your application's ID patterns
string_ids = PgCheckpointer(user_id_type="string")  # UUIDs, usernames
int_ids = PgCheckpointer(user_id_type="int")       # Auto-increment IDs  
bigint_ids = PgCheckpointer(user_id_type="bigint") # Large-scale systems

# Configuration automatically handles schema generation
await checkpointer.asetup()  # Creates appropriate column types

Dependency Injection and Framework Integration

One of PyAgenity's most elegant features is automatic checkpointer injection, making persistence seamless for node functions:

Automatic Injection in Node Functions

from injectq import Inject
from pyagenity.checkpointer import BaseCheckpointer

def audit_node(
    state: AgentState,
    config: dict,
    checkpointer: BaseCheckpointer = Inject[BaseCheckpointer]
) -> AgentState:
    """Node function with automatic checkpointer injection."""

    # Access checkpointer without manual wiring
    audit_message = Message.text_message(
        f"Decision made at step {state.execution_meta.step}",
        role="system"
    )

    # Log decision to persistent storage
    asyncio.create_task(
        checkpointer.aput_messages(config, [audit_message])
    )

    return state

Custom Analysis and Debugging

async def debug_conversation(
    thread_id: str,
    checkpointer: BaseCheckpointer = Inject[BaseCheckpointer]
):
    """Analyze conversation patterns for debugging."""

    config = {"thread_id": thread_id}

    # Get complete interaction history
    messages = await checkpointer.alist_messages(config, limit=1000)

    # Analyze patterns
    tool_calls = [msg for msg in messages if msg.tools_calls]
    errors = [msg for msg in messages if "error" in msg.text().lower()]

    print(f"Conversation analysis for {thread_id}:")
    print(f"- Total messages: {len(messages)}")
    print(f"- Tool calls: {len(tool_calls)}")  
    print(f"- Potential errors: {len(errors)}")

Advanced Usage Patterns

Conversation Branching and Forking

# Create conversation branches for "what-if" scenarios
original_config = {"thread_id": "main_conversation"}
branch_config = {"thread_id": "branch_experiment"}

# Fork current state to new branch
current_state = await checkpointer.aget_state(original_config)
await checkpointer.aput_state(branch_config, current_state)

# Experiment in branch without affecting main conversation

Cross-Session Analytics

async def analyze_user_patterns(user_id: str):
    """Analyze patterns across all user conversations."""

    # Query across multiple threads for a user
    user_threads = await checkpointer.alist_threads(
        {"user_id": user_id},
        limit=100
    )

    # Aggregate interaction patterns
    total_messages = 0
    common_topics = {}

    for thread_info in user_threads:
        thread_config = {
            "thread_id": thread_info.thread_id,
            "user_id": user_id
        }
        messages = await checkpointer.alist_messages(thread_config)
        total_messages += len(messages)

        # Extract and count topics
        for msg in messages:
            topics = extract_topics(msg.text())
            for topic in topics:
                common_topics[topic] = common_topics.get(topic, 0) + 1

    return {
        "total_conversations": len(user_threads),
        "total_messages": total_messages,
        "common_topics": common_topics
    }

Conversation Resume Patterns

async def resume_conversation(thread_id: str, user_id: str):
    """Resume a previous conversation seamlessly."""

    config = {"thread_id": thread_id, "user_id": user_id}

    # Retrieve previous state
    previous_state = await checkpointer.aget_state(config)

    if previous_state:
        # Continue from where we left off
        print(f"Resuming conversation with {len(previous_state.context)} messages")
        return previous_state
    else:
        # Start fresh conversation
        return AgentState()

Performance Optimization Strategies

Cache Warming Patterns

# Warm cache for expected active users
async def warm_cache_for_users(user_ids: List[str]):
    """Preload likely-to-be-accessed conversations into cache."""

    for user_id in user_ids:
        recent_threads = await checkpointer.alist_threads(
            {"user_id": user_id}, 
            limit=3  # Most recent conversations
        )

        # Load recent conversations into cache
        for thread_info in recent_threads:
            config = {"thread_id": thread_info.thread_id, "user_id": user_id}
            await checkpointer.aget_state_cache(config)

Batch Operations for Efficiency

# Batch message insertion for better performance
async def log_conversation_batch(config: dict, messages: List[Message]):
    """Efficiently persist multiple messages."""

    # Single database transaction for multiple messages
    await checkpointer.aput_messages(config, messages)

    # More efficient than individual puts:
    # for msg in messages:
    #     await checkpointer.aput_messages(config, [msg])  # Avoid this

Error Handling and Recovery

Graceful Degradation Patterns

async def resilient_state_retrieval(config: dict):
    """Retrieve state with graceful fallback handling."""

    try:
        # Try cache first (fastest)
        state = await checkpointer.aget_state_cache(config)
        if state:
            return state

        # Fall back to database
        return await checkpointer.aget_state(config)

    except ConnectionError:
        # Final fallback: fresh state with warning
        logger.warning(f"Checkpointer unavailable for {config}, starting fresh")
        return AgentState()

Recovery and Repair Operations

async def repair_conversation_integrity(thread_id: str):
    """Repair conversation state from message history."""

    config = {"thread_id": thread_id}

    # Retrieve all messages
    messages = await checkpointer.alist_messages(config, limit=10000)

    # Reconstruct state from message history
    reconstructed_state = AgentState()
    reconstructed_state.context = messages

    # Update stored state
    await checkpointer.aput_state(config, reconstructed_state)

    print(f"Repaired state for {thread_id} with {len(messages)} messages")

Best Practices and Patterns

Configuration Management

# Use environment-based configuration
checkpointer = PgCheckpointer(
    postgres_dsn=os.environ["DATABASE_URL"],
    redis_url=os.environ.get("CACHE_URL"),
    cache_ttl=int(os.environ.get("CACHE_TTL", "3600"))
)

Resource Management

# Always initialize schema in production
await checkpointer.asetup()

# Clean shutdown in application lifecycle
async def shutdown():
    await checkpointer.arelease()

Monitoring and Observability

# Add metrics collection
class MonitoredCheckpointer(PgCheckpointer):
    async def aput_state(self, config, state):
        start_time = time.time()
        result = await super().aput_state(config, state)

        metrics.histogram("checkpointer.put_state.duration", 
                         time.time() - start_time)
        metrics.counter("checkpointer.put_state.calls").inc()

        return result

Security Considerations

# Use connection pooling with proper credentials
checkpointer = PgCheckpointer(
    postgres_dsn="postgresql://agent_user:secure_pass@db:5432/agent_db",
    # Rotate credentials regularly
    # Use connection encryption (sslmode=require)
    # Limit database permissions to minimum required
)

When to Use Different Checkpointers

InMemoryCheckpointer: Development & Testing

Perfect for: - Local development and testing - Demo applications and prototypes
- Unit tests requiring isolation - Single-session applications

Avoid for: - Production environments - Multi-user applications - Long-running conversations requiring persistence

PgCheckpointer: Production Applications

Perfect for: - Production deployments - Multi-user systems - Applications requiring conversation resume - Systems needing audit trails - Scalable, distributed architectures

Consider for: - High-throughput applications (with appropriate tuning) - Applications with complex state that benefits from ACID guarantees - Systems requiring advanced querying of conversation history

Conclusion: Session Memory as a Strategic Asset

The checkpointer system in PyAgenity transforms conversation persistence from a technical necessity into a strategic asset. By providing:

  • Dual-storage architecture for optimal performance and durability
  • Automatic dependency injection for seamless integration
  • Multiple implementation strategies for different deployment needs
  • Rich querying capabilities for analytics and debugging

Checkpointers enable you to build agents that not only function reliably but also provide rich user experiences, detailed observability, and the foundation for advanced features like conversation analytics and cross-session intelligence.

The key insight is that session memory is not just about persistence—it's about enabling experiences that would be impossible with ephemeral interactions alone.