Advanced Topics¶
This guide covers advanced evaluation patterns, custom implementations, and best practices.
Custom Criteria¶
Creating Custom Criteria¶
Extend BaseCriterion for domain-specific evaluation:
from agentflow.evaluation import BaseCriterion, CriterionResult, CriterionConfig
class APICallCriterion(BaseCriterion):
"""Validates that specific APIs were called correctly."""
name = "api_call_validation"
description = "Ensures correct API endpoints were called"
def __init__(
self,
required_apis: list[str],
config: CriterionConfig | None = None,
):
super().__init__(config)
self.required_apis = required_apis
async def evaluate(
self,
actual: TrajectoryCollector,
expected: EvalCase,
) -> CriterionResult:
# Extract API calls from trajectory
api_calls = [
step.name for step in actual.trajectory
if step.step_type == StepType.TOOL and step.name in self.required_apis
]
# Check coverage
missing = set(self.required_apis) - set(api_calls)
score = len(api_calls) / len(self.required_apis) if self.required_apis else 1.0
return CriterionResult(
criterion=self.name,
score=score,
passed=score >= self.threshold,
threshold=self.threshold,
details={
"called": api_calls,
"missing": list(missing),
"coverage": f"{len(api_calls)}/{len(self.required_apis)}",
},
)
Stateful Criteria¶
For criteria that need to maintain state across evaluations:
class PerformanceCriterion(BaseCriterion):
"""Tracks performance metrics across evaluations."""
name = "performance"
description = "Monitors response time and resource usage"
def __init__(self, config: CriterionConfig | None = None):
super().__init__(config)
self.metrics = []
async def evaluate(
self,
actual: TrajectoryCollector,
expected: EvalCase,
) -> CriterionResult:
# Calculate metrics
duration = actual.end_time - actual.start_time if actual.start_time else 0
tool_count = len([s for s in actual.trajectory if s.step_type == StepType.TOOL])
# Store for later analysis
self.metrics.append({
"duration": duration,
"tool_count": tool_count,
"eval_id": expected.eval_id,
})
# Score based on performance
score = 1.0 if duration < 5.0 else max(0.0, 1.0 - (duration - 5.0) / 10.0)
return CriterionResult(
criterion=self.name,
score=score,
passed=score >= self.threshold,
threshold=self.threshold,
details={
"duration_seconds": duration,
"tool_calls": tool_count,
},
)
def get_stats(self) -> dict:
"""Get aggregate statistics."""
if not self.metrics:
return {}
durations = [m["duration"] for m in self.metrics]
return {
"avg_duration": sum(durations) / len(durations),
"max_duration": max(durations),
"min_duration": min(durations),
"total_evaluations": len(self.metrics),
}
Multi-Agent Evaluation¶
Evaluating Agent Handoffs¶
class HandoffCriterion(BaseCriterion):
"""Validates agent-to-agent handoffs."""
name = "handoff_validation"
description = "Ensures proper handoffs between specialized agents"
def __init__(
self,
expected_agent_sequence: list[str],
config: CriterionConfig | None = None,
):
super().__init__(config)
self.expected_sequence = expected_agent_sequence
async def evaluate(
self,
actual: TrajectoryCollector,
expected: EvalCase,
) -> CriterionResult:
# Extract agent transitions from trajectory
agent_sequence = []
current_agent = None
for step in actual.trajectory:
if step.step_type == StepType.NODE and "agent" in step.metadata:
agent = step.metadata["agent"]
if agent != current_agent:
agent_sequence.append(agent)
current_agent = agent
# Compare sequences
if len(agent_sequence) != len(self.expected_sequence):
score = 0.5
else:
matches = sum(
a == e for a, e in zip(agent_sequence, self.expected_sequence)
)
score = matches / len(self.expected_sequence)
return CriterionResult(
criterion=self.name,
score=score,
passed=score >= self.threshold,
threshold=self.threshold,
details={
"expected_sequence": self.expected_sequence,
"actual_sequence": agent_sequence,
},
)
RAG-Specific Evaluation¶
Source Citation Validation¶
class CitationCriterion(BaseCriterion):
"""Validates that responses cite sources correctly."""
name = "citation_validation"
description = "Ensures claims are properly cited"
async def evaluate(
self,
actual: TrajectoryCollector,
expected: EvalCase,
) -> CriterionResult:
response = actual.final_response
# Extract citations (assuming [1], [2] format)
import re
citations = re.findall(r'\[(\d+)\]', response)
# Extract retrieved documents from trajectory
retrieved_docs = []
for step in actual.trajectory:
if step.step_type == StepType.TOOL and step.name == "retrieve_documents":
result = step.metadata.get("result", [])
retrieved_docs.extend(result)
# Validate all citations reference retrieved docs
valid_citations = [
c for c in citations
if int(c) <= len(retrieved_docs)
]
score = len(valid_citations) / len(citations) if citations else 1.0
return CriterionResult(
criterion=self.name,
score=score,
passed=score >= self.threshold,
threshold=self.threshold,
details={
"total_citations": len(citations),
"valid_citations": len(valid_citations),
"retrieved_docs": len(retrieved_docs),
},
)
Context Relevance¶
class ContextRelevanceCriterion(BaseCriterion):
"""Evaluates relevance of retrieved context."""
name = "context_relevance"
description = "Measures how relevant retrieved documents are to the query"
async def evaluate(
self,
actual: TrajectoryCollector,
expected: EvalCase,
) -> CriterionResult:
# Get query and retrieved docs
query = expected.conversation[0].user_content.get_text()
retrieved_docs = []
for step in actual.trajectory:
if step.step_type == StepType.TOOL and "retrieve" in step.name:
docs = step.metadata.get("result", [])
retrieved_docs.extend(docs)
if not retrieved_docs:
return CriterionResult.failure(
self.name,
0.0,
self.threshold,
details={"error": "No documents retrieved"},
)
# Use LLM to judge relevance
from litellm import acompletion
prompt = f"""Rate the relevance of these documents to the query on a scale of 0-1.
Query: {query}
Documents:
{chr(10).join(f"{i+1}. {doc}" for i, doc in enumerate(retrieved_docs))}
Return only a number between 0 and 1."""
response = await acompletion(
model=self.config.judge_model,
messages=[{"role": "user", "content": prompt}],
)
score = float(response.choices[0].message.content.strip())
return CriterionResult(
criterion=self.name,
score=score,
passed=score >= self.threshold,
threshold=self.threshold,
details={
"num_docs": len(retrieved_docs),
},
)
Batch Processing¶
Evaluating Multiple Agents¶
async def evaluate_multiple_agents(
agents: dict[str, CompiledGraph],
eval_sets: dict[str, EvalSet],
config: EvalConfig,
) -> dict[str, EvalReport]:
"""Evaluate multiple agents against their eval sets."""
from agentflow.evaluation import AgentEvaluator
reports = {}
for agent_name, graph in agents.items():
eval_set = eval_sets.get(agent_name)
if not eval_set:
continue
evaluator = AgentEvaluator(graph, config)
report = await evaluator.evaluate(eval_set)
reports[agent_name] = report
return reports
Comparative Analysis¶
def compare_agent_performance(reports: dict[str, EvalReport]):
"""Compare performance across multiple agents."""
comparison = []
for agent_name, report in reports.items():
comparison.append({
"agent": agent_name,
"pass_rate": report.summary.pass_rate,
"avg_score": report.summary.avg_score,
"total_cases": report.summary.total_cases,
"failed_cases": report.summary.failed_cases,
})
# Sort by pass rate
comparison.sort(key=lambda x: x["pass_rate"], reverse=True)
# Print comparison table
print("\n{:<20} {:>10} {:>10} {:>10}".format(
"Agent", "Pass Rate", "Avg Score", "Failed"
))
print("-" * 60)
for row in comparison:
print("{:<20} {:>9.1%} {:>10.2f} {:>10}".format(
row["agent"],
row["pass_rate"],
row["avg_score"],
row["failed_cases"],
))
return comparison
Regression Testing¶
Tracking Performance Over Time¶
import json
from pathlib import Path
from datetime import datetime
class RegressionTracker:
"""Track evaluation results over time."""
def __init__(self, history_file: str = "eval_history.json"):
self.history_file = Path(history_file)
self.history = self._load_history()
def _load_history(self) -> list[dict]:
if self.history_file.exists():
with open(self.history_file) as f:
return json.load(f)
return []
def save_report(
self,
report: EvalReport,
git_commit: str | None = None,
) -> None:
"""Save report to history."""
entry = {
"timestamp": datetime.now().isoformat(),
"eval_set_id": report.eval_set_id,
"pass_rate": report.summary.pass_rate,
"avg_score": report.summary.avg_score,
"failed_cases": report.summary.failed_cases,
"git_commit": git_commit,
"criterion_stats": {
name: {
"avg_score": stats.avg_score,
"passed": stats.passed,
}
for name, stats in report.summary.criterion_stats.items()
},
}
self.history.append(entry)
with open(self.history_file, "w") as f:
json.dump(self.history, f, indent=2)
def check_regression(
self,
current_report: EvalReport,
threshold: float = 0.05,
) -> dict:
"""Check if current results show regression."""
if not self.history:
return {"regression": False, "message": "No history to compare"}
# Get previous results for same eval set
previous = [
h for h in self.history
if h["eval_set_id"] == current_report.eval_set_id
]
if not previous:
return {"regression": False, "message": "No previous results"}
last = previous[-1]
# Compare pass rate
current_pass_rate = current_report.summary.pass_rate
previous_pass_rate = last["pass_rate"]
diff = current_pass_rate - previous_pass_rate
if diff < -threshold:
return {
"regression": True,
"message": f"Pass rate decreased by {abs(diff)*100:.1f}%",
"current": current_pass_rate,
"previous": previous_pass_rate,
}
return {
"regression": False,
"improvement": diff > threshold,
"diff": diff,
}
Usage in CI¶
# ci_eval.py
import sys
from agentflow.evaluation import AgentEvaluator, EvalConfig
async def main():
# Run evaluation
evaluator = AgentEvaluator(graph, EvalConfig.default())
report = await evaluator.evaluate("tests/fixtures/main.evalset.json")
# Track regression
tracker = RegressionTracker()
import subprocess
git_commit = subprocess.check_output(
["git", "rev-parse", "HEAD"]
).decode().strip()
tracker.save_report(report, git_commit)
# Check for regression
regression = tracker.check_regression(report)
if regression["regression"]:
print(f"❌ REGRESSION DETECTED: {regression['message']}")
sys.exit(1)
elif regression.get("improvement"):
print(f"✅ IMPROVEMENT: Pass rate increased by {regression['diff']*100:.1f}%")
else:
print(f"✅ No regression detected")
# Require minimum pass rate
if report.summary.pass_rate < 0.95:
print(f"❌ Pass rate {report.summary.pass_rate*100:.1f}% below required 95%")
sys.exit(1)
if __name__ == "__main__":
import asyncio
asyncio.run(main())
Cost Optimization¶
Selective LLM-as-Judge¶
Only use expensive LLM criteria on failures:
async def smart_evaluate(
evaluator: AgentEvaluator,
eval_set: EvalSet,
) -> EvalReport:
"""Run fast criteria first, then LLM judge on failures."""
# Phase 1: Fast deterministic criteria
fast_config = EvalConfig(
criteria={
"trajectory_match": CriterionConfig(enabled=True),
"response_match": CriterionConfig(enabled=True),
}
)
fast_evaluator = AgentEvaluator(evaluator.graph, fast_config)
report = await fast_evaluator.evaluate(eval_set)
# Phase 2: LLM judge only on failures
if report.failed_cases:
llm_config = EvalConfig(
criteria={
"llm_judge": CriterionConfig(enabled=True),
}
)
failed_eval_set = EvalSet(
eval_set_id=eval_set.eval_set_id,
name=f"{eval_set.name} (Failures)",
eval_cases=[
case for case in eval_set.eval_cases
if case.eval_id in {r.eval_id for r in report.failed_cases}
],
)
llm_evaluator = AgentEvaluator(evaluator.graph, llm_config)
llm_report = await llm_evaluator.evaluate(failed_eval_set)
# Merge results
# ... (implementation details)
return report
Caching LLM Judgments¶
import hashlib
import json
from pathlib import Path
class CachedLLMJudge:
"""Cache LLM judge results to avoid redundant API calls."""
def __init__(self, cache_dir: str = ".eval_cache"):
self.cache_dir = Path(cache_dir)
self.cache_dir.mkdir(exist_ok=True)
def _get_cache_key(
self,
actual: str,
expected: str,
model: str,
) -> str:
content = f"{actual}||{expected}||{model}"
return hashlib.sha256(content.encode()).hexdigest()
async def judge(
self,
actual: str,
expected: str,
model: str = "gpt-4o-mini",
) -> float:
"""Get cached judgment or call LLM."""
cache_key = self._get_cache_key(actual, expected, model)
cache_file = self.cache_dir / f"{cache_key}.json"
# Check cache
if cache_file.exists():
with open(cache_file) as f:
return json.load(f)["score"]
# Call LLM
from litellm import acompletion
response = await acompletion(
model=model,
messages=[{
"role": "user",
"content": f"Rate similarity 0-1:\n\nActual: {actual}\n\nExpected: {expected}",
}],
)
score = float(response.choices[0].message.content.strip())
# Cache result
with open(cache_file, "w") as f:
json.dump({"score": score, "model": model}, f)
return score
Best Practices Summary¶
1. Start with Fast Criteria¶
# Good: Fast feedback
config = EvalConfig(
criteria={
"trajectory_match": CriterionConfig(enabled=True),
"response_match": CriterionConfig(enabled=True),
}
)
2. Use Appropriate Thresholds¶
# Critical functionality: strict threshold
"trajectory_match": CriterionConfig(threshold=0.95)
# Subjective quality: looser threshold
"llm_judge": CriterionConfig(threshold=0.65)
3. Organize Eval Sets by Purpose¶
tests/fixtures/
├── smoke_tests.evalset.json # Fast, catches major issues
├── integration.evalset.json # Full feature coverage
├── edge_cases.evalset.json # Unusual inputs
└── performance.evalset.json # Load/stress testing
4. Monitor Costs¶
# Track LLM usage
import litellm
litellm.set_verbose = True
# Use cheaper models for bulk testing
config = EvalConfig(
criteria={
"llm_judge": CriterionConfig(
judge_model="gpt-4o-mini", # Cheaper
),
}
)
5. Version Control Eval Sets¶
# Track changes
git add tests/fixtures/*.evalset.json
git commit -m "Update eval sets for new feature"
# Tag releases
git tag -a eval-v1.0 -m "Baseline evaluation set"
6. Document Criteria Choices¶
# Document why you chose specific criteria
config = EvalConfig(
criteria={
# Critical: ensure correct APIs are called
"trajectory_match": CriterionConfig(threshold=1.0),
# Important: response should be relevant
"response_match": CriterionConfig(threshold=0.7),
# Nice to have: semantic quality
"llm_judge": CriterionConfig(threshold=0.6),
}
)