Skip to content

Store

Modules:

Name Description
base_store

Simplified Async-First Base Store for PyAgenity Framework

embedding
mem0_store

Mem0 Long-Term Memory Store

qdrant_store

Qdrant Vector Store Implementation for PyAgenity Framework

store_schema

Classes:

Name Description
BaseEmbedding
BaseStore

Simplified async-first base class for memory stores in PyAgenity.

DistanceMetric

Supported distance metrics for vector similarity.

MemoryRecord

Comprehensive memory record for storage (Pydantic model).

MemorySearchResult

Result from a memory search operation (Pydantic model).

MemoryType

Types of memories that can be stored.

OpenAIEmbedding

Attributes

__all__ module-attribute

__all__ = ['BaseEmbedding', 'BaseStore', 'DistanceMetric', 'MemoryRecord', 'MemorySearchResult', 'MemoryType', 'OpenAIEmbedding']

Classes

BaseEmbedding

Bases: ABC

Methods:

Name Description
aembed

Generate embedding for a single text.

aembed_batch

Generate embeddings for a list of texts.

embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
dimension int

Synchronous wrapper for that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseEmbedding(ABC):
    def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
        return run_coroutine(self.aembed_batch(texts))

    @abstractmethod
    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        """Generate embeddings for a list of texts."""
        # pragma: no cover

    def embed(self, text: str) -> list[float]:
        """Synchronous wrapper for `aembed` that runs the async implementation."""
        return run_coroutine(self.aembed(text))

    @abstractmethod
    async def aembed(self, text: str) -> list[float]:
        """Generate embedding for a single text."""
        raise NotImplementedError

    @property
    @abstractmethod
    def dimension(self) -> int:
        """Synchronous wrapper for that runs the async implementation."""
        raise NotImplementedError

Attributes

dimension abstractmethod property
dimension

Synchronous wrapper for that runs the async implementation.

Functions

aembed abstractmethod async
aembed(text)

Generate embedding for a single text.

Source code in pyagenity/store/embedding/base_embedding.py
20
21
22
23
@abstractmethod
async def aembed(self, text: str) -> list[float]:
    """Generate embedding for a single text."""
    raise NotImplementedError
aembed_batch abstractmethod async
aembed_batch(texts)

Generate embeddings for a list of texts.

Source code in pyagenity/store/embedding/base_embedding.py
11
12
13
@abstractmethod
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a list of texts."""
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))

BaseStore

Bases: ABC

Simplified async-first base class for memory stores in PyAgenity.

This class provides a clean interface that supports: - Vector stores (Qdrant, Pinecone, Chroma, etc.) - Managed memory services (mem0, Zep, etc.) - Graph databases (Neo4j, etc.)

Key Design Principles: - Async-first for better performance - Core CRUD operations only - User and agent-centric operations - Extensible filtering and metadata

Methods:

Name Description
adelete

Delete a memory by ID.

aforget_memory

Delete a memory by for a user or agent.

aget

Get a specific memory by ID.

aget_all

Get a specific memory by user_id.

arelease

Clean up any resources used by the store (override in subclasses if needed).

asearch

Search memories by content similarity.

asetup

Asynchronous setup method for checkpointer.

astore

Add a new memory.

aupdate

Update an existing memory.

delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class BaseStore(ABC):
    """
    Simplified async-first base class for memory stores in PyAgenity.

    This class provides a clean interface that supports:
    - Vector stores (Qdrant, Pinecone, Chroma, etc.)
    - Managed memory services (mem0, Zep, etc.)
    - Graph databases (Neo4j, etc.)

    Key Design Principles:
    - Async-first for better performance
    - Core CRUD operations only
    - User and agent-centric operations
    - Extensible filtering and metadata
    """

    def setup(self) -> Any:
        """
        Synchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        return run_coroutine(self.asetup())

    async def asetup(self) -> Any:
        """
        Asynchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        raise NotImplementedError

    # --- Core Memory Operations ---

    @abstractmethod
    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """
        Add a new memory.

        Args:
            content: The memory content
            user_id: User identifier
            agent_id: Agent identifier
            memory_type: Type of memory (episodic, semantic, etc.)
            category: Memory category for organization
            metadata: Additional metadata
            **kwargs: Store-specific parameters

        Returns:
            Memory ID
        """
        raise NotImplementedError

    # --- Sync wrappers ---
    def store(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """Synchronous wrapper for `astore` that runs the async implementation."""
        return run_coroutine(
            self.astore(
                config,
                content,
                memory_type=memory_type,
                category=category,
                metadata=metadata,
                **kwargs,
            )
        )

    @abstractmethod
    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """
        Search memories by content similarity.

        Args:
            query: Search query
            user_id: Filter by user
            agent_id: Filter by agent
            memory_type: Filter by memory type
            category: Filter by category
            limit: Maximum results
            score_threshold: Minimum similarity score
            filters: Additional filters
            **kwargs: Store-specific parameters

        Returns:
            List of matching memories
        """
        raise NotImplementedError

    def search(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `asearch` that runs the async implementation."""
        return run_coroutine(
            self.asearch(
                config,
                query,
                memory_type=memory_type,
                category=category,
                limit=limit,
                score_threshold=score_threshold,
                filters=filters,
                retrieval_strategy=retrieval_strategy,
                distance_metric=distance_metric,
                max_tokens=max_tokens,
                **kwargs,
            )
        )

    @abstractmethod
    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> MemorySearchResult | None:
        """Get a specific memory by ID."""
        raise NotImplementedError

    @abstractmethod
    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Get a specific memory by user_id."""
        raise NotImplementedError

    def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget(config, memory_id, **kwargs))

    def get_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget_all(config, limit=limit, **kwargs))

    @abstractmethod
    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """
        Update an existing memory.

        Args:
            memory_id: ID of memory to update
            content: New content (optional)
            metadata: New/additional metadata (optional)
            **kwargs: Store-specific parameters
        """
        raise NotImplementedError

    def update(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """Synchronous wrapper for `aupdate` that runs the async implementation."""
        return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

    @abstractmethod
    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> Any:
        """Delete a memory by ID."""
        raise NotImplementedError

    def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
        """Synchronous wrapper for `adelete` that runs the async implementation."""
        return run_coroutine(self.adelete(config, memory_id, **kwargs))

    @abstractmethod
    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        raise NotImplementedError

    def forget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        return run_coroutine(self.aforget_memory(config, **kwargs))

    # --- Cleanup and Resource Management ---

    async def arelease(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        raise NotImplementedError

    def release(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        return run_coroutine(self.arelease())

Functions

adelete abstractmethod async
adelete(config, memory_id, **kwargs)

Delete a memory by ID.

Source code in pyagenity/store/base_store.py
237
238
239
240
241
242
243
244
245
@abstractmethod
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> Any:
    """Delete a memory by ID."""
    raise NotImplementedError
aforget_memory abstractmethod async
aforget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
251
252
253
254
255
256
257
258
@abstractmethod
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    raise NotImplementedError
aget abstractmethod async
aget(config, memory_id, **kwargs)

Get a specific memory by ID.

Source code in pyagenity/store/base_store.py
173
174
175
176
177
178
179
180
181
@abstractmethod
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> MemorySearchResult | None:
    """Get a specific memory by ID."""
    raise NotImplementedError
aget_all abstractmethod async
aget_all(config, limit=100, **kwargs)

Get a specific memory by user_id.

Source code in pyagenity/store/base_store.py
183
184
185
186
187
188
189
190
191
@abstractmethod
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Get a specific memory by user_id."""
    raise NotImplementedError
arelease async
arelease()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
270
271
272
async def arelease(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    raise NotImplementedError
asearch abstractmethod async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Search memories by content similarity.

Parameters:

Name Type Description Default
query
str

Search query

required
user_id

Filter by user

required
agent_id

Filter by agent

required
memory_type
MemoryType | None

Filter by memory type

None
category
str | None

Filter by category

None
limit
int

Maximum results

10
score_threshold
float | None

Minimum similarity score

None
filters
dict[str, Any] | None

Additional filters

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
list[MemorySearchResult]

List of matching memories

Source code in pyagenity/store/base_store.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@abstractmethod
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """
    Search memories by content similarity.

    Args:
        query: Search query
        user_id: Filter by user
        agent_id: Filter by agent
        memory_type: Filter by memory type
        category: Filter by category
        limit: Maximum results
        score_threshold: Minimum similarity score
        filters: Additional filters
        **kwargs: Store-specific parameters

    Returns:
        List of matching memories
    """
    raise NotImplementedError
asetup async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
48
49
50
51
52
53
54
55
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
astore abstractmethod async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Add a new memory.

Parameters:

Name Type Description Default
content
str | Message

The memory content

required
user_id

User identifier

required
agent_id

Agent identifier

required
memory_type
MemoryType

Type of memory (episodic, semantic, etc.)

EPISODIC
category
str

Memory category for organization

'general'
metadata
dict[str, Any] | None

Additional metadata

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
str

Memory ID

Source code in pyagenity/store/base_store.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@abstractmethod
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """
    Add a new memory.

    Args:
        content: The memory content
        user_id: User identifier
        agent_id: Agent identifier
        memory_type: Type of memory (episodic, semantic, etc.)
        category: Memory category for organization
        metadata: Additional metadata
        **kwargs: Store-specific parameters

    Returns:
        Memory ID
    """
    raise NotImplementedError
aupdate abstractmethod async
aupdate(config, memory_id, content, metadata=None, **kwargs)

Update an existing memory.

Parameters:

Name Type Description Default
memory_id
str

ID of memory to update

required
content
str | Message

New content (optional)

required
metadata
dict[str, Any] | None

New/additional metadata (optional)

None
**kwargs

Store-specific parameters

{}
Source code in pyagenity/store/base_store.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """
    Update an existing memory.

    Args:
        memory_id: ID of memory to update
        content: New content (optional)
        metadata: New/additional metadata (optional)
        **kwargs: Store-specific parameters
    """
    raise NotImplementedError
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

DistanceMetric

Bases: Enum

Supported distance metrics for vector similarity.

Attributes:

Name Type Description
COSINE
DOT_PRODUCT
EUCLIDEAN
MANHATTAN
Source code in pyagenity/store/store_schema.py
21
22
23
24
25
26
27
class DistanceMetric(Enum):
    """Supported distance metrics for vector similarity."""

    COSINE = "cosine"
    EUCLIDEAN = "euclidean"
    DOT_PRODUCT = "dot_product"
    MANHATTAN = "manhattan"

Attributes

COSINE class-attribute instance-attribute
COSINE = 'cosine'
DOT_PRODUCT class-attribute instance-attribute
DOT_PRODUCT = 'dot_product'
EUCLIDEAN class-attribute instance-attribute
EUCLIDEAN = 'euclidean'
MANHATTAN class-attribute instance-attribute
MANHATTAN = 'manhattan'

MemoryRecord

Bases: BaseModel

Comprehensive memory record for storage (Pydantic model).

Methods:

Name Description
from_message
validate_vector

Attributes:

Name Type Description
category str
content str
id str
memory_type MemoryType
metadata dict[str, Any]
thread_id str | None
timestamp datetime | None
user_id str | None
vector list[float] | None
Source code in pyagenity/store/store_schema.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class MemoryRecord(BaseModel):
    """Comprehensive memory record for storage (Pydantic model)."""

    id: str = Field(default_factory=lambda: str(uuid4()))
    content: str
    user_id: str | None = None
    thread_id: str | None = None
    memory_type: MemoryType = Field(default=MemoryType.EPISODIC)
    metadata: dict[str, Any] = Field(default_factory=dict)
    category: str = Field(default="general")
    vector: list[float] | None = None
    timestamp: datetime | None = Field(default_factory=datetime.now)

    @field_validator("vector")
    @classmethod
    def validate_vector(cls, v):
        if v is not None and (
            not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
        ):
            raise ValueError("vector must be list[float] or None")
        return v

    @classmethod
    def from_message(
        cls,
        message: Message,
        user_id: str | None = None,
        thread_id: str | None = None,
        vector: list[float] | None = None,
        additional_metadata: dict[str, Any] | None = None,
    ) -> "MemoryRecord":
        content = message.text()
        metadata = {
            "role": message.role,
            "message_id": str(message.message_id),
            "timestamp": message.timestamp.isoformat() if message.timestamp else None,
            "has_tool_calls": bool(message.tools_calls),
            "has_reasoning": bool(message.reasoning),
            "token_usage": message.usages.model_dump() if message.usages else None,
            **(additional_metadata or {}),
        }
        return cls(
            content=content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=MemoryType.EPISODIC,
            metadata=metadata,
            vector=vector,
        )

Attributes

category class-attribute instance-attribute
category = Field(default='general')
content instance-attribute
content
id class-attribute instance-attribute
id = Field(default_factory=lambda: str(uuid4()))
memory_type class-attribute instance-attribute
memory_type = Field(default=EPISODIC)
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
user_id class-attribute instance-attribute
user_id = None
vector class-attribute instance-attribute
vector = None

Functions

from_message classmethod
from_message(message, user_id=None, thread_id=None, vector=None, additional_metadata=None)
Source code in pyagenity/store/store_schema.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@classmethod
def from_message(
    cls,
    message: Message,
    user_id: str | None = None,
    thread_id: str | None = None,
    vector: list[float] | None = None,
    additional_metadata: dict[str, Any] | None = None,
) -> "MemoryRecord":
    content = message.text()
    metadata = {
        "role": message.role,
        "message_id": str(message.message_id),
        "timestamp": message.timestamp.isoformat() if message.timestamp else None,
        "has_tool_calls": bool(message.tools_calls),
        "has_reasoning": bool(message.reasoning),
        "token_usage": message.usages.model_dump() if message.usages else None,
        **(additional_metadata or {}),
    }
    return cls(
        content=content,
        user_id=user_id,
        thread_id=thread_id,
        memory_type=MemoryType.EPISODIC,
        metadata=metadata,
        vector=vector,
    )
validate_vector classmethod
validate_vector(v)
Source code in pyagenity/store/store_schema.py
78
79
80
81
82
83
84
85
@field_validator("vector")
@classmethod
def validate_vector(cls, v):
    if v is not None and (
        not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
    ):
        raise ValueError("vector must be list[float] or None")
    return v

MemorySearchResult

Bases: BaseModel

Result from a memory search operation (Pydantic model).

Methods:

Name Description
validate_vector

Attributes:

Name Type Description
content str
id str
memory_type MemoryType
metadata dict[str, Any]
score float
thread_id str | None
timestamp datetime | None
user_id str | None
vector list[float] | None
Source code in pyagenity/store/store_schema.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class MemorySearchResult(BaseModel):
    """Result from a memory search operation (Pydantic model)."""

    id: str = Field(default_factory=lambda: str(uuid4()))
    content: str = Field(default="", description="Primary textual content of the memory")
    score: float = Field(default=0.0, ge=0.0, description="Similarity / relevance score")
    memory_type: MemoryType = Field(default=MemoryType.EPISODIC)
    metadata: dict[str, Any] = Field(default_factory=dict)
    vector: list[float] | None = Field(default=None)
    user_id: str | None = None
    thread_id: str | None = None
    timestamp: datetime | None = Field(default_factory=datetime.now)

    @field_validator("vector")
    @classmethod
    def validate_vector(cls, v):
        if v is not None and (
            not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
        ):
            raise ValueError("vector must be list[float] or None")
        return v

Attributes

content class-attribute instance-attribute
content = Field(default='', description='Primary textual content of the memory')
id class-attribute instance-attribute
id = Field(default_factory=lambda: str(uuid4()))
memory_type class-attribute instance-attribute
memory_type = Field(default=EPISODIC)
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
score class-attribute instance-attribute
score = Field(default=0.0, ge=0.0, description='Similarity / relevance score')
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
user_id class-attribute instance-attribute
user_id = None
vector class-attribute instance-attribute
vector = Field(default=None)

Functions

validate_vector classmethod
validate_vector(v)
Source code in pyagenity/store/store_schema.py
55
56
57
58
59
60
61
62
@field_validator("vector")
@classmethod
def validate_vector(cls, v):
    if v is not None and (
        not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
    ):
        raise ValueError("vector must be list[float] or None")
    return v

MemoryType

Bases: Enum

Types of memories that can be stored.

Attributes:

Name Type Description
CUSTOM
DECLARATIVE
ENTITY
EPISODIC
PROCEDURAL
RELATIONSHIP
SEMANTIC
Source code in pyagenity/store/store_schema.py
30
31
32
33
34
35
36
37
38
39
class MemoryType(Enum):
    """Types of memories that can be stored."""

    EPISODIC = "episodic"  # Conversation memories
    SEMANTIC = "semantic"  # Facts and knowledge
    PROCEDURAL = "procedural"  # How-to knowledge
    ENTITY = "entity"  # Entity-based memories
    RELATIONSHIP = "relationship"  # Entity relationships
    CUSTOM = "custom"  # Custom memory types
    DECLARATIVE = "declarative"  # Explicit facts and events

Attributes

CUSTOM class-attribute instance-attribute
CUSTOM = 'custom'
DECLARATIVE class-attribute instance-attribute
DECLARATIVE = 'declarative'
ENTITY class-attribute instance-attribute
ENTITY = 'entity'
EPISODIC class-attribute instance-attribute
EPISODIC = 'episodic'
PROCEDURAL class-attribute instance-attribute
PROCEDURAL = 'procedural'
RELATIONSHIP class-attribute instance-attribute
RELATIONSHIP = 'relationship'
SEMANTIC class-attribute instance-attribute
SEMANTIC = 'semantic'

OpenAIEmbedding

Bases: BaseEmbedding

Methods:

Name Description
__init__
aembed
aembed_batch
embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
api_key
client
dimension int
model
Source code in pyagenity/store/embedding/openai_embedding.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class OpenAIEmbedding(BaseEmbedding):
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        OPENAI_API_KEY: str | None = None,
    ) -> None:
        if not HAS_OPENAI:
            raise ImportError(
                "The 'openai' package is required for OpenAIEmbedding. "
                "Please install it via 'pip install openai'."
            )
        self.model = model
        if OPENAI_API_KEY:
            self.api_key = OPENAI_API_KEY
        elif "OPENAI_API_KEY" in os.environ:
            self.api_key = os.environ["OPENAI_API_KEY"]
        else:
            raise ValueError(
                "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
            )

        self.client = AsyncOpenAI(
            api_key=self.api_key,
        )

    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        try:
            response = await self.client.embeddings.create(
                input=texts,
                model=self.model,
            )
            return [data.embedding for data in response.data]
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    async def aembed(self, text: str) -> list[float]:
        try:
            response = await self.client.embeddings.create(
                input=text,
                model=self.model,
            )
            return response.data[0].embedding if response.data else []
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    @property
    def dimension(self) -> int:
        model_dimensions = {
            "text-embedding-3-small": 1536,
            "text-embedding-3-large": 1536,
            "text-embedding-3-xl": 1536,
            "text-embedding-4-base": 8192,
            "text-embedding-4-large": 8192,
        }
        if self.model in model_dimensions:
            return model_dimensions[self.model]
        raise ValueError(f"Unknown model '{self.model}'. Cannot determine dimension.")

Attributes

api_key instance-attribute
api_key = OPENAI_API_KEY
client instance-attribute
client = AsyncOpenAI(api_key=api_key)
dimension property
dimension
model instance-attribute
model = model

Functions

__init__
__init__(model='text-embedding-3-small', OPENAI_API_KEY=None)
Source code in pyagenity/store/embedding/openai_embedding.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    model: str = "text-embedding-3-small",
    OPENAI_API_KEY: str | None = None,
) -> None:
    if not HAS_OPENAI:
        raise ImportError(
            "The 'openai' package is required for OpenAIEmbedding. "
            "Please install it via 'pip install openai'."
        )
    self.model = model
    if OPENAI_API_KEY:
        self.api_key = OPENAI_API_KEY
    elif "OPENAI_API_KEY" in os.environ:
        self.api_key = os.environ["OPENAI_API_KEY"]
    else:
        raise ValueError(
            "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
        )

    self.client = AsyncOpenAI(
        api_key=self.api_key,
    )
aembed async
aembed(text)
Source code in pyagenity/store/embedding/openai_embedding.py
54
55
56
57
58
59
60
61
62
async def aembed(self, text: str) -> list[float]:
    try:
        response = await self.client.embeddings.create(
            input=text,
            model=self.model,
        )
        return response.data[0].embedding if response.data else []
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
aembed_batch async
aembed_batch(texts)
Source code in pyagenity/store/embedding/openai_embedding.py
44
45
46
47
48
49
50
51
52
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    try:
        response = await self.client.embeddings.create(
            input=texts,
            model=self.model,
        )
        return [data.embedding for data in response.data]
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))

Functions

Modules

base_store

Simplified Async-First Base Store for PyAgenity Framework

This module provides a clean, modern interface for memory stores with: - Async-first design for better performance - Core CRUD operations only - Message-specific convenience methods - Extensible for different backends (vector stores, managed services, etc.)

Classes:

Name Description
BaseStore

Simplified async-first base class for memory stores in PyAgenity.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

BaseStore

Bases: ABC

Simplified async-first base class for memory stores in PyAgenity.

This class provides a clean interface that supports: - Vector stores (Qdrant, Pinecone, Chroma, etc.) - Managed memory services (mem0, Zep, etc.) - Graph databases (Neo4j, etc.)

Key Design Principles: - Async-first for better performance - Core CRUD operations only - User and agent-centric operations - Extensible filtering and metadata

Methods:

Name Description
adelete

Delete a memory by ID.

aforget_memory

Delete a memory by for a user or agent.

aget

Get a specific memory by ID.

aget_all

Get a specific memory by user_id.

arelease

Clean up any resources used by the store (override in subclasses if needed).

asearch

Search memories by content similarity.

asetup

Asynchronous setup method for checkpointer.

astore

Add a new memory.

aupdate

Update an existing memory.

delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
class BaseStore(ABC):
    """
    Simplified async-first base class for memory stores in PyAgenity.

    This class provides a clean interface that supports:
    - Vector stores (Qdrant, Pinecone, Chroma, etc.)
    - Managed memory services (mem0, Zep, etc.)
    - Graph databases (Neo4j, etc.)

    Key Design Principles:
    - Async-first for better performance
    - Core CRUD operations only
    - User and agent-centric operations
    - Extensible filtering and metadata
    """

    def setup(self) -> Any:
        """
        Synchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        return run_coroutine(self.asetup())

    async def asetup(self) -> Any:
        """
        Asynchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        raise NotImplementedError

    # --- Core Memory Operations ---

    @abstractmethod
    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """
        Add a new memory.

        Args:
            content: The memory content
            user_id: User identifier
            agent_id: Agent identifier
            memory_type: Type of memory (episodic, semantic, etc.)
            category: Memory category for organization
            metadata: Additional metadata
            **kwargs: Store-specific parameters

        Returns:
            Memory ID
        """
        raise NotImplementedError

    # --- Sync wrappers ---
    def store(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """Synchronous wrapper for `astore` that runs the async implementation."""
        return run_coroutine(
            self.astore(
                config,
                content,
                memory_type=memory_type,
                category=category,
                metadata=metadata,
                **kwargs,
            )
        )

    @abstractmethod
    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """
        Search memories by content similarity.

        Args:
            query: Search query
            user_id: Filter by user
            agent_id: Filter by agent
            memory_type: Filter by memory type
            category: Filter by category
            limit: Maximum results
            score_threshold: Minimum similarity score
            filters: Additional filters
            **kwargs: Store-specific parameters

        Returns:
            List of matching memories
        """
        raise NotImplementedError

    def search(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `asearch` that runs the async implementation."""
        return run_coroutine(
            self.asearch(
                config,
                query,
                memory_type=memory_type,
                category=category,
                limit=limit,
                score_threshold=score_threshold,
                filters=filters,
                retrieval_strategy=retrieval_strategy,
                distance_metric=distance_metric,
                max_tokens=max_tokens,
                **kwargs,
            )
        )

    @abstractmethod
    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> MemorySearchResult | None:
        """Get a specific memory by ID."""
        raise NotImplementedError

    @abstractmethod
    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Get a specific memory by user_id."""
        raise NotImplementedError

    def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget(config, memory_id, **kwargs))

    def get_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Synchronous wrapper for `aget` that runs the async implementation."""
        return run_coroutine(self.aget_all(config, limit=limit, **kwargs))

    @abstractmethod
    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """
        Update an existing memory.

        Args:
            memory_id: ID of memory to update
            content: New content (optional)
            metadata: New/additional metadata (optional)
            **kwargs: Store-specific parameters
        """
        raise NotImplementedError

    def update(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> Any:
        """Synchronous wrapper for `aupdate` that runs the async implementation."""
        return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

    @abstractmethod
    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> Any:
        """Delete a memory by ID."""
        raise NotImplementedError

    def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
        """Synchronous wrapper for `adelete` that runs the async implementation."""
        return run_coroutine(self.adelete(config, memory_id, **kwargs))

    @abstractmethod
    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        raise NotImplementedError

    def forget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> Any:
        """Delete a memory by for a user or agent."""
        return run_coroutine(self.aforget_memory(config, **kwargs))

    # --- Cleanup and Resource Management ---

    async def arelease(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        raise NotImplementedError

    def release(self) -> None:
        """Clean up any resources used by the store (override in subclasses if needed)."""
        return run_coroutine(self.arelease())
Functions
adelete abstractmethod async
adelete(config, memory_id, **kwargs)

Delete a memory by ID.

Source code in pyagenity/store/base_store.py
237
238
239
240
241
242
243
244
245
@abstractmethod
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> Any:
    """Delete a memory by ID."""
    raise NotImplementedError
aforget_memory abstractmethod async
aforget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
251
252
253
254
255
256
257
258
@abstractmethod
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    raise NotImplementedError
aget abstractmethod async
aget(config, memory_id, **kwargs)

Get a specific memory by ID.

Source code in pyagenity/store/base_store.py
173
174
175
176
177
178
179
180
181
@abstractmethod
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> MemorySearchResult | None:
    """Get a specific memory by ID."""
    raise NotImplementedError
aget_all abstractmethod async
aget_all(config, limit=100, **kwargs)

Get a specific memory by user_id.

Source code in pyagenity/store/base_store.py
183
184
185
186
187
188
189
190
191
@abstractmethod
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Get a specific memory by user_id."""
    raise NotImplementedError
arelease async
arelease()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
270
271
272
async def arelease(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    raise NotImplementedError
asearch abstractmethod async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Search memories by content similarity.

Parameters:

Name Type Description Default
query str

Search query

required
user_id

Filter by user

required
agent_id

Filter by agent

required
memory_type MemoryType | None

Filter by memory type

None
category str | None

Filter by category

None
limit int

Maximum results

10
score_threshold float | None

Minimum similarity score

None
filters dict[str, Any] | None

Additional filters

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
list[MemorySearchResult]

List of matching memories

Source code in pyagenity/store/base_store.py
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
@abstractmethod
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """
    Search memories by content similarity.

    Args:
        query: Search query
        user_id: Filter by user
        agent_id: Filter by agent
        memory_type: Filter by memory type
        category: Filter by category
        limit: Maximum results
        score_threshold: Minimum similarity score
        filters: Additional filters
        **kwargs: Store-specific parameters

    Returns:
        List of matching memories
    """
    raise NotImplementedError
asetup async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
48
49
50
51
52
53
54
55
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
astore abstractmethod async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Add a new memory.

Parameters:

Name Type Description Default
content str | Message

The memory content

required
user_id

User identifier

required
agent_id

Agent identifier

required
memory_type MemoryType

Type of memory (episodic, semantic, etc.)

EPISODIC
category str

Memory category for organization

'general'
metadata dict[str, Any] | None

Additional metadata

None
**kwargs

Store-specific parameters

{}

Returns:

Type Description
str

Memory ID

Source code in pyagenity/store/base_store.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
@abstractmethod
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """
    Add a new memory.

    Args:
        content: The memory content
        user_id: User identifier
        agent_id: Agent identifier
        memory_type: Type of memory (episodic, semantic, etc.)
        category: Memory category for organization
        metadata: Additional metadata
        **kwargs: Store-specific parameters

    Returns:
        Memory ID
    """
    raise NotImplementedError
aupdate abstractmethod async
aupdate(config, memory_id, content, metadata=None, **kwargs)

Update an existing memory.

Parameters:

Name Type Description Default
memory_id str

ID of memory to update

required
content str | Message

New content (optional)

required
metadata dict[str, Any] | None

New/additional metadata (optional)

None
**kwargs

Store-specific parameters

{}
Source code in pyagenity/store/base_store.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
@abstractmethod
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """
    Update an existing memory.

    Args:
        memory_id: ID of memory to update
        content: New content (optional)
        metadata: New/additional metadata (optional)
        **kwargs: Store-specific parameters
    """
    raise NotImplementedError
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions

embedding

Modules:

Name Description
base_embedding
openai_embedding

Classes:

Name Description
BaseEmbedding
OpenAIEmbedding

Attributes

__all__ module-attribute
__all__ = ['BaseEmbedding', 'OpenAIEmbedding']

Classes

BaseEmbedding

Bases: ABC

Methods:

Name Description
aembed

Generate embedding for a single text.

aembed_batch

Generate embeddings for a list of texts.

embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
dimension int

Synchronous wrapper for that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseEmbedding(ABC):
    def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
        return run_coroutine(self.aembed_batch(texts))

    @abstractmethod
    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        """Generate embeddings for a list of texts."""
        # pragma: no cover

    def embed(self, text: str) -> list[float]:
        """Synchronous wrapper for `aembed` that runs the async implementation."""
        return run_coroutine(self.aembed(text))

    @abstractmethod
    async def aembed(self, text: str) -> list[float]:
        """Generate embedding for a single text."""
        raise NotImplementedError

    @property
    @abstractmethod
    def dimension(self) -> int:
        """Synchronous wrapper for that runs the async implementation."""
        raise NotImplementedError
Attributes
dimension abstractmethod property
dimension

Synchronous wrapper for that runs the async implementation.

Functions
aembed abstractmethod async
aembed(text)

Generate embedding for a single text.

Source code in pyagenity/store/embedding/base_embedding.py
20
21
22
23
@abstractmethod
async def aembed(self, text: str) -> list[float]:
    """Generate embedding for a single text."""
    raise NotImplementedError
aembed_batch abstractmethod async
aembed_batch(texts)

Generate embeddings for a list of texts.

Source code in pyagenity/store/embedding/base_embedding.py
11
12
13
@abstractmethod
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a list of texts."""
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))
OpenAIEmbedding

Bases: BaseEmbedding

Methods:

Name Description
__init__
aembed
aembed_batch
embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
api_key
client
dimension int
model
Source code in pyagenity/store/embedding/openai_embedding.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class OpenAIEmbedding(BaseEmbedding):
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        OPENAI_API_KEY: str | None = None,
    ) -> None:
        if not HAS_OPENAI:
            raise ImportError(
                "The 'openai' package is required for OpenAIEmbedding. "
                "Please install it via 'pip install openai'."
            )
        self.model = model
        if OPENAI_API_KEY:
            self.api_key = OPENAI_API_KEY
        elif "OPENAI_API_KEY" in os.environ:
            self.api_key = os.environ["OPENAI_API_KEY"]
        else:
            raise ValueError(
                "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
            )

        self.client = AsyncOpenAI(
            api_key=self.api_key,
        )

    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        try:
            response = await self.client.embeddings.create(
                input=texts,
                model=self.model,
            )
            return [data.embedding for data in response.data]
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    async def aembed(self, text: str) -> list[float]:
        try:
            response = await self.client.embeddings.create(
                input=text,
                model=self.model,
            )
            return response.data[0].embedding if response.data else []
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    @property
    def dimension(self) -> int:
        model_dimensions = {
            "text-embedding-3-small": 1536,
            "text-embedding-3-large": 1536,
            "text-embedding-3-xl": 1536,
            "text-embedding-4-base": 8192,
            "text-embedding-4-large": 8192,
        }
        if self.model in model_dimensions:
            return model_dimensions[self.model]
        raise ValueError(f"Unknown model '{self.model}'. Cannot determine dimension.")
Attributes
api_key instance-attribute
api_key = OPENAI_API_KEY
client instance-attribute
client = AsyncOpenAI(api_key=api_key)
dimension property
dimension
model instance-attribute
model = model
Functions
__init__
__init__(model='text-embedding-3-small', OPENAI_API_KEY=None)
Source code in pyagenity/store/embedding/openai_embedding.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    model: str = "text-embedding-3-small",
    OPENAI_API_KEY: str | None = None,
) -> None:
    if not HAS_OPENAI:
        raise ImportError(
            "The 'openai' package is required for OpenAIEmbedding. "
            "Please install it via 'pip install openai'."
        )
    self.model = model
    if OPENAI_API_KEY:
        self.api_key = OPENAI_API_KEY
    elif "OPENAI_API_KEY" in os.environ:
        self.api_key = os.environ["OPENAI_API_KEY"]
    else:
        raise ValueError(
            "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
        )

    self.client = AsyncOpenAI(
        api_key=self.api_key,
    )
aembed async
aembed(text)
Source code in pyagenity/store/embedding/openai_embedding.py
54
55
56
57
58
59
60
61
62
async def aembed(self, text: str) -> list[float]:
    try:
        response = await self.client.embeddings.create(
            input=text,
            model=self.model,
        )
        return response.data[0].embedding if response.data else []
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
aembed_batch async
aembed_batch(texts)
Source code in pyagenity/store/embedding/openai_embedding.py
44
45
46
47
48
49
50
51
52
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    try:
        response = await self.client.embeddings.create(
            input=texts,
            model=self.model,
        )
        return [data.embedding for data in response.data]
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))

Modules

base_embedding

Classes:

Name Description
BaseEmbedding
Classes
BaseEmbedding

Bases: ABC

Methods:

Name Description
aembed

Generate embedding for a single text.

aembed_batch

Generate embeddings for a list of texts.

embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
dimension int

Synchronous wrapper for that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class BaseEmbedding(ABC):
    def embed_batch(self, texts: list[str]) -> list[list[float]]:
        """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
        return run_coroutine(self.aembed_batch(texts))

    @abstractmethod
    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        """Generate embeddings for a list of texts."""
        # pragma: no cover

    def embed(self, text: str) -> list[float]:
        """Synchronous wrapper for `aembed` that runs the async implementation."""
        return run_coroutine(self.aembed(text))

    @abstractmethod
    async def aembed(self, text: str) -> list[float]:
        """Generate embedding for a single text."""
        raise NotImplementedError

    @property
    @abstractmethod
    def dimension(self) -> int:
        """Synchronous wrapper for that runs the async implementation."""
        raise NotImplementedError
Attributes
dimension abstractmethod property
dimension

Synchronous wrapper for that runs the async implementation.

Functions
aembed abstractmethod async
aembed(text)

Generate embedding for a single text.

Source code in pyagenity/store/embedding/base_embedding.py
20
21
22
23
@abstractmethod
async def aembed(self, text: str) -> list[float]:
    """Generate embedding for a single text."""
    raise NotImplementedError
aembed_batch abstractmethod async
aembed_batch(texts)

Generate embeddings for a list of texts.

Source code in pyagenity/store/embedding/base_embedding.py
11
12
13
@abstractmethod
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    """Generate embeddings for a list of texts."""
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))
Functions
openai_embedding

Classes:

Name Description
OpenAIEmbedding

Attributes:

Name Type Description
HAS_OPENAI
Attributes
HAS_OPENAI module-attribute
HAS_OPENAI = True
Classes
OpenAIEmbedding

Bases: BaseEmbedding

Methods:

Name Description
__init__
aembed
aembed_batch
embed

Synchronous wrapper for aembed that runs the async implementation.

embed_batch

Synchronous wrapper for aembed_batch that runs the async implementation.

Attributes:

Name Type Description
api_key
client
dimension int
model
Source code in pyagenity/store/embedding/openai_embedding.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
class OpenAIEmbedding(BaseEmbedding):
    def __init__(
        self,
        model: str = "text-embedding-3-small",
        OPENAI_API_KEY: str | None = None,
    ) -> None:
        if not HAS_OPENAI:
            raise ImportError(
                "The 'openai' package is required for OpenAIEmbedding. "
                "Please install it via 'pip install openai'."
            )
        self.model = model
        if OPENAI_API_KEY:
            self.api_key = OPENAI_API_KEY
        elif "OPENAI_API_KEY" in os.environ:
            self.api_key = os.environ["OPENAI_API_KEY"]
        else:
            raise ValueError(
                "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
            )

        self.client = AsyncOpenAI(
            api_key=self.api_key,
        )

    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        try:
            response = await self.client.embeddings.create(
                input=texts,
                model=self.model,
            )
            return [data.embedding for data in response.data]
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    async def aembed(self, text: str) -> list[float]:
        try:
            response = await self.client.embeddings.create(
                input=text,
                model=self.model,
            )
            return response.data[0].embedding if response.data else []
        except OpenAIError as e:
            raise RuntimeError(f"OpenAI API error: {e}") from e

    @property
    def dimension(self) -> int:
        model_dimensions = {
            "text-embedding-3-small": 1536,
            "text-embedding-3-large": 1536,
            "text-embedding-3-xl": 1536,
            "text-embedding-4-base": 8192,
            "text-embedding-4-large": 8192,
        }
        if self.model in model_dimensions:
            return model_dimensions[self.model]
        raise ValueError(f"Unknown model '{self.model}'. Cannot determine dimension.")
Attributes
api_key instance-attribute
api_key = OPENAI_API_KEY
client instance-attribute
client = AsyncOpenAI(api_key=api_key)
dimension property
dimension
model instance-attribute
model = model
Functions
__init__
__init__(model='text-embedding-3-small', OPENAI_API_KEY=None)
Source code in pyagenity/store/embedding/openai_embedding.py
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
def __init__(
    self,
    model: str = "text-embedding-3-small",
    OPENAI_API_KEY: str | None = None,
) -> None:
    if not HAS_OPENAI:
        raise ImportError(
            "The 'openai' package is required for OpenAIEmbedding. "
            "Please install it via 'pip install openai'."
        )
    self.model = model
    if OPENAI_API_KEY:
        self.api_key = OPENAI_API_KEY
    elif "OPENAI_API_KEY" in os.environ:
        self.api_key = os.environ["OPENAI_API_KEY"]
    else:
        raise ValueError(
            "OpenAI API key must be provided via parameter or OPENAI_API_KEY env var"
        )

    self.client = AsyncOpenAI(
        api_key=self.api_key,
    )
aembed async
aembed(text)
Source code in pyagenity/store/embedding/openai_embedding.py
54
55
56
57
58
59
60
61
62
async def aembed(self, text: str) -> list[float]:
    try:
        response = await self.client.embeddings.create(
            input=text,
            model=self.model,
        )
        return response.data[0].embedding if response.data else []
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
aembed_batch async
aembed_batch(texts)
Source code in pyagenity/store/embedding/openai_embedding.py
44
45
46
47
48
49
50
51
52
async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
    try:
        response = await self.client.embeddings.create(
            input=texts,
            model=self.model,
        )
        return [data.embedding for data in response.data]
    except OpenAIError as e:
        raise RuntimeError(f"OpenAI API error: {e}") from e
embed
embed(text)

Synchronous wrapper for aembed that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
16
17
18
def embed(self, text: str) -> list[float]:
    """Synchronous wrapper for `aembed` that runs the async implementation."""
    return run_coroutine(self.aembed(text))
embed_batch
embed_batch(texts)

Synchronous wrapper for aembed_batch that runs the async implementation.

Source code in pyagenity/store/embedding/base_embedding.py
7
8
9
def embed_batch(self, texts: list[str]) -> list[list[float]]:
    """Synchronous wrapper for `aembed_batch` that runs the async implementation."""
    return run_coroutine(self.aembed_batch(texts))

mem0_store

Mem0 Long-Term Memory Store

Async-first implementation of :class:BaseStore that uses the mem0 library as a managed long-term memory layer. In PyAgenity we treat the graph state as short-term (ephemeral per run / session) memory and a store implementation as long-term, durable memory. This module wires Mem0 so that:

  • astore / asearch / etc. map to Mem0's add, search, get_all, update, delete.
  • We maintain a generated UUID (framework memory id) separate from the Mem0 internal id.
  • Metadata is enriched to retain memory type, category, timestamps and app scoping.
  • The public async methods satisfy the :class:BaseStore contract (astore, abatch_store, asearch, aget, aupdate, adelete, aforget_memory and arelease).
Design notes:

Mem0 (>= 0.2.x / 2025 spec) still exposes synchronous Python APIs. We off-load blocking calls to a thread executor to keep the interface awaitable. Where Mem0 does not support an operation directly (e.g. fetch by custom memory id) we fallback to scanning get_all for the user. For batch insertion we parallelise Add operations with gather while bounding concurrency (simple semaphore) to avoid thread explosion.

The store interprets the supplied config mapping passed to every method as: {"user_id": str | None, "thread_id": str | None, "app_id": str | None}. thread_id is stored into metadata under agent_id for backward compatibility with earlier implementations where agent_id served a similar role.

Prerequisite: install mem0.

pip install mem0ai
Optional vector DB / embedder / llm configuration should be supplied through Mem0's native configuration structure (see upstream docs - memory configuration, vector store configuration). You can also use helper factory function create_mem0_store_with_qdrant for quick Qdrant backing.

Classes:

Name Description
Mem0Store

Mem0 implementation of long-term memory.

Functions:

Name Description
create_mem0_store

Factory for a basic Mem0 long-term store.

create_mem0_store_with_qdrant

Factory producing a Mem0Store configured for Qdrant backing.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

Mem0Store

Bases: BaseStore

Mem0 implementation of long-term memory.

Primary responsibilities: * Persist memories (episodic by default) across graph invocations * Retrieve semantically similar memories to augment state * Provide CRUD lifecycle aligned with BaseStore async API

Unlike in-memory state, these memories survive process restarts as they are managed by Mem0's configured vector / persistence layer.

Methods:

Name Description
__init__
adelete
aforget_memory
aget
aget_all
arelease
asearch
asetup

Asynchronous setup method for checkpointer.

astore
aupdate
delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

generate_framework_id
get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Attributes:

Name Type Description
app_id
config
Source code in pyagenity/store/mem0_store.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class Mem0Store(BaseStore):
    """Mem0 implementation of long-term memory.

    Primary responsibilities:
    * Persist memories (episodic by default) across graph invocations
    * Retrieve semantically similar memories to augment state
    * Provide CRUD lifecycle aligned with ``BaseStore`` async API

    Unlike in-memory state, these memories survive process restarts as they are
    managed by Mem0's configured vector / persistence layer.
    """

    def __init__(
        self,
        config: MemoryConfig | dict,
        app_id: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__()
        self.config = config
        self.app_id = app_id or "pyagenity_app"
        self._client = None  # Lazy initialization

        logger.info(
            "Initialized Mem0Store (long-term) app=%s",
            self.app_id,
        )

    async def _get_client(self) -> AsyncMemory:
        """Lazy initialization of AsyncMemory client."""
        if self._client is None:
            try:
                # Prefer explicit config via Memory.from_config when supplied; fallback to defaults
                if isinstance(self.config, dict):
                    self._client = await AsyncMemory.from_config(self.config)
                elif isinstance(self.config, MemoryConfig):
                    self._client = AsyncMemory(config=self.config)
                else:
                    self._client = AsyncMemory()
            except Exception as e:  # pragma: no cover - defensive
                logger.error(f"Failed to initialize Mem0 client: {e}")
                raise
        return self._client

    # ---------------------------------------------------------------------
    # Internal helpers
    # ---------------------------------------------------------------------

    def _extract_ids(self, config: dict[str, Any]) -> tuple[str, str | None, str | None]:
        """Extract user_id, thread_id, app_id from per-call config with fallbacks."""
        user_id = config.get("user_id")
        thread_id = config.get("thread_id")
        app_id = config.get("app_id") or self.app_id

        # if user id and thread id are not provided, we cannot proceed
        if not user_id:
            raise ValueError("user_id must be provided in config")

        if not thread_id:
            raise ValueError("thread_id must be provided in config")

        return user_id, thread_id, app_id

    def _create_result(
        self,
        raw: dict[str, Any],
        user_id: str,
    ) -> MemorySearchResult:
        # check user id belongs to the user
        if raw.get("user_id") != user_id:
            raise ValueError("Memory user_id does not match the requested user_id")

        metadata = raw.get("metadata", {}) or {}
        # Ensure memory_type enum mapping
        memory_type_val = metadata.get("memory_type", MemoryType.EPISODIC.value)
        try:
            memory_type = MemoryType(memory_type_val)
        except ValueError:
            memory_type = MemoryType.EPISODIC

        return MemorySearchResult(
            id=metadata.get("memory_id", str(raw.get("id", uuid4()))),
            content=raw.get("memory") or raw.get("data", ""),
            score=float(raw.get("score", 0.0) or 0.0),
            memory_type=memory_type,
            metadata=metadata,
            user_id=user_id,
            thread_id=metadata.get("run_id"),
        )

    def _iter_results(self, response: Any) -> Iterable[dict[str, Any]]:
        if isinstance(response, list):
            for item in response:
                if isinstance(item, dict):
                    yield item
        elif isinstance(response, dict) and "results" in response:
            for item in response["results"]:
                if isinstance(item, dict):
                    yield item
        else:  # pragma: no cover
            logger.debug("Unexpected Mem0 response type: %s", type(response))

    async def generate_framework_id(self) -> str:
        generated_id = InjectQ.get_instance().try_get("generated_id", str(uuid4()))
        if isinstance(generated_id, Awaitable):
            generated_id = await generated_id
        return generated_id

    # ------------------------------------------------------------------
    # BaseStore required async operations
    # ------------------------------------------------------------------

    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> Any:
        text = content.text() if isinstance(content, Message) else str(content)
        if not text.strip():
            raise ValueError("Content cannot be empty")

        user_id, thread_id, app_id = self._extract_ids(config)

        mem_meta = {
            "memory_type": memory_type.value,
            "category": category,
            "created_at": datetime.now().isoformat(),
            **(metadata or {}),
        }

        client = await self._get_client()
        result = await client.add(  # type: ignore
            messages=[{"role": "user", "content": text}],
            user_id=user_id,
            agent_id=app_id,
            run_id=thread_id,
            metadata=mem_meta,
        )

        logger.debug("Stored memory for user=%s thread=%s id=%s", user_id, thread_id, result)

        return result

    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy=None,  # Unused for Mem0; kept for signature parity
        distance_metric=None,  # Unused
        max_tokens: int = 4000,
        **kwargs: Any,
    ) -> list[MemorySearchResult]:
        user_id, thread_id, app_id = self._extract_ids(config)

        client = await self._get_client()
        result = await client.search(  # type: ignore
            query=query,
            user_id=user_id,
            agent_id=app_id,
            limit=limit,
            filters=filters,
            threshold=score_threshold,
        )

        if "original_results" not in result:
            logger.warning("Mem0 search response missing 'original_results': %s", result)
            return []

        if "relations" in result:
            logger.warning(
                "Mem0 search response contains 'relations', which is not supported yet: %s",
                result,
            )

        out: list[MemorySearchResult] = [
            self._create_result(raw, user_id) for raw in result["original_results"]
        ]

        logger.debug(
            "Searched memories for user=%s thread=%s query=%s found=%d",
            user_id,
            thread_id,
            query,
            len(out),
        )
        return out

    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs: Any,
    ) -> MemorySearchResult | None:
        user_id, _, _ = self._extract_ids(config)
        # If we stored mapping use that user id instead (authoritative)

        client = await self._get_client()
        result = await client.get(  # type: ignore
            memory_id=memory_id,
        )

        return self._create_result(result, user_id) if result else None

    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs: Any,
    ) -> list[MemorySearchResult]:
        user_id, thread_id, app_id = self._extract_ids(config)

        client = await self._get_client()
        result = await client.get_all(  # type: ignore
            user_id=user_id,
            agent_id=app_id,
            limit=limit,
        )

        if "results" not in result:
            logger.warning("Mem0 get_all response missing 'results': %s", result)
            return []

        if "relations" in result:
            logger.warning(
                "Mem0 get_all response contains 'relations', which is not supported yet: %s",
                result,
            )

        out: list[MemorySearchResult] = [
            self._create_result(raw, user_id) for raw in result["results"]
        ]

        logger.debug(
            "Fetched all memories for user=%s thread=%s count=%d",
            user_id,
            thread_id,
            len(out),
        )
        return out

    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> Any:
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # user_id obtained for potential permission checks (not used by Mem0 update directly)

        new_text = content.text() if isinstance(content, Message) else str(content)
        updated_meta = {**(existing.metadata or {}), **(metadata or {})}
        updated_meta["updated_at"] = datetime.now().isoformat()

        client = await self._get_client()
        res = await client.update(  # type: ignore
            memory_id=existing.id,
            data=new_text,
        )

        logger.debug("Updated memory %s via recreate", memory_id)
        return res

    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs: Any,
    ) -> Any:
        user_id, _, _ = self._extract_ids(config)
        existing = await self.aget(config, memory_id)
        if not existing:
            logger.warning("Memory %s not found for deletion", memory_id)
            return {
                "deleted": False,
                "reason": "not_found",
            }

        if existing.user_id != user_id:
            raise ValueError("Cannot delete memory belonging to a different user")

        client = await self._get_client()
        res = await client.delete(  # type: ignore
            memory_id=existing.id,
        )

        logger.debug("Deleted memory %s for user %s", memory_id, user_id)
        return res

    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs: Any,
    ) -> Any:
        # Delete all memories for a user
        user_id, _, _ = self._extract_ids(config)
        client = await self._get_client()
        res = await client.delete_all(user_id=user_id)  # type: ignore
        logger.debug("Forgot all memories for user %s", user_id)
        return res

    async def arelease(self) -> None:
        logger.info("Mem0Store released resources")
Attributes
app_id instance-attribute
app_id = app_id or 'pyagenity_app'
config instance-attribute
config = config
Functions
__init__
__init__(config, app_id=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    config: MemoryConfig | dict,
    app_id: str | None = None,
    **kwargs: Any,
) -> None:
    super().__init__()
    self.config = config
    self.app_id = app_id or "pyagenity_app"
    self._client = None  # Lazy initialization

    logger.info(
        "Initialized Mem0Store (long-term) app=%s",
        self.app_id,
    )
adelete async
adelete(config, memory_id, **kwargs)
Source code in pyagenity/store/mem0_store.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs: Any,
) -> Any:
    user_id, _, _ = self._extract_ids(config)
    existing = await self.aget(config, memory_id)
    if not existing:
        logger.warning("Memory %s not found for deletion", memory_id)
        return {
            "deleted": False,
            "reason": "not_found",
        }

    if existing.user_id != user_id:
        raise ValueError("Cannot delete memory belonging to a different user")

    client = await self._get_client()
    res = await client.delete(  # type: ignore
        memory_id=existing.id,
    )

    logger.debug("Deleted memory %s for user %s", memory_id, user_id)
    return res
aforget_memory async
aforget_memory(config, **kwargs)
Source code in pyagenity/store/mem0_store.py
363
364
365
366
367
368
369
370
371
372
373
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs: Any,
) -> Any:
    # Delete all memories for a user
    user_id, _, _ = self._extract_ids(config)
    client = await self._get_client()
    res = await client.delete_all(user_id=user_id)  # type: ignore
    logger.debug("Forgot all memories for user %s", user_id)
    return res
aget async
aget(config, memory_id, **kwargs)
Source code in pyagenity/store/mem0_store.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs: Any,
) -> MemorySearchResult | None:
    user_id, _, _ = self._extract_ids(config)
    # If we stored mapping use that user id instead (authoritative)

    client = await self._get_client()
    result = await client.get(  # type: ignore
        memory_id=memory_id,
    )

    return self._create_result(result, user_id) if result else None
aget_all async
aget_all(config, limit=100, **kwargs)
Source code in pyagenity/store/mem0_store.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs: Any,
) -> list[MemorySearchResult]:
    user_id, thread_id, app_id = self._extract_ids(config)

    client = await self._get_client()
    result = await client.get_all(  # type: ignore
        user_id=user_id,
        agent_id=app_id,
        limit=limit,
    )

    if "results" not in result:
        logger.warning("Mem0 get_all response missing 'results': %s", result)
        return []

    if "relations" in result:
        logger.warning(
            "Mem0 get_all response contains 'relations', which is not supported yet: %s",
            result,
        )

    out: list[MemorySearchResult] = [
        self._create_result(raw, user_id) for raw in result["results"]
    ]

    logger.debug(
        "Fetched all memories for user=%s thread=%s count=%d",
        user_id,
        thread_id,
        len(out),
    )
    return out
arelease async
arelease()
Source code in pyagenity/store/mem0_store.py
375
376
async def arelease(self) -> None:
    logger.info("Mem0Store released resources")
asearch async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=None, distance_metric=None, max_tokens=4000, **kwargs)
Source code in pyagenity/store/mem0_store.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy=None,  # Unused for Mem0; kept for signature parity
    distance_metric=None,  # Unused
    max_tokens: int = 4000,
    **kwargs: Any,
) -> list[MemorySearchResult]:
    user_id, thread_id, app_id = self._extract_ids(config)

    client = await self._get_client()
    result = await client.search(  # type: ignore
        query=query,
        user_id=user_id,
        agent_id=app_id,
        limit=limit,
        filters=filters,
        threshold=score_threshold,
    )

    if "original_results" not in result:
        logger.warning("Mem0 search response missing 'original_results': %s", result)
        return []

    if "relations" in result:
        logger.warning(
            "Mem0 search response contains 'relations', which is not supported yet: %s",
            result,
        )

    out: list[MemorySearchResult] = [
        self._create_result(raw, user_id) for raw in result["original_results"]
    ]

    logger.debug(
        "Searched memories for user=%s thread=%s query=%s found=%d",
        user_id,
        thread_id,
        query,
        len(out),
    )
    return out
asetup async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
48
49
50
51
52
53
54
55
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
astore async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    text = content.text() if isinstance(content, Message) else str(content)
    if not text.strip():
        raise ValueError("Content cannot be empty")

    user_id, thread_id, app_id = self._extract_ids(config)

    mem_meta = {
        "memory_type": memory_type.value,
        "category": category,
        "created_at": datetime.now().isoformat(),
        **(metadata or {}),
    }

    client = await self._get_client()
    result = await client.add(  # type: ignore
        messages=[{"role": "user", "content": text}],
        user_id=user_id,
        agent_id=app_id,
        run_id=thread_id,
        metadata=mem_meta,
    )

    logger.debug("Stored memory for user=%s thread=%s id=%s", user_id, thread_id, result)

    return result
aupdate async
aupdate(config, memory_id, content, metadata=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # user_id obtained for potential permission checks (not used by Mem0 update directly)

    new_text = content.text() if isinstance(content, Message) else str(content)
    updated_meta = {**(existing.metadata or {}), **(metadata or {})}
    updated_meta["updated_at"] = datetime.now().isoformat()

    client = await self._get_client()
    res = await client.update(  # type: ignore
        memory_id=existing.id,
        data=new_text,
    )

    logger.debug("Updated memory %s via recreate", memory_id)
    return res
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
generate_framework_id async
generate_framework_id()
Source code in pyagenity/store/mem0_store.py
163
164
165
166
167
async def generate_framework_id(self) -> str:
    generated_id = InjectQ.get_instance().try_get("generated_id", str(uuid4()))
    if isinstance(generated_id, Awaitable):
        generated_id = await generated_id
    return generated_id
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions

create_mem0_store
create_mem0_store(config, user_id='default_user', thread_id=None, app_id='pyagenity_app')

Factory for a basic Mem0 long-term store.

Source code in pyagenity/store/mem0_store.py
382
383
384
385
386
387
388
389
390
391
392
393
394
def create_mem0_store(
    config: dict[str, Any],
    user_id: str = "default_user",
    thread_id: str | None = None,
    app_id: str = "pyagenity_app",
) -> Mem0Store:
    """Factory for a basic Mem0 long-term store."""
    return Mem0Store(
        config=config,
        default_user_id=user_id,
        default_thread_id=thread_id,
        app_id=app_id,
    )
create_mem0_store_with_qdrant
create_mem0_store_with_qdrant(qdrant_url, qdrant_api_key=None, collection_name='pyagenity_memories', embedding_model='text-embedding-ada-002', llm_model='gpt-4o-mini', app_id='pyagenity_app', **kwargs)

Factory producing a Mem0Store configured for Qdrant backing.

Source code in pyagenity/store/mem0_store.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def create_mem0_store_with_qdrant(
    qdrant_url: str,
    qdrant_api_key: str | None = None,
    collection_name: str = "pyagenity_memories",
    embedding_model: str = "text-embedding-ada-002",
    llm_model: str = "gpt-4o-mini",
    app_id: str = "pyagenity_app",
    **kwargs: Any,
) -> Mem0Store:
    """Factory producing a Mem0Store configured for Qdrant backing."""
    config = {
        "vector_store": {
            "provider": "qdrant",
            "config": {
                "collection_name": collection_name,
                "url": qdrant_url,
                "api_key": qdrant_api_key,
                **kwargs.get("vector_store_config", {}),
            },
        },
        "embedder": {
            "provider": kwargs.get("embedder_provider", "openai"),
            "config": {"model": embedding_model, **kwargs.get("embedder_config", {})},
        },
        "llm": {
            "provider": kwargs.get("llm_provider", "openai"),
            "config": {"model": llm_model, **kwargs.get("llm_config", {})},
        },
    }
    return create_mem0_store(
        config=config,
        app_id=app_id,
    )

qdrant_store

Qdrant Vector Store Implementation for PyAgenity Framework

This module provides a modern, async-first implementation of BaseStore using Qdrant as the backend vector database. Supports both local and cloud Qdrant deployments with configurable embedding services.

Classes:

Name Description
QdrantStore

Modern async-first Qdrant-based vector store implementation.

Functions:

Name Description
create_cloud_qdrant_store

Create a cloud Qdrant store.

create_local_qdrant_store

Create a local Qdrant store.

create_remote_qdrant_store

Create a remote Qdrant store.

Attributes:

Name Type Description
logger
msg

Attributes

logger module-attribute
logger = getLogger(__name__)
msg module-attribute
msg = "Qdrant client not installed. Install with: pip install 'pyagenity[qdrant]'"

Classes

QdrantStore

Bases: BaseStore

Modern async-first Qdrant-based vector store implementation.

Features: - Async-only operations for better performance - Local and cloud Qdrant deployment support - Configurable embedding services - Efficient vector similarity search with multiple strategies - Collection management with automatic creation - Rich metadata filtering capabilities - User and agent-scoped operations

Example
# Local Qdrant with OpenAI embeddings
store = QdrantStore(path="./qdrant_data", embedding_service=OpenAIEmbeddingService())

# Remote Qdrant
store = QdrantStore(host="localhost", port=6333, embedding_service=OpenAIEmbeddingService())

# Cloud Qdrant
store = QdrantStore(
    url="https://xyz.qdrant.io",
    api_key="your-api-key",
    embedding_service=OpenAIEmbeddingService(),
)

Methods:

Name Description
__init__

Initialize Qdrant vector store.

adelete

Delete a memory by ID.

aforget_memory

Delete all memories for a user or agent.

aget

Get a specific memory by ID.

aget_all

Get all memories for a user.

arelease

Clean up resources.

asearch

Search memories by content similarity.

asetup

Set up the store and ensure default collection exists.

astore

Store a new memory.

aupdate

Update an existing memory.

delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Attributes:

Name Type Description
client
default_collection
embedding
Source code in pyagenity/store/qdrant_store.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
class QdrantStore(BaseStore):
    """
    Modern async-first Qdrant-based vector store implementation.

    Features:
    - Async-only operations for better performance
    - Local and cloud Qdrant deployment support
    - Configurable embedding services
    - Efficient vector similarity search with multiple strategies
    - Collection management with automatic creation
    - Rich metadata filtering capabilities
    - User and agent-scoped operations

    Example:
        ```python
        # Local Qdrant with OpenAI embeddings
        store = QdrantStore(path="./qdrant_data", embedding_service=OpenAIEmbeddingService())

        # Remote Qdrant
        store = QdrantStore(host="localhost", port=6333, embedding_service=OpenAIEmbeddingService())

        # Cloud Qdrant
        store = QdrantStore(
            url="https://xyz.qdrant.io",
            api_key="your-api-key",
            embedding_service=OpenAIEmbeddingService(),
        )
        ```
    """

    def __init__(
        self,
        embedding: BaseEmbedding,
        path: str | None = None,
        host: str | None = None,
        port: int | None = None,
        url: str | None = None,
        api_key: str | None = None,
        default_collection: str = "pyagenity_memories",
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        **kwargs: Any,
    ):
        """
        Initialize Qdrant vector store.

        Args:
            embedding: Service for generating embeddings
            path: Path for local Qdrant (file-based storage)
            host: Host for remote Qdrant server
            port: Port for remote Qdrant server
            url: URL for Qdrant cloud
            api_key: API key for Qdrant cloud
            default_collection: Default collection name
            distance_metric: Default distance metric
            **kwargs: Additional client parameters
        """
        self.embedding = embedding

        # Initialize async client
        if path:
            self.client = AsyncQdrantClient(path=path, **kwargs)
        elif url:
            self.client = AsyncQdrantClient(url=url, api_key=api_key, **kwargs)
        else:
            host = host or "localhost"
            port = port or 6333
            self.client = AsyncQdrantClient(host=host, port=port, api_key=api_key, **kwargs)

        # Cache for collection existence checks
        self._collection_cache = set()
        self._setup_lock = asyncio.Lock()

        self.default_collection = default_collection
        self._default_distance_metric = distance_metric

        logger.info(f"Initialized QdrantStore with config: path={path}, host={host}, url={url}")

    async def asetup(self) -> Any:
        """Set up the store and ensure default collection exists."""
        async with self._setup_lock:
            await self._ensure_collection_exists(self.default_collection)
        return True

    def _distance_metric_to_qdrant(self, metric: DistanceMetric) -> Distance:
        """Convert framework distance metric to Qdrant distance."""
        mapping = {
            DistanceMetric.COSINE: Distance.COSINE,
            DistanceMetric.EUCLIDEAN: Distance.EUCLID,
            DistanceMetric.DOT_PRODUCT: Distance.DOT,
            DistanceMetric.MANHATTAN: Distance.MANHATTAN,
        }
        return mapping.get(metric, Distance.COSINE)

    def _extract_config_values(self, config: dict[str, Any]) -> tuple[str | None, str | None, str]:
        """Extract user_id, thread_id, and collection from config."""
        user_id = config.get("user_id")
        thread_id = config.get("thread_id")
        collection = config.get("collection", self.default_collection)
        return user_id, thread_id, collection

    def _point_to_search_result(self, point) -> MemorySearchResult:
        """Convert Qdrant point to MemorySearchResult."""
        payload = getattr(point, "payload", {}) or {}

        # Extract content
        content = payload.get("content", "")

        # Convert memory_type string back to enum
        memory_type_str = payload.get("memory_type", "episodic")
        try:
            memory_type = MemoryType(memory_type_str)
        except ValueError:
            memory_type = MemoryType.EPISODIC

        # Parse timestamp
        timestamp_str = payload.get("timestamp")
        timestamp = None
        if timestamp_str:
            try:
                timestamp = datetime.fromisoformat(timestamp_str)
            except (ValueError, TypeError):
                timestamp = None

        return MemorySearchResult(
            id=str(point.id),
            content=content,
            score=float(getattr(point, "score", 1.0) or 0.0),
            memory_type=memory_type,
            metadata=payload,
            vector=getattr(point, "vector", None),
            user_id=payload.get("user_id"),
            thread_id=payload.get("thread_id")
            or payload.get("agent_id"),  # Support both thread_id and agent_id
            timestamp=timestamp,
        )

    def _build_qdrant_filter(
        self,
        user_id: str | None = None,
        thread_id: str | None = None,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        filters: dict[str, Any] | None = None,
    ) -> Filter | None:
        """Build Qdrant filter from parameters."""
        conditions = []

        # Add user/agent filters
        if user_id:
            conditions.append(
                FieldCondition(
                    key="user_id",
                    match=MatchValue(value=user_id),
                ),
            )
        if thread_id:
            conditions.append(
                FieldCondition(
                    key="thread_id",
                    match=MatchValue(value=thread_id),
                ),
            )
        if memory_type:
            conditions.append(
                FieldCondition(
                    key="memory_type",
                    match=MatchValue(value=memory_type.value),
                )
            )
        if category:
            conditions.append(FieldCondition(key="category", match=MatchValue(value=category)))

        # Add custom filters
        if filters:
            for key, value in filters.items():
                if isinstance(value, str | int | bool):
                    conditions.append(FieldCondition(key=key, match=MatchValue(value=value)))

        return Filter(must=conditions) if conditions else None

    async def _ensure_collection_exists(self, collection: str) -> None:
        """Ensure collection exists, create if not."""
        if collection in self._collection_cache:
            return

        try:
            # Check if collection exists
            collections = await self.client.get_collections()
            existing_names = {col.name for col in collections.collections}

            if collection not in existing_names:
                # Create collection with vector configuration
                await self.client.create_collection(
                    collection_name=collection,
                    vectors_config=VectorParams(
                        size=self.embedding.dimension,
                        distance=self._distance_metric_to_qdrant(
                            self._default_distance_metric,
                        ),
                    ),
                )
                logger.info(f"Created collection: {collection}")

            self._collection_cache.add(collection)
        except Exception as e:
            logger.error(f"Error ensuring collection {collection} exists: {e}")
            raise

    def _prepare_content(self, content: str | Message) -> str:
        """Extract text content from string or Message."""
        if isinstance(content, Message):
            return content.text()
        return content

    def _create_memory_record(
        self,
        content: str | Message,
        user_id: str | None = None,
        thread_id: str | None = None,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
    ) -> MemoryRecord:
        """Create a memory record from parameters."""
        text_content = self._prepare_content(content)

        if isinstance(content, Message):
            return MemoryRecord.from_message(
                content,
                user_id=user_id,
                thread_id=thread_id,
                additional_metadata=metadata,
            )

        return MemoryRecord(
            content=text_content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            metadata=metadata or {},
            category=category,
        )

    # --- BaseStore abstract method implementations ---

    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """Store a new memory."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Create memory record
        record = self._create_memory_record(
            content=content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
        )

        # Generate embedding
        text_content = self._prepare_content(content)
        vector = await self.embedding.aembed(text_content)
        if not vector or len(vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Prepare payload
        payload = {
            "content": record.content,
            "user_id": record.user_id,
            "thread_id": record.thread_id,
            "memory_type": record.memory_type.value,
            "category": record.category,
            "timestamp": record.timestamp.isoformat() if record.timestamp else None,
            **record.metadata,
        }

        # Create point
        point = PointStruct(
            id=record.id,
            vector=vector,
            payload=payload,
        )

        # Store in Qdrant
        await self.client.upsert(
            collection_name=collection,
            points=[point],
        )

        logger.debug(f"Stored memory {record.id} in collection {collection}")
        return record.id

    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Search memories by content similarity."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Generate query embedding
        query_vector = await self.embedding.aembed(query)
        if not query_vector or len(query_vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Build filter
        search_filter = self._build_qdrant_filter(
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            category=category,
            filters=filters,
        )

        # Perform search
        search_result = await self.client.search(
            collection_name=collection,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=limit,
            score_threshold=score_threshold,
        )

        # Convert to search results
        results = [self._point_to_search_result(point) for point in search_result]

        logger.debug(f"Found {len(results)} memories for query in collection {collection}")
        return results

    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> MemorySearchResult | None:
        """Get a specific memory by ID."""
        user_id, thread_id, collection = self._extract_config_values(config)

        try:
            # Ensure collection exists
            await self._ensure_collection_exists(collection)

            # Get point by ID
            points = await self.client.retrieve(
                collection_name=collection,
                ids=[memory_id],
            )

            if not points:
                return None

            point = points[0]
            result = self._point_to_search_result(point)

            # Verify user/agent access if specified
            if user_id and result.user_id != user_id:
                return None
            if thread_id and result.thread_id != thread_id:
                return None

            return result

        except Exception as e:
            logger.error(f"Error retrieving memory {memory_id}: {e}")
            return None

    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Get all memories for a user."""
        user_id, _, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Build filter
        search_filter = self._build_qdrant_filter(
            user_id=user_id,
        )

        # Perform search
        search_result = await self.client.search(
            collection_name=collection,
            query_vector=[],
            query_filter=search_filter,
            limit=limit,
        )

        # Convert to search results
        results = [self._point_to_search_result(point) for point in search_result]

        logger.debug(f"Found {len(results)} memories for query in collection {collection}")
        return results

    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> None:
        """Update an existing memory."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Get existing memory
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # Verify user/agent access if specified
        if user_id and existing.user_id != user_id:
            raise PermissionError("User does not have permission to update this memory")
        if thread_id and existing.thread_id != thread_id:
            raise PermissionError("Thread does not have permission to update this memory")

        # Prepare new content
        text_content = self._prepare_content(content)
        new_vector = await self.embedding.aembed(text_content)
        if not new_vector or len(new_vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Update payload
        updated_metadata = {**existing.metadata}
        if metadata:
            updated_metadata.update(metadata)

        updated_payload = {
            "content": text_content,
            "user_id": existing.user_id,
            "thread_id": existing.thread_id,
            "memory_type": existing.memory_type.value,
            "category": updated_metadata.get("category", "general"),
            "timestamp": datetime.now().isoformat(),
            **updated_metadata,
        }

        # Create updated point
        point = PointStruct(
            id=memory_id,
            vector=new_vector,
            payload=updated_payload,
        )

        # Update in Qdrant
        await self.client.upsert(
            collection_name=collection,
            points=[point],
        )

        logger.debug(f"Updated memory {memory_id} in collection {collection}")

    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> None:
        """Delete a memory by ID."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Verify memory exists and user has access
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # verify user/agent access if specified
        if user_id and existing.user_id != user_id:
            raise PermissionError("User does not have permission to delete this memory")
        if thread_id and existing.thread_id != thread_id:
            raise PermissionError("Thread does not have permission to delete this memory")

        # Delete from Qdrant
        await self.client.delete(
            collection_name=collection,
            points_selector=models.PointIdsList(points=[memory_id]),
        )

        logger.debug(f"Deleted memory {memory_id} from collection {collection}")

    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> None:
        """Delete all memories for a user or agent."""
        user_id, agent_id, collection = self._extract_config_values(config)

        # Build filter for memories to delete
        delete_filter = self._build_qdrant_filter(user_id=user_id, thread_id=agent_id)

        if delete_filter:
            # Delete matching memories
            await self.client.delete(
                collection_name=collection,
                points_selector=models.FilterSelector(filter=delete_filter),
            )

            logger.info(
                f"Deleted all memories for user_id={user_id}, agent_id={agent_id} "
                f"in collection {collection}"
            )
        else:
            logger.warning("No user_id or agent_id specified for memory deletion")

    async def arelease(self) -> None:
        """Clean up resources."""
        if hasattr(self.client, "close"):
            await self.client.close()
        logger.info("QdrantStore resources released")
Attributes
client instance-attribute
client = AsyncQdrantClient(path=path, **kwargs)
default_collection instance-attribute
default_collection = default_collection
embedding instance-attribute
embedding = embedding
Functions
__init__
__init__(embedding, path=None, host=None, port=None, url=None, api_key=None, default_collection='pyagenity_memories', distance_metric=DistanceMetric.COSINE, **kwargs)

Initialize Qdrant vector store.

Parameters:

Name Type Description Default
embedding BaseEmbedding

Service for generating embeddings

required
path str | None

Path for local Qdrant (file-based storage)

None
host str | None

Host for remote Qdrant server

None
port int | None

Port for remote Qdrant server

None
url str | None

URL for Qdrant cloud

None
api_key str | None

API key for Qdrant cloud

None
default_collection str

Default collection name

'pyagenity_memories'
distance_metric DistanceMetric

Default distance metric

COSINE
**kwargs Any

Additional client parameters

{}
Source code in pyagenity/store/qdrant_store.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(
    self,
    embedding: BaseEmbedding,
    path: str | None = None,
    host: str | None = None,
    port: int | None = None,
    url: str | None = None,
    api_key: str | None = None,
    default_collection: str = "pyagenity_memories",
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    **kwargs: Any,
):
    """
    Initialize Qdrant vector store.

    Args:
        embedding: Service for generating embeddings
        path: Path for local Qdrant (file-based storage)
        host: Host for remote Qdrant server
        port: Port for remote Qdrant server
        url: URL for Qdrant cloud
        api_key: API key for Qdrant cloud
        default_collection: Default collection name
        distance_metric: Default distance metric
        **kwargs: Additional client parameters
    """
    self.embedding = embedding

    # Initialize async client
    if path:
        self.client = AsyncQdrantClient(path=path, **kwargs)
    elif url:
        self.client = AsyncQdrantClient(url=url, api_key=api_key, **kwargs)
    else:
        host = host or "localhost"
        port = port or 6333
        self.client = AsyncQdrantClient(host=host, port=port, api_key=api_key, **kwargs)

    # Cache for collection existence checks
    self._collection_cache = set()
    self._setup_lock = asyncio.Lock()

    self.default_collection = default_collection
    self._default_distance_metric = distance_metric

    logger.info(f"Initialized QdrantStore with config: path={path}, host={host}, url={url}")
adelete async
adelete(config, memory_id, **kwargs)

Delete a memory by ID.

Source code in pyagenity/store/qdrant_store.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> None:
    """Delete a memory by ID."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Verify memory exists and user has access
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # verify user/agent access if specified
    if user_id and existing.user_id != user_id:
        raise PermissionError("User does not have permission to delete this memory")
    if thread_id and existing.thread_id != thread_id:
        raise PermissionError("Thread does not have permission to delete this memory")

    # Delete from Qdrant
    await self.client.delete(
        collection_name=collection,
        points_selector=models.PointIdsList(points=[memory_id]),
    )

    logger.debug(f"Deleted memory {memory_id} from collection {collection}")
aforget_memory async
aforget_memory(config, **kwargs)

Delete all memories for a user or agent.

Source code in pyagenity/store/qdrant_store.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> None:
    """Delete all memories for a user or agent."""
    user_id, agent_id, collection = self._extract_config_values(config)

    # Build filter for memories to delete
    delete_filter = self._build_qdrant_filter(user_id=user_id, thread_id=agent_id)

    if delete_filter:
        # Delete matching memories
        await self.client.delete(
            collection_name=collection,
            points_selector=models.FilterSelector(filter=delete_filter),
        )

        logger.info(
            f"Deleted all memories for user_id={user_id}, agent_id={agent_id} "
            f"in collection {collection}"
        )
    else:
        logger.warning("No user_id or agent_id specified for memory deletion")
aget async
aget(config, memory_id, **kwargs)

Get a specific memory by ID.

Source code in pyagenity/store/qdrant_store.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> MemorySearchResult | None:
    """Get a specific memory by ID."""
    user_id, thread_id, collection = self._extract_config_values(config)

    try:
        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Get point by ID
        points = await self.client.retrieve(
            collection_name=collection,
            ids=[memory_id],
        )

        if not points:
            return None

        point = points[0]
        result = self._point_to_search_result(point)

        # Verify user/agent access if specified
        if user_id and result.user_id != user_id:
            return None
        if thread_id and result.thread_id != thread_id:
            return None

        return result

    except Exception as e:
        logger.error(f"Error retrieving memory {memory_id}: {e}")
        return None
aget_all async
aget_all(config, limit=100, **kwargs)

Get all memories for a user.

Source code in pyagenity/store/qdrant_store.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Get all memories for a user."""
    user_id, _, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Build filter
    search_filter = self._build_qdrant_filter(
        user_id=user_id,
    )

    # Perform search
    search_result = await self.client.search(
        collection_name=collection,
        query_vector=[],
        query_filter=search_filter,
        limit=limit,
    )

    # Convert to search results
    results = [self._point_to_search_result(point) for point in search_result]

    logger.debug(f"Found {len(results)} memories for query in collection {collection}")
    return results
arelease async
arelease()

Clean up resources.

Source code in pyagenity/store/qdrant_store.py
576
577
578
579
580
async def arelease(self) -> None:
    """Clean up resources."""
    if hasattr(self.client, "close"):
        await self.client.close()
    logger.info("QdrantStore resources released")
asearch async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Search memories by content similarity.

Source code in pyagenity/store/qdrant_store.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Search memories by content similarity."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Generate query embedding
    query_vector = await self.embedding.aembed(query)
    if not query_vector or len(query_vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Build filter
    search_filter = self._build_qdrant_filter(
        user_id=user_id,
        thread_id=thread_id,
        memory_type=memory_type,
        category=category,
        filters=filters,
    )

    # Perform search
    search_result = await self.client.search(
        collection_name=collection,
        query_vector=query_vector,
        query_filter=search_filter,
        limit=limit,
        score_threshold=score_threshold,
    )

    # Convert to search results
    results = [self._point_to_search_result(point) for point in search_result]

    logger.debug(f"Found {len(results)} memories for query in collection {collection}")
    return results
asetup async
asetup()

Set up the store and ensure default collection exists.

Source code in pyagenity/store/qdrant_store.py
122
123
124
125
126
async def asetup(self) -> Any:
    """Set up the store and ensure default collection exists."""
    async with self._setup_lock:
        await self._ensure_collection_exists(self.default_collection)
    return True
astore async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Store a new memory.

Source code in pyagenity/store/qdrant_store.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Store a new memory."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Create memory record
    record = self._create_memory_record(
        content=content,
        user_id=user_id,
        thread_id=thread_id,
        memory_type=memory_type,
        category=category,
        metadata=metadata,
    )

    # Generate embedding
    text_content = self._prepare_content(content)
    vector = await self.embedding.aembed(text_content)
    if not vector or len(vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Prepare payload
    payload = {
        "content": record.content,
        "user_id": record.user_id,
        "thread_id": record.thread_id,
        "memory_type": record.memory_type.value,
        "category": record.category,
        "timestamp": record.timestamp.isoformat() if record.timestamp else None,
        **record.metadata,
    }

    # Create point
    point = PointStruct(
        id=record.id,
        vector=vector,
        payload=payload,
    )

    # Store in Qdrant
    await self.client.upsert(
        collection_name=collection,
        points=[point],
    )

    logger.debug(f"Stored memory {record.id} in collection {collection}")
    return record.id
aupdate async
aupdate(config, memory_id, content, metadata=None, **kwargs)

Update an existing memory.

Source code in pyagenity/store/qdrant_store.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> None:
    """Update an existing memory."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Get existing memory
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # Verify user/agent access if specified
    if user_id and existing.user_id != user_id:
        raise PermissionError("User does not have permission to update this memory")
    if thread_id and existing.thread_id != thread_id:
        raise PermissionError("Thread does not have permission to update this memory")

    # Prepare new content
    text_content = self._prepare_content(content)
    new_vector = await self.embedding.aembed(text_content)
    if not new_vector or len(new_vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Update payload
    updated_metadata = {**existing.metadata}
    if metadata:
        updated_metadata.update(metadata)

    updated_payload = {
        "content": text_content,
        "user_id": existing.user_id,
        "thread_id": existing.thread_id,
        "memory_type": existing.memory_type.value,
        "category": updated_metadata.get("category", "general"),
        "timestamp": datetime.now().isoformat(),
        **updated_metadata,
    }

    # Create updated point
    point = PointStruct(
        id=memory_id,
        vector=new_vector,
        payload=updated_payload,
    )

    # Update in Qdrant
    await self.client.upsert(
        collection_name=collection,
        points=[point],
    )

    logger.debug(f"Updated memory {memory_id} in collection {collection}")
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions

create_cloud_qdrant_store
create_cloud_qdrant_store(url, api_key, embedding, **kwargs)

Create a cloud Qdrant store.

Source code in pyagenity/store/qdrant_store.py
614
615
616
617
618
619
620
621
622
623
624
625
626
def create_cloud_qdrant_store(
    url: str,
    api_key: str,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a cloud Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        url=url,
        api_key=api_key,
        **kwargs,
    )
create_local_qdrant_store
create_local_qdrant_store(path, embedding, **kwargs)

Create a local Qdrant store.

Source code in pyagenity/store/qdrant_store.py
586
587
588
589
590
591
592
593
594
595
596
def create_local_qdrant_store(
    path: str,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a local Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        path=path,
        **kwargs,
    )
create_remote_qdrant_store
create_remote_qdrant_store(host, port, embedding, **kwargs)

Create a remote Qdrant store.

Source code in pyagenity/store/qdrant_store.py
599
600
601
602
603
604
605
606
607
608
609
610
611
def create_remote_qdrant_store(
    host: str,
    port: int,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a remote Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        host=host,
        port=port,
        **kwargs,
    )

store_schema

Classes:

Name Description
DistanceMetric

Supported distance metrics for vector similarity.

MemoryRecord

Comprehensive memory record for storage (Pydantic model).

MemorySearchResult

Result from a memory search operation (Pydantic model).

MemoryType

Types of memories that can be stored.

RetrievalStrategy

Memory retrieval strategies.

Classes

DistanceMetric

Bases: Enum

Supported distance metrics for vector similarity.

Attributes:

Name Type Description
COSINE
DOT_PRODUCT
EUCLIDEAN
MANHATTAN
Source code in pyagenity/store/store_schema.py
21
22
23
24
25
26
27
class DistanceMetric(Enum):
    """Supported distance metrics for vector similarity."""

    COSINE = "cosine"
    EUCLIDEAN = "euclidean"
    DOT_PRODUCT = "dot_product"
    MANHATTAN = "manhattan"
Attributes
COSINE class-attribute instance-attribute
COSINE = 'cosine'
DOT_PRODUCT class-attribute instance-attribute
DOT_PRODUCT = 'dot_product'
EUCLIDEAN class-attribute instance-attribute
EUCLIDEAN = 'euclidean'
MANHATTAN class-attribute instance-attribute
MANHATTAN = 'manhattan'
MemoryRecord

Bases: BaseModel

Comprehensive memory record for storage (Pydantic model).

Methods:

Name Description
from_message
validate_vector

Attributes:

Name Type Description
category str
content str
id str
memory_type MemoryType
metadata dict[str, Any]
thread_id str | None
timestamp datetime | None
user_id str | None
vector list[float] | None
Source code in pyagenity/store/store_schema.py
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
class MemoryRecord(BaseModel):
    """Comprehensive memory record for storage (Pydantic model)."""

    id: str = Field(default_factory=lambda: str(uuid4()))
    content: str
    user_id: str | None = None
    thread_id: str | None = None
    memory_type: MemoryType = Field(default=MemoryType.EPISODIC)
    metadata: dict[str, Any] = Field(default_factory=dict)
    category: str = Field(default="general")
    vector: list[float] | None = None
    timestamp: datetime | None = Field(default_factory=datetime.now)

    @field_validator("vector")
    @classmethod
    def validate_vector(cls, v):
        if v is not None and (
            not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
        ):
            raise ValueError("vector must be list[float] or None")
        return v

    @classmethod
    def from_message(
        cls,
        message: Message,
        user_id: str | None = None,
        thread_id: str | None = None,
        vector: list[float] | None = None,
        additional_metadata: dict[str, Any] | None = None,
    ) -> "MemoryRecord":
        content = message.text()
        metadata = {
            "role": message.role,
            "message_id": str(message.message_id),
            "timestamp": message.timestamp.isoformat() if message.timestamp else None,
            "has_tool_calls": bool(message.tools_calls),
            "has_reasoning": bool(message.reasoning),
            "token_usage": message.usages.model_dump() if message.usages else None,
            **(additional_metadata or {}),
        }
        return cls(
            content=content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=MemoryType.EPISODIC,
            metadata=metadata,
            vector=vector,
        )
Attributes
category class-attribute instance-attribute
category = Field(default='general')
content instance-attribute
content
id class-attribute instance-attribute
id = Field(default_factory=lambda: str(uuid4()))
memory_type class-attribute instance-attribute
memory_type = Field(default=EPISODIC)
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
user_id class-attribute instance-attribute
user_id = None
vector class-attribute instance-attribute
vector = None
Functions
from_message classmethod
from_message(message, user_id=None, thread_id=None, vector=None, additional_metadata=None)
Source code in pyagenity/store/store_schema.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
@classmethod
def from_message(
    cls,
    message: Message,
    user_id: str | None = None,
    thread_id: str | None = None,
    vector: list[float] | None = None,
    additional_metadata: dict[str, Any] | None = None,
) -> "MemoryRecord":
    content = message.text()
    metadata = {
        "role": message.role,
        "message_id": str(message.message_id),
        "timestamp": message.timestamp.isoformat() if message.timestamp else None,
        "has_tool_calls": bool(message.tools_calls),
        "has_reasoning": bool(message.reasoning),
        "token_usage": message.usages.model_dump() if message.usages else None,
        **(additional_metadata or {}),
    }
    return cls(
        content=content,
        user_id=user_id,
        thread_id=thread_id,
        memory_type=MemoryType.EPISODIC,
        metadata=metadata,
        vector=vector,
    )
validate_vector classmethod
validate_vector(v)
Source code in pyagenity/store/store_schema.py
78
79
80
81
82
83
84
85
@field_validator("vector")
@classmethod
def validate_vector(cls, v):
    if v is not None and (
        not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
    ):
        raise ValueError("vector must be list[float] or None")
    return v
MemorySearchResult

Bases: BaseModel

Result from a memory search operation (Pydantic model).

Methods:

Name Description
validate_vector

Attributes:

Name Type Description
content str
id str
memory_type MemoryType
metadata dict[str, Any]
score float
thread_id str | None
timestamp datetime | None
user_id str | None
vector list[float] | None
Source code in pyagenity/store/store_schema.py
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
class MemorySearchResult(BaseModel):
    """Result from a memory search operation (Pydantic model)."""

    id: str = Field(default_factory=lambda: str(uuid4()))
    content: str = Field(default="", description="Primary textual content of the memory")
    score: float = Field(default=0.0, ge=0.0, description="Similarity / relevance score")
    memory_type: MemoryType = Field(default=MemoryType.EPISODIC)
    metadata: dict[str, Any] = Field(default_factory=dict)
    vector: list[float] | None = Field(default=None)
    user_id: str | None = None
    thread_id: str | None = None
    timestamp: datetime | None = Field(default_factory=datetime.now)

    @field_validator("vector")
    @classmethod
    def validate_vector(cls, v):
        if v is not None and (
            not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
        ):
            raise ValueError("vector must be list[float] or None")
        return v
Attributes
content class-attribute instance-attribute
content = Field(default='', description='Primary textual content of the memory')
id class-attribute instance-attribute
id = Field(default_factory=lambda: str(uuid4()))
memory_type class-attribute instance-attribute
memory_type = Field(default=EPISODIC)
metadata class-attribute instance-attribute
metadata = Field(default_factory=dict)
score class-attribute instance-attribute
score = Field(default=0.0, ge=0.0, description='Similarity / relevance score')
thread_id class-attribute instance-attribute
thread_id = None
timestamp class-attribute instance-attribute
timestamp = Field(default_factory=now)
user_id class-attribute instance-attribute
user_id = None
vector class-attribute instance-attribute
vector = Field(default=None)
Functions
validate_vector classmethod
validate_vector(v)
Source code in pyagenity/store/store_schema.py
55
56
57
58
59
60
61
62
@field_validator("vector")
@classmethod
def validate_vector(cls, v):
    if v is not None and (
        not isinstance(v, list) or any(not isinstance(x, (int | float)) for x in v)
    ):
        raise ValueError("vector must be list[float] or None")
    return v
MemoryType

Bases: Enum

Types of memories that can be stored.

Attributes:

Name Type Description
CUSTOM
DECLARATIVE
ENTITY
EPISODIC
PROCEDURAL
RELATIONSHIP
SEMANTIC
Source code in pyagenity/store/store_schema.py
30
31
32
33
34
35
36
37
38
39
class MemoryType(Enum):
    """Types of memories that can be stored."""

    EPISODIC = "episodic"  # Conversation memories
    SEMANTIC = "semantic"  # Facts and knowledge
    PROCEDURAL = "procedural"  # How-to knowledge
    ENTITY = "entity"  # Entity-based memories
    RELATIONSHIP = "relationship"  # Entity relationships
    CUSTOM = "custom"  # Custom memory types
    DECLARATIVE = "declarative"  # Explicit facts and events
Attributes
CUSTOM class-attribute instance-attribute
CUSTOM = 'custom'
DECLARATIVE class-attribute instance-attribute
DECLARATIVE = 'declarative'
ENTITY class-attribute instance-attribute
ENTITY = 'entity'
EPISODIC class-attribute instance-attribute
EPISODIC = 'episodic'
PROCEDURAL class-attribute instance-attribute
PROCEDURAL = 'procedural'
RELATIONSHIP class-attribute instance-attribute
RELATIONSHIP = 'relationship'
SEMANTIC class-attribute instance-attribute
SEMANTIC = 'semantic'
RetrievalStrategy

Bases: Enum

Memory retrieval strategies.

Attributes:

Name Type Description
GRAPH_TRAVERSAL
HYBRID
RELEVANCE
SIMILARITY
TEMPORAL
Source code in pyagenity/store/store_schema.py
11
12
13
14
15
16
17
18
class RetrievalStrategy(Enum):
    """Memory retrieval strategies."""

    SIMILARITY = "similarity"  # Vector similarity search
    TEMPORAL = "temporal"  # Time-based retrieval
    RELEVANCE = "relevance"  # Relevance scoring
    HYBRID = "hybrid"  # Combined approaches
    GRAPH_TRAVERSAL = "graph_traversal"  # Knowledge graph navigation
Attributes
GRAPH_TRAVERSAL class-attribute instance-attribute
GRAPH_TRAVERSAL = 'graph_traversal'
HYBRID class-attribute instance-attribute
HYBRID = 'hybrid'
RELEVANCE class-attribute instance-attribute
RELEVANCE = 'relevance'
SIMILARITY class-attribute instance-attribute
SIMILARITY = 'similarity'
TEMPORAL class-attribute instance-attribute
TEMPORAL = 'temporal'