Skip to main content

Background Task Manager

When to use this

Use BackgroundTaskManager when a node needs to do slow I/O (send a webhook, update a CRM, flush telemetry) without delaying the response to the caller. The graph returns immediately; the background task runs concurrently and is cleaned up on shutdown.

Import path

from agentflow.utils.background_task_manager import BackgroundTaskManager, TaskMetadata
# also available from the top-level package:
from agentflow.utils import BackgroundTaskManager

Getting the instance in a node

BackgroundTaskManager is registered in the dependency container automatically by StateGraph. Declare it as a parameter and the framework injects it:

from agentflow.utils.background_task_manager import BackgroundTaskManager

async def my_node(
state,
config: dict,
task_manager: BackgroundTaskManager, # auto-injected
) -> ...:
task_manager.create_task(
send_webhook(config.get("user_id")),
name="send_webhook",
timeout=15.0,
)
return state

BackgroundTaskManager

Constructor

manager = BackgroundTaskManager(default_shutdown_timeout=30.0)
ParameterTypeDefaultDescription
default_shutdown_timeoutfloat30.0Seconds to wait when draining tasks during shutdown().

create_task

task = manager.create_task(
coro,
name="my_task",
timeout=None,
context=None,
)
ParameterTypeDefaultDescription
coroCoroutinerequiredThe coroutine to execute in the background.
namestr"background_task"Human-readable label. Appears in logs and get_task_info().
timeoutfloat | NoneNoneCancel the task after this many seconds. Logs a warning on timeout.
contextdict | NoneNoneExtra key/value pairs attached to log entries and task info for debugging.

Returns asyncio.Task.

get_task_count

n = manager.get_task_count()  # → int

Number of tasks that are still running.

get_task_info

infos = manager.get_task_info()  # → list[dict]

Each dict contains:

KeyTypeDescription
namestrTask name.
age_secondsfloatSeconds since the task was created.
timeoutfloat | NoneConfigured timeout.
contextdictContext passed at creation.
doneboolWhether the task has finished.
cancelledboolWhether the task was cancelled (only meaningful when done=True).

wait_for_all

await manager.wait_for_all(timeout=30.0, return_exceptions=False)

Wait for all tracked tasks to complete. Logs a warning if timeout is exceeded.

ParameterTypeDefaultDescription
timeoutfloat | NoneNoneMax seconds to wait. None means wait forever.
return_exceptionsboolFalseIf True, exceptions are returned as results rather than raised.

cancel_all

await manager.cancel_all()

Cancel every tracked task immediately. Does not wait for cancellation to propagate.

shutdown

stats = await manager.shutdown(timeout=30.0)

Graceful shutdown: cancels all tasks, waits up to timeout seconds, then force-cancels any remaining. Returns a stats dict:

KeyTypeDescription
statusstr"completed" or "timed_out".
initial_tasksintNumber of tasks at shutdown start.
completed_tasksintTasks that finished cleanly.
remaining_tasksintTasks still alive after timeout.
duration_secondsfloatTotal shutdown duration.

TaskMetadata

Dataclass holding per-task tracking info. Available via get_task_info().

FieldTypeDescription
namestrTask name.
created_atfloatUnix timestamp of creation.
timeoutfloat | NoneConfigured timeout.
contextdict | NoneExtra context.

Lifecycle and shutdown

BackgroundTaskManager is created once per StateGraph during __init__. It is passed to CompiledGraph at compile(). When you call app.aclose(), the graph calls task_manager.shutdown(timeout=shutdown_timeout), which drains or cancels any running background tasks before the process exits.

app = graph.compile(shutdown_timeout=30.0)  # shutdown_timeout flows to BackgroundTaskManager

# In your process teardown:
await app.aclose() # or await app.astop()

Common errors

ErrorCauseFix
Background task never startsCoroutine function passed instead of coroutine object.create_task(send(x)) not create_task(send).
task_manager is None in nodeNode is not inside a compiled graph.Ensure the node function runs inside a graph that was compiled.
Task silently fails, no logException swallowed.Check for background_task_manager.tasks_failed metric; errors are logged at ERROR level.
Tasks run after graph has conceptually "finished"aclose() not called.Always call await app.aclose() on process exit.