Skip to content

Mem0 store

Mem0 Long-Term Memory Store

Async-first implementation of :class:BaseStore that uses the mem0 library as a managed long-term memory layer. In PyAgenity we treat the graph state as short-term (ephemeral per run / session) memory and a store implementation as long-term, durable memory. This module wires Mem0 so that:

  • astore / asearch / etc. map to Mem0's add, search, get_all, update, delete.
  • We maintain a generated UUID (framework memory id) separate from the Mem0 internal id.
  • Metadata is enriched to retain memory type, category, timestamps and app scoping.
  • The public async methods satisfy the :class:BaseStore contract (astore, abatch_store, asearch, aget, aupdate, adelete, aforget_memory and arelease).

Design notes:

Mem0 (>= 0.2.x / 2025 spec) still exposes synchronous Python APIs. We off-load blocking calls to a thread executor to keep the interface awaitable. Where Mem0 does not support an operation directly (e.g. fetch by custom memory id) we fallback to scanning get_all for the user. For batch insertion we parallelise Add operations with gather while bounding concurrency (simple semaphore) to avoid thread explosion.

The store interprets the supplied config mapping passed to every method as: {"user_id": str | None, "thread_id": str | None, "app_id": str | None}. thread_id is stored into metadata under agent_id for backward compatibility with earlier implementations where agent_id served a similar role.

Prerequisite: install mem0.

pip install mem0ai
Optional vector DB / embedder / llm configuration should be supplied through Mem0's native configuration structure (see upstream docs - memory configuration, vector store configuration). You can also use helper factory function create_mem0_store_with_qdrant for quick Qdrant backing.

Classes:

Name Description
Mem0Store

Mem0 implementation of long-term memory.

Functions:

Name Description
create_mem0_store

Factory for a basic Mem0 long-term store.

create_mem0_store_with_qdrant

Factory producing a Mem0Store configured for Qdrant backing.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

Mem0Store

Bases: BaseStore

Mem0 implementation of long-term memory.

Primary responsibilities: * Persist memories (episodic by default) across graph invocations * Retrieve semantically similar memories to augment state * Provide CRUD lifecycle aligned with BaseStore async API

Unlike in-memory state, these memories survive process restarts as they are managed by Mem0's configured vector / persistence layer.

Methods:

Name Description
__init__
adelete
aforget_memory
aget
aget_all
arelease
asearch
asetup

Asynchronous setup method for checkpointer.

astore
aupdate
delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

generate_framework_id
get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Attributes:

Name Type Description
app_id
config
Source code in pyagenity/store/mem0_store.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
class Mem0Store(BaseStore):
    """Mem0 implementation of long-term memory.

    Primary responsibilities:
    * Persist memories (episodic by default) across graph invocations
    * Retrieve semantically similar memories to augment state
    * Provide CRUD lifecycle aligned with ``BaseStore`` async API

    Unlike in-memory state, these memories survive process restarts as they are
    managed by Mem0's configured vector / persistence layer.
    """

    def __init__(
        self,
        config: MemoryConfig | dict,
        app_id: str | None = None,
        **kwargs: Any,
    ) -> None:
        super().__init__()
        self.config = config
        self.app_id = app_id or "pyagenity_app"
        self._client = None  # Lazy initialization

        logger.info(
            "Initialized Mem0Store (long-term) app=%s",
            self.app_id,
        )

    async def _get_client(self) -> AsyncMemory:
        """Lazy initialization of AsyncMemory client."""
        if self._client is None:
            try:
                # Prefer explicit config via Memory.from_config when supplied; fallback to defaults
                if isinstance(self.config, dict):
                    self._client = await AsyncMemory.from_config(self.config)
                elif isinstance(self.config, MemoryConfig):
                    self._client = AsyncMemory(config=self.config)
                else:
                    self._client = AsyncMemory()
            except Exception as e:  # pragma: no cover - defensive
                logger.error(f"Failed to initialize Mem0 client: {e}")
                raise
        return self._client

    # ---------------------------------------------------------------------
    # Internal helpers
    # ---------------------------------------------------------------------

    def _extract_ids(self, config: dict[str, Any]) -> tuple[str, str | None, str | None]:
        """Extract user_id, thread_id, app_id from per-call config with fallbacks."""
        user_id = config.get("user_id")
        thread_id = config.get("thread_id")
        app_id = config.get("app_id") or self.app_id

        # if user id and thread id are not provided, we cannot proceed
        if not user_id:
            raise ValueError("user_id must be provided in config")

        if not thread_id:
            raise ValueError("thread_id must be provided in config")

        return user_id, thread_id, app_id

    def _create_result(
        self,
        raw: dict[str, Any],
        user_id: str,
    ) -> MemorySearchResult:
        # check user id belongs to the user
        if raw.get("user_id") != user_id:
            raise ValueError("Memory user_id does not match the requested user_id")

        metadata = raw.get("metadata", {}) or {}
        # Ensure memory_type enum mapping
        memory_type_val = metadata.get("memory_type", MemoryType.EPISODIC.value)
        try:
            memory_type = MemoryType(memory_type_val)
        except ValueError:
            memory_type = MemoryType.EPISODIC

        return MemorySearchResult(
            id=metadata.get("memory_id", str(raw.get("id", uuid4()))),
            content=raw.get("memory") or raw.get("data", ""),
            score=float(raw.get("score", 0.0) or 0.0),
            memory_type=memory_type,
            metadata=metadata,
            user_id=user_id,
            thread_id=metadata.get("run_id"),
        )

    def _iter_results(self, response: Any) -> Iterable[dict[str, Any]]:
        if isinstance(response, list):
            for item in response:
                if isinstance(item, dict):
                    yield item
        elif isinstance(response, dict) and "results" in response:
            for item in response["results"]:
                if isinstance(item, dict):
                    yield item
        else:  # pragma: no cover
            logger.debug("Unexpected Mem0 response type: %s", type(response))

    async def generate_framework_id(self) -> str:
        generated_id = InjectQ.get_instance().try_get("generated_id", str(uuid4()))
        if isinstance(generated_id, Awaitable):
            generated_id = await generated_id
        return generated_id

    # ------------------------------------------------------------------
    # BaseStore required async operations
    # ------------------------------------------------------------------

    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> Any:
        text = content.text() if isinstance(content, Message) else str(content)
        if not text.strip():
            raise ValueError("Content cannot be empty")

        user_id, thread_id, app_id = self._extract_ids(config)

        mem_meta = {
            "memory_type": memory_type.value,
            "category": category,
            "created_at": datetime.now().isoformat(),
            **(metadata or {}),
        }

        client = await self._get_client()
        result = await client.add(  # type: ignore
            messages=[{"role": "user", "content": text}],
            user_id=user_id,
            agent_id=app_id,
            run_id=thread_id,
            metadata=mem_meta,
        )

        logger.debug("Stored memory for user=%s thread=%s id=%s", user_id, thread_id, result)

        return result

    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy=None,  # Unused for Mem0; kept for signature parity
        distance_metric=None,  # Unused
        max_tokens: int = 4000,
        **kwargs: Any,
    ) -> list[MemorySearchResult]:
        user_id, thread_id, app_id = self._extract_ids(config)

        client = await self._get_client()
        result = await client.search(  # type: ignore
            query=query,
            user_id=user_id,
            agent_id=app_id,
            limit=limit,
            filters=filters,
            threshold=score_threshold,
        )

        if "original_results" not in result:
            logger.warning("Mem0 search response missing 'original_results': %s", result)
            return []

        if "relations" in result:
            logger.warning(
                "Mem0 search response contains 'relations', which is not supported yet: %s",
                result,
            )

        out: list[MemorySearchResult] = [
            self._create_result(raw, user_id) for raw in result["original_results"]
        ]

        logger.debug(
            "Searched memories for user=%s thread=%s query=%s found=%d",
            user_id,
            thread_id,
            query,
            len(out),
        )
        return out

    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs: Any,
    ) -> MemorySearchResult | None:
        user_id, _, _ = self._extract_ids(config)
        # If we stored mapping use that user id instead (authoritative)

        client = await self._get_client()
        result = await client.get(  # type: ignore
            memory_id=memory_id,
        )

        return self._create_result(result, user_id) if result else None

    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs: Any,
    ) -> list[MemorySearchResult]:
        user_id, thread_id, app_id = self._extract_ids(config)

        client = await self._get_client()
        result = await client.get_all(  # type: ignore
            user_id=user_id,
            agent_id=app_id,
            limit=limit,
        )

        if "results" not in result:
            logger.warning("Mem0 get_all response missing 'results': %s", result)
            return []

        if "relations" in result:
            logger.warning(
                "Mem0 get_all response contains 'relations', which is not supported yet: %s",
                result,
            )

        out: list[MemorySearchResult] = [
            self._create_result(raw, user_id) for raw in result["results"]
        ]

        logger.debug(
            "Fetched all memories for user=%s thread=%s count=%d",
            user_id,
            thread_id,
            len(out),
        )
        return out

    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs: Any,
    ) -> Any:
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # user_id obtained for potential permission checks (not used by Mem0 update directly)

        new_text = content.text() if isinstance(content, Message) else str(content)
        updated_meta = {**(existing.metadata or {}), **(metadata or {})}
        updated_meta["updated_at"] = datetime.now().isoformat()

        client = await self._get_client()
        res = await client.update(  # type: ignore
            memory_id=existing.id,
            data=new_text,
        )

        logger.debug("Updated memory %s via recreate", memory_id)
        return res

    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs: Any,
    ) -> Any:
        user_id, _, _ = self._extract_ids(config)
        existing = await self.aget(config, memory_id)
        if not existing:
            logger.warning("Memory %s not found for deletion", memory_id)
            return {
                "deleted": False,
                "reason": "not_found",
            }

        if existing.user_id != user_id:
            raise ValueError("Cannot delete memory belonging to a different user")

        client = await self._get_client()
        res = await client.delete(  # type: ignore
            memory_id=existing.id,
        )

        logger.debug("Deleted memory %s for user %s", memory_id, user_id)
        return res

    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs: Any,
    ) -> Any:
        # Delete all memories for a user
        user_id, _, _ = self._extract_ids(config)
        client = await self._get_client()
        res = await client.delete_all(user_id=user_id)  # type: ignore
        logger.debug("Forgot all memories for user %s", user_id)
        return res

    async def arelease(self) -> None:
        logger.info("Mem0Store released resources")

Attributes

app_id instance-attribute
app_id = app_id or 'pyagenity_app'
config instance-attribute
config = config

Functions

__init__
__init__(config, app_id=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
def __init__(
    self,
    config: MemoryConfig | dict,
    app_id: str | None = None,
    **kwargs: Any,
) -> None:
    super().__init__()
    self.config = config
    self.app_id = app_id or "pyagenity_app"
    self._client = None  # Lazy initialization

    logger.info(
        "Initialized Mem0Store (long-term) app=%s",
        self.app_id,
    )
adelete async
adelete(config, memory_id, **kwargs)
Source code in pyagenity/store/mem0_store.py
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs: Any,
) -> Any:
    user_id, _, _ = self._extract_ids(config)
    existing = await self.aget(config, memory_id)
    if not existing:
        logger.warning("Memory %s not found for deletion", memory_id)
        return {
            "deleted": False,
            "reason": "not_found",
        }

    if existing.user_id != user_id:
        raise ValueError("Cannot delete memory belonging to a different user")

    client = await self._get_client()
    res = await client.delete(  # type: ignore
        memory_id=existing.id,
    )

    logger.debug("Deleted memory %s for user %s", memory_id, user_id)
    return res
aforget_memory async
aforget_memory(config, **kwargs)
Source code in pyagenity/store/mem0_store.py
363
364
365
366
367
368
369
370
371
372
373
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs: Any,
) -> Any:
    # Delete all memories for a user
    user_id, _, _ = self._extract_ids(config)
    client = await self._get_client()
    res = await client.delete_all(user_id=user_id)  # type: ignore
    logger.debug("Forgot all memories for user %s", user_id)
    return res
aget async
aget(config, memory_id, **kwargs)
Source code in pyagenity/store/mem0_store.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs: Any,
) -> MemorySearchResult | None:
    user_id, _, _ = self._extract_ids(config)
    # If we stored mapping use that user id instead (authoritative)

    client = await self._get_client()
    result = await client.get(  # type: ignore
        memory_id=memory_id,
    )

    return self._create_result(result, user_id) if result else None
aget_all async
aget_all(config, limit=100, **kwargs)
Source code in pyagenity/store/mem0_store.py
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs: Any,
) -> list[MemorySearchResult]:
    user_id, thread_id, app_id = self._extract_ids(config)

    client = await self._get_client()
    result = await client.get_all(  # type: ignore
        user_id=user_id,
        agent_id=app_id,
        limit=limit,
    )

    if "results" not in result:
        logger.warning("Mem0 get_all response missing 'results': %s", result)
        return []

    if "relations" in result:
        logger.warning(
            "Mem0 get_all response contains 'relations', which is not supported yet: %s",
            result,
        )

    out: list[MemorySearchResult] = [
        self._create_result(raw, user_id) for raw in result["results"]
    ]

    logger.debug(
        "Fetched all memories for user=%s thread=%s count=%d",
        user_id,
        thread_id,
        len(out),
    )
    return out
arelease async
arelease()
Source code in pyagenity/store/mem0_store.py
375
376
async def arelease(self) -> None:
    logger.info("Mem0Store released resources")
asearch async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=None, distance_metric=None, max_tokens=4000, **kwargs)
Source code in pyagenity/store/mem0_store.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy=None,  # Unused for Mem0; kept for signature parity
    distance_metric=None,  # Unused
    max_tokens: int = 4000,
    **kwargs: Any,
) -> list[MemorySearchResult]:
    user_id, thread_id, app_id = self._extract_ids(config)

    client = await self._get_client()
    result = await client.search(  # type: ignore
        query=query,
        user_id=user_id,
        agent_id=app_id,
        limit=limit,
        filters=filters,
        threshold=score_threshold,
    )

    if "original_results" not in result:
        logger.warning("Mem0 search response missing 'original_results': %s", result)
        return []

    if "relations" in result:
        logger.warning(
            "Mem0 search response contains 'relations', which is not supported yet: %s",
            result,
        )

    out: list[MemorySearchResult] = [
        self._create_result(raw, user_id) for raw in result["original_results"]
    ]

    logger.debug(
        "Searched memories for user=%s thread=%s query=%s found=%d",
        user_id,
        thread_id,
        query,
        len(out),
    )
    return out
asetup async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
48
49
50
51
52
53
54
55
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
astore async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    text = content.text() if isinstance(content, Message) else str(content)
    if not text.strip():
        raise ValueError("Content cannot be empty")

    user_id, thread_id, app_id = self._extract_ids(config)

    mem_meta = {
        "memory_type": memory_type.value,
        "category": category,
        "created_at": datetime.now().isoformat(),
        **(metadata or {}),
    }

    client = await self._get_client()
    result = await client.add(  # type: ignore
        messages=[{"role": "user", "content": text}],
        user_id=user_id,
        agent_id=app_id,
        run_id=thread_id,
        metadata=mem_meta,
    )

    logger.debug("Stored memory for user=%s thread=%s id=%s", user_id, thread_id, result)

    return result
aupdate async
aupdate(config, memory_id, content, metadata=None, **kwargs)
Source code in pyagenity/store/mem0_store.py
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs: Any,
) -> Any:
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # user_id obtained for potential permission checks (not used by Mem0 update directly)

    new_text = content.text() if isinstance(content, Message) else str(content)
    updated_meta = {**(existing.metadata or {}), **(metadata or {})}
    updated_meta["updated_at"] = datetime.now().isoformat()

    client = await self._get_client()
    res = await client.update(  # type: ignore
        memory_id=existing.id,
        data=new_text,
    )

    logger.debug("Updated memory %s via recreate", memory_id)
    return res
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
generate_framework_id async
generate_framework_id()
Source code in pyagenity/store/mem0_store.py
163
164
165
166
167
async def generate_framework_id(self) -> str:
    generated_id = InjectQ.get_instance().try_get("generated_id", str(uuid4()))
    if isinstance(generated_id, Awaitable):
        generated_id = await generated_id
    return generated_id
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions

create_mem0_store

create_mem0_store(config, user_id='default_user', thread_id=None, app_id='pyagenity_app')

Factory for a basic Mem0 long-term store.

Source code in pyagenity/store/mem0_store.py
382
383
384
385
386
387
388
389
390
391
392
393
394
def create_mem0_store(
    config: dict[str, Any],
    user_id: str = "default_user",
    thread_id: str | None = None,
    app_id: str = "pyagenity_app",
) -> Mem0Store:
    """Factory for a basic Mem0 long-term store."""
    return Mem0Store(
        config=config,
        default_user_id=user_id,
        default_thread_id=thread_id,
        app_id=app_id,
    )

create_mem0_store_with_qdrant

create_mem0_store_with_qdrant(qdrant_url, qdrant_api_key=None, collection_name='pyagenity_memories', embedding_model='text-embedding-ada-002', llm_model='gpt-4o-mini', app_id='pyagenity_app', **kwargs)

Factory producing a Mem0Store configured for Qdrant backing.

Source code in pyagenity/store/mem0_store.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def create_mem0_store_with_qdrant(
    qdrant_url: str,
    qdrant_api_key: str | None = None,
    collection_name: str = "pyagenity_memories",
    embedding_model: str = "text-embedding-ada-002",
    llm_model: str = "gpt-4o-mini",
    app_id: str = "pyagenity_app",
    **kwargs: Any,
) -> Mem0Store:
    """Factory producing a Mem0Store configured for Qdrant backing."""
    config = {
        "vector_store": {
            "provider": "qdrant",
            "config": {
                "collection_name": collection_name,
                "url": qdrant_url,
                "api_key": qdrant_api_key,
                **kwargs.get("vector_store_config", {}),
            },
        },
        "embedder": {
            "provider": kwargs.get("embedder_provider", "openai"),
            "config": {"model": embedding_model, **kwargs.get("embedder_config", {})},
        },
        "llm": {
            "provider": kwargs.get("llm_provider", "openai"),
            "config": {"model": llm_model, **kwargs.get("llm_config", {})},
        },
    }
    return create_mem0_store(
        config=config,
        app_id=app_id,
    )