Background Task Manager¶
Overview¶
The BackgroundTaskManager is a core component in Agentflow that handles asynchronous background tasks throughout the agent execution lifecycle. It ensures proper task tracking, error logging, and graceful cleanup during shutdown.
The BackgroundTaskManager is automatically created and registered in the dependency injection container (InjectQ) when you create a StateGraph, making it available to all nodes and utilities that need to spawn background work.
Key Features¶
- Automatic task tracking: All background tasks are registered and monitored
- Error logging: Exceptions in background tasks are caught and logged without crashing the main flow
- Timeout support: Set per-task timeouts to prevent runaway operations
- Graceful shutdown: Cancel and wait for all tasks during application cleanup
- Metadata tracking: Track task names, creation time, context for debugging
- Async context manager: Use
async withfor automatic cleanup
How It Works¶
When a StateGraph is initialized:
- A
BackgroundTaskManagerinstance is created - It's bound to the InjectQ dependency injection container
- Any function that needs to create background tasks can inject it
- During shutdown, the manager ensures all tasks are properly cleaned up
Example: Event Publishing¶
The most common use case in Agentflow is event publishing. The publish_event function uses BackgroundTaskManager to publish events without blocking node execution:
from injectq import Inject
from agentflow.publisher.events import EventModel
from agentflow.publisher.base_publisher import BasePublisher
from agentflow.utils.background_task_manager import BackgroundTaskManager
async def _publish_event_task(
event: EventModel,
publisher: BasePublisher | None,
) -> None:
"""Publish an event asynchronously."""
if publisher:
try:
await publisher.publish(event)
except Exception as e:
logger.error("Failed to publish event: %s", e)
def publish_event(
event: EventModel,
publisher: BasePublisher | None = Inject[BasePublisher],
task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
) -> None:
"""Publish an event in the background without blocking."""
# Create background task with name and context for observability
task_manager.create_task(
_publish_event_task(event, publisher),
name="publish_event",
context={
"event": event.event,
"thread_id": event.thread_id,
}
)
This pattern ensures that: - Event publishing doesn't block agent execution - Failed publishes are logged but don't crash the agent - Tasks are tracked and cleaned up during shutdown
Using BackgroundTaskManager¶
Basic Usage with Dependency Injection¶
The recommended way to use BackgroundTaskManager is through dependency injection:
from injectq import Inject
from agentflow.utils.background_task_manager import BackgroundTaskManager
from agentflow.state import AgentState
async def my_background_work(data: str):
"""Some async work that runs in the background."""
await asyncio.sleep(2)
print(f"Processed: {data}")
async def my_node(
state: AgentState,
config: dict,
task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
):
"""Node that spawns background work."""
# Spawn a background task
task_manager.create_task(
my_background_work(state.data.get("input", "")),
name="process_data",
context={"thread_id": config.get("thread_id")}
)
# Return immediately without waiting
return state
Creating Tasks with Metadata¶
Provide meaningful names and context to make debugging easier:
task_manager.create_task(
send_notification(user_id, message),
name="send_notification",
timeout=10.0, # Auto-cancel after 10 seconds
context={
"user_id": user_id,
"notification_type": "email",
"thread_id": config["thread_id"],
}
)
Standalone Usage (Advanced)¶
If you need a BackgroundTaskManager outside of a graph context:
import asyncio
from agentflow.utils.background_task_manager import BackgroundTaskManager
async def main():
# Create manager with 30-second shutdown timeout
manager = BackgroundTaskManager(default_shutdown_timeout=30.0)
# Create some background tasks
manager.create_task(
fetch_data_async("https://api.example.com"),
name="fetch_api_data",
timeout=5.0
)
# Do other work...
await asyncio.sleep(1)
# Gracefully shutdown and wait for tasks
stats = await manager.shutdown()
print(f"Shutdown stats: {stats}")
asyncio.run(main())
Using as Context Manager¶
For automatic cleanup:
async def process_batch():
async with BackgroundTaskManager() as manager:
for item in batch:
manager.create_task(
process_item(item),
name=f"process_{item.id}"
)
# Do other work...
await asyncio.sleep(2)
# All tasks automatically cleaned up when exiting context
Task Lifecycle¶
1. Task Creation¶
task = task_manager.create_task(
my_coroutine(),
name="my_task",
timeout=30.0,
context={"user": "alice"}
)
When a task is created: - It's added to the internal tracking set - Metadata (name, creation time, timeout, context) is recorded - A done callback is registered for cleanup - If timeout is set, an automatic cancellation is scheduled
2. Task Execution¶
The task runs in the background: - Normal completion → logged at DEBUG level - Exception → logged at ERROR level with full traceback - Cancellation → logged at DEBUG level - Timeout → logged at WARNING level, task is cancelled
3. Task Completion¶
When a task completes (success, error, or cancellation): - It's removed from the tracking set - Metadata is cleaned up - Metrics are updated (if enabled) - Done callback logs the outcome
4. Shutdown¶
During graceful shutdown:
The manager:
1. Cancels all active tasks
2. Waits up to timeout seconds for tasks to finish
3. Force-cancels any remaining tasks after timeout
4. Returns shutdown statistics
Monitoring and Debugging¶
Get Active Task Count¶
active_count = task_manager.get_task_count()
print(f"Currently running {active_count} background tasks")
Get Detailed Task Information¶
task_info = task_manager.get_task_info()
for task in task_info:
print(f"Task: {task['name']}")
print(f" Age: {task['age_seconds']:.2f}s")
print(f" Timeout: {task['timeout']}s")
print(f" Context: {task['context']}")
print(f" Done: {task['done']}")
Manually Cancel All Tasks¶
Wait for All Tasks¶
Integration with StateGraph¶
The BackgroundTaskManager is automatically integrated into every StateGraph:
from agentflow.graph import StateGraph
# Create a graph
graph = StateGraph()
# ... add nodes and edges ...
# Compile the graph
app = graph.compile(shutdown_timeout=30.0)
# The graph's BackgroundTaskManager is now registered in InjectQ
# All nodes can inject it via: task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager]
# During shutdown, the manager is cleaned up
stats = await app.aclose()
print(stats["background_tasks"])
# {
# "status": "completed",
# "initial_tasks": 5,
# "completed_tasks": 5,
# "remaining_tasks": 0,
# "duration_seconds": 2.3
# }
The shutdown_timeout you pass to compile() is used by the BackgroundTaskManager to determine how long to wait for tasks during shutdown.
Shutdown Statistics¶
When you call await app.aclose() or await manager.shutdown(), you get detailed statistics:
{
"status": "completed", # or "timeout" or "already_shutdown"
"initial_tasks": 5, # Number of tasks when shutdown started
"completed_tasks": 5, # Number that finished cleanly
"remaining_tasks": 0, # Number still running (if timeout)
"duration_seconds": 2.3 # How long shutdown took
}
Best Practices¶
1. Always Provide Meaningful Names¶
# ✅ GOOD
task_manager.create_task(
send_email(user),
name="send_welcome_email"
)
# ❌ BAD
task_manager.create_task(send_email(user)) # Generic "background_task" name
2. Use Timeouts for External Calls¶
# ✅ GOOD - Timeout prevents hanging forever
task_manager.create_task(
fetch_external_api(),
name="fetch_api",
timeout=10.0
)
# ❌ RISKY - No timeout, could hang indefinitely
task_manager.create_task(
fetch_external_api(),
name="fetch_api"
)
3. Include Context for Debugging¶
# ✅ GOOD - Context helps with debugging
task_manager.create_task(
process_order(order_id),
name="process_order",
context={
"order_id": order_id,
"user_id": user_id,
"thread_id": config["thread_id"]
}
)
4. Don't Block on Background Tasks¶
# ✅ GOOD - Fire and forget
task_manager.create_task(analytics_track(event))
return state
# ❌ BAD - Defeats the purpose of background tasks
task = task_manager.create_task(analytics_track(event))
await task # Don't do this!
5. Handle Errors in the Coroutine¶
# ✅ GOOD - Handle errors gracefully
async def robust_background_work():
try:
await do_something_risky()
except Exception as e:
logger.error("Background work failed: %s", e)
# Optionally send to error tracking service
await report_error(e)
task_manager.create_task(robust_background_work())
Common Patterns¶
Pattern 1: Non-Blocking Notifications¶
async def my_node(
state: AgentState,
config: dict,
task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
):
"""Process data and send notification without blocking."""
result = await process_data(state.data)
# Send notification in background
task_manager.create_task(
send_notification(result),
name="send_notification",
timeout=5.0
)
# Return immediately
state.data["result"] = result
return state
Pattern 2: Background Metrics Collection¶
async def track_metrics(node_name: str, duration: float, config: dict):
"""Send metrics to external service."""
try:
await metrics_service.record({
"node": node_name,
"duration": duration,
"thread_id": config.get("thread_id"),
"timestamp": time.time()
})
except Exception as e:
logger.warning("Failed to track metrics: %s", e)
async def my_node(
state: AgentState,
config: dict,
task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
):
"""Node that tracks its execution metrics."""
start = time.time()
# Do the actual work
result = await process(state)
# Track metrics in background
duration = time.time() - start
task_manager.create_task(
track_metrics("my_node", duration, config),
name="track_metrics",
timeout=3.0
)
return result
Pattern 3: Parallel Data Fetching¶
async def fetch_all_data(task_manager: BackgroundTaskManager):
"""Fetch multiple data sources in parallel."""
results = {}
async def fetch_and_store(key: str, url: str):
data = await fetch(url)
results[key] = data
# Create multiple background tasks
task_manager.create_task(
fetch_and_store("users", "https://api/users"),
name="fetch_users",
timeout=5.0
)
task_manager.create_task(
fetch_and_store("products", "https://api/products"),
name="fetch_products",
timeout=5.0
)
# Wait a bit for results
await asyncio.sleep(1)
return results
Troubleshooting¶
Issue: Tasks Not Completing¶
Symptom: shutdown() times out with remaining tasks
Solutions: 1. Check for blocking operations (use async versions) 2. Increase shutdown timeout 3. Add timeouts to individual tasks 4. Check task info to see what's stuck:
info = task_manager.get_task_info()
for task in info:
if not task['done']:
print(f"Stuck task: {task['name']}, age: {task['age_seconds']:.1f}s")
Issue: Too Many Background Tasks¶
Symptom: High memory usage, slow shutdown
Solutions: 1. Check task count periodically:
count = task_manager.get_task_count()
if count > 100:
logger.warning("Too many background tasks: %d", count)
Issue: Missing Error Logs¶
Symptom: Background tasks failing silently
Solution: Tasks are logged at ERROR level. Ensure logging is configured:
import logging
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
See Also¶
- Graceful Shutdown - Learn how BackgroundTaskManager integrates with shutdown
- Publisher - Event publishing uses BackgroundTaskManager
- Dependency Injection - How to inject BackgroundTaskManager
- Async Patterns - Best practices for async code