BaseStore: The Store Abstraction Layer¶
The BaseStore is the foundational abstraction that defines the contract for all long-term memory implementations in Agentflow. It provides a clean, async-first interface that enables different storage backends—from vector databases to managed memory services—to work seamlessly within the framework.
Architecture Philosophy¶
The Abstraction Principle¶
Rather than locking you into a specific storage solution, Agentflow adopts a provider-agnostic approach to long-term memory. The BaseStore abstract base class defines a consistent API that different backends implement, allowing you to:
- Switch storage providers without changing agent code
- Experiment with different backends to find what works best
- Mix multiple stores for different use cases within the same application
- Build custom implementations tailored to specific requirements
from agentflow.store import BaseStore, QdrantStore, Mem0Store
# All implementations share the same interface
store_a: BaseStore = QdrantStore(embedding=embedding_service, path="./data")
store_b: BaseStore = Mem0Store(config=mem0_config)
# Same API works with any backend
memory_id = await store_a.astore(config, content, memory_type=MemoryType.EPISODIC)
results = await store_b.asearch(config, query="previous conversation")
Design Principles¶
The BaseStore interface is built on several key principles that guide its architecture:
1. Async-First for Performance
All core operations are asynchronous by default, with synchronous wrappers provided for compatibility:
# Async-first design (preferred)
memory_id = await store.astore(config, content)
# Sync wrapper available when needed
memory_id = store.store(config, content)
2. Configuration-Driven Context
Every operation accepts a config dictionary that provides context about the user, thread, and application scope:
config = {
"user_id": "alice",
"thread_id": "conversation_123",
"app_id": "customer_support"
}
# Config flows through all operations
await store.astore(config, content)
await store.asearch(config, query)
3. Content Flexibility
Store accepts both raw strings and structured Message objects, allowing seamless integration with agent workflows:
# Store string content
await store.astore(config, "User prefers technical documentation")
# Store Message objects directly
message = Message.from_text("Hello!", role="user")
await store.astore(config, message)
4. Rich Metadata Support
Every memory can be enriched with metadata, memory types, categories, and custom attributes:
await store.astore(
config=config,
content="User solved bug using debugger",
memory_type=MemoryType.EPISODIC,
category="problem_solving",
metadata={
"difficulty": "medium",
"tools_used": ["debugger", "logs"],
"time_to_solve": "15_minutes"
}
)
Core Operations¶
The BaseStore defines a comprehensive set of operations that cover the entire memory lifecycle:
Storage Operations¶
Store Individual Memories
async def astore(
self,
config: dict[str, Any],
content: str | Message,
memory_type: MemoryType = MemoryType.EPISODIC,
category: str = "general",
metadata: dict[str, Any] | None = None,
**kwargs,
) -> str:
"""Store a single memory and return its ID."""
The astore method is the primary way to persist knowledge. It returns a memory ID that can be used for future updates or deletions.
Retrieval Operations¶
Search by Similarity
async def asearch(
self,
config: dict[str, Any],
query: str,
memory_type: MemoryType | None = None,
category: str | None = None,
limit: int = 10,
score_threshold: float | None = None,
filters: dict[str, Any] | None = None,
retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
distance_metric: DistanceMetric = DistanceMetric.COSINE,
max_tokens: int = 4000,
**kwargs,
) -> list[MemorySearchResult]:
"""Search for relevant memories based on semantic similarity."""
The asearch method supports multiple retrieval strategies (similarity, temporal, hybrid) and flexible filtering.
Retrieve Specific Memories
async def aget(
self,
config: dict[str, Any],
memory_id: str,
**kwargs,
) -> MemorySearchResult | None:
"""Get a specific memory by its ID."""
async def aget_all(
self,
config: dict[str, Any],
limit: int = 100,
**kwargs,
) -> list[MemorySearchResult]:
"""Get all memories for a given user/thread context."""
Update and Delete Operations¶
Update Existing Memories
async def aupdate(
self,
config: dict[str, Any],
memory_id: str,
content: str | Message,
metadata: dict[str, Any] | None = None,
**kwargs,
) -> Any:
"""Update content or metadata of an existing memory."""
Delete Memories
async def adelete(
self,
config: dict[str, Any],
memory_id: str,
**kwargs,
) -> Any:
"""Delete a specific memory by ID."""
async def aforget_memory(
self,
config: dict[str, Any],
**kwargs,
) -> Any:
"""Delete all memories for a user or thread context."""
Resource Management¶
Setup and Cleanup
async def asetup(self) -> Any:
"""Initialize the store (create collections, connections, etc.)."""
async def arelease(self) -> None:
"""Clean up resources (close connections, release handles)."""
Memory Types and Schemas¶
The BaseStore works with well-defined data structures that provide type safety and consistency:
MemoryType Enumeration¶
class MemoryType(str, Enum):
EPISODIC = "episodic" # Specific experiences and events
SEMANTIC = "semantic" # Factual knowledge and insights
PROCEDURAL = "procedural" # Process and workflow knowledge
ENTITY = "entity" # Entity-specific information
RELATIONSHIP = "relationship" # Entity relationships
DECLARATIVE = "declarative" # Explicit facts and declarations
CUSTOM = "custom" # Custom memory types
MemorySearchResult Model¶
class MemorySearchResult(BaseModel):
id: str # Unique memory identifier
content: str # Memory content
score: float # Relevance/similarity score
memory_type: MemoryType # Type classification
metadata: dict[str, Any] # Additional metadata
vector: list[float] | None # Optional embedding vector
user_id: str | None # User context
thread_id: str | None # Thread context
timestamp: datetime | None # Creation/update time
RetrievalStrategy Options¶
class RetrievalStrategy(str, Enum):
SIMILARITY = "similarity" # Vector similarity search
TEMPORAL = "temporal" # Time-based retrieval
RELEVANCE = "relevance" # Relevance scoring
HYBRID = "hybrid" # Combined approaches
GRAPH_TRAVERSAL = "graph_traversal" # Knowledge graph navigation
Implementation Guidelines¶
When implementing your own BaseStore subclass, follow these guidelines:
Required Method Implementations¶
All abstract methods must be implemented:
from agentflow.store import BaseStore
class MyCustomStore(BaseStore):
async def asetup(self) -> Any:
"""Initialize your storage backend."""
# Connect to database, create schemas, etc.
pass
async def astore(self, config, content, memory_type, category, metadata, **kwargs) -> str:
"""Store memory and return ID."""
# Your storage logic here
return generated_memory_id
async def asearch(self, config, query, **kwargs) -> list[MemorySearchResult]:
"""Search for relevant memories."""
# Your search logic here
return results
# ... implement all other abstract methods
Configuration Handling¶
Parse and use the configuration dictionary consistently:
async def astore(self, config, content, **kwargs):
user_id = config.get("user_id")
thread_id = config.get("thread_id")
app_id = config.get("app_id")
# Use these for scoping and filtering
# Store them with the memory for future retrieval
Error Handling¶
Provide clear error messages and handle edge cases gracefully:
async def aget(self, config, memory_id, **kwargs):
if not memory_id:
raise ValueError("memory_id cannot be empty")
try:
result = await self._fetch_from_backend(memory_id)
if result is None:
return None # Memory not found
return self._convert_to_search_result(result)
except ConnectionError as e:
raise RuntimeError(f"Failed to connect to storage backend: {e}")
Resource Management¶
Always implement proper cleanup:
async def arelease(self):
"""Clean up all resources."""
if self.client:
await self.client.close()
if self.connection_pool:
await self.connection_pool.shutdown()
Backend Implementations¶
Agentflow provides two production-ready implementations of BaseStore:
Vector Database: QdrantStore¶
Best for semantic search and similarity-based retrieval:
from agentflow.store import QdrantStore
from agentflow.store.embedding import OpenAIEmbedding
store = QdrantStore(
embedding=OpenAIEmbedding(),
path="./qdrant_data"
)
Key Features: - Local or cloud deployment - Multiple distance metrics - Rich filtering capabilities - Efficient vector search
Managed Service: Mem0Store¶
Best for managed memory with built-in intelligence:
Key Features: - Managed infrastructure - Built-in memory optimization - Automatic deduplication - Enterprise features
Integration Patterns¶
Dependency Injection¶
The recommended way to use stores in agent nodes:
from injectq import Inject
from agentflow.store import BaseStore
async def knowledge_agent(
state: AgentState,
config: dict,
store: BaseStore = Inject[BaseStore]
) -> AgentState:
"""Agent with injected store dependency."""
# Store is automatically injected
relevant = await store.asearch(config, query=state.context[-1].text())
# Use retrieved knowledge
return enhanced_state
Graph Configuration¶
Register your store during graph compilation:
from agentflow.graph import StateGraph
from injectq import InjectQ
# Create store instance
store = QdrantStore(embedding=embedding_service, path="./data")
await store.asetup()
# Create DI container
di = InjectQ()
di.register(BaseStore, store)
# Compile graph with DI
graph = workflow.compile(injector=di)
# Store is now available to all nodes
result = await graph.ainvoke(initial_state, config)
Performance Considerations¶
Batch Operations¶
While BaseStore doesn't mandate batch operations, implementations should support them for efficiency:
# Some implementations provide batch methods
if hasattr(store, 'abatch_store'):
batch_id = await store.abatch_store(
config=config,
content=["memory 1", "memory 2", "memory 3"]
)
Caching and Optimization¶
Consider implementing caching layers for frequently accessed memories:
class CachedStore(BaseStore):
def __init__(self, backend: BaseStore, cache_ttl: int = 300):
self.backend = backend
self.cache = {} # In-memory cache
self.cache_ttl = cache_ttl
async def asearch(self, config, query, **kwargs):
cache_key = f"{query}:{config.get('user_id')}"
if cache_key in self.cache:
return self.cache[cache_key]
results = await self.backend.asearch(config, query, **kwargs)
self.cache[cache_key] = results
return results
Conclusion¶
The BaseStore abstraction is designed to provide maximum flexibility while maintaining a consistent, intuitive API. By separating the interface from implementation, it allows:
- Freedom of choice in storage backends
- Easy testing through mock implementations
- Gradual migration between storage solutions
- Custom implementations for specialized needs
Whether you're using the built-in QdrantStore and Mem0Store implementations or building your own, the BaseStore contract ensures your agent code remains clean, testable, and portable across different storage backends.