Authentication Guide¶
This guide covers implementing authentication in your AgentFlow application using JWT or custom authentication backends.
Table of Contents¶
- Overview
- No Authentication
- JWT Authentication
- Custom Authentication
- BaseAuth Interface
- Best Practices
- Examples
Overview¶
AgentFlow supports three authentication modes:
- No Authentication - For development or internal APIs
- JWT Authentication - Built-in JWT token validation
- Custom Authentication - Implement your own auth logic
Authentication is configured in agentflow.json:
No Authentication¶
Configuration¶
agentflow.json:
Usage¶
All API endpoints will be accessible without authentication.
# No auth header required
curl http://localhost:8000/ping
curl -X POST http://localhost:8000/threads
When to Use¶
- ✅ Development and testing
- ✅ Internal APIs behind a firewall
- ✅ APIs with alternative security (API Gateway, VPN)
- ❌ Public-facing production APIs
- ❌ APIs handling sensitive data
JWT Authentication¶
Configuration¶
Step 1: Configure agentflow.json
Step 2: Set Environment Variables
.env:
Supported Algorithms¶
| Algorithm | Type | Description |
|---|---|---|
| HS256 | HMAC | SHA-256 (recommended for single server) |
| HS384 | HMAC | SHA-384 |
| HS512 | HMAC | SHA-512 |
| RS256 | RSA | SHA-256 (for distributed systems) |
| RS384 | RSA | SHA-384 |
| RS512 | RSA | SHA-512 |
| ES256 | ECDSA | SHA-256 |
| ES384 | ECDSA | SHA-384 |
| ES512 | ECDSA | SHA-512 |
Generating Secrets¶
For HS256 (symmetric):
# Python
python -c "import secrets; print(secrets.token_urlsafe(32))"
# OpenSSL
openssl rand -base64 32
For RS256 (asymmetric):
# Generate private key
openssl genrsa -out private.pem 2048
# Generate public key
openssl rsa -in private.pem -outform PEM -pubout -out public.pem
# Use private key content as JWT_SECRET_KEY
cat private.pem
Creating JWT Tokens¶
Python example:
import jwt
from datetime import datetime, timedelta
def create_token(user_id: str, username: str) -> str:
payload = {
"user_id": user_id,
"username": username,
"exp": datetime.utcnow() + timedelta(hours=24),
"iat": datetime.utcnow()
}
token = jwt.encode(
payload,
"your-secret-key",
algorithm="HS256"
)
return token
# Usage
token = create_token("user123", "john_doe")
print(f"Token: {token}")
Node.js example:
const jwt = require('jsonwebtoken');
function createToken(userId, username) {
const payload = {
user_id: userId,
username: username,
exp: Math.floor(Date.now() / 1000) + (24 * 60 * 60), // 24 hours
iat: Math.floor(Date.now() / 1000)
};
return jwt.sign(payload, 'your-secret-key', { algorithm: 'HS256' });
}
const token = createToken('user123', 'john_doe');
console.log(`Token: ${token}`);
Using JWT Tokens¶
With curl:
# Create a thread
curl -X POST http://localhost:8000/threads \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." \
-H "Content-Type: application/json"
# Send a message
curl -X POST http://localhost:8000/threads/abc123/messages \
-H "Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..." \
-H "Content-Type: application/json" \
-d '{"content": "Hello"}'
With Python requests:
import requests
token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
headers = {
"Authorization": f"Bearer {token}",
"Content-Type": "application/json"
}
# Create thread
response = requests.post(
"http://localhost:8000/threads",
headers=headers
)
thread_id = response.json()["thread_id"]
# Send message
response = requests.post(
f"http://localhost:8000/threads/{thread_id}/messages",
headers=headers,
json={"content": "Hello, AI!"}
)
print(response.json())
With JavaScript fetch:
const token = "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9...";
const headers = {
"Authorization": `Bearer ${token}`,
"Content-Type": "application/json"
};
// Create thread
fetch("http://localhost:8000/threads", {
method: "POST",
headers: headers
})
.then(res => res.json())
.then(data => {
const threadId = data.thread_id;
// Send message
return fetch(`http://localhost:8000/threads/${threadId}/messages`, {
method: "POST",
headers: headers,
body: JSON.stringify({ content: "Hello, AI!" })
});
})
.then(res => res.json())
.then(data => console.log(data));
JWT Token Structure¶
A JWT consists of three parts: Header, Payload, and Signature.
Header:
Payload (claims):
{
"user_id": "user123",
"username": "john_doe",
"email": "john@example.com",
"roles": ["user", "admin"],
"exp": 1735689600, // Expiration time
"iat": 1735603200 // Issued at
}
Signature:
Token Validation¶
The JWT middleware automatically validates:
- ✅ Token signature
- ✅ Token expiration (exp claim)
- ✅ Token format
Error Responses¶
Missing token:
Status: 401 UnauthorizedInvalid token:
Status: 401 UnauthorizedExpired token:
Status: 401 UnauthorizedCustom Authentication¶
Overview¶
Implement custom authentication for: - OAuth 2.0 / OpenID Connect - API keys - Firebase Authentication - Auth0 - Custom database authentication - Multi-factor authentication
Configuration¶
agentflow.json:
Implementation¶
auth/custom.py:
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from typing import Any
class MyAuthBackend(BaseAuth):
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict[str, Any] | None:
"""
Authenticate user based on credentials.
Args:
res: FastAPI Response object (for setting cookies, headers)
credential: HTTPAuthorizationCredentials with token
Returns:
dict with user info including 'user_id', or raises HTTPException
"""
token = credential.credentials
# Your authentication logic here
user = self.verify_token(token)
if not user:
raise HTTPException(
status_code=401,
detail="Invalid authentication credentials"
)
# Return user information
# This will be merged with the graph config
return {
"user_id": user["id"],
"username": user["username"],
"email": user["email"],
"roles": user["roles"]
}
def verify_token(self, token: str) -> dict | None:
"""Implement your token verification logic."""
# Example: Query database, call external API, etc.
pass
BaseAuth Interface¶
Abstract Method¶
from abc import ABC, abstractmethod
from typing import Any
from fastapi import Response
from fastapi.security import HTTPAuthorizationCredentials
class BaseAuth(ABC):
@abstractmethod
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict[str, Any] | None:
"""
Authenticate the user based on credentials.
Returns:
- Empty dict {} if no authentication required
- Dict with user info if authentication successful
- Raises HTTPException if authentication fails
The returned dict should contain at least:
- user_id: Unique user identifier
Optional fields:
- username: User's username
- email: User's email
- roles: List of user roles
- Any other user-specific data
These fields will be merged with the graph config,
making them available throughout your agent graph.
"""
raise NotImplementedError
Return Values¶
No authentication required:
Authentication successful:
return {
"user_id": "user123",
"username": "john_doe",
"email": "john@example.com",
"roles": ["user", "premium"],
"subscription": "pro"
}
Authentication failed:
Best Practices¶
Security¶
-
Use strong secrets:
-
Never commit secrets:
-
Use environment-specific secrets:
-
Rotate secrets regularly:
-
Use HTTPS in production:
- JWT tokens should only be transmitted over HTTPS
- Configure SSL/TLS on your server or load balancer
Token Management¶
-
Set appropriate expiration:
-
Include required claims:
-
Implement token refresh:
-
Validate all claims:
Error Handling¶
-
Provide clear error messages:
-
Log authentication failures:
-
Rate limit authentication attempts:
Examples¶
Firebase Authentication¶
# auth/firebase.py
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
import firebase_admin
from firebase_admin import credentials, auth
# Initialize Firebase
cred = credentials.Certificate("firebase-credentials.json")
firebase_admin.initialize_app(cred)
class FirebaseAuth(BaseAuth):
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict:
try:
# Verify Firebase ID token
decoded_token = auth.verify_id_token(credential.credentials)
uid = decoded_token['uid']
return {
"user_id": uid,
"email": decoded_token.get('email'),
"email_verified": decoded_token.get('email_verified'),
"name": decoded_token.get('name')
}
except Exception as e:
raise HTTPException(401, f"Invalid Firebase token: {e}")
agentflow.json:
API Key Authentication¶
# auth/api_key.py
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
import hashlib
class APIKeyAuth(BaseAuth):
def __init__(self):
# In production, load from database
self.api_keys = {
"hashed_key_1": {
"user_id": "user1",
"name": "Service Account 1",
"permissions": ["read", "write"]
},
"hashed_key_2": {
"user_id": "user2",
"name": "Service Account 2",
"permissions": ["read"]
}
}
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict:
# Hash the provided API key
api_key = credential.credentials
key_hash = hashlib.sha256(api_key.encode()).hexdigest()
# Look up in database
user_data = self.api_keys.get(key_hash)
if not user_data:
raise HTTPException(401, "Invalid API key")
return {
"user_id": user_data["user_id"],
"name": user_data["name"],
"permissions": user_data["permissions"]
}
OAuth 2.0 Authentication¶
# auth/oauth.py
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
import requests
class OAuth2Auth(BaseAuth):
def __init__(self):
self.oauth_server = "https://oauth.example.com"
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict:
# Verify token with OAuth server
response = requests.get(
f"{self.oauth_server}/userinfo",
headers={"Authorization": f"Bearer {credential.credentials}"}
)
if response.status_code != 200:
raise HTTPException(401, "Invalid OAuth token")
user_info = response.json()
return {
"user_id": user_info["sub"],
"email": user_info["email"],
"name": user_info["name"],
"picture": user_info.get("picture")
}
Database Authentication¶
# auth/database.py
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
from sqlalchemy.orm import Session
import jwt
class DatabaseAuth(BaseAuth):
def __init__(self):
self.db = self.get_db_connection()
self.secret_key = "your-secret-key"
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict:
try:
# Decode JWT
payload = jwt.decode(
credential.credentials,
self.secret_key,
algorithms=["HS256"]
)
user_id = payload["user_id"]
# Query database
user = self.db.query(User).filter(User.id == user_id).first()
if not user or not user.is_active:
raise HTTPException(401, "User not found or inactive")
return {
"user_id": user.id,
"username": user.username,
"email": user.email,
"roles": [role.name for role in user.roles],
"permissions": user.get_permissions()
}
except jwt.ExpiredSignatureError:
raise HTTPException(401, "Token has expired")
except jwt.InvalidTokenError:
raise HTTPException(401, "Invalid token")
def get_db_connection(self) -> Session:
# Implement your database connection
pass
Multi-Factor Authentication¶
# auth/mfa.py
from agentflow_cli import BaseAuth
from fastapi import Response, HTTPException
from fastapi.security import HTTPAuthorizationCredentials
import pyotp
class MFAAuth(BaseAuth):
def authenticate(
self,
res: Response,
credential: HTTPAuthorizationCredentials
) -> dict:
# Token format: "jwt_token:mfa_code"
try:
jwt_token, mfa_code = credential.credentials.split(":")
except ValueError:
raise HTTPException(401, "Invalid token format. Expected: jwt:mfa_code")
# Verify JWT
user_data = self.verify_jwt(jwt_token)
# Verify MFA code
totp = pyotp.TOTP(user_data["mfa_secret"])
if not totp.verify(mfa_code):
raise HTTPException(401, "Invalid MFA code")
return {
"user_id": user_data["user_id"],
"username": user_data["username"],
"mfa_verified": True
}
def verify_jwt(self, token: str) -> dict:
# Implement JWT verification
pass
Testing Authentication¶
Testing with curl¶
# No auth
curl http://localhost:8000/ping
# JWT auth
TOKEN="eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9..."
curl -H "Authorization: Bearer $TOKEN" http://localhost:8000/threads
# API key
curl -H "Authorization: Bearer your-api-key" http://localhost:8000/threads
Testing with pytest¶
# tests/test_auth.py
import pytest
from fastapi.testclient import TestClient
from app.main import app
import jwt
from datetime import datetime, timedelta
client = TestClient(app)
def create_test_token(user_id="test_user"):
payload = {
"user_id": user_id,
"exp": datetime.utcnow() + timedelta(hours=1)
}
return jwt.encode(payload, "test-secret", algorithm="HS256")
def test_no_auth_fails():
response = client.post("/threads")
assert response.status_code == 401
def test_invalid_token_fails():
headers = {"Authorization": "Bearer invalid_token"}
response = client.post("/threads", headers=headers)
assert response.status_code == 401
def test_valid_token_succeeds():
token = create_test_token()
headers = {"Authorization": f"Bearer {token}"}
response = client.post("/threads", headers=headers)
assert response.status_code == 200
def test_expired_token_fails():
payload = {
"user_id": "test_user",
"exp": datetime.utcnow() - timedelta(hours=1) # Expired
}
token = jwt.encode(payload, "test-secret", algorithm="HS256")
headers = {"Authorization": f"Bearer {token}"}
response = client.post("/threads", headers=headers)
assert response.status_code == 401
Additional Resources¶
- JWT.io - JWT debugger and documentation
- Configuration Guide - Complete configuration reference
- Deployment Guide - Production deployment strategies
- FastAPI Security - FastAPI security documentation