Skip to content

Background task manager

Background task manager for async operations in PyAgenity.

This module provides BackgroundTaskManager, which tracks and manages asyncio background tasks, ensuring proper cleanup and error logging.

Classes:

Name Description
BackgroundTaskManager

Manages asyncio background tasks for agent operations.

TaskMetadata

Metadata for tracking background tasks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

BackgroundTaskManager

Manages asyncio background tasks for agent operations.

Tracks created tasks, ensures cleanup, and logs errors from background execution. Enhanced with cancellation, timeouts, and metadata tracking.

Methods:

Name Description
__init__

Initialize the BackgroundTaskManager.

cancel_all

Cancel all tracked background tasks.

create_task

Create and track a background asyncio task.

get_task_count

Get the number of active background tasks.

get_task_info

Get information about all active tasks.

wait_for_all

Wait for all tracked background tasks to complete.

Source code in pyagenity/utils/background_task_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class BackgroundTaskManager:
    """
    Manages asyncio background tasks for agent operations.

    Tracks created tasks, ensures cleanup, and logs errors from background execution.
    Enhanced with cancellation, timeouts, and metadata tracking.
    """

    def __init__(self):
        """
        Initialize the BackgroundTaskManager.
        """
        self._tasks: set[asyncio.Task] = set()
        self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}

    def create_task(
        self,
        coro: Coroutine,
        *,
        name: str = "background_task",
        timeout: float | None = None,
        context: dict[str, Any] | None = None,
    ) -> asyncio.Task:
        """
        Create and track a background asyncio task.

        Args:
            coro (Coroutine): The coroutine to run in the background.
            name (str): Human-readable name for the task.
            timeout (Optional[float]): Timeout in seconds for the task.
            context (Optional[dict]): Additional context for logging.

        Returns:
            asyncio.Task: The created task.
        """
        metrics.counter("background_task_manager.tasks_created").inc()

        task = asyncio.create_task(coro, name=name)
        metadata = TaskMetadata(
            name=name, created_at=time.time(), timeout=timeout, context=context or {}
        )

        self._tasks.add(task)
        self._task_metadata[task] = metadata
        task.add_done_callback(self._task_done_callback)

        # Set up timeout if specified
        if timeout:
            self._setup_timeout(task, timeout)

        logger.debug(
            "Created background task: %s (timeout=%s)",
            name,
            timeout,
            extra={"task_context": context},
        )

        return task

    def _setup_timeout(self, task: asyncio.Task, timeout: float) -> None:
        """Set up timeout cancellation for a task."""

        async def timeout_canceller():
            try:
                await asyncio.sleep(timeout)
                if not task.done():
                    metadata = self._task_metadata.get(task)
                    task_name = metadata.name if metadata else "unknown"
                    logger.warning(
                        "Background task '%s' timed out after %s seconds", task_name, timeout
                    )
                    task.cancel()
                    metrics.counter("background_task_manager.tasks_timed_out").inc()
            except asyncio.CancelledError:
                pass  # Parent task was cancelled, this is expected

        # Create the timeout task but don't track it (avoid recursive tracking)
        timeout_task = asyncio.create_task(timeout_canceller())
        # Add a callback to clean up the timeout task reference
        timeout_task.add_done_callback(lambda t: None)

    def _task_done_callback(self, task: asyncio.Task) -> None:
        """
        Remove completed task and log exceptions if any.

        Args:
            task (asyncio.Task): The completed asyncio task.
        """
        metadata = self._task_metadata.pop(task, None)
        self._tasks.discard(task)

        task_name = metadata.name if metadata else "unknown"
        duration = time.time() - metadata.created_at if metadata else 0.0

        try:
            task.result()  # raises if task failed
            metrics.counter("background_task_manager.tasks_completed").inc()
            logger.debug(
                "Background task '%s' completed successfully (duration=%.2fs)",
                task_name,
                duration,
                extra={"task_context": metadata.context if metadata else {}},
            )
        except asyncio.CancelledError:
            metrics.counter("background_task_manager.tasks_cancelled").inc()
            logger.debug("Background task '%s' was cancelled", task_name)
        except Exception as e:
            metrics.counter("background_task_manager.tasks_failed").inc()
            error_msg = (
                f"Background task raised an exception - {task_name}: {e} (duration={duration:.2f}s)"
            )
            logger.error(
                error_msg,
                exc_info=e,
                extra={"task_context": metadata.context if metadata else {}},
            )

    async def cancel_all(self) -> None:
        """
        Cancel all tracked background tasks.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Cancelling %d background tasks...", len(self._tasks))

        for task in self._tasks.copy():
            if not task.done():
                task.cancel()

        # Wait a short time for cancellations to process
        await asyncio.sleep(0.1)

    async def wait_for_all(
        self, timeout: float | None = None, return_exceptions: bool = False
    ) -> None:
        """
        Wait for all tracked background tasks to complete.

        Args:
            timeout (float | None): Maximum time to wait in seconds.
            return_exceptions (bool): If True, exceptions are returned as results instead of raised.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

        try:
            if timeout:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                    timeout=timeout,
                )
            else:
                await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
            logger.info("All background tasks finished.")
        except TimeoutError:
            logger.warning("Timeout waiting for background tasks, some may still be running")
            metrics.counter("background_task_manager.wait_timeout").inc()

    def get_task_count(self) -> int:
        """Get the number of active background tasks."""
        return len(self._tasks)

    def get_task_info(self) -> list[dict[str, Any]]:
        """Get information about all active tasks."""
        current_time = time.time()
        return [
            {
                "name": metadata.name,
                "age_seconds": current_time - metadata.created_at,
                "timeout": metadata.timeout,
                "context": metadata.context,
                "done": task.done(),
                "cancelled": task.cancelled() if task.done() else False,
            }
            for task, metadata in self._task_metadata.items()
        ]

Functions

__init__
__init__()

Initialize the BackgroundTaskManager.

Source code in pyagenity/utils/background_task_manager.py
39
40
41
42
43
44
def __init__(self):
    """
    Initialize the BackgroundTaskManager.
    """
    self._tasks: set[asyncio.Task] = set()
    self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}
cancel_all async
cancel_all()

Cancel all tracked background tasks.

Returns:

Type Description
None

None

Source code in pyagenity/utils/background_task_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def cancel_all(self) -> None:
    """
    Cancel all tracked background tasks.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Cancelling %d background tasks...", len(self._tasks))

    for task in self._tasks.copy():
        if not task.done():
            task.cancel()

    # Wait a short time for cancellations to process
    await asyncio.sleep(0.1)
create_task
create_task(coro, *, name='background_task', timeout=None, context=None)

Create and track a background asyncio task.

Parameters:

Name Type Description Default
coro
Coroutine

The coroutine to run in the background.

required
name
str

Human-readable name for the task.

'background_task'
timeout
Optional[float]

Timeout in seconds for the task.

None
context
Optional[dict]

Additional context for logging.

None

Returns:

Type Description
Task

asyncio.Task: The created task.

Source code in pyagenity/utils/background_task_manager.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create_task(
    self,
    coro: Coroutine,
    *,
    name: str = "background_task",
    timeout: float | None = None,
    context: dict[str, Any] | None = None,
) -> asyncio.Task:
    """
    Create and track a background asyncio task.

    Args:
        coro (Coroutine): The coroutine to run in the background.
        name (str): Human-readable name for the task.
        timeout (Optional[float]): Timeout in seconds for the task.
        context (Optional[dict]): Additional context for logging.

    Returns:
        asyncio.Task: The created task.
    """
    metrics.counter("background_task_manager.tasks_created").inc()

    task = asyncio.create_task(coro, name=name)
    metadata = TaskMetadata(
        name=name, created_at=time.time(), timeout=timeout, context=context or {}
    )

    self._tasks.add(task)
    self._task_metadata[task] = metadata
    task.add_done_callback(self._task_done_callback)

    # Set up timeout if specified
    if timeout:
        self._setup_timeout(task, timeout)

    logger.debug(
        "Created background task: %s (timeout=%s)",
        name,
        timeout,
        extra={"task_context": context},
    )

    return task
get_task_count
get_task_count()

Get the number of active background tasks.

Source code in pyagenity/utils/background_task_manager.py
198
199
200
def get_task_count(self) -> int:
    """Get the number of active background tasks."""
    return len(self._tasks)
get_task_info
get_task_info()

Get information about all active tasks.

Source code in pyagenity/utils/background_task_manager.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_task_info(self) -> list[dict[str, Any]]:
    """Get information about all active tasks."""
    current_time = time.time()
    return [
        {
            "name": metadata.name,
            "age_seconds": current_time - metadata.created_at,
            "timeout": metadata.timeout,
            "context": metadata.context,
            "done": task.done(),
            "cancelled": task.cancelled() if task.done() else False,
        }
        for task, metadata in self._task_metadata.items()
    ]
wait_for_all async
wait_for_all(timeout=None, return_exceptions=False)

Wait for all tracked background tasks to complete.

Parameters:

Name Type Description Default
timeout
float | None

Maximum time to wait in seconds.

None
return_exceptions
bool

If True, exceptions are returned as results instead of raised.

False

Returns:

Type Description
None

None

Source code in pyagenity/utils/background_task_manager.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def wait_for_all(
    self, timeout: float | None = None, return_exceptions: bool = False
) -> None:
    """
    Wait for all tracked background tasks to complete.

    Args:
        timeout (float | None): Maximum time to wait in seconds.
        return_exceptions (bool): If True, exceptions are returned as results instead of raised.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

    try:
        if timeout:
            await asyncio.wait_for(
                asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                timeout=timeout,
            )
        else:
            await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
        logger.info("All background tasks finished.")
    except TimeoutError:
        logger.warning("Timeout waiting for background tasks, some may still be running")
        metrics.counter("background_task_manager.wait_timeout").inc()

TaskMetadata dataclass

Metadata for tracking background tasks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
context dict[str, Any] | None
created_at float
name str
timeout float | None
Source code in pyagenity/utils/background_task_manager.py
21
22
23
24
25
26
27
28
@dataclass
class TaskMetadata:
    """Metadata for tracking background tasks."""

    name: str
    created_at: float
    timeout: float | None = None
    context: dict[str, Any] | None = None

Attributes

context class-attribute instance-attribute
context = None
created_at instance-attribute
created_at
name instance-attribute
name
timeout class-attribute instance-attribute
timeout = None

Functions

__init__
__init__(name, created_at, timeout=None, context=None)

Modules