Skip to content

Embedding System: Semantic Search Foundation

The embedding system in Agentflow provides a clean abstraction for converting text into vector representations, enabling semantic search and similarity-based retrieval across your agent's knowledge memory. This abstraction layer decouples your application logic from specific embedding providers, giving you the flexibility to switch between different models and services without changing your code.

The Embedding Abstraction

Why Embeddings Matter

At the heart of modern AI memory systems lies a fundamental challenge: how do we find semantically related information in vast knowledge repositories? Traditional keyword search falls short because it can't understand meaning, context, or intent. This is where embeddings shine.

Embeddings are dense vector representations of text that capture semantic meaning in a numerical form. Similar concepts cluster together in this vector space, enabling:

  • Semantic similarity search: Find related memories even with different wording
  • Context-aware retrieval: Understand intent and nuance beyond keywords
  • Multimodal understanding: Bridge different types of content (text, code, etc.)
  • Efficient comparison: Use mathematical distance metrics for fast lookups
# Keyword search misses this connection
query = "debugging techniques"
memory = "I used print statements to trace the issue"  # No keyword match!

# Embedding-based search understands the semantic relationship
query_vector = embedding.embed(query)
memory_vector = embedding.embed(memory)
similarity = cosine_similarity(query_vector, memory_vector)  # High score!

The BaseEmbedding Interface

Agentflow defines a simple but powerful interface that all embedding implementations must follow:

from agentflow.store.embedding import BaseEmbedding

class BaseEmbedding(ABC):
    async def aembed(self, text: str) -> list[float]:
        """Generate embedding vector for a single text."""

    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        """Generate embedding vectors for multiple texts efficiently."""

    @property
    def dimension(self) -> int:
        """Return the dimensionality of embedding vectors."""

This abstraction provides several key benefits:

1. Provider Agnosticism

Switch between OpenAI, Cohere, Hugging Face, or custom models without changing application code:

# Development: Use OpenAI
embedding = OpenAIEmbedding(model="text-embedding-3-small")

# Production: Switch to custom model
embedding = CustomEmbedding(model_path="./fine-tuned-model")

# Store works with any implementation
store = QdrantStore(embedding=embedding, path="./data")

2. Performance Optimization

The batch interface enables efficient processing of multiple texts:

# Inefficient: One API call per text
embeddings = [await embedding.aembed(text) for text in texts]

# Efficient: Single batched API call
embeddings = await embedding.aembed_batch(texts)

3. Type Safety and Consistency

The dimension property ensures vector compatibility:

# Vector store can validate dimensions at setup time
assert embedding.dimension == expected_dimension

Architecture Philosophy

Separation of Concerns

The embedding system follows a clear separation of responsibilities:

┌─────────────────────────────────────────────────┐
│           Application Layer                     │
│  (Agents, Nodes, Business Logic)               │
└─────────────────┬───────────────────────────────┘
                  ├── Uses
┌─────────────────────────────────────────────────┐
│           Storage Layer (BaseStore)             │
│  (QdrantStore, Mem0Store)                      │
└─────────────────┬───────────────────────────────┘
                  ├── Delegates to
┌─────────────────────────────────────────────────┐
│        Embedding Layer (BaseEmbedding)          │
│  (OpenAIEmbedding, CustomEmbedding)            │
└─────────────────────────────────────────────────┘

Benefits of this architecture:

  • Single Responsibility: Each layer has a focused purpose
  • Testability: Mock embedding services for unit tests
  • Flexibility: Swap implementations without coupling
  • Optimization: Cache and batch at the right layer

Async-First Design

All embedding operations are asynchronous by default, with synchronous wrappers for compatibility:

# Async interface (preferred for performance)
vector = await embedding.aembed("Some text")
vectors = await embedding.aembed_batch(["Text 1", "Text 2"])

# Sync wrappers (for compatibility)
vector = embedding.embed("Some text")
vectors = embedding.embed_batch(["Text 1", "Text 2"])

This design choice enables:

  • Non-blocking operations: Multiple embedding requests can run concurrently
  • Better throughput: Batch operations utilize network efficiently
  • Resource efficiency: Don't block threads waiting for API responses

Embedding Strategies

Different use cases benefit from different embedding models and strategies:

Model Selection

Small Models: Fast and Efficient

# OpenAI's small model: 1536 dimensions
embedding = OpenAIEmbedding(model="text-embedding-3-small")

# Best for:
# - High-throughput applications
# - Cost-sensitive deployments
# - Real-time search requirements
# - General-purpose semantic search

Large Models: Maximum Accuracy

# OpenAI's large model: 3072 dimensions  
embedding = OpenAIEmbedding(model="text-embedding-3-large")

# Best for:
# - Precise semantic matching
# - Domain-specific applications
# - Quality over speed scenarios
# - Complex query understanding

Custom Models: Domain Specialization

# Fine-tuned model for specific domain
class DomainEmbedding(BaseEmbedding):
    def __init__(self, model_path: str):
        self.model = load_custom_model(model_path)
        self._dimension = 768  # Depends on your model

    async def aembed(self, text: str) -> list[float]:
        return await self.model.encode_async(text)

# Best for:
# - Specialized vocabularies (medical, legal, etc.)
# - Language-specific optimization
# - On-premise deployment requirements
# - Cost reduction through self-hosting

Distance Metrics

Different distance metrics suit different embedding spaces:

Cosine Similarity (Most Common)

# Measures angle between vectors, normalized
store = QdrantStore(
    embedding=embedding,
    distance_metric=DistanceMetric.COSINE
)

# Best for:
# - Most embedding models (default choice)
# - Normalized vectors
# - Semantic similarity

Euclidean Distance

# Measures straight-line distance in vector space
store = QdrantStore(
    embedding=embedding,
    distance_metric=DistanceMetric.EUCLIDEAN
)

# Best for:
# - Unnormalized vectors
# - Magnitude-aware comparisons
# - Spatial relationships

Dot Product

# Measures vector alignment and magnitude
store = QdrantStore(
    embedding=embedding,
    distance_metric=DistanceMetric.DOT_PRODUCT
)

# Best for:
# - Performance-critical scenarios
# - Pre-normalized vectors
# - Maximum similarity scoring

Integration Patterns

Store Integration

The most common pattern is to inject embeddings into your store:

from agentflow.store import QdrantStore
from agentflow.store.embedding import OpenAIEmbedding

# Create embedding service
embedding = OpenAIEmbedding(
    model="text-embedding-3-small",
    api_key=os.getenv("OPENAI_API_KEY")
)

# Store handles all embedding operations automatically
store = QdrantStore(
    embedding=embedding,
    path="./qdrant_data",
    distance_metric=DistanceMetric.COSINE
)

# Embeddings are generated transparently
await store.astore(config, "User loves technical documentation")
# ^ Text is automatically embedded before storage

results = await store.asearch(config, "documentation preferences")
# ^ Query is automatically embedded for similarity search

Custom Embedding Pipeline

For advanced use cases, you can control the embedding process:

from agentflow.store.embedding import BaseEmbedding

class PreprocessedEmbedding(BaseEmbedding):
    """Custom embedding with preprocessing pipeline."""

    def __init__(self, base_embedding: BaseEmbedding):
        self.base = base_embedding
        self._dimension = base_embedding.dimension

    async def aembed(self, text: str) -> list[float]:
        # Custom preprocessing
        cleaned = self._clean_text(text)
        chunked = self._chunk_if_needed(cleaned)

        # Generate embedding
        vector = await self.base.aembed(chunked)

        # Optional post-processing
        return self._normalize(vector)

    def _clean_text(self, text: str) -> str:
        """Remove special characters, normalize whitespace, etc."""
        return text.strip().lower()

    def _chunk_if_needed(self, text: str) -> str:
        """Handle texts exceeding model context length."""
        if len(text) > 8000:  # Model limit
            return text[:8000]
        return text

    def _normalize(self, vector: list[float]) -> list[float]:
        """L2 normalization for cosine similarity."""
        magnitude = sum(x**2 for x in vector) ** 0.5
        return [x / magnitude for x in vector]

    @property
    def dimension(self) -> int:
        return self._dimension

# Use the custom pipeline
custom_embedding = PreprocessedEmbedding(OpenAIEmbedding())
store = QdrantStore(embedding=custom_embedding, path="./data")

Caching for Performance

Add caching to reduce API calls and costs:

from functools import lru_cache
import hashlib

class CachedEmbedding(BaseEmbedding):
    """Embedding service with LRU cache."""

    def __init__(self, base_embedding: BaseEmbedding, cache_size: int = 1000):
        self.base = base_embedding
        self._dimension = base_embedding.dimension
        self._cache = {}
        self._cache_size = cache_size

    async def aembed(self, text: str) -> list[float]:
        # Use hash as cache key
        cache_key = hashlib.md5(text.encode()).hexdigest()

        if cache_key in self._cache:
            return self._cache[cache_key]

        # Generate and cache
        vector = await self.base.aembed(text)

        # Simple LRU: remove oldest if over size
        if len(self._cache) >= self._cache_size:
            oldest = next(iter(self._cache))
            del self._cache[oldest]

        self._cache[cache_key] = vector
        return vector

    @property
    def dimension(self) -> int:
        return self._dimension

Implementation Guidelines

When implementing your own embedding service:

1. Handle API Errors Gracefully

class RobustEmbedding(BaseEmbedding):
    async def aembed(self, text: str) -> list[float]:
        max_retries = 3
        for attempt in range(max_retries):
            try:
                return await self._call_api(text)
            except RateLimitError:
                if attempt < max_retries - 1:
                    await asyncio.sleep(2 ** attempt)  # Exponential backoff
                else:
                    raise
            except APIError as e:
                logger.error(f"Embedding API error: {e}")
                raise

2. Optimize Batch Operations

class OptimizedEmbedding(BaseEmbedding):
    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        # Process in chunks to avoid API limits
        chunk_size = 100  # API limit
        results = []

        for i in range(0, len(texts), chunk_size):
            chunk = texts[i:i + chunk_size]
            chunk_vectors = await self._api_batch_call(chunk)
            results.extend(chunk_vectors)

        return results

3. Validate Inputs and Outputs

class ValidatedEmbedding(BaseEmbedding):
    async def aembed(self, text: str) -> list[float]:
        if not text or not text.strip():
            raise ValueError("Text cannot be empty")

        if len(text) > self.max_length:
            raise ValueError(f"Text exceeds max length of {self.max_length}")

        vector = await self._generate_embedding(text)

        # Validate output
        if len(vector) != self.dimension:
            raise RuntimeError(f"Expected {self.dimension} dimensions, got {len(vector)}")

        return vector

Performance Considerations

Batch Processing

Always use batch operations when processing multiple texts:

# ❌ Slow: N API calls
for memory in memories:
    vector = await embedding.aembed(memory.content)
    # Store vector...

# ✅ Fast: 1 API call
contents = [m.content for m in memories]
vectors = await embedding.aembed_batch(contents)
for memory, vector in zip(memories, vectors):
    # Store vector...

Async Concurrency

Leverage async for parallel processing:

# ❌ Sequential processing
results = []
for query in queries:
    vector = await embedding.aembed(query)
    search_results = await store.asearch_by_vector(vector)
    results.append(search_results)

# ✅ Concurrent processing
async def process_query(query: str):
    vector = await embedding.aembed(query)
    return await store.asearch_by_vector(vector)

results = await asyncio.gather(*[process_query(q) for q in queries])

Cost Optimization

Monitor and optimize embedding API costs:

class CostTrackingEmbedding(BaseEmbedding):
    def __init__(self, base: BaseEmbedding, cost_per_token: float = 0.0001):
        self.base = base
        self.cost_per_token = cost_per_token
        self.total_tokens = 0
        self.total_cost = 0.0

    async def aembed(self, text: str) -> list[float]:
        tokens = len(text.split())  # Rough estimate
        self.total_tokens += tokens
        self.total_cost += tokens * self.cost_per_token

        return await self.base.aembed(text)

    def get_stats(self) -> dict:
        return {
            "total_tokens": self.total_tokens,
            "total_cost": self.total_cost,
            "average_tokens_per_call": self.total_tokens / max(1, self.call_count)
        }

Testing Strategies

Mock Embeddings for Tests

class MockEmbedding(BaseEmbedding):
    """Deterministic embedding for testing."""

    def __init__(self, dimension: int = 128):
        self._dimension = dimension

    async def aembed(self, text: str) -> list[float]:
        # Generate deterministic vector from text
        import hashlib
        hash_value = int(hashlib.md5(text.encode()).hexdigest(), 16)
        return [(hash_value >> i) % 2 for i in range(self.dimension)]

    async def aembed_batch(self, texts: list[str]) -> list[list[float]]:
        return [await self.aembed(text) for text in texts]

    @property
    def dimension(self) -> int:
        return self._dimension

# Use in tests
@pytest.fixture
def test_embedding():
    return MockEmbedding(dimension=128)

async def test_store_search(test_embedding):
    store = QdrantStore(embedding=test_embedding, path=":memory:")
    # Test without making real API calls

Conclusion

The embedding system in Agentflow provides a clean, efficient abstraction for semantic search that:

  • Decouples your application from specific embedding providers
  • Optimizes performance through async operations and batching
  • Enables flexible deployment strategies (cloud, self-hosted, hybrid)
  • Supports testing and development through mockable interfaces

By treating embeddings as a pluggable component, Agentflow gives you the freedom to choose the best embedding solution for your use case while maintaining clean, maintainable code. Whether you're using OpenAI's hosted models, running custom fine-tuned models, or experimenting with new embedding techniques, the BaseEmbedding interface ensures your application remains flexible and future-proof.