Skip to content

Qdrant store

Qdrant Vector Store Implementation for PyAgenity Framework

This module provides a modern, async-first implementation of BaseStore using Qdrant as the backend vector database. Supports both local and cloud Qdrant deployments with configurable embedding services.

Classes:

Name Description
QdrantStore

Modern async-first Qdrant-based vector store implementation.

Functions:

Name Description
create_cloud_qdrant_store

Create a cloud Qdrant store.

create_local_qdrant_store

Create a local Qdrant store.

create_remote_qdrant_store

Create a remote Qdrant store.

Attributes:

Name Type Description
logger
msg

Attributes

logger module-attribute

logger = getLogger(__name__)

msg module-attribute

msg = "Qdrant client not installed. Install with: pip install 'pyagenity[qdrant]'"

Classes

QdrantStore

Bases: BaseStore

Modern async-first Qdrant-based vector store implementation.

Features: - Async-only operations for better performance - Local and cloud Qdrant deployment support - Configurable embedding services - Efficient vector similarity search with multiple strategies - Collection management with automatic creation - Rich metadata filtering capabilities - User and agent-scoped operations

Example
# Local Qdrant with OpenAI embeddings
store = QdrantStore(path="./qdrant_data", embedding_service=OpenAIEmbeddingService())

# Remote Qdrant
store = QdrantStore(host="localhost", port=6333, embedding_service=OpenAIEmbeddingService())

# Cloud Qdrant
store = QdrantStore(
    url="https://xyz.qdrant.io",
    api_key="your-api-key",
    embedding_service=OpenAIEmbeddingService(),
)

Methods:

Name Description
__init__

Initialize Qdrant vector store.

adelete

Delete a memory by ID.

aforget_memory

Delete all memories for a user or agent.

aget

Get a specific memory by ID.

aget_all

Get all memories for a user.

arelease

Clean up resources.

asearch

Search memories by content similarity.

asetup

Set up the store and ensure default collection exists.

astore

Store a new memory.

aupdate

Update an existing memory.

delete

Synchronous wrapper for adelete that runs the async implementation.

forget_memory

Delete a memory by for a user or agent.

get

Synchronous wrapper for aget that runs the async implementation.

get_all

Synchronous wrapper for aget that runs the async implementation.

release

Clean up any resources used by the store (override in subclasses if needed).

search

Synchronous wrapper for asearch that runs the async implementation.

setup

Synchronous setup method for checkpointer.

store

Synchronous wrapper for astore that runs the async implementation.

update

Synchronous wrapper for aupdate that runs the async implementation.

Attributes:

Name Type Description
client
default_collection
embedding
Source code in pyagenity/store/qdrant_store.py
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
class QdrantStore(BaseStore):
    """
    Modern async-first Qdrant-based vector store implementation.

    Features:
    - Async-only operations for better performance
    - Local and cloud Qdrant deployment support
    - Configurable embedding services
    - Efficient vector similarity search with multiple strategies
    - Collection management with automatic creation
    - Rich metadata filtering capabilities
    - User and agent-scoped operations

    Example:
        ```python
        # Local Qdrant with OpenAI embeddings
        store = QdrantStore(path="./qdrant_data", embedding_service=OpenAIEmbeddingService())

        # Remote Qdrant
        store = QdrantStore(host="localhost", port=6333, embedding_service=OpenAIEmbeddingService())

        # Cloud Qdrant
        store = QdrantStore(
            url="https://xyz.qdrant.io",
            api_key="your-api-key",
            embedding_service=OpenAIEmbeddingService(),
        )
        ```
    """

    def __init__(
        self,
        embedding: BaseEmbedding,
        path: str | None = None,
        host: str | None = None,
        port: int | None = None,
        url: str | None = None,
        api_key: str | None = None,
        default_collection: str = "pyagenity_memories",
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        **kwargs: Any,
    ):
        """
        Initialize Qdrant vector store.

        Args:
            embedding: Service for generating embeddings
            path: Path for local Qdrant (file-based storage)
            host: Host for remote Qdrant server
            port: Port for remote Qdrant server
            url: URL for Qdrant cloud
            api_key: API key for Qdrant cloud
            default_collection: Default collection name
            distance_metric: Default distance metric
            **kwargs: Additional client parameters
        """
        self.embedding = embedding

        # Initialize async client
        if path:
            self.client = AsyncQdrantClient(path=path, **kwargs)
        elif url:
            self.client = AsyncQdrantClient(url=url, api_key=api_key, **kwargs)
        else:
            host = host or "localhost"
            port = port or 6333
            self.client = AsyncQdrantClient(host=host, port=port, api_key=api_key, **kwargs)

        # Cache for collection existence checks
        self._collection_cache = set()
        self._setup_lock = asyncio.Lock()

        self.default_collection = default_collection
        self._default_distance_metric = distance_metric

        logger.info(f"Initialized QdrantStore with config: path={path}, host={host}, url={url}")

    async def asetup(self) -> Any:
        """Set up the store and ensure default collection exists."""
        async with self._setup_lock:
            await self._ensure_collection_exists(self.default_collection)
        return True

    def _distance_metric_to_qdrant(self, metric: DistanceMetric) -> Distance:
        """Convert framework distance metric to Qdrant distance."""
        mapping = {
            DistanceMetric.COSINE: Distance.COSINE,
            DistanceMetric.EUCLIDEAN: Distance.EUCLID,
            DistanceMetric.DOT_PRODUCT: Distance.DOT,
            DistanceMetric.MANHATTAN: Distance.MANHATTAN,
        }
        return mapping.get(metric, Distance.COSINE)

    def _extract_config_values(self, config: dict[str, Any]) -> tuple[str | None, str | None, str]:
        """Extract user_id, thread_id, and collection from config."""
        user_id = config.get("user_id")
        thread_id = config.get("thread_id")
        collection = config.get("collection", self.default_collection)
        return user_id, thread_id, collection

    def _point_to_search_result(self, point) -> MemorySearchResult:
        """Convert Qdrant point to MemorySearchResult."""
        payload = getattr(point, "payload", {}) or {}

        # Extract content
        content = payload.get("content", "")

        # Convert memory_type string back to enum
        memory_type_str = payload.get("memory_type", "episodic")
        try:
            memory_type = MemoryType(memory_type_str)
        except ValueError:
            memory_type = MemoryType.EPISODIC

        # Parse timestamp
        timestamp_str = payload.get("timestamp")
        timestamp = None
        if timestamp_str:
            try:
                timestamp = datetime.fromisoformat(timestamp_str)
            except (ValueError, TypeError):
                timestamp = None

        return MemorySearchResult(
            id=str(point.id),
            content=content,
            score=float(getattr(point, "score", 1.0) or 0.0),
            memory_type=memory_type,
            metadata=payload,
            vector=getattr(point, "vector", None),
            user_id=payload.get("user_id"),
            thread_id=payload.get("thread_id")
            or payload.get("agent_id"),  # Support both thread_id and agent_id
            timestamp=timestamp,
        )

    def _build_qdrant_filter(
        self,
        user_id: str | None = None,
        thread_id: str | None = None,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        filters: dict[str, Any] | None = None,
    ) -> Filter | None:
        """Build Qdrant filter from parameters."""
        conditions = []

        # Add user/agent filters
        if user_id:
            conditions.append(
                FieldCondition(
                    key="user_id",
                    match=MatchValue(value=user_id),
                ),
            )
        if thread_id:
            conditions.append(
                FieldCondition(
                    key="thread_id",
                    match=MatchValue(value=thread_id),
                ),
            )
        if memory_type:
            conditions.append(
                FieldCondition(
                    key="memory_type",
                    match=MatchValue(value=memory_type.value),
                )
            )
        if category:
            conditions.append(FieldCondition(key="category", match=MatchValue(value=category)))

        # Add custom filters
        if filters:
            for key, value in filters.items():
                if isinstance(value, str | int | bool):
                    conditions.append(FieldCondition(key=key, match=MatchValue(value=value)))

        return Filter(must=conditions) if conditions else None

    async def _ensure_collection_exists(self, collection: str) -> None:
        """Ensure collection exists, create if not."""
        if collection in self._collection_cache:
            return

        try:
            # Check if collection exists
            collections = await self.client.get_collections()
            existing_names = {col.name for col in collections.collections}

            if collection not in existing_names:
                # Create collection with vector configuration
                await self.client.create_collection(
                    collection_name=collection,
                    vectors_config=VectorParams(
                        size=self.embedding.dimension,
                        distance=self._distance_metric_to_qdrant(
                            self._default_distance_metric,
                        ),
                    ),
                )
                logger.info(f"Created collection: {collection}")

            self._collection_cache.add(collection)
        except Exception as e:
            logger.error(f"Error ensuring collection {collection} exists: {e}")
            raise

    def _prepare_content(self, content: str | Message) -> str:
        """Extract text content from string or Message."""
        if isinstance(content, Message):
            return content.text()
        return content

    def _create_memory_record(
        self,
        content: str | Message,
        user_id: str | None = None,
        thread_id: str | None = None,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
    ) -> MemoryRecord:
        """Create a memory record from parameters."""
        text_content = self._prepare_content(content)

        if isinstance(content, Message):
            return MemoryRecord.from_message(
                content,
                user_id=user_id,
                thread_id=thread_id,
                additional_metadata=metadata,
            )

        return MemoryRecord(
            content=text_content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            metadata=metadata or {},
            category=category,
        )

    # --- BaseStore abstract method implementations ---

    async def astore(
        self,
        config: dict[str, Any],
        content: str | Message,
        memory_type: MemoryType = MemoryType.EPISODIC,
        category: str = "general",
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> str:
        """Store a new memory."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Create memory record
        record = self._create_memory_record(
            content=content,
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
        )

        # Generate embedding
        text_content = self._prepare_content(content)
        vector = await self.embedding.aembed(text_content)
        if not vector or len(vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Prepare payload
        payload = {
            "content": record.content,
            "user_id": record.user_id,
            "thread_id": record.thread_id,
            "memory_type": record.memory_type.value,
            "category": record.category,
            "timestamp": record.timestamp.isoformat() if record.timestamp else None,
            **record.metadata,
        }

        # Create point
        point = PointStruct(
            id=record.id,
            vector=vector,
            payload=payload,
        )

        # Store in Qdrant
        await self.client.upsert(
            collection_name=collection,
            points=[point],
        )

        logger.debug(f"Stored memory {record.id} in collection {collection}")
        return record.id

    async def asearch(
        self,
        config: dict[str, Any],
        query: str,
        memory_type: MemoryType | None = None,
        category: str | None = None,
        limit: int = 10,
        score_threshold: float | None = None,
        filters: dict[str, Any] | None = None,
        retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
        distance_metric: DistanceMetric = DistanceMetric.COSINE,
        max_tokens: int = 4000,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Search memories by content similarity."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Generate query embedding
        query_vector = await self.embedding.aembed(query)
        if not query_vector or len(query_vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Build filter
        search_filter = self._build_qdrant_filter(
            user_id=user_id,
            thread_id=thread_id,
            memory_type=memory_type,
            category=category,
            filters=filters,
        )

        # Perform search
        search_result = await self.client.search(
            collection_name=collection,
            query_vector=query_vector,
            query_filter=search_filter,
            limit=limit,
            score_threshold=score_threshold,
        )

        # Convert to search results
        results = [self._point_to_search_result(point) for point in search_result]

        logger.debug(f"Found {len(results)} memories for query in collection {collection}")
        return results

    async def aget(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> MemorySearchResult | None:
        """Get a specific memory by ID."""
        user_id, thread_id, collection = self._extract_config_values(config)

        try:
            # Ensure collection exists
            await self._ensure_collection_exists(collection)

            # Get point by ID
            points = await self.client.retrieve(
                collection_name=collection,
                ids=[memory_id],
            )

            if not points:
                return None

            point = points[0]
            result = self._point_to_search_result(point)

            # Verify user/agent access if specified
            if user_id and result.user_id != user_id:
                return None
            if thread_id and result.thread_id != thread_id:
                return None

            return result

        except Exception as e:
            logger.error(f"Error retrieving memory {memory_id}: {e}")
            return None

    async def aget_all(
        self,
        config: dict[str, Any],
        limit: int = 100,
        **kwargs,
    ) -> list[MemorySearchResult]:
        """Get all memories for a user."""
        user_id, _, collection = self._extract_config_values(config)

        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Build filter
        search_filter = self._build_qdrant_filter(
            user_id=user_id,
        )

        # Perform search
        search_result = await self.client.search(
            collection_name=collection,
            query_vector=[],
            query_filter=search_filter,
            limit=limit,
        )

        # Convert to search results
        results = [self._point_to_search_result(point) for point in search_result]

        logger.debug(f"Found {len(results)} memories for query in collection {collection}")
        return results

    async def aupdate(
        self,
        config: dict[str, Any],
        memory_id: str,
        content: str | Message,
        metadata: dict[str, Any] | None = None,
        **kwargs,
    ) -> None:
        """Update an existing memory."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Get existing memory
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # Verify user/agent access if specified
        if user_id and existing.user_id != user_id:
            raise PermissionError("User does not have permission to update this memory")
        if thread_id and existing.thread_id != thread_id:
            raise PermissionError("Thread does not have permission to update this memory")

        # Prepare new content
        text_content = self._prepare_content(content)
        new_vector = await self.embedding.aembed(text_content)
        if not new_vector or len(new_vector) != self.embedding.dimension:
            raise ValueError("Embedding service returned invalid vector")

        # Update payload
        updated_metadata = {**existing.metadata}
        if metadata:
            updated_metadata.update(metadata)

        updated_payload = {
            "content": text_content,
            "user_id": existing.user_id,
            "thread_id": existing.thread_id,
            "memory_type": existing.memory_type.value,
            "category": updated_metadata.get("category", "general"),
            "timestamp": datetime.now().isoformat(),
            **updated_metadata,
        }

        # Create updated point
        point = PointStruct(
            id=memory_id,
            vector=new_vector,
            payload=updated_payload,
        )

        # Update in Qdrant
        await self.client.upsert(
            collection_name=collection,
            points=[point],
        )

        logger.debug(f"Updated memory {memory_id} in collection {collection}")

    async def adelete(
        self,
        config: dict[str, Any],
        memory_id: str,
        **kwargs,
    ) -> None:
        """Delete a memory by ID."""
        user_id, thread_id, collection = self._extract_config_values(config)

        # Verify memory exists and user has access
        existing = await self.aget(config, memory_id)
        if not existing:
            raise ValueError(f"Memory {memory_id} not found")

        # verify user/agent access if specified
        if user_id and existing.user_id != user_id:
            raise PermissionError("User does not have permission to delete this memory")
        if thread_id and existing.thread_id != thread_id:
            raise PermissionError("Thread does not have permission to delete this memory")

        # Delete from Qdrant
        await self.client.delete(
            collection_name=collection,
            points_selector=models.PointIdsList(points=[memory_id]),
        )

        logger.debug(f"Deleted memory {memory_id} from collection {collection}")

    async def aforget_memory(
        self,
        config: dict[str, Any],
        **kwargs,
    ) -> None:
        """Delete all memories for a user or agent."""
        user_id, agent_id, collection = self._extract_config_values(config)

        # Build filter for memories to delete
        delete_filter = self._build_qdrant_filter(user_id=user_id, thread_id=agent_id)

        if delete_filter:
            # Delete matching memories
            await self.client.delete(
                collection_name=collection,
                points_selector=models.FilterSelector(filter=delete_filter),
            )

            logger.info(
                f"Deleted all memories for user_id={user_id}, agent_id={agent_id} "
                f"in collection {collection}"
            )
        else:
            logger.warning("No user_id or agent_id specified for memory deletion")

    async def arelease(self) -> None:
        """Clean up resources."""
        if hasattr(self.client, "close"):
            await self.client.close()
        logger.info("QdrantStore resources released")

Attributes

client instance-attribute
client = AsyncQdrantClient(path=path, **kwargs)
default_collection instance-attribute
default_collection = default_collection
embedding instance-attribute
embedding = embedding

Functions

__init__
__init__(embedding, path=None, host=None, port=None, url=None, api_key=None, default_collection='pyagenity_memories', distance_metric=DistanceMetric.COSINE, **kwargs)

Initialize Qdrant vector store.

Parameters:

Name Type Description Default
embedding
BaseEmbedding

Service for generating embeddings

required
path
str | None

Path for local Qdrant (file-based storage)

None
host
str | None

Host for remote Qdrant server

None
port
int | None

Port for remote Qdrant server

None
url
str | None

URL for Qdrant cloud

None
api_key
str | None

API key for Qdrant cloud

None
default_collection
str

Default collection name

'pyagenity_memories'
distance_metric
DistanceMetric

Default distance metric

COSINE
**kwargs
Any

Additional client parameters

{}
Source code in pyagenity/store/qdrant_store.py
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def __init__(
    self,
    embedding: BaseEmbedding,
    path: str | None = None,
    host: str | None = None,
    port: int | None = None,
    url: str | None = None,
    api_key: str | None = None,
    default_collection: str = "pyagenity_memories",
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    **kwargs: Any,
):
    """
    Initialize Qdrant vector store.

    Args:
        embedding: Service for generating embeddings
        path: Path for local Qdrant (file-based storage)
        host: Host for remote Qdrant server
        port: Port for remote Qdrant server
        url: URL for Qdrant cloud
        api_key: API key for Qdrant cloud
        default_collection: Default collection name
        distance_metric: Default distance metric
        **kwargs: Additional client parameters
    """
    self.embedding = embedding

    # Initialize async client
    if path:
        self.client = AsyncQdrantClient(path=path, **kwargs)
    elif url:
        self.client = AsyncQdrantClient(url=url, api_key=api_key, **kwargs)
    else:
        host = host or "localhost"
        port = port or 6333
        self.client = AsyncQdrantClient(host=host, port=port, api_key=api_key, **kwargs)

    # Cache for collection existence checks
    self._collection_cache = set()
    self._setup_lock = asyncio.Lock()

    self.default_collection = default_collection
    self._default_distance_metric = distance_metric

    logger.info(f"Initialized QdrantStore with config: path={path}, host={host}, url={url}")
adelete async
adelete(config, memory_id, **kwargs)

Delete a memory by ID.

Source code in pyagenity/store/qdrant_store.py
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
async def adelete(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> None:
    """Delete a memory by ID."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Verify memory exists and user has access
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # verify user/agent access if specified
    if user_id and existing.user_id != user_id:
        raise PermissionError("User does not have permission to delete this memory")
    if thread_id and existing.thread_id != thread_id:
        raise PermissionError("Thread does not have permission to delete this memory")

    # Delete from Qdrant
    await self.client.delete(
        collection_name=collection,
        points_selector=models.PointIdsList(points=[memory_id]),
    )

    logger.debug(f"Deleted memory {memory_id} from collection {collection}")
aforget_memory async
aforget_memory(config, **kwargs)

Delete all memories for a user or agent.

Source code in pyagenity/store/qdrant_store.py
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
async def aforget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> None:
    """Delete all memories for a user or agent."""
    user_id, agent_id, collection = self._extract_config_values(config)

    # Build filter for memories to delete
    delete_filter = self._build_qdrant_filter(user_id=user_id, thread_id=agent_id)

    if delete_filter:
        # Delete matching memories
        await self.client.delete(
            collection_name=collection,
            points_selector=models.FilterSelector(filter=delete_filter),
        )

        logger.info(
            f"Deleted all memories for user_id={user_id}, agent_id={agent_id} "
            f"in collection {collection}"
        )
    else:
        logger.warning("No user_id or agent_id specified for memory deletion")
aget async
aget(config, memory_id, **kwargs)

Get a specific memory by ID.

Source code in pyagenity/store/qdrant_store.py
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
async def aget(
    self,
    config: dict[str, Any],
    memory_id: str,
    **kwargs,
) -> MemorySearchResult | None:
    """Get a specific memory by ID."""
    user_id, thread_id, collection = self._extract_config_values(config)

    try:
        # Ensure collection exists
        await self._ensure_collection_exists(collection)

        # Get point by ID
        points = await self.client.retrieve(
            collection_name=collection,
            ids=[memory_id],
        )

        if not points:
            return None

        point = points[0]
        result = self._point_to_search_result(point)

        # Verify user/agent access if specified
        if user_id and result.user_id != user_id:
            return None
        if thread_id and result.thread_id != thread_id:
            return None

        return result

    except Exception as e:
        logger.error(f"Error retrieving memory {memory_id}: {e}")
        return None
aget_all async
aget_all(config, limit=100, **kwargs)

Get all memories for a user.

Source code in pyagenity/store/qdrant_store.py
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
async def aget_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Get all memories for a user."""
    user_id, _, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Build filter
    search_filter = self._build_qdrant_filter(
        user_id=user_id,
    )

    # Perform search
    search_result = await self.client.search(
        collection_name=collection,
        query_vector=[],
        query_filter=search_filter,
        limit=limit,
    )

    # Convert to search results
    results = [self._point_to_search_result(point) for point in search_result]

    logger.debug(f"Found {len(results)} memories for query in collection {collection}")
    return results
arelease async
arelease()

Clean up resources.

Source code in pyagenity/store/qdrant_store.py
576
577
578
579
580
async def arelease(self) -> None:
    """Clean up resources."""
    if hasattr(self.client, "close"):
        await self.client.close()
    logger.info("QdrantStore resources released")
asearch async
asearch(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Search memories by content similarity.

Source code in pyagenity/store/qdrant_store.py
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
async def asearch(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Search memories by content similarity."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Generate query embedding
    query_vector = await self.embedding.aembed(query)
    if not query_vector or len(query_vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Build filter
    search_filter = self._build_qdrant_filter(
        user_id=user_id,
        thread_id=thread_id,
        memory_type=memory_type,
        category=category,
        filters=filters,
    )

    # Perform search
    search_result = await self.client.search(
        collection_name=collection,
        query_vector=query_vector,
        query_filter=search_filter,
        limit=limit,
        score_threshold=score_threshold,
    )

    # Convert to search results
    results = [self._point_to_search_result(point) for point in search_result]

    logger.debug(f"Found {len(results)} memories for query in collection {collection}")
    return results
asetup async
asetup()

Set up the store and ensure default collection exists.

Source code in pyagenity/store/qdrant_store.py
122
123
124
125
126
async def asetup(self) -> Any:
    """Set up the store and ensure default collection exists."""
    async with self._setup_lock:
        await self._ensure_collection_exists(self.default_collection)
    return True
astore async
astore(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Store a new memory.

Source code in pyagenity/store/qdrant_store.py
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
async def astore(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Store a new memory."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Ensure collection exists
    await self._ensure_collection_exists(collection)

    # Create memory record
    record = self._create_memory_record(
        content=content,
        user_id=user_id,
        thread_id=thread_id,
        memory_type=memory_type,
        category=category,
        metadata=metadata,
    )

    # Generate embedding
    text_content = self._prepare_content(content)
    vector = await self.embedding.aembed(text_content)
    if not vector or len(vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Prepare payload
    payload = {
        "content": record.content,
        "user_id": record.user_id,
        "thread_id": record.thread_id,
        "memory_type": record.memory_type.value,
        "category": record.category,
        "timestamp": record.timestamp.isoformat() if record.timestamp else None,
        **record.metadata,
    }

    # Create point
    point = PointStruct(
        id=record.id,
        vector=vector,
        payload=payload,
    )

    # Store in Qdrant
    await self.client.upsert(
        collection_name=collection,
        points=[point],
    )

    logger.debug(f"Stored memory {record.id} in collection {collection}")
    return record.id
aupdate async
aupdate(config, memory_id, content, metadata=None, **kwargs)

Update an existing memory.

Source code in pyagenity/store/qdrant_store.py
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
async def aupdate(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> None:
    """Update an existing memory."""
    user_id, thread_id, collection = self._extract_config_values(config)

    # Get existing memory
    existing = await self.aget(config, memory_id)
    if not existing:
        raise ValueError(f"Memory {memory_id} not found")

    # Verify user/agent access if specified
    if user_id and existing.user_id != user_id:
        raise PermissionError("User does not have permission to update this memory")
    if thread_id and existing.thread_id != thread_id:
        raise PermissionError("Thread does not have permission to update this memory")

    # Prepare new content
    text_content = self._prepare_content(content)
    new_vector = await self.embedding.aembed(text_content)
    if not new_vector or len(new_vector) != self.embedding.dimension:
        raise ValueError("Embedding service returned invalid vector")

    # Update payload
    updated_metadata = {**existing.metadata}
    if metadata:
        updated_metadata.update(metadata)

    updated_payload = {
        "content": text_content,
        "user_id": existing.user_id,
        "thread_id": existing.thread_id,
        "memory_type": existing.memory_type.value,
        "category": updated_metadata.get("category", "general"),
        "timestamp": datetime.now().isoformat(),
        **updated_metadata,
    }

    # Create updated point
    point = PointStruct(
        id=memory_id,
        vector=new_vector,
        payload=updated_payload,
    )

    # Update in Qdrant
    await self.client.upsert(
        collection_name=collection,
        points=[point],
    )

    logger.debug(f"Updated memory {memory_id} in collection {collection}")
delete
delete(config, memory_id, **kwargs)

Synchronous wrapper for adelete that runs the async implementation.

Source code in pyagenity/store/base_store.py
247
248
249
def delete(self, config: dict[str, Any], memory_id: str, **kwargs) -> None:
    """Synchronous wrapper for `adelete` that runs the async implementation."""
    return run_coroutine(self.adelete(config, memory_id, **kwargs))
forget_memory
forget_memory(config, **kwargs)

Delete a memory by for a user or agent.

Source code in pyagenity/store/base_store.py
260
261
262
263
264
265
266
def forget_memory(
    self,
    config: dict[str, Any],
    **kwargs,
) -> Any:
    """Delete a memory by for a user or agent."""
    return run_coroutine(self.aforget_memory(config, **kwargs))
get
get(config, memory_id, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
193
194
195
def get(self, config: dict[str, Any], memory_id: str, **kwargs) -> MemorySearchResult | None:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget(config, memory_id, **kwargs))
get_all
get_all(config, limit=100, **kwargs)

Synchronous wrapper for aget that runs the async implementation.

Source code in pyagenity/store/base_store.py
197
198
199
200
201
202
203
204
def get_all(
    self,
    config: dict[str, Any],
    limit: int = 100,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `aget` that runs the async implementation."""
    return run_coroutine(self.aget_all(config, limit=limit, **kwargs))
release
release()

Clean up any resources used by the store (override in subclasses if needed).

Source code in pyagenity/store/base_store.py
274
275
276
def release(self) -> None:
    """Clean up any resources used by the store (override in subclasses if needed)."""
    return run_coroutine(self.arelease())
search
search(config, query, memory_type=None, category=None, limit=10, score_threshold=None, filters=None, retrieval_strategy=RetrievalStrategy.SIMILARITY, distance_metric=DistanceMetric.COSINE, max_tokens=4000, **kwargs)

Synchronous wrapper for asearch that runs the async implementation.

Source code in pyagenity/store/base_store.py
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def search(
    self,
    config: dict[str, Any],
    query: str,
    memory_type: MemoryType | None = None,
    category: str | None = None,
    limit: int = 10,
    score_threshold: float | None = None,
    filters: dict[str, Any] | None = None,
    retrieval_strategy: RetrievalStrategy = RetrievalStrategy.SIMILARITY,
    distance_metric: DistanceMetric = DistanceMetric.COSINE,
    max_tokens: int = 4000,
    **kwargs,
) -> list[MemorySearchResult]:
    """Synchronous wrapper for `asearch` that runs the async implementation."""
    return run_coroutine(
        self.asearch(
            config,
            query,
            memory_type=memory_type,
            category=category,
            limit=limit,
            score_threshold=score_threshold,
            filters=filters,
            retrieval_strategy=retrieval_strategy,
            distance_metric=distance_metric,
            max_tokens=max_tokens,
            **kwargs,
        )
    )
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/store/base_store.py
39
40
41
42
43
44
45
46
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())
store
store(config, content, memory_type=MemoryType.EPISODIC, category='general', metadata=None, **kwargs)

Synchronous wrapper for astore that runs the async implementation.

Source code in pyagenity/store/base_store.py
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
def store(
    self,
    config: dict[str, Any],
    content: str | Message,
    memory_type: MemoryType = MemoryType.EPISODIC,
    category: str = "general",
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> str:
    """Synchronous wrapper for `astore` that runs the async implementation."""
    return run_coroutine(
        self.astore(
            config,
            content,
            memory_type=memory_type,
            category=category,
            metadata=metadata,
            **kwargs,
        )
    )
update
update(config, memory_id, content, metadata=None, **kwargs)

Synchronous wrapper for aupdate that runs the async implementation.

Source code in pyagenity/store/base_store.py
226
227
228
229
230
231
232
233
234
235
def update(
    self,
    config: dict[str, Any],
    memory_id: str,
    content: str | Message,
    metadata: dict[str, Any] | None = None,
    **kwargs,
) -> Any:
    """Synchronous wrapper for `aupdate` that runs the async implementation."""
    return run_coroutine(self.aupdate(config, memory_id, content, metadata=metadata, **kwargs))

Functions

create_cloud_qdrant_store

create_cloud_qdrant_store(url, api_key, embedding, **kwargs)

Create a cloud Qdrant store.

Source code in pyagenity/store/qdrant_store.py
614
615
616
617
618
619
620
621
622
623
624
625
626
def create_cloud_qdrant_store(
    url: str,
    api_key: str,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a cloud Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        url=url,
        api_key=api_key,
        **kwargs,
    )

create_local_qdrant_store

create_local_qdrant_store(path, embedding, **kwargs)

Create a local Qdrant store.

Source code in pyagenity/store/qdrant_store.py
586
587
588
589
590
591
592
593
594
595
596
def create_local_qdrant_store(
    path: str,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a local Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        path=path,
        **kwargs,
    )

create_remote_qdrant_store

create_remote_qdrant_store(host, port, embedding, **kwargs)

Create a remote Qdrant store.

Source code in pyagenity/store/qdrant_store.py
599
600
601
602
603
604
605
606
607
608
609
610
611
def create_remote_qdrant_store(
    host: str,
    port: int,
    embedding: BaseEmbedding,
    **kwargs,
) -> QdrantStore:
    """Create a remote Qdrant store."""
    return QdrantStore(
        embedding=embedding,
        host=host,
        port=port,
        **kwargs,
    )