Skip to content

Base store

Simplified Async-First Base Store for PyAgenity Framework

This module provides a clean, modern interface for memory stores with: - Async-first design for better performance - Core CRUD operations only - Message-specific convenience methods - Extensible for different backends (vector stores, managed services, etc.)

Classes:

Name Description
BaseStore

Simplified async-first base class for memory stores in PyAgenity.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

BaseStore

Bases: ABC

Simplified async-first base class for memory stores in PyAgenity.

This class provides a clean interface that supports: - Vector stores (Qdrant, Pinecone, Chroma, etc.) - Managed memory services (mem0, Zep, etc.) - Graph databases (Neo4j, etc.)

Key Design Principles: - Async-first for better performance - Core CRUD operations only - User and agent-centric operations - Extensible filtering and metadata

Methods:

Name Description
adelete

Delete a memory by ID.

aforget_memory

Delete a memory by for a user or agent.

aget

Get a specific memory by ID.

aget_all

Get a specific memory by user_id.

arelease

Clean up any resources used by the store (override in subclasses if needed).

asearch

Search memories by content similarity.

asetup

Asynchronous setup method for checkpointer.

astore

Add a new memory.

aupdate

Update an existing memory.

delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class BaseStore(ABC):
    """
    Simplified async-first base class for memory stores in PyAgenity.

    This class provides a clean interface that supports:
    - Vector stores (Qdrant, Pinecone, Chroma, etc.)
    - Managed memory services (mem0, Zep, etc.)
    - Graph databases (Neo4j, etc.)

    Key Design Principles:
    - Async-first for better performance
    - Core CRUD operations only
    - User and agent-centric operations
    - Extensible filtering and metadata
    """

    def setup(self) -> Any:
        """
        Synchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        return run_coroutine(self.asetup())

    async def asetup(self) -> Any:
        """
        Asynchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        raise NotImplementedError

    # --- Core Memory Operations ---

    @abstractmethod
    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """
        Add a new memory.

        Args:
            content: The memory content
            user_id: User identifier
            agent_id: Agent identifier
            memory_type: Type of memory (episodic, semantic, etc.)
            category: Memory category for organization
            metadata: Additional metadata
            **kwargs: Store-specific parameters

        Returns:
            Memory ID
        """
        raise NotImplementedError

    # --- Sync wrappers ---
    def store(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """Synchronous wrapper for `astore` that runs the async implementation."""
        return run_coroutine(
            self.astore(
                config,
                content,
                memory_type=memory_type,
                category=category,
                metadata=metadata,
                **kwargs,
            )
        )

    @abstractmethod
    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """
        Search memories by content similarity.

        Args:
            query: Search query
            user_id: Filter by user
            agent_id: Filter by agent
            memory_type: Filter by memory type
            category: Filter by category
            limit: Maximum results
            score_threshold: Minimum similarity score
            filters: Additional filters
            **kwargs: Store-specific parameters

        Returns:
            List of matching memories
        """
        raise NotImplementedError

    def search(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `asearch` that runs the async implementation."""
        return run_coroutine(
            self.asearch(
                config,
                query,
                memory_type=memory_type,
                category=category,
                limit=limit,
                score_threshold=score_threshold,
                filters=filters,
                retrieval_strategy=retrieval_strategy,
                distance_metric=distance_metric,
                max_tokens=max_tokens,
                **kwargs,
            )
        )

    @abstractmethod
    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> MemorySearchResult | None:
        """Get a specific memory by ID."""
        raise NotImplementedError

    @abstractmethod
    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Get a specific memory by user_id."""
        raise NotImplementedError

    def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget(config, memory_id, **kwargs))

    def get_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget_all(config, limit=limit, **kwargs))

    @abstractmethod
    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """
        Update an existing memory.

        Args:
            memory_id: ID of memory to update
            content: New content (optional)
            metadata: New/additional metadata (optional)
            **kwargs: Store-specific parameters
        """
        raise NotImplementedError

    def update(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """Synchronous wrapper for `aupdate` that runs the async implementation."""
        return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

    @abstractmethod
    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> Any:
        """Delete a memory by ID."""
        raise NotImplementedError

    def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
        """Synchronous wrapper for `adelete` that runs the async implementation."""
        return run_coroutine(self.adelete(config, memory_id, **kwargs))

    @abstractmethod
    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        raise NotImplementedError

    def forget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        return run_coroutine(self.aforget_memory(config, **kwargs))

    # --- Cleanup and Resource Management ---

    async def arelease(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        raise NotImplementedError

    def release(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        return run_coroutine(self.arelease())

Functions

adelete abstractmethod async
adelete(config, memory_id, **kwargs)

Delete a memory by ID.

Source code in pyagenity/store/base_store.py
237
238
239
240
241
242
243
244
245
@abstractmethod
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> Any:
    """Delete a memory by ID."""
    raise NotImplementedError
aforget_memory abstractmethod async
aforget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
251
252
253
254
255
256
257
258
@abstractmethod
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    raise NotImplementedError
aget abstractmethod async
aget(config, memory_id, **kwargs)

Get a specific memory by ID.

Source code in pyagenity/store/base_store.py
173
174
175
176
177
178
179
180
181
@abstractmethod
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> MemorySearchResult | None:
    """Get a specific memory by ID."""
    raise NotImplementedError
aget_all abstractmethod async
aget_all(config, limit=100, **kwargs)

Get a specific memory by user_id.

Source code in pyagenity/store/base_store.py
183
184
185
186
187
188
189
190
191
@abstractmethod
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Get a specific memory by user_id."""
    raise NotImplementedError
arelease async
arelease()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
270
271
272
async def arelease(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    raise NotImplementedError
asearch abstractmethod async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Search memories by content similarity.

Parameters:

Name Type Description Default
query
str

Search query

required
user_id

Filter by user

required
agent_id

Filter by agent

required
memory_type
MemoryType | None

Filter by memory type

None
category
str | None

Filter by category

None
limit
int

Maximum results

10
score_threshold
float | None

Minimum similarity score

None
filters
dict[str, Any] | None

Additional filters

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
list[MemorySearchResult]

List of matching memories

Source code in pyagenity/store/base_store.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@abstractmethod
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """
    Search memories by content similarity.

    Args:
        query: Search query
        user_id: Filter by user
        agent_id: Filter by agent
        memory_type: Filter by memory type
        category: Filter by category
        limit: Maximum results
        score_threshold: Minimum similarity score
        filters: Additional filters
        **kwargs: Store-specific parameters

    Returns:
        List of matching memories
    """
    raise NotImplementedError
asetup async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
48
49
50
51
52
53
54
55
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
astore abstractmethod async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Add a new memory.

Parameters:

Name Type Description Default
content
str | Message

The memory content

required
user_id

User identifier

required
agent_id

Agent identifier

required
memory_type
MemoryType

Type of memory (episodic, semantic, etc.)

EPISODIC
category
str

Memory category for organization

'general'
metadata
dict[str, Any] | None

Additional metadata

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
str

Memory ID

Source code in pyagenity/store/base_store.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@abstractmethod
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """
    Add a new memory.

    Args:
        content: The memory content
        user_id: User identifier
        agent_id: Agent identifier
        memory_type: Type of memory (episodic, semantic, etc.)
        category: Memory category for organization
        metadata: Additional metadata
        **kwargs: Store-specific parameters

    Returns:
        Memory ID
    """
    raise NotImplementedError
aupdate abstractmethod async
aupdate(config, memory_id, content, metadata=None, **kwargs)

Update an existing memory.

Parameters:

Name Type Description Default
memory_id
str

ID of memory to update

required
content
str | Message

New content (optional)

required
metadata
dict[str, Any] | None

New/additional metadata (optional)

None
**kwargs

Store-specific parameters

{}
Source code in pyagenity/store/base_store.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """
    Update an existing memory.

    Args:
        memory_id: ID of memory to update
        content: New content (optional)
        metadata: New/additional metadata (optional)
        **kwargs: Store-specific parameters
    """
    raise NotImplementedError
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions