Skip to content

Utils

Unified utility exports for TAF agent graphs.

This module re-exports core utility symbols for agent graph construction, message handling, callback management, reducers, and constants. Import from this module for a stable, unified surface of agent utilities.

Main Exports
  • Message and content blocks (Message, TextBlock, ToolCallBlock, etc.)
  • Callback management (CallbackManager, register_before_invoke, etc.)
  • Command and callable utilities (Command, call_sync_or_async)
  • Reducers (add_messages, replace_messages, append_items, replace_value)
  • Constants (START, END, ExecutionState, etc.)
  • Converter (convert_messages)

Modules:

Name Description
background_task_manager

Background task manager for async operations in TAF.

callable_utils

Utilities for calling sync or async functions in TAF.

callbacks

Callback system for TAF.

command

Command API for AgentGraph in TAF.

constants

Constants and enums for TAF agent graph execution and messaging.

converter

Message conversion utilities for TAF agent graphs.

id_generator

ID Generator Module

logging

Centralized logging configuration for TAF.

metrics

Lightweight metrics instrumentation utilities.

thread_info

Thread metadata and status tracking for agent graphs.

thread_name_generator

Thread name generation utilities for AI agent conversations.

Classes:

Name Description
AfterInvokeCallback

Abstract base class for after_invoke callbacks.

AsyncIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

BackgroundTaskManager

Manages asyncio background tasks for agent operations.

BaseIDGenerator

Abstract base class for ID generation strategies.

BeforeInvokeCallback

Abstract base class for before_invoke callbacks.

BigIntIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

CallbackContext

Context information passed to callbacks.

CallbackManager

Manages registration and execution of callbacks for different invocation types.

Command

Command object that combines state updates with control flow.

DefaultIDGenerator

Default ID generator that returns empty strings.

ExecutionState

Graph execution states for agent workflows.

HexIDGenerator

ID generator that produces hexadecimal strings.

IDType

Enumeration of supported ID types.

IntIDGenerator

ID generator that produces 32-bit random integers.

InvocationType

Types of invocations that can trigger callbacks.

OnErrorCallback

Abstract base class for on_error callbacks.

ResponseGranularity

Response granularity options for agent graph outputs.

ShortIDGenerator

ID generator that produces short alphanumeric strings.

StorageLevel

Message storage levels for agent state persistence.

TaskMetadata

Metadata for tracking background tasks.

ThreadInfo

Metadata and status for a thread in agent execution.

TimestampIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

UUIDGenerator

ID generator that produces UUID version 4 strings.

Functions:

Name Description
add_messages

Adds messages to the list, avoiding duplicates by message_id.

append_items

Appends items to a list, avoiding duplicates by item.id.

call_sync_or_async

Call a function that may be sync or async, returning its result.

configure_logging

Configures the root logger for the TAF project.

convert_messages

Convert system prompts, agent state, and extra messages to a list of dicts for

generate_dummy_thread_name

Generate a meaningful English name for an AI chat thread.

register_after_invoke

Register an after_invoke callback on the global callback manager.

register_before_invoke

Register a before_invoke callback on the global callback manager.

register_on_error

Register an on_error callback on the global callback manager.

replace_messages

Replaces the entire message list with a new one.

replace_value

Replaces a value with another.

run_coroutine

Run an async coroutine from a sync context safely.

Attributes:

Name Type Description
END Literal['__end__']
START Literal['__start__']
default_callback_manager

Attributes

END module-attribute

END = '__end__'

START module-attribute

START = '__start__'

__all__ module-attribute

__all__ = ['END', 'START', 'AfterInvokeCallback', 'AsyncIDGenerator', 'BackgroundTaskManager', 'BaseIDGenerator', 'BeforeInvokeCallback', 'BigIntIDGenerator', 'CallbackContext', 'CallbackManager', 'Command', 'DefaultIDGenerator', 'ExecutionState', 'HexIDGenerator', 'IDType', 'IntIDGenerator', 'InvocationType', 'OnErrorCallback', 'ResponseGranularity', 'ShortIDGenerator', 'StorageLevel', 'TaskMetadata', 'ThreadInfo', 'TimestampIDGenerator', 'UUIDGenerator', 'add_messages', 'append_items', 'call_sync_or_async', 'configure_logging', 'convert_messages', 'default_callback_manager', 'generate_dummy_thread_name', 'register_after_invoke', 'register_before_invoke', 'register_on_error', 'replace_messages', 'replace_value', 'run_coroutine']

default_callback_manager module-attribute

default_callback_manager = CallbackManager()

Classes

AfterInvokeCallback

Bases: ABC

Abstract base class for after_invoke callbacks.

Called after the AI model, tool, or MCP function is invoked. Allows for output validation and modification.

Methods:

Name Description
__call__

Execute the after_invoke callback.

Source code in agentflow/utils/callbacks.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AfterInvokeCallback[T, R](ABC):
    """Abstract base class for after_invoke callbacks.

    Called after the AI model, tool, or MCP function is invoked.
    Allows for output validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
        """Execute the after_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The original input data that was sent
            output_data: The output data returned from the invocation

        Returns:
            Modified output data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data, output_data)

Execute the after_invoke callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
T

The original input data that was sent

required
output_data
Any

The output data returned from the invocation

required

Returns:

Type Description
Any | R

Modified output data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in agentflow/utils/callbacks.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
    """Execute the after_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The original input data that was sent
        output_data: The output data returned from the invocation

    Returns:
        Modified output data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...

AsyncIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx). This generator provides an asynchronous interface for generating UUIDs.

Methods:

Name Description
generate

Asynchronously generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in agentflow/utils/id_generator.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class AsyncIDGenerator(BaseIDGenerator):
    """
    ID generator that produces UUID version 4 strings asynchronously.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    This generator provides an asynchronous interface for generating UUIDs.
    """

    @property
    def id_type(self) -> IDType:
        """
        Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING).
        """
        return IDType.STRING

    async def generate(self) -> str:
        """
        Asynchronously generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        # Simulate async operation (e.g., if fetching from an external service)
        return str(uuid.uuid4())

Attributes

id_type property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING).

Functions

generate async
generate()

Asynchronously generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in agentflow/utils/id_generator.py
226
227
228
229
230
231
232
233
234
async def generate(self) -> str:
    """
    Asynchronously generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    # Simulate async operation (e.g., if fetching from an external service)
    return str(uuid.uuid4())

BackgroundTaskManager

Manages asyncio background tasks for agent operations.

Tracks created tasks, ensures cleanup, and logs errors from background execution. Enhanced with cancellation, timeouts, and metadata tracking.

Methods:

Name Description
__init__

Initialize the BackgroundTaskManager.

cancel_all

Cancel all tracked background tasks.

create_task

Create and track a background asyncio task.

get_task_count

Get the number of active background tasks.

get_task_info

Get information about all active tasks.

wait_for_all

Wait for all tracked background tasks to complete.

Source code in agentflow/utils/background_task_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class BackgroundTaskManager:
    """
    Manages asyncio background tasks for agent operations.

    Tracks created tasks, ensures cleanup, and logs errors from background execution.
    Enhanced with cancellation, timeouts, and metadata tracking.
    """

    def __init__(self):
        """
        Initialize the BackgroundTaskManager.
        """
        self._tasks: set[asyncio.Task] = set()
        self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}

    def create_task(
        self,
        coro: Coroutine,
        *,
        name: str = "background_task",
        timeout: float | None = None,
        context: dict[str, Any] | None = None,
    ) -> asyncio.Task:
        """
        Create and track a background asyncio task.

        Args:
            coro (Coroutine): The coroutine to run in the background.
            name (str): Human-readable name for the task.
            timeout (Optional[float]): Timeout in seconds for the task.
            context (Optional[dict]): Additional context for logging.

        Returns:
            asyncio.Task: The created task.
        """
        metrics.counter("background_task_manager.tasks_created").inc()

        task = asyncio.create_task(coro, name=name)
        metadata = TaskMetadata(
            name=name, created_at=time.time(), timeout=timeout, context=context or {}
        )

        self._tasks.add(task)
        self._task_metadata[task] = metadata
        task.add_done_callback(self._task_done_callback)

        # Set up timeout if specified
        if timeout:
            self._setup_timeout(task, timeout)

        logger.debug(
            "Created background task: %s (timeout=%s)",
            name,
            timeout,
            extra={"task_context": context},
        )

        return task

    def _setup_timeout(self, task: asyncio.Task, timeout: float) -> None:
        """Set up timeout cancellation for a task."""

        async def timeout_canceller():
            try:
                await asyncio.sleep(timeout)
                if not task.done():
                    metadata = self._task_metadata.get(task)
                    task_name = metadata.name if metadata else "unknown"
                    logger.warning(
                        "Background task '%s' timed out after %s seconds", task_name, timeout
                    )
                    task.cancel()
                    metrics.counter("background_task_manager.tasks_timed_out").inc()
            except asyncio.CancelledError:
                pass  # Parent task was cancelled, this is expected

        # Create the timeout task but don't track it (avoid recursive tracking)
        timeout_task = asyncio.create_task(timeout_canceller())
        # Add a callback to clean up the timeout task reference
        timeout_task.add_done_callback(lambda t: None)

    def _task_done_callback(self, task: asyncio.Task) -> None:
        """
        Remove completed task and log exceptions if any.

        Args:
            task (asyncio.Task): The completed asyncio task.
        """
        metadata = self._task_metadata.pop(task, None)
        self._tasks.discard(task)

        task_name = metadata.name if metadata else "unknown"
        duration = time.time() - metadata.created_at if metadata else 0.0

        try:
            task.result()  # raises if task failed
            metrics.counter("background_task_manager.tasks_completed").inc()
            logger.debug(
                "Background task '%s' completed successfully (duration=%.2fs)",
                task_name,
                duration,
                extra={"task_context": metadata.context if metadata else {}},
            )
        except asyncio.CancelledError:
            metrics.counter("background_task_manager.tasks_cancelled").inc()
            logger.debug("Background task '%s' was cancelled", task_name)
        except Exception as e:
            metrics.counter("background_task_manager.tasks_failed").inc()
            error_msg = (
                f"Background task raised an exception - {task_name}: {e} (duration={duration:.2f}s)"
            )
            logger.error(
                error_msg,
                exc_info=e,
                extra={"task_context": metadata.context if metadata else {}},
            )

    async def cancel_all(self) -> None:
        """
        Cancel all tracked background tasks.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Cancelling %d background tasks...", len(self._tasks))

        for task in self._tasks.copy():
            if not task.done():
                task.cancel()

        # Wait a short time for cancellations to process
        await asyncio.sleep(0.1)

    async def wait_for_all(
        self, timeout: float | None = None, return_exceptions: bool = False
    ) -> None:
        """
        Wait for all tracked background tasks to complete.

        Args:
            timeout (float | None): Maximum time to wait in seconds.
            return_exceptions (bool): If True, exceptions are returned as results instead of raised.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

        try:
            if timeout:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                    timeout=timeout,
                )
            else:
                await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
            logger.info("All background tasks finished.")
        except TimeoutError:
            logger.warning("Timeout waiting for background tasks, some may still be running")
            metrics.counter("background_task_manager.wait_timeout").inc()

    def get_task_count(self) -> int:
        """Get the number of active background tasks."""
        return len(self._tasks)

    def get_task_info(self) -> list[dict[str, Any]]:
        """Get information about all active tasks."""
        current_time = time.time()
        return [
            {
                "name": metadata.name,
                "age_seconds": current_time - metadata.created_at,
                "timeout": metadata.timeout,
                "context": metadata.context,
                "done": task.done(),
                "cancelled": task.cancelled() if task.done() else False,
            }
            for task, metadata in self._task_metadata.items()
        ]

Functions

__init__
__init__()

Initialize the BackgroundTaskManager.

Source code in agentflow/utils/background_task_manager.py
39
40
41
42
43
44
def __init__(self):
    """
    Initialize the BackgroundTaskManager.
    """
    self._tasks: set[asyncio.Task] = set()
    self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}
cancel_all async
cancel_all()

Cancel all tracked background tasks.

Returns:

Type Description
None

None

Source code in agentflow/utils/background_task_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def cancel_all(self) -> None:
    """
    Cancel all tracked background tasks.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Cancelling %d background tasks...", len(self._tasks))

    for task in self._tasks.copy():
        if not task.done():
            task.cancel()

    # Wait a short time for cancellations to process
    await asyncio.sleep(0.1)
create_task
create_task(coro, *, name='background_task', timeout=None, context=None)

Create and track a background asyncio task.

Parameters:

Name Type Description Default
coro
Coroutine

The coroutine to run in the background.

required
name
str

Human-readable name for the task.

'background_task'
timeout
Optional[float]

Timeout in seconds for the task.

None
context
Optional[dict]

Additional context for logging.

None

Returns:

Type Description
Task

asyncio.Task: The created task.

Source code in agentflow/utils/background_task_manager.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create_task(
    self,
    coro: Coroutine,
    *,
    name: str = "background_task",
    timeout: float | None = None,
    context: dict[str, Any] | None = None,
) -> asyncio.Task:
    """
    Create and track a background asyncio task.

    Args:
        coro (Coroutine): The coroutine to run in the background.
        name (str): Human-readable name for the task.
        timeout (Optional[float]): Timeout in seconds for the task.
        context (Optional[dict]): Additional context for logging.

    Returns:
        asyncio.Task: The created task.
    """
    metrics.counter("background_task_manager.tasks_created").inc()

    task = asyncio.create_task(coro, name=name)
    metadata = TaskMetadata(
        name=name, created_at=time.time(), timeout=timeout, context=context or {}
    )

    self._tasks.add(task)
    self._task_metadata[task] = metadata
    task.add_done_callback(self._task_done_callback)

    # Set up timeout if specified
    if timeout:
        self._setup_timeout(task, timeout)

    logger.debug(
        "Created background task: %s (timeout=%s)",
        name,
        timeout,
        extra={"task_context": context},
    )

    return task
get_task_count
get_task_count()

Get the number of active background tasks.

Source code in agentflow/utils/background_task_manager.py
198
199
200
def get_task_count(self) -> int:
    """Get the number of active background tasks."""
    return len(self._tasks)
get_task_info
get_task_info()

Get information about all active tasks.

Source code in agentflow/utils/background_task_manager.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_task_info(self) -> list[dict[str, Any]]:
    """Get information about all active tasks."""
    current_time = time.time()
    return [
        {
            "name": metadata.name,
            "age_seconds": current_time - metadata.created_at,
            "timeout": metadata.timeout,
            "context": metadata.context,
            "done": task.done(),
            "cancelled": task.cancelled() if task.done() else False,
        }
        for task, metadata in self._task_metadata.items()
    ]
wait_for_all async
wait_for_all(timeout=None, return_exceptions=False)

Wait for all tracked background tasks to complete.

Parameters:

Name Type Description Default
timeout
float | None

Maximum time to wait in seconds.

None
return_exceptions
bool

If True, exceptions are returned as results instead of raised.

False

Returns:

Type Description
None

None

Source code in agentflow/utils/background_task_manager.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def wait_for_all(
    self, timeout: float | None = None, return_exceptions: bool = False
) -> None:
    """
    Wait for all tracked background tasks to complete.

    Args:
        timeout (float | None): Maximum time to wait in seconds.
        return_exceptions (bool): If True, exceptions are returned as results instead of raised.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

    try:
        if timeout:
            await asyncio.wait_for(
                asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                timeout=timeout,
            )
        else:
            await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
        logger.info("All background tasks finished.")
    except TimeoutError:
        logger.warning("Timeout waiting for background tasks, some may still be running")
        metrics.counter("background_task_manager.wait_timeout").inc()

BaseIDGenerator

Bases: ABC

Abstract base class for ID generation strategies.

All ID generators must implement the id_type property and generate method.

Methods:

Name Description
generate

Generate a new unique ID.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in agentflow/utils/id_generator.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BaseIDGenerator(ABC):
    """Abstract base class for ID generation strategies.

    All ID generators must implement the id_type property and generate method.
    """

    @property
    @abstractmethod
    def id_type(self) -> IDType:
        """Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING, INTEGER, or BIGINT).
        """
        raise NotImplementedError("id_type method must be implemented")

    @abstractmethod
    def generate(self) -> str | int | Awaitable[str | int]:
        """Generate a new unique ID.

        Returns:
            str | int: A new unique identifier of the appropriate type.
        """
        raise NotImplementedError("generate method must be implemented")

Attributes

id_type abstractmethod property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING, INTEGER, or BIGINT).

Functions

generate abstractmethod
generate()

Generate a new unique ID.

Returns:

Type Description
str | int | Awaitable[str | int]

str | int: A new unique identifier of the appropriate type.

Source code in agentflow/utils/id_generator.py
41
42
43
44
45
46
47
48
@abstractmethod
def generate(self) -> str | int | Awaitable[str | int]:
    """Generate a new unique ID.

    Returns:
        str | int: A new unique identifier of the appropriate type.
    """
    raise NotImplementedError("generate method must be implemented")

BeforeInvokeCallback

Bases: ABC

Abstract base class for before_invoke callbacks.

Called before the AI model, tool, or MCP function is invoked. Allows for input validation and modification.

Methods:

Name Description
__call__

Execute the before_invoke callback.

Source code in agentflow/utils/callbacks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BeforeInvokeCallback[T, R](ABC):
    """Abstract base class for before_invoke callbacks.

    Called before the AI model, tool, or MCP function is invoked.
    Allows for input validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
        """Execute the before_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The input data about to be sent to the invocation

        Returns:
            Modified input data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data)

Execute the before_invoke callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
T

The input data about to be sent to the invocation

required

Returns:

Type Description
T | R

Modified input data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in agentflow/utils/callbacks.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
    """Execute the before_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The input data about to be sent to the invocation

    Returns:
        Modified input data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...

BigIntIDGenerator

Bases: BaseIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

Generates IDs by multiplying current Unix timestamp by 1e9, resulting in large integers that are sortable by creation time. Typical size is 19-20 digits.

Methods:

Name Description
generate

Generate a new big integer ID based on current nanoseconds.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class BigIntIDGenerator(BaseIDGenerator):
    """ID generator that produces big integer IDs based on current time in nanoseconds.

    Generates IDs by multiplying current Unix timestamp by 1e9, resulting in
    large integers that are sortable by creation time. Typical size is 19-20 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.BIGINT

    def generate(self) -> int:
        """Generate a new big integer ID based on current nanoseconds.

        Returns:
            int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
        """
        # Use current time in nanoseconds for higher uniqueness
        return int(time.time() * 1_000_000_000)

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new big integer ID based on current nanoseconds.

Returns:

Name Type Description
int int

A large integer (19-20 digits) representing nanoseconds since Unix epoch.

Source code in agentflow/utils/id_generator.py
83
84
85
86
87
88
89
90
def generate(self) -> int:
    """Generate a new big integer ID based on current nanoseconds.

    Returns:
        int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
    """
    # Use current time in nanoseconds for higher uniqueness
    return int(time.time() * 1_000_000_000)

CallbackContext dataclass

Context information passed to callbacks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
function_name str | None
invocation_type InvocationType
metadata dict[str, Any] | None
node_name str
Source code in agentflow/utils/callbacks.py
36
37
38
39
40
41
42
43
@dataclass
class CallbackContext:
    """Context information passed to callbacks."""

    invocation_type: InvocationType
    node_name: str
    function_name: str | None = None
    metadata: dict[str, Any] | None = None

Attributes

function_name class-attribute instance-attribute
function_name = None
invocation_type instance-attribute
invocation_type
metadata class-attribute instance-attribute
metadata = None
node_name instance-attribute
node_name

Functions

__init__
__init__(invocation_type, node_name, function_name=None, metadata=None)

CallbackManager

Manages registration and execution of callbacks for different invocation types.

Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.

Methods:

Name Description
__init__

Initialize the CallbackManager with empty callback registries.

clear_callbacks

Clear callbacks for a specific invocation type or all types.

execute_after_invoke

Execute all after_invoke callbacks for the given context.

execute_before_invoke

Execute all before_invoke callbacks for the given context.

execute_on_error

Execute all on_error callbacks for the given context.

get_callback_counts

Get count of registered callbacks by type for debugging.

register_after_invoke

Register an after_invoke callback for a specific invocation type.

register_before_invoke

Register a before_invoke callback for a specific invocation type.

register_on_error

Register an on_error callback for a specific invocation type.

Source code in agentflow/utils/callbacks.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class CallbackManager:
    """
    Manages registration and execution of callbacks for different invocation types.

    Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.
    """

    def __init__(self):
        """
        Initialize the CallbackManager with empty callback registries.
        """
        self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }

    def register_before_invoke(
        self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
    ) -> None:
        """
        Register a before_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (BeforeInvokeCallbackType): The callback to register.
        """
        self._before_callbacks[invocation_type].append(callback)

    def register_after_invoke(
        self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
    ) -> None:
        """
        Register an after_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (AfterInvokeCallbackType): The callback to register.
        """
        self._after_callbacks[invocation_type].append(callback)

    def register_on_error(
        self, invocation_type: InvocationType, callback: OnErrorCallbackType
    ) -> None:
        """
        Register an on_error callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (OnErrorCallbackType): The callback to register.
        """
        self._error_callbacks[invocation_type].append(callback)

    async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
        """
        Execute all before_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data to be validated or modified.

        Returns:
            Any: The modified input data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_data = input_data

        for callback in self._before_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, BeforeInvokeCallback):
                    current_data = await callback(context, current_data)
                elif callable(callback):
                    result = callback(context, current_data)
                    if hasattr(result, "__await__"):
                        current_data = await result
                    else:
                        current_data = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_data

    async def execute_after_invoke(
        self, context: CallbackContext, input_data: Any, output_data: Any
    ) -> Any:
        """
        Execute all after_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The original input data sent to the invocation.
            output_data (Any): The output data returned from the invocation.

        Returns:
            Any: The modified output data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_output = output_data

        for callback in self._after_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, AfterInvokeCallback):
                    current_output = await callback(context, input_data, current_output)
                elif callable(callback):
                    result = callback(context, input_data, current_output)
                    if hasattr(result, "__await__"):
                        current_output = await result
                    else:
                        current_output = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_output

    async def execute_on_error(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Message | None:
        """
        Execute all on_error callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data that caused the error.
            error (Exception): The exception that occurred.

        Returns:
            Message | None: Recovery value from callbacks, or None if not handled.
        """
        recovery_value = None

        for callback in self._error_callbacks[context.invocation_type]:
            try:
                result = None
                if isinstance(callback, OnErrorCallback):
                    result = await callback(context, input_data, error)
                elif callable(callback):
                    result = callback(context, input_data, error)
                    if hasattr(result, "__await__"):
                        result = await result  # type: ignore

                if isinstance(result, Message) or result is None:
                    recovery_value = result
            except Exception as exc:
                logger.exception("Error callback failed: %s", exc)
                continue

        return recovery_value

    def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
        """
        Clear callbacks for a specific invocation type or all types.

        Args:
            invocation_type (InvocationType | None): The invocation type to clear, or None for all.
        """
        if invocation_type:
            self._before_callbacks[invocation_type].clear()
            self._after_callbacks[invocation_type].clear()
            self._error_callbacks[invocation_type].clear()
        else:
            for inv_type in InvocationType:
                self._before_callbacks[inv_type].clear()
                self._after_callbacks[inv_type].clear()
                self._error_callbacks[inv_type].clear()

    def get_callback_counts(self) -> dict[str, dict[str, int]]:
        """
        Get count of registered callbacks by type for debugging.

        Returns:
            dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
        """
        return {
            inv_type.value: {
                "before_invoke": len(self._before_callbacks[inv_type]),
                "after_invoke": len(self._after_callbacks[inv_type]),
                "on_error": len(self._error_callbacks[inv_type]),
            }
            for inv_type in InvocationType
        }

Functions

__init__
__init__()

Initialize the CallbackManager with empty callback registries.

Source code in agentflow/utils/callbacks.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(self):
    """
    Initialize the CallbackManager with empty callback registries.
    """
    self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
clear_callbacks
clear_callbacks(invocation_type=None)

Clear callbacks for a specific invocation type or all types.

Parameters:

Name Type Description Default
invocation_type
InvocationType | None

The invocation type to clear, or None for all.

None
Source code in agentflow/utils/callbacks.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
    """
    Clear callbacks for a specific invocation type or all types.

    Args:
        invocation_type (InvocationType | None): The invocation type to clear, or None for all.
    """
    if invocation_type:
        self._before_callbacks[invocation_type].clear()
        self._after_callbacks[invocation_type].clear()
        self._error_callbacks[invocation_type].clear()
    else:
        for inv_type in InvocationType:
            self._before_callbacks[inv_type].clear()
            self._after_callbacks[inv_type].clear()
            self._error_callbacks[inv_type].clear()
execute_after_invoke async
execute_after_invoke(context, input_data, output_data)

Execute all after_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The original input data sent to the invocation.

required
output_data
Any

The output data returned from the invocation.

required

Returns:

Name Type Description
Any Any

The modified output data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in agentflow/utils/callbacks.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def execute_after_invoke(
    self, context: CallbackContext, input_data: Any, output_data: Any
) -> Any:
    """
    Execute all after_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The original input data sent to the invocation.
        output_data (Any): The output data returned from the invocation.

    Returns:
        Any: The modified output data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_output = output_data

    for callback in self._after_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, AfterInvokeCallback):
                current_output = await callback(context, input_data, current_output)
            elif callable(callback):
                result = callback(context, input_data, current_output)
                if hasattr(result, "__await__"):
                    current_output = await result
                else:
                    current_output = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_output
execute_before_invoke async
execute_before_invoke(context, input_data)

Execute all before_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The input data to be validated or modified.

required

Returns:

Name Type Description
Any Any

The modified input data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in agentflow/utils/callbacks.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
    """
    Execute all before_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data to be validated or modified.

    Returns:
        Any: The modified input data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_data = input_data

    for callback in self._before_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, BeforeInvokeCallback):
                current_data = await callback(context, current_data)
            elif callable(callback):
                result = callback(context, current_data)
                if hasattr(result, "__await__"):
                    current_data = await result
                else:
                    current_data = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_data
execute_on_error async
execute_on_error(context, input_data, error)

Execute all on_error callbacks for the given context.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation.

required
input_data
Any

The input data that caused the error.

required
error
Exception

The exception that occurred.

required

Returns:

Type Description
Message | None

Message | None: Recovery value from callbacks, or None if not handled.

Source code in agentflow/utils/callbacks.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def execute_on_error(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Message | None:
    """
    Execute all on_error callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data that caused the error.
        error (Exception): The exception that occurred.

    Returns:
        Message | None: Recovery value from callbacks, or None if not handled.
    """
    recovery_value = None

    for callback in self._error_callbacks[context.invocation_type]:
        try:
            result = None
            if isinstance(callback, OnErrorCallback):
                result = await callback(context, input_data, error)
            elif callable(callback):
                result = callback(context, input_data, error)
                if hasattr(result, "__await__"):
                    result = await result  # type: ignore

            if isinstance(result, Message) or result is None:
                recovery_value = result
        except Exception as exc:
            logger.exception("Error callback failed: %s", exc)
            continue

    return recovery_value
get_callback_counts
get_callback_counts()

Get count of registered callbacks by type for debugging.

Returns:

Type Description
dict[str, dict[str, int]]

dict[str, dict[str, int]]: Counts of callbacks for each invocation type.

Source code in agentflow/utils/callbacks.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_callback_counts(self) -> dict[str, dict[str, int]]:
    """
    Get count of registered callbacks by type for debugging.

    Returns:
        dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
    """
    return {
        inv_type.value: {
            "before_invoke": len(self._before_callbacks[inv_type]),
            "after_invoke": len(self._after_callbacks[inv_type]),
            "on_error": len(self._error_callbacks[inv_type]),
        }
        for inv_type in InvocationType
    }
register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
AfterInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
176
177
178
179
180
181
182
183
184
185
186
def register_after_invoke(
    self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    self._after_callbacks[invocation_type].append(callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
BeforeInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
164
165
166
167
168
169
170
171
172
173
174
def register_before_invoke(
    self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    self._before_callbacks[invocation_type].append(callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
OnErrorCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
188
189
190
191
192
193
194
195
196
197
198
def register_on_error(
    self, invocation_type: InvocationType, callback: OnErrorCallbackType
) -> None:
    """
    Register an on_error callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    self._error_callbacks[invocation_type].append(callback)

Command

Command object that combines state updates with control flow.

Allows nodes to update agent state and direct graph execution to specific nodes or graphs. Similar to LangGraph's Command API.

Methods:

Name Description
__init__

Initialize a Command object.

__repr__

Return a string representation of the Command object.

Attributes:

Name Type Description
PARENT
goto
graph
state
update
Source code in agentflow/utils/command.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Command[StateT: AgentState]:
    """
    Command object that combines state updates with control flow.

    Allows nodes to update agent state and direct graph execution to specific nodes or graphs.
    Similar to LangGraph's Command API.
    """

    PARENT = "PARENT"

    def __init__(
        self,
        update: Union["StateT", None, Message, str, "BaseConverter"] = None,
        goto: str | None = None,
        graph: str | None = None,
        state: StateT | None = None,
    ):
        """
        Initialize a Command object.

        Args:
            update (StateT | None | Message | str | BaseConverter): State update to apply.
            goto (str | None): Next node to execute (node name or END).
            graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
            state (StateT | None): Optional agent state to attach.
        """
        self.update = update
        self.goto = goto
        self.graph = graph
        self.state = state

    def __repr__(self) -> str:
        """
        Return a string representation of the Command object.

        Returns:
            str: String representation of the Command.
        """
        return (
            f"Command(update={self.update}, goto={self.goto}, \n"
            f" graph={self.graph}, state={self.state})"
        )

Attributes

PARENT class-attribute instance-attribute
PARENT = 'PARENT'
goto instance-attribute
goto = goto
graph instance-attribute
graph = graph
state instance-attribute
state = state
update instance-attribute
update = update

Functions

__init__
__init__(update=None, goto=None, graph=None, state=None)

Initialize a Command object.

Parameters:

Name Type Description Default
update
StateT | None | Message | str | BaseConverter

State update to apply.

None
goto
str | None

Next node to execute (node name or END).

None
graph
str | None

Which graph to navigate to (None for current, PARENT for parent).

None
state
StateT | None

Optional agent state to attach.

None
Source code in agentflow/utils/command.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    update: Union["StateT", None, Message, str, "BaseConverter"] = None,
    goto: str | None = None,
    graph: str | None = None,
    state: StateT | None = None,
):
    """
    Initialize a Command object.

    Args:
        update (StateT | None | Message | str | BaseConverter): State update to apply.
        goto (str | None): Next node to execute (node name or END).
        graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
        state (StateT | None): Optional agent state to attach.
    """
    self.update = update
    self.goto = goto
    self.graph = graph
    self.state = state
__repr__
__repr__()

Return a string representation of the Command object.

Returns:

Name Type Description
str str

String representation of the Command.

Source code in agentflow/utils/command.py
54
55
56
57
58
59
60
61
62
63
64
def __repr__(self) -> str:
    """
    Return a string representation of the Command object.

    Returns:
        str: String representation of the Command.
    """
    return (
        f"Command(update={self.update}, goto={self.goto}, \n"
        f" graph={self.graph}, state={self.state})"
    )

DefaultIDGenerator

Bases: BaseIDGenerator

Default ID generator that returns empty strings.

This generator is intended as a placeholder that can be configured to use framework defaults (typically UUID-based). Currently returns empty strings. If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Methods:

Name Description
generate

Generate a default ID (currently empty string).

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class DefaultIDGenerator(BaseIDGenerator):
    """Default ID generator that returns empty strings.

    This generator is intended as a placeholder that can be configured
    to use framework defaults (typically UUID-based). Currently returns
    empty strings. If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a default ID (currently empty string).

        If empty string is returned, the framework will use its default
        UUID-based generator. If the framework is not configured to use
        UUID generation, it will fall back to UUID4.

        Returns:
            str: An empty string (framework will substitute with UUID).
        """
        # if you keep empty, then it will be used default
        # framework default which is UUID based
        # if framework not using then uuid 4 will be used
        return ""

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a default ID (currently empty string).

If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Returns:

Name Type Description
str str

An empty string (framework will substitute with UUID).

Source code in agentflow/utils/id_generator.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def generate(self) -> str:
    """Generate a default ID (currently empty string).

    If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.

    Returns:
        str: An empty string (framework will substitute with UUID).
    """
    # if you keep empty, then it will be used default
    # framework default which is UUID based
    # if framework not using then uuid 4 will be used
    return ""

ExecutionState

Bases: StrEnum

Graph execution states for agent workflows.

Values

RUNNING: Execution is in progress. PAUSED: Execution is paused. COMPLETED: Execution completed successfully. ERROR: Execution encountered an error. INTERRUPTED: Execution was interrupted. ABORTED: Execution was aborted. IDLE: Execution is idle.

Attributes:

Name Type Description
ABORTED
COMPLETED
ERROR
IDLE
INTERRUPTED
PAUSED
RUNNING
Source code in agentflow/utils/constants.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ExecutionState(StrEnum):
    """
    Graph execution states for agent workflows.

    Values:
        RUNNING: Execution is in progress.
        PAUSED: Execution is paused.
        COMPLETED: Execution completed successfully.
        ERROR: Execution encountered an error.
        INTERRUPTED: Execution was interrupted.
        ABORTED: Execution was aborted.
        IDLE: Execution is idle.
    """

    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    ERROR = "error"
    INTERRUPTED = "interrupted"
    ABORTED = "aborted"
    IDLE = "idle"

Attributes

ABORTED class-attribute instance-attribute
ABORTED = 'aborted'
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
IDLE class-attribute instance-attribute
IDLE = 'idle'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PAUSED class-attribute instance-attribute
PAUSED = 'paused'
RUNNING class-attribute instance-attribute
RUNNING = 'running'

HexIDGenerator

Bases: BaseIDGenerator

ID generator that produces hexadecimal strings.

Generates cryptographically secure random hex strings of 32 characters (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).

Methods:

Name Description
generate

Generate a new 32-character hexadecimal string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class HexIDGenerator(BaseIDGenerator):
    """ID generator that produces hexadecimal strings.

    Generates cryptographically secure random hex strings of 32 characters
    (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 32-character hexadecimal string.

        Returns:
            str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
        """
        return secrets.token_hex(16)

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new 32-character hexadecimal string.

Returns:

Name Type Description
str str

A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').

Source code in agentflow/utils/id_generator.py
154
155
156
157
158
159
160
def generate(self) -> str:
    """Generate a new 32-character hexadecimal string.

    Returns:
        str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
    """
    return secrets.token_hex(16)

IDType

Bases: StrEnum

Enumeration of supported ID types.

Attributes:

Name Type Description
BIGINT
INTEGER
STRING
Source code in agentflow/utils/id_generator.py
17
18
19
20
21
22
class IDType(enum.StrEnum):
    """Enumeration of supported ID types."""

    STRING = "string"  # String-based IDs
    INTEGER = "integer"  # Integer-based IDs
    BIGINT = "bigint"  # Big integer IDs

Attributes

BIGINT class-attribute instance-attribute
BIGINT = 'bigint'
INTEGER class-attribute instance-attribute
INTEGER = 'integer'
STRING class-attribute instance-attribute
STRING = 'string'

IntIDGenerator

Bases: BaseIDGenerator

ID generator that produces 32-bit random integers.

Generates cryptographically secure random integers using secrets.randbits(32). Values range from 0 to 2^32 - 1 (4,294,967,295).

Methods:

Name Description
generate

Generate a new 32-bit random integer.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class IntIDGenerator(BaseIDGenerator):
    """ID generator that produces 32-bit random integers.

    Generates cryptographically secure random integers using secrets.randbits(32).
    Values range from 0 to 2^32 - 1 (4,294,967,295).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new 32-bit random integer.

        Returns:
            int: A random integer between 0 and 4,294,967,295 (inclusive).
        """
        return secrets.randbits(32)

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new 32-bit random integer.

Returns:

Name Type Description
int int

A random integer between 0 and 4,294,967,295 (inclusive).

Source code in agentflow/utils/id_generator.py
134
135
136
137
138
139
140
def generate(self) -> int:
    """Generate a new 32-bit random integer.

    Returns:
        int: A random integer between 0 and 4,294,967,295 (inclusive).
    """
    return secrets.randbits(32)

InvocationType

Bases: Enum

Types of invocations that can trigger callbacks.

Attributes:

Name Type Description
AI
MCP
TOOL
Source code in agentflow/utils/callbacks.py
28
29
30
31
32
33
class InvocationType(Enum):
    """Types of invocations that can trigger callbacks."""

    AI = "ai"
    TOOL = "tool"
    MCP = "mcp"

Attributes

AI class-attribute instance-attribute
AI = 'ai'
MCP class-attribute instance-attribute
MCP = 'mcp'
TOOL class-attribute instance-attribute
TOOL = 'tool'

OnErrorCallback

Bases: ABC

Abstract base class for on_error callbacks.

Called when an error occurs during invocation. Allows for error handling and logging.

Methods:

Name Description
__call__

Execute the on_error callback.

Source code in agentflow/utils/callbacks.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class OnErrorCallback(ABC):
    """Abstract base class for on_error callbacks.

    Called when an error occurs during invocation.
    Allows for error handling and logging.
    """

    @abstractmethod
    async def __call__(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Any | None:
        """Execute the on_error callback.

        Args:
            context: Context information about the invocation
            input_data: The input data that caused the error
            error: The exception that occurred

        Returns:
            Optional recovery value or None to re-raise the error

        Raises:
            Exception: If error handling fails or if the error should be re-raised
        """
        ...

Functions

__call__ abstractmethod async
__call__(context, input_data, error)

Execute the on_error callback.

Parameters:

Name Type Description Default
context
CallbackContext

Context information about the invocation

required
input_data
Any

The input data that caused the error

required
error
Exception

The exception that occurred

required

Returns:

Type Description
Any | None

Optional recovery value or None to re-raise the error

Raises:

Type Description
Exception

If error handling fails or if the error should be re-raised

Source code in agentflow/utils/callbacks.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@abstractmethod
async def __call__(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Any | None:
    """Execute the on_error callback.

    Args:
        context: Context information about the invocation
        input_data: The input data that caused the error
        error: The exception that occurred

    Returns:
        Optional recovery value or None to re-raise the error

    Raises:
        Exception: If error handling fails or if the error should be re-raised
    """
    ...

ResponseGranularity

Bases: StrEnum

Response granularity options for agent graph outputs.

Values

FULL: State, latest messages. PARTIAL: Context, summary, latest messages. LOW: Only latest messages.

Attributes:

Name Type Description
FULL
LOW
PARTIAL
Source code in agentflow/utils/constants.py
55
56
57
58
59
60
61
62
63
64
65
66
67
class ResponseGranularity(StrEnum):
    """
    Response granularity options for agent graph outputs.

    Values:
        FULL: State, latest messages.
        PARTIAL: Context, summary, latest messages.
        LOW: Only latest messages.
    """

    FULL = "full"
    PARTIAL = "partial"
    LOW = "low"

Attributes

FULL class-attribute instance-attribute
FULL = 'full'
LOW class-attribute instance-attribute
LOW = 'low'
PARTIAL class-attribute instance-attribute
PARTIAL = 'partial'

ShortIDGenerator

Bases: BaseIDGenerator

ID generator that produces short alphanumeric strings.

Generates 8-character strings using uppercase/lowercase letters and digits. Each character is randomly chosen from 62 possible characters (26 + 26 + 10). Total possible combinations: 62^8 ≈ 2.18 x 10^14.

Methods:

Name Description
generate

Generate a new 8-character alphanumeric string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ShortIDGenerator(BaseIDGenerator):
    """ID generator that produces short alphanumeric strings.

    Generates 8-character strings using uppercase/lowercase letters and digits.
    Each character is randomly chosen from 62 possible characters (26 + 26 + 10).
    Total possible combinations: 62^8 ≈ 2.18 x 10^14.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 8-character alphanumeric string.

        Returns:
            str: An 8-character string containing letters and digits
                 (e.g., 'Ab3XyZ9k').
        """
        alphabet = string.ascii_letters + string.digits
        return "".join(secrets.choice(alphabet) for _ in range(8))

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new 8-character alphanumeric string.

Returns:

Name Type Description
str str

An 8-character string containing letters and digits (e.g., 'Ab3XyZ9k').

Source code in agentflow/utils/id_generator.py
195
196
197
198
199
200
201
202
203
def generate(self) -> str:
    """Generate a new 8-character alphanumeric string.

    Returns:
        str: An 8-character string containing letters and digits
             (e.g., 'Ab3XyZ9k').
    """
    alphabet = string.ascii_letters + string.digits
    return "".join(secrets.choice(alphabet) for _ in range(8))

StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
ALL

Save everything including tool calls.

MEDIUM

Only AI and human messages.

LOW

Only first human and last AI message.

Source code in agentflow/utils/constants.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class StorageLevel:
    """
    Message storage levels for agent state persistence.

    Attributes:
        ALL: Save everything including tool calls.
        MEDIUM: Only AI and human messages.
        LOW: Only first human and last AI message.
    """

    ALL = "all"
    MEDIUM = "medium"
    LOW = "low"

Attributes

ALL class-attribute instance-attribute
ALL = 'all'
LOW class-attribute instance-attribute
LOW = 'low'
MEDIUM class-attribute instance-attribute
MEDIUM = 'medium'

TaskMetadata dataclass

Metadata for tracking background tasks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
context dict[str, Any] | None
created_at float
name str
timeout float | None
Source code in agentflow/utils/background_task_manager.py
21
22
23
24
25
26
27
28
@dataclass
class TaskMetadata:
    """Metadata for tracking background tasks."""

    name: str
    created_at: float
    timeout: float | None = None
    context: dict[str, Any] | None = None

Attributes

context class-attribute instance-attribute
context = None
created_at instance-attribute
created_at
name instance-attribute
name
timeout class-attribute instance-attribute
timeout = None

Functions

__init__
__init__(name, created_at, timeout=None, context=None)

ThreadInfo

Bases: BaseModel

Metadata and status for a thread in agent execution.

Attributes:

Name Type Description
thread_id int | str

Unique identifier for the thread.

thread_name str | None

Optional name for the thread.

user_id int | str | None

Optional user identifier associated with the thread.

metadata dict[str, Any] | None

Optional metadata for the thread.

updated_at datetime | None

Timestamp of last update.

stop_requested bool

Whether a stop has been requested for the thread.

run_id str | None

Optional run identifier for the thread execution.

Example

ThreadInfo(thread_id=1, thread_name="main", user_id=42)

Source code in agentflow/utils/thread_info.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ThreadInfo(BaseModel):
    """
    Metadata and status for a thread in agent execution.

    Attributes:
        thread_id (int | str): Unique identifier for the thread.
        thread_name (str | None): Optional name for the thread.
        user_id (int | str | None): Optional user identifier associated with the thread.
        metadata (dict[str, Any] | None): Optional metadata for the thread.
        updated_at (datetime | None): Timestamp of last update.
        stop_requested (bool): Whether a stop has been requested for the thread.
        run_id (str | None): Optional run identifier for the thread execution.

    Example:
        >>> ThreadInfo(thread_id=1, thread_name="main", user_id=42)
    """

    thread_id: int | str
    thread_name: str | None = None
    user_id: int | str | None = None
    metadata: dict[str, Any] | None = None
    updated_at: datetime | None = None
    run_id: str | None = None

Attributes

metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
thread_id instance-attribute
thread_id
thread_name class-attribute instance-attribute
thread_name = None
updated_at class-attribute instance-attribute
updated_at = None
user_id class-attribute instance-attribute
user_id = None

TimestampIDGenerator

Bases: BaseIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

Generates IDs by multiplying current Unix timestamp by 1e6, resulting in integers that are sortable by creation time. Typical size is 16-17 digits.

Methods:

Name Description
generate

Generate a new integer ID based on current microseconds.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class TimestampIDGenerator(BaseIDGenerator):
    """ID generator that produces integer IDs based on current time in microseconds.

    Generates IDs by multiplying current Unix timestamp by 1e6, resulting in
    integers that are sortable by creation time. Typical size is 16-17 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new integer ID based on current microseconds.

        Returns:
            int: An integer (16-17 digits) representing microseconds since Unix epoch.
        """
        return int(time.time() * 1000000)

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new integer ID based on current microseconds.

Returns:

Name Type Description
int int

An integer (16-17 digits) representing microseconds since Unix epoch.

Source code in agentflow/utils/id_generator.py
174
175
176
177
178
179
180
def generate(self) -> int:
    """Generate a new integer ID based on current microseconds.

    Returns:
        int: An integer (16-17 digits) representing microseconds since Unix epoch.
    """
    return int(time.time() * 1000000)

UUIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).

Methods:

Name Description
generate

Generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class UUIDGenerator(BaseIDGenerator):
    """ID generator that produces UUID version 4 strings.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        return str(uuid.uuid4())

Attributes

id_type property
id_type

Functions

generate
generate()

Generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in agentflow/utils/id_generator.py
63
64
65
66
67
68
69
def generate(self) -> str:
    """Generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    return str(uuid.uuid4())

Functions

add_messages

add_messages(left, right)

Adds messages to the list, avoiding duplicates by message_id.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages.

required

right

list[Message]

New messages to add.

required

Returns:

Type Description
list[Message]

list[Message]: Combined list with unique messages.

Example

add_messages([msg1], [msg2, msg1]) [msg1, msg2]

Source code in agentflow/state/reducers.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
def add_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Adds messages to the list, avoiding duplicates by message_id.

    Args:
        left (list[Message]): Existing list of messages.
        right (list[Message]): New messages to add.

    Returns:
        list[Message]: Combined list with unique messages.

    Example:
        >>> add_messages([msg1], [msg2, msg1])
        [msg1, msg2]
    """
    left_ids = {msg.message_id for msg in left}
    right = [msg for msg in right if msg.message_id not in left_ids and msg.delta is False]
    return left + right

append_items

append_items(left, right)

Appends items to a list, avoiding duplicates by item.id.

Parameters:

Name Type Description Default

left

list

Existing list of items (must have .id attribute).

required

right

list

New items to add.

required

Returns:

Name Type Description
list list

Combined list with unique items.

Example

append_items([item1], [item2, item1]) [item1, item2]

Source code in agentflow/state/reducers.py
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def append_items(left: list, right: list) -> list:
    """
    Appends items to a list, avoiding duplicates by item.id.

    Args:
        left (list): Existing list of items (must have .id attribute).
        right (list): New items to add.

    Returns:
        list: Combined list with unique items.

    Example:
        >>> append_items([item1], [item2, item1])
        [item1, item2]
    """
    left_ids = {item.id for item in left}
    right = [item for item in right if item.id not in left_ids]
    return left + right

call_sync_or_async async

call_sync_or_async(func, *args, **kwargs)

Call a function that may be sync or async, returning its result.

If the function is synchronous, it runs in a thread pool to avoid blocking the event loop. If the result is awaitable, it is awaited before returning.

Parameters:

Name Type Description Default

func

Callable[..., Any]

The function to call.

required

*args

Positional arguments for the function.

()

**kwargs

Keyword arguments for the function.

{}

Returns:

Name Type Description
Any Any

The result of the function call, awaited if necessary.

Source code in agentflow/utils/callable_utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def call_sync_or_async(func: Callable[..., Any], *args, **kwargs) -> Any:
    """
    Call a function that may be sync or async, returning its result.

    If the function is synchronous, it runs in a thread pool to avoid blocking
    the event loop. If the result is awaitable, it is awaited before returning.

    Args:
        func (Callable[..., Any]): The function to call.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function.

    Returns:
        Any: The result of the function call, awaited if necessary.
    """
    if _is_async_callable(func):
        return await func(*args, **kwargs)

    # Call sync function in a thread pool
    result = await asyncio.to_thread(func, *args, **kwargs)
    # If the result is awaitable, await it
    if inspect.isawaitable(result):
        return await result
    return result

configure_logging

configure_logging(level=logging.INFO, format_string=None, handler=None)

Configures the root logger for the TAF project.

This function sets up logging for all modules under the 'agentflow' namespace. It ensures that logs are formatted consistently and sent to the appropriate handler.

Parameters:

Name Type Description Default

level

int

Logging level (e.g., logging.INFO, logging.DEBUG). Defaults to logging.INFO.

INFO

format_string

str

Custom format string for log messages. If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".

None

handler

Handler

Custom logging handler. If None, uses StreamHandler to stdout.

None

Returns:

Type Description
None

None

Example

configure_logging(level=logging.DEBUG) logger = logging.getLogger("agentflow.module") logger.info("This is an info message.")

Source code in agentflow/utils/logging.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def configure_logging(
    level: int = logging.INFO,
    format_string: str | None = None,
    handler: logging.Handler | None = None,
) -> None:
    """
    Configures the root logger for the TAF project.

    This function sets up logging for all modules under the 'agentflow' namespace.
    It ensures that logs are formatted consistently and sent to the appropriate handler.

    Args:
        level (int, optional): Logging level (e.g., logging.INFO, logging.DEBUG).
            Defaults to logging.INFO.
        format_string (str, optional): Custom format string for log messages.
            If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".
        handler (logging.Handler, optional): Custom logging handler. If None,
            uses StreamHandler to stdout.

    Returns:
        None

    Raises:
        None

    Example:
        >>> configure_logging(level=logging.DEBUG)
        >>> logger = logging.getLogger("agentflow.module")
        >>> logger.info("This is an info message.")
    """
    if format_string is None:
        format_string = "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s"

    if handler is None:
        handler = logging.StreamHandler(sys.stdout)

    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)

    # Configure root logger for agentflow
    root_logger = logging.getLogger("agentflow")
    root_logger.setLevel(level)

    # Only add handler if none exists to avoid duplicates
    if not root_logger.handlers:
        root_logger.addHandler(handler)

    # Prevent propagation to avoid duplicate logs
    root_logger.propagate = False

convert_messages

convert_messages(system_prompts, state=None, extra_messages=None)

Convert system prompts, agent state, and extra messages to a list of dicts for LLM/tool payloads.

Parameters:

Name Type Description Default

system_prompts

list[dict[str, Any]]

List of system prompt dicts.

required

state

AgentState | None

Optional agent state containing context and summary.

None

extra_messages

list[Message] | None

Optional extra messages to include.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of message dicts for payloads.

Raises:

Type Description
ValueError

If system_prompts is None.

Source code in agentflow/utils/converter.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def convert_messages(
    system_prompts: list[dict[str, Any]],
    state: Union["AgentState", None] = None,
    extra_messages: list[Message] | None = None,
) -> list[dict[str, Any]]:
    """
    Convert system prompts, agent state, and extra messages to a list of dicts for
    LLM/tool payloads.

    Args:
        system_prompts (list[dict[str, Any]]): List of system prompt dicts.
        state (AgentState | None): Optional agent state containing context and summary.
        extra_messages (list[Message] | None): Optional extra messages to include.

    Returns:
        list[dict[str, Any]]: List of message dicts for payloads.

    Raises:
        ValueError: If system_prompts is None.
    """
    if system_prompts is None:
        logger.error("System prompts are None")
        raise ValueError("System prompts cannot be None")

    res = []
    res += system_prompts

    if state and state.context_summary:
        summary = {
            "role": "assistant",
            "content": state.context_summary if state.context_summary else "",
        }
        res.append(summary)

    if state and state.context:
        for msg in state.context:
            formatted = _convert_dict(msg)
            if formatted:
                res.append(formatted)

    if extra_messages:
        for msg in extra_messages:
            formatted = _convert_dict(msg)
            if formatted:
                res.append(formatted)

    logger.debug("Number of Converted messages: %s", len(res))
    return res

generate_dummy_thread_name

generate_dummy_thread_name(separator='-')

Generate a meaningful English name for an AI chat thread.

Parameters:

Name Type Description Default

separator

str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

Example

generate_dummy_thread_name() 'creative-exploration'

Source code in agentflow/utils/thread_name_generator.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def generate_dummy_thread_name(separator: str = "-") -> str:
    """
    Generate a meaningful English name for an AI chat thread.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

    Example:
        >>> generate_dummy_thread_name()
        'creative-exploration'
    """
    generator = AIThreadNameGenerator()
    return generator.generate_name(separator)

register_after_invoke

register_after_invoke(invocation_type, callback)

Register an after_invoke callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

AfterInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
353
354
355
356
357
358
359
360
361
362
363
def register_after_invoke(
    invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_after_invoke(invocation_type, callback)

register_before_invoke

register_before_invoke(invocation_type, callback)

Register a before_invoke callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

BeforeInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
340
341
342
343
344
345
346
347
348
349
350
def register_before_invoke(
    invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_before_invoke(invocation_type, callback)

register_on_error

register_on_error(invocation_type, callback)

Register an on_error callback on the global callback manager.

Parameters:

Name Type Description Default

invocation_type

InvocationType

The type of invocation (AI, TOOL, MCP).

required

callback

OnErrorCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
366
367
368
369
370
371
372
373
374
def register_on_error(invocation_type: InvocationType, callback: OnErrorCallbackType) -> None:
    """
    Register an on_error callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    default_callback_manager.register_on_error(invocation_type, callback)

replace_messages

replace_messages(left, right)

Replaces the entire message list with a new one.

Parameters:

Name Type Description Default

left

list[Message]

Existing list of messages (ignored).

required

right

list[Message]

New list of messages.

required

Returns:

Type Description
list[Message]

list[Message]: The new message list.

Example

replace_messages([msg1], [msg2]) [msg2]

Source code in agentflow/state/reducers.py
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def replace_messages(left: list[Message], right: list[Message]) -> list[Message]:
    """
    Replaces the entire message list with a new one.

    Args:
        left (list[Message]): Existing list of messages (ignored).
        right (list[Message]): New list of messages.

    Returns:
        list[Message]: The new message list.

    Example:
        >>> replace_messages([msg1], [msg2])
        [msg2]
    """
    return right

replace_value

replace_value(left, right)

Replaces a value with another.

Parameters:

Name Type Description Default

left

Existing value (ignored).

required

right

New value to use.

required

Returns:

Name Type Description
Any

The new value.

Example

replace_value(1, 2) 2

Source code in agentflow/state/reducers.py
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
def replace_value(left, right):
    """
    Replaces a value with another.

    Args:
        left: Existing value (ignored).
        right: New value to use.

    Returns:
        Any: The new value.

    Example:
        >>> replace_value(1, 2)
        2
    """
    return right

run_coroutine

run_coroutine(func)

Run an async coroutine from a sync context safely.

Source code in agentflow/utils/callable_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
def run_coroutine(func: Coroutine) -> Any:
    """Run an async coroutine from a sync context safely."""
    # Always try to get/create an event loop and use thread-safe execution
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No loop running, create one
        return asyncio.run(func)

    # Loop is running, use thread-safe execution
    fut = asyncio.run_coroutine_threadsafe(func, loop)
    return fut.result()

Modules

background_task_manager

Background task manager for async operations in TAF.

This module provides BackgroundTaskManager, which tracks and manages asyncio background tasks, ensuring proper cleanup and error logging.

Classes:

Name Description
BackgroundTaskManager

Manages asyncio background tasks for agent operations.

TaskMetadata

Metadata for tracking background tasks.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

BackgroundTaskManager

Manages asyncio background tasks for agent operations.

Tracks created tasks, ensures cleanup, and logs errors from background execution. Enhanced with cancellation, timeouts, and metadata tracking.

Methods:

Name Description
__init__

Initialize the BackgroundTaskManager.

cancel_all

Cancel all tracked background tasks.

create_task

Create and track a background asyncio task.

get_task_count

Get the number of active background tasks.

get_task_info

Get information about all active tasks.

wait_for_all

Wait for all tracked background tasks to complete.

Source code in agentflow/utils/background_task_manager.py
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
class BackgroundTaskManager:
    """
    Manages asyncio background tasks for agent operations.

    Tracks created tasks, ensures cleanup, and logs errors from background execution.
    Enhanced with cancellation, timeouts, and metadata tracking.
    """

    def __init__(self):
        """
        Initialize the BackgroundTaskManager.
        """
        self._tasks: set[asyncio.Task] = set()
        self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}

    def create_task(
        self,
        coro: Coroutine,
        *,
        name: str = "background_task",
        timeout: float | None = None,
        context: dict[str, Any] | None = None,
    ) -> asyncio.Task:
        """
        Create and track a background asyncio task.

        Args:
            coro (Coroutine): The coroutine to run in the background.
            name (str): Human-readable name for the task.
            timeout (Optional[float]): Timeout in seconds for the task.
            context (Optional[dict]): Additional context for logging.

        Returns:
            asyncio.Task: The created task.
        """
        metrics.counter("background_task_manager.tasks_created").inc()

        task = asyncio.create_task(coro, name=name)
        metadata = TaskMetadata(
            name=name, created_at=time.time(), timeout=timeout, context=context or {}
        )

        self._tasks.add(task)
        self._task_metadata[task] = metadata
        task.add_done_callback(self._task_done_callback)

        # Set up timeout if specified
        if timeout:
            self._setup_timeout(task, timeout)

        logger.debug(
            "Created background task: %s (timeout=%s)",
            name,
            timeout,
            extra={"task_context": context},
        )

        return task

    def _setup_timeout(self, task: asyncio.Task, timeout: float) -> None:
        """Set up timeout cancellation for a task."""

        async def timeout_canceller():
            try:
                await asyncio.sleep(timeout)
                if not task.done():
                    metadata = self._task_metadata.get(task)
                    task_name = metadata.name if metadata else "unknown"
                    logger.warning(
                        "Background task '%s' timed out after %s seconds", task_name, timeout
                    )
                    task.cancel()
                    metrics.counter("background_task_manager.tasks_timed_out").inc()
            except asyncio.CancelledError:
                pass  # Parent task was cancelled, this is expected

        # Create the timeout task but don't track it (avoid recursive tracking)
        timeout_task = asyncio.create_task(timeout_canceller())
        # Add a callback to clean up the timeout task reference
        timeout_task.add_done_callback(lambda t: None)

    def _task_done_callback(self, task: asyncio.Task) -> None:
        """
        Remove completed task and log exceptions if any.

        Args:
            task (asyncio.Task): The completed asyncio task.
        """
        metadata = self._task_metadata.pop(task, None)
        self._tasks.discard(task)

        task_name = metadata.name if metadata else "unknown"
        duration = time.time() - metadata.created_at if metadata else 0.0

        try:
            task.result()  # raises if task failed
            metrics.counter("background_task_manager.tasks_completed").inc()
            logger.debug(
                "Background task '%s' completed successfully (duration=%.2fs)",
                task_name,
                duration,
                extra={"task_context": metadata.context if metadata else {}},
            )
        except asyncio.CancelledError:
            metrics.counter("background_task_manager.tasks_cancelled").inc()
            logger.debug("Background task '%s' was cancelled", task_name)
        except Exception as e:
            metrics.counter("background_task_manager.tasks_failed").inc()
            error_msg = (
                f"Background task raised an exception - {task_name}: {e} (duration={duration:.2f}s)"
            )
            logger.error(
                error_msg,
                exc_info=e,
                extra={"task_context": metadata.context if metadata else {}},
            )

    async def cancel_all(self) -> None:
        """
        Cancel all tracked background tasks.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Cancelling %d background tasks...", len(self._tasks))

        for task in self._tasks.copy():
            if not task.done():
                task.cancel()

        # Wait a short time for cancellations to process
        await asyncio.sleep(0.1)

    async def wait_for_all(
        self, timeout: float | None = None, return_exceptions: bool = False
    ) -> None:
        """
        Wait for all tracked background tasks to complete.

        Args:
            timeout (float | None): Maximum time to wait in seconds.
            return_exceptions (bool): If True, exceptions are returned as results instead of raised.

        Returns:
            None
        """
        if not self._tasks:
            return

        logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

        try:
            if timeout:
                await asyncio.wait_for(
                    asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                    timeout=timeout,
                )
            else:
                await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
            logger.info("All background tasks finished.")
        except TimeoutError:
            logger.warning("Timeout waiting for background tasks, some may still be running")
            metrics.counter("background_task_manager.wait_timeout").inc()

    def get_task_count(self) -> int:
        """Get the number of active background tasks."""
        return len(self._tasks)

    def get_task_info(self) -> list[dict[str, Any]]:
        """Get information about all active tasks."""
        current_time = time.time()
        return [
            {
                "name": metadata.name,
                "age_seconds": current_time - metadata.created_at,
                "timeout": metadata.timeout,
                "context": metadata.context,
                "done": task.done(),
                "cancelled": task.cancelled() if task.done() else False,
            }
            for task, metadata in self._task_metadata.items()
        ]
Functions
__init__
__init__()

Initialize the BackgroundTaskManager.

Source code in agentflow/utils/background_task_manager.py
39
40
41
42
43
44
def __init__(self):
    """
    Initialize the BackgroundTaskManager.
    """
    self._tasks: set[asyncio.Task] = set()
    self._task_metadata: dict[asyncio.Task, TaskMetadata] = {}
cancel_all async
cancel_all()

Cancel all tracked background tasks.

Returns:

Type Description
None

None

Source code in agentflow/utils/background_task_manager.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
async def cancel_all(self) -> None:
    """
    Cancel all tracked background tasks.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Cancelling %d background tasks...", len(self._tasks))

    for task in self._tasks.copy():
        if not task.done():
            task.cancel()

    # Wait a short time for cancellations to process
    await asyncio.sleep(0.1)
create_task
create_task(coro, *, name='background_task', timeout=None, context=None)

Create and track a background asyncio task.

Parameters:

Name Type Description Default
coro Coroutine

The coroutine to run in the background.

required
name str

Human-readable name for the task.

'background_task'
timeout Optional[float]

Timeout in seconds for the task.

None
context Optional[dict]

Additional context for logging.

None

Returns:

Type Description
Task

asyncio.Task: The created task.

Source code in agentflow/utils/background_task_manager.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
def create_task(
    self,
    coro: Coroutine,
    *,
    name: str = "background_task",
    timeout: float | None = None,
    context: dict[str, Any] | None = None,
) -> asyncio.Task:
    """
    Create and track a background asyncio task.

    Args:
        coro (Coroutine): The coroutine to run in the background.
        name (str): Human-readable name for the task.
        timeout (Optional[float]): Timeout in seconds for the task.
        context (Optional[dict]): Additional context for logging.

    Returns:
        asyncio.Task: The created task.
    """
    metrics.counter("background_task_manager.tasks_created").inc()

    task = asyncio.create_task(coro, name=name)
    metadata = TaskMetadata(
        name=name, created_at=time.time(), timeout=timeout, context=context or {}
    )

    self._tasks.add(task)
    self._task_metadata[task] = metadata
    task.add_done_callback(self._task_done_callback)

    # Set up timeout if specified
    if timeout:
        self._setup_timeout(task, timeout)

    logger.debug(
        "Created background task: %s (timeout=%s)",
        name,
        timeout,
        extra={"task_context": context},
    )

    return task
get_task_count
get_task_count()

Get the number of active background tasks.

Source code in agentflow/utils/background_task_manager.py
198
199
200
def get_task_count(self) -> int:
    """Get the number of active background tasks."""
    return len(self._tasks)
get_task_info
get_task_info()

Get information about all active tasks.

Source code in agentflow/utils/background_task_manager.py
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def get_task_info(self) -> list[dict[str, Any]]:
    """Get information about all active tasks."""
    current_time = time.time()
    return [
        {
            "name": metadata.name,
            "age_seconds": current_time - metadata.created_at,
            "timeout": metadata.timeout,
            "context": metadata.context,
            "done": task.done(),
            "cancelled": task.cancelled() if task.done() else False,
        }
        for task, metadata in self._task_metadata.items()
    ]
wait_for_all async
wait_for_all(timeout=None, return_exceptions=False)

Wait for all tracked background tasks to complete.

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait in seconds.

None
return_exceptions bool

If True, exceptions are returned as results instead of raised.

False

Returns:

Type Description
None

None

Source code in agentflow/utils/background_task_manager.py
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
async def wait_for_all(
    self, timeout: float | None = None, return_exceptions: bool = False
) -> None:
    """
    Wait for all tracked background tasks to complete.

    Args:
        timeout (float | None): Maximum time to wait in seconds.
        return_exceptions (bool): If True, exceptions are returned as results instead of raised.

    Returns:
        None
    """
    if not self._tasks:
        return

    logger.info("Waiting for %d background tasks to finish...", len(self._tasks))

    try:
        if timeout:
            await asyncio.wait_for(
                asyncio.gather(*self._tasks, return_exceptions=return_exceptions),
                timeout=timeout,
            )
        else:
            await asyncio.gather(*self._tasks, return_exceptions=return_exceptions)
        logger.info("All background tasks finished.")
    except TimeoutError:
        logger.warning("Timeout waiting for background tasks, some may still be running")
        metrics.counter("background_task_manager.wait_timeout").inc()
TaskMetadata dataclass

Metadata for tracking background tasks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
context dict[str, Any] | None
created_at float
name str
timeout float | None
Source code in agentflow/utils/background_task_manager.py
21
22
23
24
25
26
27
28
@dataclass
class TaskMetadata:
    """Metadata for tracking background tasks."""

    name: str
    created_at: float
    timeout: float | None = None
    context: dict[str, Any] | None = None
Attributes
context class-attribute instance-attribute
context = None
created_at instance-attribute
created_at
name instance-attribute
name
timeout class-attribute instance-attribute
timeout = None
Functions
__init__
__init__(name, created_at, timeout=None, context=None)

Modules

callable_utils

Utilities for calling sync or async functions in TAF.

This module provides helpers to detect async callables and to invoke functions that may be synchronous or asynchronous, handling thread pool execution and awaitables.

Functions:

Name Description
call_sync_or_async

Call a function that may be sync or async, returning its result.

run_coroutine

Run an async coroutine from a sync context safely.

Functions

call_sync_or_async async
call_sync_or_async(func, *args, **kwargs)

Call a function that may be sync or async, returning its result.

If the function is synchronous, it runs in a thread pool to avoid blocking the event loop. If the result is awaitable, it is awaited before returning.

Parameters:

Name Type Description Default
func
Callable[..., Any]

The function to call.

required
*args

Positional arguments for the function.

()
**kwargs

Keyword arguments for the function.

{}

Returns:

Name Type Description
Any Any

The result of the function call, awaited if necessary.

Source code in agentflow/utils/callable_utils.py
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
async def call_sync_or_async(func: Callable[..., Any], *args, **kwargs) -> Any:
    """
    Call a function that may be sync or async, returning its result.

    If the function is synchronous, it runs in a thread pool to avoid blocking
    the event loop. If the result is awaitable, it is awaited before returning.

    Args:
        func (Callable[..., Any]): The function to call.
        *args: Positional arguments for the function.
        **kwargs: Keyword arguments for the function.

    Returns:
        Any: The result of the function call, awaited if necessary.
    """
    if _is_async_callable(func):
        return await func(*args, **kwargs)

    # Call sync function in a thread pool
    result = await asyncio.to_thread(func, *args, **kwargs)
    # If the result is awaitable, await it
    if inspect.isawaitable(result):
        return await result
    return result
run_coroutine
run_coroutine(func)

Run an async coroutine from a sync context safely.

Source code in agentflow/utils/callable_utils.py
54
55
56
57
58
59
60
61
62
63
64
65
def run_coroutine(func: Coroutine) -> Any:
    """Run an async coroutine from a sync context safely."""
    # Always try to get/create an event loop and use thread-safe execution
    try:
        loop = asyncio.get_running_loop()
    except RuntimeError:
        # No loop running, create one
        return asyncio.run(func)

    # Loop is running, use thread-safe execution
    fut = asyncio.run_coroutine_threadsafe(func, loop)
    return fut.result()

callbacks

Callback system for TAF.

This module provides a comprehensive callback framework that allows users to define their own validation logic and custom behavior at key points in the execution flow:

  • before_invoke: Called before AI/TOOL/MCP invocation for input validation and modification
  • after_invoke: Called after AI/TOOL/MCP invocation for output validation and modification
  • on_error: Called when errors occur during invocation for error handling and logging

The system is generic and type-safe, supporting different callback types for different invocation contexts.

Classes:

Name Description
AfterInvokeCallback

Abstract base class for after_invoke callbacks.

BeforeInvokeCallback

Abstract base class for before_invoke callbacks.

CallbackContext

Context information passed to callbacks.

CallbackManager

Manages registration and execution of callbacks for different invocation types.

InvocationType

Types of invocations that can trigger callbacks.

OnErrorCallback

Abstract base class for on_error callbacks.

Functions:

Name Description
register_after_invoke

Register an after_invoke callback on the global callback manager.

register_before_invoke

Register a before_invoke callback on the global callback manager.

register_on_error

Register an on_error callback on the global callback manager.

Attributes:

Name Type Description
AfterInvokeCallbackType
BeforeInvokeCallbackType
OnErrorCallbackType
default_callback_manager
logger

Attributes

AfterInvokeCallbackType module-attribute
AfterInvokeCallbackType = Union[AfterInvokeCallback[Any, Any], Callable[[CallbackContext, Any, Any], Union[Any, Awaitable[Any]]]]
BeforeInvokeCallbackType module-attribute
BeforeInvokeCallbackType = Union[BeforeInvokeCallback[Any, Any], Callable[[CallbackContext, Any], Union[Any, Awaitable[Any]]]]
OnErrorCallbackType module-attribute
OnErrorCallbackType = Union[OnErrorCallback, Callable[[CallbackContext, Any, Exception], Union[Any | None, Awaitable[Any | None]]]]
default_callback_manager module-attribute
default_callback_manager = CallbackManager()
logger module-attribute
logger = getLogger(__name__)

Classes

AfterInvokeCallback

Bases: ABC

Abstract base class for after_invoke callbacks.

Called after the AI model, tool, or MCP function is invoked. Allows for output validation and modification.

Methods:

Name Description
__call__

Execute the after_invoke callback.

Source code in agentflow/utils/callbacks.py
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
class AfterInvokeCallback[T, R](ABC):
    """Abstract base class for after_invoke callbacks.

    Called after the AI model, tool, or MCP function is invoked.
    Allows for output validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
        """Execute the after_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The original input data that was sent
            output_data: The output data returned from the invocation

        Returns:
            Modified output data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data, output_data)

Execute the after_invoke callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data T

The original input data that was sent

required
output_data Any

The output data returned from the invocation

required

Returns:

Type Description
Any | R

Modified output data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in agentflow/utils/callbacks.py
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T, output_data: Any) -> Any | R:
    """Execute the after_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The original input data that was sent
        output_data: The output data returned from the invocation

    Returns:
        Modified output data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...
BeforeInvokeCallback

Bases: ABC

Abstract base class for before_invoke callbacks.

Called before the AI model, tool, or MCP function is invoked. Allows for input validation and modification.

Methods:

Name Description
__call__

Execute the before_invoke callback.

Source code in agentflow/utils/callbacks.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
class BeforeInvokeCallback[T, R](ABC):
    """Abstract base class for before_invoke callbacks.

    Called before the AI model, tool, or MCP function is invoked.
    Allows for input validation and modification.
    """

    @abstractmethod
    async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
        """Execute the before_invoke callback.

        Args:
            context: Context information about the invocation
            input_data: The input data about to be sent to the invocation

        Returns:
            Modified input data (can be same type or different type)

        Raises:
            Exception: If validation fails or modification cannot be performed
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data)

Execute the before_invoke callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data T

The input data about to be sent to the invocation

required

Returns:

Type Description
T | R

Modified input data (can be same type or different type)

Raises:

Type Description
Exception

If validation fails or modification cannot be performed

Source code in agentflow/utils/callbacks.py
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
@abstractmethod
async def __call__(self, context: CallbackContext, input_data: T) -> T | R:
    """Execute the before_invoke callback.

    Args:
        context: Context information about the invocation
        input_data: The input data about to be sent to the invocation

    Returns:
        Modified input data (can be same type or different type)

    Raises:
        Exception: If validation fails or modification cannot be performed
    """
    ...
CallbackContext dataclass

Context information passed to callbacks.

Methods:

Name Description
__init__

Attributes:

Name Type Description
function_name str | None
invocation_type InvocationType
metadata dict[str, Any] | None
node_name str
Source code in agentflow/utils/callbacks.py
36
37
38
39
40
41
42
43
@dataclass
class CallbackContext:
    """Context information passed to callbacks."""

    invocation_type: InvocationType
    node_name: str
    function_name: str | None = None
    metadata: dict[str, Any] | None = None
Attributes
function_name class-attribute instance-attribute
function_name = None
invocation_type instance-attribute
invocation_type
metadata class-attribute instance-attribute
metadata = None
node_name instance-attribute
node_name
Functions
__init__
__init__(invocation_type, node_name, function_name=None, metadata=None)
CallbackManager

Manages registration and execution of callbacks for different invocation types.

Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.

Methods:

Name Description
__init__

Initialize the CallbackManager with empty callback registries.

clear_callbacks

Clear callbacks for a specific invocation type or all types.

execute_after_invoke

Execute all after_invoke callbacks for the given context.

execute_before_invoke

Execute all before_invoke callbacks for the given context.

execute_on_error

Execute all on_error callbacks for the given context.

get_callback_counts

Get count of registered callbacks by type for debugging.

register_after_invoke

Register an after_invoke callback for a specific invocation type.

register_before_invoke

Register a before_invoke callback for a specific invocation type.

register_on_error

Register an on_error callback for a specific invocation type.

Source code in agentflow/utils/callbacks.py
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
class CallbackManager:
    """
    Manages registration and execution of callbacks for different invocation types.

    Handles before_invoke, after_invoke, and on_error callbacks for AI, TOOL, and MCP invocations.
    """

    def __init__(self):
        """
        Initialize the CallbackManager with empty callback registries.
        """
        self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }
        self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
            InvocationType.AI: [],
            InvocationType.TOOL: [],
            InvocationType.MCP: [],
        }

    def register_before_invoke(
        self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
    ) -> None:
        """
        Register a before_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (BeforeInvokeCallbackType): The callback to register.
        """
        self._before_callbacks[invocation_type].append(callback)

    def register_after_invoke(
        self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
    ) -> None:
        """
        Register an after_invoke callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (AfterInvokeCallbackType): The callback to register.
        """
        self._after_callbacks[invocation_type].append(callback)

    def register_on_error(
        self, invocation_type: InvocationType, callback: OnErrorCallbackType
    ) -> None:
        """
        Register an on_error callback for a specific invocation type.

        Args:
            invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
            callback (OnErrorCallbackType): The callback to register.
        """
        self._error_callbacks[invocation_type].append(callback)

    async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
        """
        Execute all before_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data to be validated or modified.

        Returns:
            Any: The modified input data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_data = input_data

        for callback in self._before_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, BeforeInvokeCallback):
                    current_data = await callback(context, current_data)
                elif callable(callback):
                    result = callback(context, current_data)
                    if hasattr(result, "__await__"):
                        current_data = await result
                    else:
                        current_data = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_data

    async def execute_after_invoke(
        self, context: CallbackContext, input_data: Any, output_data: Any
    ) -> Any:
        """
        Execute all after_invoke callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The original input data sent to the invocation.
            output_data (Any): The output data returned from the invocation.

        Returns:
            Any: The modified output data after all callbacks.

        Raises:
            Exception: If any callback fails.
        """
        current_output = output_data

        for callback in self._after_callbacks[context.invocation_type]:
            try:
                if isinstance(callback, AfterInvokeCallback):
                    current_output = await callback(context, input_data, current_output)
                elif callable(callback):
                    result = callback(context, input_data, current_output)
                    if hasattr(result, "__await__"):
                        current_output = await result
                    else:
                        current_output = result
            except Exception as e:
                await self.execute_on_error(context, input_data, e)
                raise

        return current_output

    async def execute_on_error(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Message | None:
        """
        Execute all on_error callbacks for the given context.

        Args:
            context (CallbackContext): Context information about the invocation.
            input_data (Any): The input data that caused the error.
            error (Exception): The exception that occurred.

        Returns:
            Message | None: Recovery value from callbacks, or None if not handled.
        """
        recovery_value = None

        for callback in self._error_callbacks[context.invocation_type]:
            try:
                result = None
                if isinstance(callback, OnErrorCallback):
                    result = await callback(context, input_data, error)
                elif callable(callback):
                    result = callback(context, input_data, error)
                    if hasattr(result, "__await__"):
                        result = await result  # type: ignore

                if isinstance(result, Message) or result is None:
                    recovery_value = result
            except Exception as exc:
                logger.exception("Error callback failed: %s", exc)
                continue

        return recovery_value

    def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
        """
        Clear callbacks for a specific invocation type or all types.

        Args:
            invocation_type (InvocationType | None): The invocation type to clear, or None for all.
        """
        if invocation_type:
            self._before_callbacks[invocation_type].clear()
            self._after_callbacks[invocation_type].clear()
            self._error_callbacks[invocation_type].clear()
        else:
            for inv_type in InvocationType:
                self._before_callbacks[inv_type].clear()
                self._after_callbacks[inv_type].clear()
                self._error_callbacks[inv_type].clear()

    def get_callback_counts(self) -> dict[str, dict[str, int]]:
        """
        Get count of registered callbacks by type for debugging.

        Returns:
            dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
        """
        return {
            inv_type.value: {
                "before_invoke": len(self._before_callbacks[inv_type]),
                "after_invoke": len(self._after_callbacks[inv_type]),
                "on_error": len(self._error_callbacks[inv_type]),
            }
            for inv_type in InvocationType
        }
Functions
__init__
__init__()

Initialize the CallbackManager with empty callback registries.

Source code in agentflow/utils/callbacks.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
def __init__(self):
    """
    Initialize the CallbackManager with empty callback registries.
    """
    self._before_callbacks: dict[InvocationType, list[BeforeInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._after_callbacks: dict[InvocationType, list[AfterInvokeCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
    self._error_callbacks: dict[InvocationType, list[OnErrorCallbackType]] = {
        InvocationType.AI: [],
        InvocationType.TOOL: [],
        InvocationType.MCP: [],
    }
clear_callbacks
clear_callbacks(invocation_type=None)

Clear callbacks for a specific invocation type or all types.

Parameters:

Name Type Description Default
invocation_type InvocationType | None

The invocation type to clear, or None for all.

None
Source code in agentflow/utils/callbacks.py
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
def clear_callbacks(self, invocation_type: InvocationType | None = None) -> None:
    """
    Clear callbacks for a specific invocation type or all types.

    Args:
        invocation_type (InvocationType | None): The invocation type to clear, or None for all.
    """
    if invocation_type:
        self._before_callbacks[invocation_type].clear()
        self._after_callbacks[invocation_type].clear()
        self._error_callbacks[invocation_type].clear()
    else:
        for inv_type in InvocationType:
            self._before_callbacks[inv_type].clear()
            self._after_callbacks[inv_type].clear()
            self._error_callbacks[inv_type].clear()
execute_after_invoke async
execute_after_invoke(context, input_data, output_data)

Execute all after_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The original input data sent to the invocation.

required
output_data Any

The output data returned from the invocation.

required

Returns:

Name Type Description
Any Any

The modified output data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in agentflow/utils/callbacks.py
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
async def execute_after_invoke(
    self, context: CallbackContext, input_data: Any, output_data: Any
) -> Any:
    """
    Execute all after_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The original input data sent to the invocation.
        output_data (Any): The output data returned from the invocation.

    Returns:
        Any: The modified output data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_output = output_data

    for callback in self._after_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, AfterInvokeCallback):
                current_output = await callback(context, input_data, current_output)
            elif callable(callback):
                result = callback(context, input_data, current_output)
                if hasattr(result, "__await__"):
                    current_output = await result
                else:
                    current_output = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_output
execute_before_invoke async
execute_before_invoke(context, input_data)

Execute all before_invoke callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The input data to be validated or modified.

required

Returns:

Name Type Description
Any Any

The modified input data after all callbacks.

Raises:

Type Description
Exception

If any callback fails.

Source code in agentflow/utils/callbacks.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
async def execute_before_invoke(self, context: CallbackContext, input_data: Any) -> Any:
    """
    Execute all before_invoke callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data to be validated or modified.

    Returns:
        Any: The modified input data after all callbacks.

    Raises:
        Exception: If any callback fails.
    """
    current_data = input_data

    for callback in self._before_callbacks[context.invocation_type]:
        try:
            if isinstance(callback, BeforeInvokeCallback):
                current_data = await callback(context, current_data)
            elif callable(callback):
                result = callback(context, current_data)
                if hasattr(result, "__await__"):
                    current_data = await result
                else:
                    current_data = result
        except Exception as e:
            await self.execute_on_error(context, input_data, e)
            raise

    return current_data
execute_on_error async
execute_on_error(context, input_data, error)

Execute all on_error callbacks for the given context.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation.

required
input_data Any

The input data that caused the error.

required
error Exception

The exception that occurred.

required

Returns:

Type Description
Message | None

Message | None: Recovery value from callbacks, or None if not handled.

Source code in agentflow/utils/callbacks.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
async def execute_on_error(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Message | None:
    """
    Execute all on_error callbacks for the given context.

    Args:
        context (CallbackContext): Context information about the invocation.
        input_data (Any): The input data that caused the error.
        error (Exception): The exception that occurred.

    Returns:
        Message | None: Recovery value from callbacks, or None if not handled.
    """
    recovery_value = None

    for callback in self._error_callbacks[context.invocation_type]:
        try:
            result = None
            if isinstance(callback, OnErrorCallback):
                result = await callback(context, input_data, error)
            elif callable(callback):
                result = callback(context, input_data, error)
                if hasattr(result, "__await__"):
                    result = await result  # type: ignore

            if isinstance(result, Message) or result is None:
                recovery_value = result
        except Exception as exc:
            logger.exception("Error callback failed: %s", exc)
            continue

    return recovery_value
get_callback_counts
get_callback_counts()

Get count of registered callbacks by type for debugging.

Returns:

Type Description
dict[str, dict[str, int]]

dict[str, dict[str, int]]: Counts of callbacks for each invocation type.

Source code in agentflow/utils/callbacks.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
def get_callback_counts(self) -> dict[str, dict[str, int]]:
    """
    Get count of registered callbacks by type for debugging.

    Returns:
        dict[str, dict[str, int]]: Counts of callbacks for each invocation type.
    """
    return {
        inv_type.value: {
            "before_invoke": len(self._before_callbacks[inv_type]),
            "after_invoke": len(self._after_callbacks[inv_type]),
            "on_error": len(self._error_callbacks[inv_type]),
        }
        for inv_type in InvocationType
    }
register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback AfterInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
176
177
178
179
180
181
182
183
184
185
186
def register_after_invoke(
    self, invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    self._after_callbacks[invocation_type].append(callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback BeforeInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
164
165
166
167
168
169
170
171
172
173
174
def register_before_invoke(
    self, invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    self._before_callbacks[invocation_type].append(callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback for a specific invocation type.

Parameters:

Name Type Description Default
invocation_type InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback OnErrorCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
188
189
190
191
192
193
194
195
196
197
198
def register_on_error(
    self, invocation_type: InvocationType, callback: OnErrorCallbackType
) -> None:
    """
    Register an on_error callback for a specific invocation type.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    self._error_callbacks[invocation_type].append(callback)
InvocationType

Bases: Enum

Types of invocations that can trigger callbacks.

Attributes:

Name Type Description
AI
MCP
TOOL
Source code in agentflow/utils/callbacks.py
28
29
30
31
32
33
class InvocationType(Enum):
    """Types of invocations that can trigger callbacks."""

    AI = "ai"
    TOOL = "tool"
    MCP = "mcp"
Attributes
AI class-attribute instance-attribute
AI = 'ai'
MCP class-attribute instance-attribute
MCP = 'mcp'
TOOL class-attribute instance-attribute
TOOL = 'tool'
OnErrorCallback

Bases: ABC

Abstract base class for on_error callbacks.

Called when an error occurs during invocation. Allows for error handling and logging.

Methods:

Name Description
__call__

Execute the on_error callback.

Source code in agentflow/utils/callbacks.py
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
class OnErrorCallback(ABC):
    """Abstract base class for on_error callbacks.

    Called when an error occurs during invocation.
    Allows for error handling and logging.
    """

    @abstractmethod
    async def __call__(
        self, context: CallbackContext, input_data: Any, error: Exception
    ) -> Any | None:
        """Execute the on_error callback.

        Args:
            context: Context information about the invocation
            input_data: The input data that caused the error
            error: The exception that occurred

        Returns:
            Optional recovery value or None to re-raise the error

        Raises:
            Exception: If error handling fails or if the error should be re-raised
        """
        ...
Functions
__call__ abstractmethod async
__call__(context, input_data, error)

Execute the on_error callback.

Parameters:

Name Type Description Default
context CallbackContext

Context information about the invocation

required
input_data Any

The input data that caused the error

required
error Exception

The exception that occurred

required

Returns:

Type Description
Any | None

Optional recovery value or None to re-raise the error

Raises:

Type Description
Exception

If error handling fails or if the error should be re-raised

Source code in agentflow/utils/callbacks.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
@abstractmethod
async def __call__(
    self, context: CallbackContext, input_data: Any, error: Exception
) -> Any | None:
    """Execute the on_error callback.

    Args:
        context: Context information about the invocation
        input_data: The input data that caused the error
        error: The exception that occurred

    Returns:
        Optional recovery value or None to re-raise the error

    Raises:
        Exception: If error handling fails or if the error should be re-raised
    """
    ...

Functions

register_after_invoke
register_after_invoke(invocation_type, callback)

Register an after_invoke callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
AfterInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
353
354
355
356
357
358
359
360
361
362
363
def register_after_invoke(
    invocation_type: InvocationType, callback: AfterInvokeCallbackType
) -> None:
    """
    Register an after_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (AfterInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_after_invoke(invocation_type, callback)
register_before_invoke
register_before_invoke(invocation_type, callback)

Register a before_invoke callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
BeforeInvokeCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
340
341
342
343
344
345
346
347
348
349
350
def register_before_invoke(
    invocation_type: InvocationType, callback: BeforeInvokeCallbackType
) -> None:
    """
    Register a before_invoke callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (BeforeInvokeCallbackType): The callback to register.
    """
    default_callback_manager.register_before_invoke(invocation_type, callback)
register_on_error
register_on_error(invocation_type, callback)

Register an on_error callback on the global callback manager.

Parameters:

Name Type Description Default
invocation_type
InvocationType

The type of invocation (AI, TOOL, MCP).

required
callback
OnErrorCallbackType

The callback to register.

required
Source code in agentflow/utils/callbacks.py
366
367
368
369
370
371
372
373
374
def register_on_error(invocation_type: InvocationType, callback: OnErrorCallbackType) -> None:
    """
    Register an on_error callback on the global callback manager.

    Args:
        invocation_type (InvocationType): The type of invocation (AI, TOOL, MCP).
        callback (OnErrorCallbackType): The callback to register.
    """
    default_callback_manager.register_on_error(invocation_type, callback)

command

Command API for AgentGraph in TAF.

This module provides the Command class, which allows nodes to combine state updates with control flow, similar to LangGraph's Command API. Nodes can update agent state and direct graph execution to specific nodes or graphs.

Classes:

Name Description
Command

Command object that combines state updates with control flow.

Attributes:

Name Type Description
StateT

Attributes

StateT module-attribute
StateT = TypeVar('StateT', bound='AgentState')

Classes

Command

Command object that combines state updates with control flow.

Allows nodes to update agent state and direct graph execution to specific nodes or graphs. Similar to LangGraph's Command API.

Methods:

Name Description
__init__

Initialize a Command object.

__repr__

Return a string representation of the Command object.

Attributes:

Name Type Description
PARENT
goto
graph
state
update
Source code in agentflow/utils/command.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
class Command[StateT: AgentState]:
    """
    Command object that combines state updates with control flow.

    Allows nodes to update agent state and direct graph execution to specific nodes or graphs.
    Similar to LangGraph's Command API.
    """

    PARENT = "PARENT"

    def __init__(
        self,
        update: Union["StateT", None, Message, str, "BaseConverter"] = None,
        goto: str | None = None,
        graph: str | None = None,
        state: StateT | None = None,
    ):
        """
        Initialize a Command object.

        Args:
            update (StateT | None | Message | str | BaseConverter): State update to apply.
            goto (str | None): Next node to execute (node name or END).
            graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
            state (StateT | None): Optional agent state to attach.
        """
        self.update = update
        self.goto = goto
        self.graph = graph
        self.state = state

    def __repr__(self) -> str:
        """
        Return a string representation of the Command object.

        Returns:
            str: String representation of the Command.
        """
        return (
            f"Command(update={self.update}, goto={self.goto}, \n"
            f" graph={self.graph}, state={self.state})"
        )
Attributes
PARENT class-attribute instance-attribute
PARENT = 'PARENT'
goto instance-attribute
goto = goto
graph instance-attribute
graph = graph
state instance-attribute
state = state
update instance-attribute
update = update
Functions
__init__
__init__(update=None, goto=None, graph=None, state=None)

Initialize a Command object.

Parameters:

Name Type Description Default
update StateT | None | Message | str | BaseConverter

State update to apply.

None
goto str | None

Next node to execute (node name or END).

None
graph str | None

Which graph to navigate to (None for current, PARENT for parent).

None
state StateT | None

Optional agent state to attach.

None
Source code in agentflow/utils/command.py
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
def __init__(
    self,
    update: Union["StateT", None, Message, str, "BaseConverter"] = None,
    goto: str | None = None,
    graph: str | None = None,
    state: StateT | None = None,
):
    """
    Initialize a Command object.

    Args:
        update (StateT | None | Message | str | BaseConverter): State update to apply.
        goto (str | None): Next node to execute (node name or END).
        graph (str | None): Which graph to navigate to (None for current, PARENT for parent).
        state (StateT | None): Optional agent state to attach.
    """
    self.update = update
    self.goto = goto
    self.graph = graph
    self.state = state
__repr__
__repr__()

Return a string representation of the Command object.

Returns:

Name Type Description
str str

String representation of the Command.

Source code in agentflow/utils/command.py
54
55
56
57
58
59
60
61
62
63
64
def __repr__(self) -> str:
    """
    Return a string representation of the Command object.

    Returns:
        str: String representation of the Command.
    """
    return (
        f"Command(update={self.update}, goto={self.goto}, \n"
        f" graph={self.graph}, state={self.state})"
    )

constants

Constants and enums for TAF agent graph execution and messaging.

This module defines special node names, message storage levels, execution states, and response granularity options for agent workflows.

Classes:

Name Description
ExecutionState

Graph execution states for agent workflows.

ResponseGranularity

Response granularity options for agent graph outputs.

StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
END Literal['__end__']
START Literal['__start__']

Attributes

END module-attribute
END = '__end__'
START module-attribute
START = '__start__'

Classes

ExecutionState

Bases: StrEnum

Graph execution states for agent workflows.

Values

RUNNING: Execution is in progress. PAUSED: Execution is paused. COMPLETED: Execution completed successfully. ERROR: Execution encountered an error. INTERRUPTED: Execution was interrupted. ABORTED: Execution was aborted. IDLE: Execution is idle.

Attributes:

Name Type Description
ABORTED
COMPLETED
ERROR
IDLE
INTERRUPTED
PAUSED
RUNNING
Source code in agentflow/utils/constants.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
class ExecutionState(StrEnum):
    """
    Graph execution states for agent workflows.

    Values:
        RUNNING: Execution is in progress.
        PAUSED: Execution is paused.
        COMPLETED: Execution completed successfully.
        ERROR: Execution encountered an error.
        INTERRUPTED: Execution was interrupted.
        ABORTED: Execution was aborted.
        IDLE: Execution is idle.
    """

    RUNNING = "running"
    PAUSED = "paused"
    COMPLETED = "completed"
    ERROR = "error"
    INTERRUPTED = "interrupted"
    ABORTED = "aborted"
    IDLE = "idle"
Attributes
ABORTED class-attribute instance-attribute
ABORTED = 'aborted'
COMPLETED class-attribute instance-attribute
COMPLETED = 'completed'
ERROR class-attribute instance-attribute
ERROR = 'error'
IDLE class-attribute instance-attribute
IDLE = 'idle'
INTERRUPTED class-attribute instance-attribute
INTERRUPTED = 'interrupted'
PAUSED class-attribute instance-attribute
PAUSED = 'paused'
RUNNING class-attribute instance-attribute
RUNNING = 'running'
ResponseGranularity

Bases: StrEnum

Response granularity options for agent graph outputs.

Values

FULL: State, latest messages. PARTIAL: Context, summary, latest messages. LOW: Only latest messages.

Attributes:

Name Type Description
FULL
LOW
PARTIAL
Source code in agentflow/utils/constants.py
55
56
57
58
59
60
61
62
63
64
65
66
67
class ResponseGranularity(StrEnum):
    """
    Response granularity options for agent graph outputs.

    Values:
        FULL: State, latest messages.
        PARTIAL: Context, summary, latest messages.
        LOW: Only latest messages.
    """

    FULL = "full"
    PARTIAL = "partial"
    LOW = "low"
Attributes
FULL class-attribute instance-attribute
FULL = 'full'
LOW class-attribute instance-attribute
LOW = 'low'
PARTIAL class-attribute instance-attribute
PARTIAL = 'partial'
StorageLevel

Message storage levels for agent state persistence.

Attributes:

Name Type Description
ALL

Save everything including tool calls.

MEDIUM

Only AI and human messages.

LOW

Only first human and last AI message.

Source code in agentflow/utils/constants.py
17
18
19
20
21
22
23
24
25
26
27
28
29
class StorageLevel:
    """
    Message storage levels for agent state persistence.

    Attributes:
        ALL: Save everything including tool calls.
        MEDIUM: Only AI and human messages.
        LOW: Only first human and last AI message.
    """

    ALL = "all"
    MEDIUM = "medium"
    LOW = "low"
Attributes
ALL class-attribute instance-attribute
ALL = 'all'
LOW class-attribute instance-attribute
LOW = 'low'
MEDIUM class-attribute instance-attribute
MEDIUM = 'medium'

converter

Message conversion utilities for TAF agent graphs.

This module provides helpers to convert Message objects and agent state into dicts suitable for LLM and tool invocation payloads.

Functions:

Name Description
convert_messages

Convert system prompts, agent state, and extra messages to a list of dicts for

Attributes:

Name Type Description
logger

Attributes

logger module-attribute
logger = getLogger(__name__)

Classes

Functions

convert_messages
convert_messages(system_prompts, state=None, extra_messages=None)

Convert system prompts, agent state, and extra messages to a list of dicts for LLM/tool payloads.

Parameters:

Name Type Description Default
system_prompts
list[dict[str, Any]]

List of system prompt dicts.

required
state
AgentState | None

Optional agent state containing context and summary.

None
extra_messages
list[Message] | None

Optional extra messages to include.

None

Returns:

Type Description
list[dict[str, Any]]

list[dict[str, Any]]: List of message dicts for payloads.

Raises:

Type Description
ValueError

If system_prompts is None.

Source code in agentflow/utils/converter.py
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def convert_messages(
    system_prompts: list[dict[str, Any]],
    state: Union["AgentState", None] = None,
    extra_messages: list[Message] | None = None,
) -> list[dict[str, Any]]:
    """
    Convert system prompts, agent state, and extra messages to a list of dicts for
    LLM/tool payloads.

    Args:
        system_prompts (list[dict[str, Any]]): List of system prompt dicts.
        state (AgentState | None): Optional agent state containing context and summary.
        extra_messages (list[Message] | None): Optional extra messages to include.

    Returns:
        list[dict[str, Any]]: List of message dicts for payloads.

    Raises:
        ValueError: If system_prompts is None.
    """
    if system_prompts is None:
        logger.error("System prompts are None")
        raise ValueError("System prompts cannot be None")

    res = []
    res += system_prompts

    if state and state.context_summary:
        summary = {
            "role": "assistant",
            "content": state.context_summary if state.context_summary else "",
        }
        res.append(summary)

    if state and state.context:
        for msg in state.context:
            formatted = _convert_dict(msg)
            if formatted:
                res.append(formatted)

    if extra_messages:
        for msg in extra_messages:
            formatted = _convert_dict(msg)
            if formatted:
                res.append(formatted)

    logger.debug("Number of Converted messages: %s", len(res))
    return res

id_generator

ID Generator Module

This module provides various strategies for generating unique identifiers. Each generator implements the BaseIDGenerator interface and specifies the type and size of IDs it produces.

Classes:

Name Description
AsyncIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

BaseIDGenerator

Abstract base class for ID generation strategies.

BigIntIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

DefaultIDGenerator

Default ID generator that returns empty strings.

HexIDGenerator

ID generator that produces hexadecimal strings.

IDType

Enumeration of supported ID types.

IntIDGenerator

ID generator that produces 32-bit random integers.

ShortIDGenerator

ID generator that produces short alphanumeric strings.

TimestampIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

UUIDGenerator

ID generator that produces UUID version 4 strings.

Classes

AsyncIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings asynchronously.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx). This generator provides an asynchronous interface for generating UUIDs.

Methods:

Name Description
generate

Asynchronously generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in agentflow/utils/id_generator.py
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
class AsyncIDGenerator(BaseIDGenerator):
    """
    ID generator that produces UUID version 4 strings asynchronously.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    This generator provides an asynchronous interface for generating UUIDs.
    """

    @property
    def id_type(self) -> IDType:
        """
        Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING).
        """
        return IDType.STRING

    async def generate(self) -> str:
        """
        Asynchronously generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        # Simulate async operation (e.g., if fetching from an external service)
        return str(uuid.uuid4())
Attributes
id_type property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING).

Functions
generate async
generate()

Asynchronously generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in agentflow/utils/id_generator.py
226
227
228
229
230
231
232
233
234
async def generate(self) -> str:
    """
    Asynchronously generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    # Simulate async operation (e.g., if fetching from an external service)
    return str(uuid.uuid4())
BaseIDGenerator

Bases: ABC

Abstract base class for ID generation strategies.

All ID generators must implement the id_type property and generate method.

Methods:

Name Description
generate

Generate a new unique ID.

Attributes:

Name Type Description
id_type IDType

Return the type of ID generated by this generator.

Source code in agentflow/utils/id_generator.py
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
class BaseIDGenerator(ABC):
    """Abstract base class for ID generation strategies.

    All ID generators must implement the id_type property and generate method.
    """

    @property
    @abstractmethod
    def id_type(self) -> IDType:
        """Return the type of ID generated by this generator.

        Returns:
            IDType: The type of ID (STRING, INTEGER, or BIGINT).
        """
        raise NotImplementedError("id_type method must be implemented")

    @abstractmethod
    def generate(self) -> str | int | Awaitable[str | int]:
        """Generate a new unique ID.

        Returns:
            str | int: A new unique identifier of the appropriate type.
        """
        raise NotImplementedError("generate method must be implemented")
Attributes
id_type abstractmethod property
id_type

Return the type of ID generated by this generator.

Returns:

Name Type Description
IDType IDType

The type of ID (STRING, INTEGER, or BIGINT).

Functions
generate abstractmethod
generate()

Generate a new unique ID.

Returns:

Type Description
str | int | Awaitable[str | int]

str | int: A new unique identifier of the appropriate type.

Source code in agentflow/utils/id_generator.py
41
42
43
44
45
46
47
48
@abstractmethod
def generate(self) -> str | int | Awaitable[str | int]:
    """Generate a new unique ID.

    Returns:
        str | int: A new unique identifier of the appropriate type.
    """
    raise NotImplementedError("generate method must be implemented")
BigIntIDGenerator

Bases: BaseIDGenerator

ID generator that produces big integer IDs based on current time in nanoseconds.

Generates IDs by multiplying current Unix timestamp by 1e9, resulting in large integers that are sortable by creation time. Typical size is 19-20 digits.

Methods:

Name Description
generate

Generate a new big integer ID based on current nanoseconds.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
class BigIntIDGenerator(BaseIDGenerator):
    """ID generator that produces big integer IDs based on current time in nanoseconds.

    Generates IDs by multiplying current Unix timestamp by 1e9, resulting in
    large integers that are sortable by creation time. Typical size is 19-20 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.BIGINT

    def generate(self) -> int:
        """Generate a new big integer ID based on current nanoseconds.

        Returns:
            int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
        """
        # Use current time in nanoseconds for higher uniqueness
        return int(time.time() * 1_000_000_000)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new big integer ID based on current nanoseconds.

Returns:

Name Type Description
int int

A large integer (19-20 digits) representing nanoseconds since Unix epoch.

Source code in agentflow/utils/id_generator.py
83
84
85
86
87
88
89
90
def generate(self) -> int:
    """Generate a new big integer ID based on current nanoseconds.

    Returns:
        int: A large integer (19-20 digits) representing nanoseconds since Unix epoch.
    """
    # Use current time in nanoseconds for higher uniqueness
    return int(time.time() * 1_000_000_000)
DefaultIDGenerator

Bases: BaseIDGenerator

Default ID generator that returns empty strings.

This generator is intended as a placeholder that can be configured to use framework defaults (typically UUID-based). Currently returns empty strings. If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Methods:

Name Description
generate

Generate a default ID (currently empty string).

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
class DefaultIDGenerator(BaseIDGenerator):
    """Default ID generator that returns empty strings.

    This generator is intended as a placeholder that can be configured
    to use framework defaults (typically UUID-based). Currently returns
    empty strings. If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a default ID (currently empty string).

        If empty string is returned, the framework will use its default
        UUID-based generator. If the framework is not configured to use
        UUID generation, it will fall back to UUID4.

        Returns:
            str: An empty string (framework will substitute with UUID).
        """
        # if you keep empty, then it will be used default
        # framework default which is UUID based
        # if framework not using then uuid 4 will be used
        return ""
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a default ID (currently empty string).

If empty string is returned, the framework will use its default UUID-based generator. If the framework is not configured to use UUID generation, it will fall back to UUID4.

Returns:

Name Type Description
str str

An empty string (framework will substitute with UUID).

Source code in agentflow/utils/id_generator.py
107
108
109
110
111
112
113
114
115
116
117
118
119
120
def generate(self) -> str:
    """Generate a default ID (currently empty string).

    If empty string is returned, the framework will use its default
    UUID-based generator. If the framework is not configured to use
    UUID generation, it will fall back to UUID4.

    Returns:
        str: An empty string (framework will substitute with UUID).
    """
    # if you keep empty, then it will be used default
    # framework default which is UUID based
    # if framework not using then uuid 4 will be used
    return ""
HexIDGenerator

Bases: BaseIDGenerator

ID generator that produces hexadecimal strings.

Generates cryptographically secure random hex strings of 32 characters (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).

Methods:

Name Description
generate

Generate a new 32-character hexadecimal string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
class HexIDGenerator(BaseIDGenerator):
    """ID generator that produces hexadecimal strings.

    Generates cryptographically secure random hex strings of 32 characters
    (representing 16 random bytes). Each character is a hexadecimal digit (0-9, a-f).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 32-character hexadecimal string.

        Returns:
            str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
        """
        return secrets.token_hex(16)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 32-character hexadecimal string.

Returns:

Name Type Description
str str

A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').

Source code in agentflow/utils/id_generator.py
154
155
156
157
158
159
160
def generate(self) -> str:
    """Generate a new 32-character hexadecimal string.

    Returns:
        str: A 32-character hex string (e.g., '1a2b3c4d5e6f7890abcdef1234567890').
    """
    return secrets.token_hex(16)
IDType

Bases: StrEnum

Enumeration of supported ID types.

Attributes:

Name Type Description
BIGINT
INTEGER
STRING
Source code in agentflow/utils/id_generator.py
17
18
19
20
21
22
class IDType(enum.StrEnum):
    """Enumeration of supported ID types."""

    STRING = "string"  # String-based IDs
    INTEGER = "integer"  # Integer-based IDs
    BIGINT = "bigint"  # Big integer IDs
Attributes
BIGINT class-attribute instance-attribute
BIGINT = 'bigint'
INTEGER class-attribute instance-attribute
INTEGER = 'integer'
STRING class-attribute instance-attribute
STRING = 'string'
IntIDGenerator

Bases: BaseIDGenerator

ID generator that produces 32-bit random integers.

Generates cryptographically secure random integers using secrets.randbits(32). Values range from 0 to 2^32 - 1 (4,294,967,295).

Methods:

Name Description
generate

Generate a new 32-bit random integer.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
class IntIDGenerator(BaseIDGenerator):
    """ID generator that produces 32-bit random integers.

    Generates cryptographically secure random integers using secrets.randbits(32).
    Values range from 0 to 2^32 - 1 (4,294,967,295).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new 32-bit random integer.

        Returns:
            int: A random integer between 0 and 4,294,967,295 (inclusive).
        """
        return secrets.randbits(32)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 32-bit random integer.

Returns:

Name Type Description
int int

A random integer between 0 and 4,294,967,295 (inclusive).

Source code in agentflow/utils/id_generator.py
134
135
136
137
138
139
140
def generate(self) -> int:
    """Generate a new 32-bit random integer.

    Returns:
        int: A random integer between 0 and 4,294,967,295 (inclusive).
    """
    return secrets.randbits(32)
ShortIDGenerator

Bases: BaseIDGenerator

ID generator that produces short alphanumeric strings.

Generates 8-character strings using uppercase/lowercase letters and digits. Each character is randomly chosen from 62 possible characters (26 + 26 + 10). Total possible combinations: 62^8 ≈ 2.18 x 10^14.

Methods:

Name Description
generate

Generate a new 8-character alphanumeric string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
class ShortIDGenerator(BaseIDGenerator):
    """ID generator that produces short alphanumeric strings.

    Generates 8-character strings using uppercase/lowercase letters and digits.
    Each character is randomly chosen from 62 possible characters (26 + 26 + 10).
    Total possible combinations: 62^8 ≈ 2.18 x 10^14.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new 8-character alphanumeric string.

        Returns:
            str: An 8-character string containing letters and digits
                 (e.g., 'Ab3XyZ9k').
        """
        alphabet = string.ascii_letters + string.digits
        return "".join(secrets.choice(alphabet) for _ in range(8))
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new 8-character alphanumeric string.

Returns:

Name Type Description
str str

An 8-character string containing letters and digits (e.g., 'Ab3XyZ9k').

Source code in agentflow/utils/id_generator.py
195
196
197
198
199
200
201
202
203
def generate(self) -> str:
    """Generate a new 8-character alphanumeric string.

    Returns:
        str: An 8-character string containing letters and digits
             (e.g., 'Ab3XyZ9k').
    """
    alphabet = string.ascii_letters + string.digits
    return "".join(secrets.choice(alphabet) for _ in range(8))
TimestampIDGenerator

Bases: BaseIDGenerator

ID generator that produces integer IDs based on current time in microseconds.

Generates IDs by multiplying current Unix timestamp by 1e6, resulting in integers that are sortable by creation time. Typical size is 16-17 digits.

Methods:

Name Description
generate

Generate a new integer ID based on current microseconds.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
class TimestampIDGenerator(BaseIDGenerator):
    """ID generator that produces integer IDs based on current time in microseconds.

    Generates IDs by multiplying current Unix timestamp by 1e6, resulting in
    integers that are sortable by creation time. Typical size is 16-17 digits.
    """

    @property
    def id_type(self) -> IDType:
        return IDType.INTEGER

    def generate(self) -> int:
        """Generate a new integer ID based on current microseconds.

        Returns:
            int: An integer (16-17 digits) representing microseconds since Unix epoch.
        """
        return int(time.time() * 1000000)
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new integer ID based on current microseconds.

Returns:

Name Type Description
int int

An integer (16-17 digits) representing microseconds since Unix epoch.

Source code in agentflow/utils/id_generator.py
174
175
176
177
178
179
180
def generate(self) -> int:
    """Generate a new integer ID based on current microseconds.

    Returns:
        int: An integer (16-17 digits) representing microseconds since Unix epoch.
    """
    return int(time.time() * 1000000)
UUIDGenerator

Bases: BaseIDGenerator

ID generator that produces UUID version 4 strings.

UUIDs are 128-bit identifiers that are virtually guaranteed to be unique across space and time. The generated strings are 36 characters long (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).

Methods:

Name Description
generate

Generate a new UUID4 string.

Attributes:

Name Type Description
id_type IDType
Source code in agentflow/utils/id_generator.py
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
class UUIDGenerator(BaseIDGenerator):
    """ID generator that produces UUID version 4 strings.

    UUIDs are 128-bit identifiers that are virtually guaranteed to be unique
    across space and time. The generated strings are 36 characters long
    (32 hexadecimal digits + 4 hyphens in the format xxxxxxxx-xxxx-xxxx-xxxx-xxxxxxxxxxxx).
    """

    @property
    def id_type(self) -> IDType:
        return IDType.STRING

    def generate(self) -> str:
        """Generate a new UUID4 string.

        Returns:
            str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
        """
        return str(uuid.uuid4())
Attributes
id_type property
id_type
Functions
generate
generate()

Generate a new UUID4 string.

Returns:

Name Type Description
str str

A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').

Source code in agentflow/utils/id_generator.py
63
64
65
66
67
68
69
def generate(self) -> str:
    """Generate a new UUID4 string.

    Returns:
        str: A 36-character UUID string (e.g., '550e8400-e29b-41d4-a716-446655440000').
    """
    return str(uuid.uuid4())

logging

Centralized logging configuration for TAF.

This module provides logging configuration that can be imported and used throughout the project. Each module should use:

import logging
logger = logging.getLogger(__name__)

This ensures proper hierarchical logging with module-specific loggers.

Typical usage example

from agentflow.utils.logging import configure_logging configure_logging(level=logging.DEBUG)

Functions:

Name Description
configure_logging

Configures the root logger for the TAF project.

Functions

configure_logging
configure_logging(level=logging.INFO, format_string=None, handler=None)

Configures the root logger for the TAF project.

This function sets up logging for all modules under the 'agentflow' namespace. It ensures that logs are formatted consistently and sent to the appropriate handler.

Parameters:

Name Type Description Default
level
int

Logging level (e.g., logging.INFO, logging.DEBUG). Defaults to logging.INFO.

INFO
format_string
str

Custom format string for log messages. If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".

None
handler
Handler

Custom logging handler. If None, uses StreamHandler to stdout.

None

Returns:

Type Description
None

None

Example

configure_logging(level=logging.DEBUG) logger = logging.getLogger("agentflow.module") logger.info("This is an info message.")

Source code in agentflow/utils/logging.py
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
def configure_logging(
    level: int = logging.INFO,
    format_string: str | None = None,
    handler: logging.Handler | None = None,
) -> None:
    """
    Configures the root logger for the TAF project.

    This function sets up logging for all modules under the 'agentflow' namespace.
    It ensures that logs are formatted consistently and sent to the appropriate handler.

    Args:
        level (int, optional): Logging level (e.g., logging.INFO, logging.DEBUG).
            Defaults to logging.INFO.
        format_string (str, optional): Custom format string for log messages.
            If None, uses a default format: "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s".
        handler (logging.Handler, optional): Custom logging handler. If None,
            uses StreamHandler to stdout.

    Returns:
        None

    Raises:
        None

    Example:
        >>> configure_logging(level=logging.DEBUG)
        >>> logger = logging.getLogger("agentflow.module")
        >>> logger.info("This is an info message.")
    """
    if format_string is None:
        format_string = "[%(asctime)s] %(levelname)-8s %(name)s: %(message)s"

    if handler is None:
        handler = logging.StreamHandler(sys.stdout)

    formatter = logging.Formatter(format_string)
    handler.setFormatter(formatter)

    # Configure root logger for agentflow
    root_logger = logging.getLogger("agentflow")
    root_logger.setLevel(level)

    # Only add handler if none exists to avoid duplicates
    if not root_logger.handlers:
        root_logger.addHandler(handler)

    # Prevent propagation to avoid duplicate logs
    root_logger.propagate = False

metrics

Lightweight metrics instrumentation utilities.

Design goals
  • Zero dependency by default.
  • Cheap no-op when disabled.
  • Pluggable exporter (e.g., Prometheus scrape formatting) later.
Usage

from agentflow.utils.metrics import counter, timer counter('messages_written_total').inc() with timer('db_write_latency_ms'): ...

Classes:

Name Description
Counter
TimerMetric

Functions:

Name Description
counter
enable_metrics
snapshot

Return a point-in-time snapshot of metrics (thread-safe copy).

timer

Classes

Counter dataclass

Methods:

Name Description
__init__
inc

Attributes:

Name Type Description
name str
value int
Source code in agentflow/utils/metrics.py
34
35
36
37
38
39
40
41
42
43
@dataclass
class Counter:
    name: str
    value: int = 0

    def inc(self, amount: int = 1) -> None:
        if not _ENABLED:
            return
        with _LOCK:
            self.value += amount
Attributes
name instance-attribute
name
value class-attribute instance-attribute
value = 0
Functions
__init__
__init__(name, value=0)
inc
inc(amount=1)
Source code in agentflow/utils/metrics.py
39
40
41
42
43
def inc(self, amount: int = 1) -> None:
    if not _ENABLED:
        return
    with _LOCK:
        self.value += amount
TimerMetric dataclass

Methods:

Name Description
__init__
observe

Attributes:

Name Type Description
avg_ms float
count int
max_ms float
name str
total_ms float
Source code in agentflow/utils/metrics.py
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
@dataclass
class TimerMetric:
    name: str
    count: int = 0
    total_ms: float = 0.0
    max_ms: float = 0.0

    def observe(self, duration_ms: float) -> None:
        if not _ENABLED:
            return
        with _LOCK:
            self.count += 1
            self.total_ms += duration_ms
            self.max_ms = max(self.max_ms, duration_ms)

    @property
    def avg_ms(self) -> float:
        if self.count == 0:
            return 0.0
        return self.total_ms / self.count
Attributes
avg_ms property
avg_ms
count class-attribute instance-attribute
count = 0
max_ms class-attribute instance-attribute
max_ms = 0.0
name instance-attribute
name
total_ms class-attribute instance-attribute
total_ms = 0.0
Functions
__init__
__init__(name, count=0, total_ms=0.0, max_ms=0.0)
observe
observe(duration_ms)
Source code in agentflow/utils/metrics.py
53
54
55
56
57
58
59
def observe(self, duration_ms: float) -> None:
    if not _ENABLED:
        return
    with _LOCK:
        self.count += 1
        self.total_ms += duration_ms
        self.max_ms = max(self.max_ms, duration_ms)

Functions

counter
counter(name)
Source code in agentflow/utils/metrics.py
68
69
70
71
72
73
74
def counter(name: str) -> Counter:
    with _LOCK:
        c = _COUNTERS.get(name)
        if c is None:
            c = Counter(name)
            _COUNTERS[name] = c
        return c
enable_metrics
enable_metrics(value)
Source code in agentflow/utils/metrics.py
29
30
31
def enable_metrics(value: bool) -> None:  # simple toggle; acceptable global
    # Intentionally keeps a module-level switch—call sites cheap check.
    globals()["_ENABLED"] = value
snapshot
snapshot()

Return a point-in-time snapshot of metrics (thread-safe copy).

Source code in agentflow/utils/metrics.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
def snapshot() -> dict:
    """Return a point-in-time snapshot of metrics (thread-safe copy)."""
    with _LOCK:
        return {
            "counters": {k: v.value for k, v in _COUNTERS.items()},
            "timers": {
                k: {
                    "count": t.count,
                    "avg_ms": t.avg_ms,
                    "max_ms": t.max_ms,
                }
                for k, t in _TIMERS.items()
            },
        }
timer
timer(name)
Source code in agentflow/utils/metrics.py
77
78
79
80
81
82
83
84
85
def timer(name: str) -> _TimerCtx:  # convenience factory
    metric = _TIMERS.get(name)
    if metric is None:
        with _LOCK:
            metric = _TIMERS.get(name)
            if metric is None:
                metric = TimerMetric(name)
                _TIMERS[name] = metric
    return _TimerCtx(metric)

thread_info

Thread metadata and status tracking for agent graphs.

This module defines the ThreadInfo model, which tracks thread identity, user, metadata, status, and timestamps for agent graph execution and orchestration.

Classes:

Name Description
ThreadInfo

Metadata and status for a thread in agent execution.

Classes

ThreadInfo

Bases: BaseModel

Metadata and status for a thread in agent execution.

Attributes:

Name Type Description
thread_id int | str

Unique identifier for the thread.

thread_name str | None

Optional name for the thread.

user_id int | str | None

Optional user identifier associated with the thread.

metadata dict[str, Any] | None

Optional metadata for the thread.

updated_at datetime | None

Timestamp of last update.

stop_requested bool

Whether a stop has been requested for the thread.

run_id str | None

Optional run identifier for the thread execution.

Example

ThreadInfo(thread_id=1, thread_name="main", user_id=42)

Source code in agentflow/utils/thread_info.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
class ThreadInfo(BaseModel):
    """
    Metadata and status for a thread in agent execution.

    Attributes:
        thread_id (int | str): Unique identifier for the thread.
        thread_name (str | None): Optional name for the thread.
        user_id (int | str | None): Optional user identifier associated with the thread.
        metadata (dict[str, Any] | None): Optional metadata for the thread.
        updated_at (datetime | None): Timestamp of last update.
        stop_requested (bool): Whether a stop has been requested for the thread.
        run_id (str | None): Optional run identifier for the thread execution.

    Example:
        >>> ThreadInfo(thread_id=1, thread_name="main", user_id=42)
    """

    thread_id: int | str
    thread_name: str | None = None
    user_id: int | str | None = None
    metadata: dict[str, Any] | None = None
    updated_at: datetime | None = None
    run_id: str | None = None
Attributes
metadata class-attribute instance-attribute
metadata = None
run_id class-attribute instance-attribute
run_id = None
thread_id instance-attribute
thread_id
thread_name class-attribute instance-attribute
thread_name = None
updated_at class-attribute instance-attribute
updated_at = None
user_id class-attribute instance-attribute
user_id = None

thread_name_generator

Thread name generation utilities for AI agent conversations.

This module provides the AIThreadNameGenerator class and helper function for generating meaningful, varied, and human-friendly thread names for AI chat sessions using different patterns and themes.

Classes:

Name Description
AIThreadNameGenerator

Generates thread names using adjective-noun, action-based,

Functions:

Name Description
generate_dummy_thread_name

Convenience function for generating a thread name.

Classes

AIThreadNameGenerator

Generates meaningful, varied thread names for AI conversations using different patterns and themes. Patterns include adjective-noun, action-based, and compound descriptive names.

Example

AIThreadNameGenerator().generate_name() 'thoughtful-dialogue'

Methods:

Name Description
generate_action_name

Generate an action-based thread name for a more dynamic feel.

generate_compound_name

Generate a compound descriptive thread name.

generate_name

Generate a meaningful thread name using random pattern selection.

generate_simple_name

Generate a simple adjective-noun combination for a thread name.

Attributes:

Name Type Description
ACTION_PATTERNS
ADJECTIVES
COMPOUND_PATTERNS
NOUNS
Source code in agentflow/utils/thread_name_generator.py
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
class AIThreadNameGenerator:
    """
    Generates meaningful, varied thread names for AI conversations using different
    patterns and themes. Patterns include adjective-noun, action-based, and compound
    descriptive names.

    Example:
        >>> AIThreadNameGenerator().generate_name()
        'thoughtful-dialogue'
    """

    # Enhanced adjectives grouped by semantic meaning
    ADJECTIVES = [
        # Intellectual
        "thoughtful",
        "insightful",
        "analytical",
        "logical",
        "strategic",
        "methodical",
        "systematic",
        "comprehensive",
        "detailed",
        "precise",
        # Creative
        "creative",
        "imaginative",
        "innovative",
        "artistic",
        "expressive",
        "original",
        "inventive",
        "inspired",
        "visionary",
        "whimsical",
        # Emotional/Social
        "engaging",
        "collaborative",
        "meaningful",
        "productive",
        "harmonious",
        "enlightening",
        "empathetic",
        "supportive",
        "encouraging",
        "uplifting",
        # Dynamic
        "dynamic",
        "energetic",
        "vibrant",
        "lively",
        "spirited",
        "active",
        "flowing",
        "adaptive",
        "responsive",
        "interactive",
        # Quality-focused
        "focused",
        "dedicated",
        "thorough",
        "meticulous",
        "careful",
        "patient",
        "persistent",
        "resilient",
        "determined",
        "ambitious",
    ]

    # Enhanced nouns with more conversational context
    NOUNS = [
        # Conversation-related
        "dialogue",
        "conversation",
        "discussion",
        "exchange",
        "chat",
        "consultation",
        "session",
        "meeting",
        "interaction",
        "communication",
        # Journey/Process
        "journey",
        "exploration",
        "adventure",
        "quest",
        "voyage",
        "expedition",
        "discovery",
        "investigation",
        "research",
        "study",
        # Conceptual
        "insight",
        "vision",
        "perspective",
        "understanding",
        "wisdom",
        "knowledge",
        "learning",
        "growth",
        "development",
        "progress",
        # Solution-oriented
        "solution",
        "approach",
        "strategy",
        "method",
        "framework",
        "plan",
        "blueprint",
        "pathway",
        "route",
        "direction",
        # Creative/Abstract
        "canvas",
        "story",
        "narrative",
        "symphony",
        "composition",
        "creation",
        "masterpiece",
        "design",
        "pattern",
        "concept",
        # Collaborative
        "partnership",
        "collaboration",
        "alliance",
        "connection",
        "bond",
        "synergy",
        "harmony",
        "unity",
        "cooperation",
        "teamwork",
    ]

    # Action-based patterns for more dynamic names
    ACTION_PATTERNS = {
        "exploring": ["ideas", "concepts", "possibilities", "mysteries", "frontiers", "depths"],
        "building": ["solutions", "understanding", "connections", "frameworks", "bridges"],
        "discovering": ["insights", "patterns", "answers", "truths", "secrets", "wisdom"],
        "crafting": ["responses", "solutions", "stories", "strategies", "experiences"],
        "navigating": ["challenges", "questions", "complexities", "territories", "paths"],
        "unlocking": ["potential", "mysteries", "possibilities", "creativity", "knowledge"],
        "weaving": ["ideas", "stories", "connections", "patterns", "narratives"],
        "illuminating": ["concepts", "mysteries", "paths", "truths", "possibilities"],
    }

    # Descriptive compound patterns
    COMPOUND_PATTERNS = [
        ("deep", ["dive", "thought", "reflection", "analysis", "exploration"]),
        ("bright", ["spark", "idea", "insight", "moment", "flash"]),
        ("fresh", ["perspective", "approach", "start", "take", "view"]),
        ("open", ["dialogue", "discussion", "conversation", "exchange", "forum"]),
        ("creative", ["flow", "spark", "burst", "stream", "wave"]),
        ("mindful", ["moment", "pause", "reflection", "consideration", "thought"]),
        ("collaborative", ["effort", "venture", "journey", "exploration", "creation"]),
    ]

    def generate_simple_name(self, separator: str = "-") -> str:
        """
        Generate a simple adjective-noun combination for a thread name.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "thoughtful-dialogue" or "creative-exploration".

        Example:
            >>> AIThreadNameGenerator().generate_simple_name()
            'creative-exploration'
        """
        adj = secrets.choice(self.ADJECTIVES)
        noun = secrets.choice(self.NOUNS)
        return f"{adj}{separator}{noun}"

    def generate_action_name(self, separator: str = "-") -> str:
        """
        Generate an action-based thread name for a more dynamic feel.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "exploring-ideas" or "building-understanding".

        Example:
            >>> AIThreadNameGenerator().generate_action_name()
            'building-connections'
        """
        action = secrets.choice(list(self.ACTION_PATTERNS.keys()))
        target = secrets.choice(self.ACTION_PATTERNS[action])
        return f"{action}{separator}{target}"

    def generate_compound_name(self, separator: str = "-") -> str:
        """
        Generate a compound descriptive thread name.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: Name like "deep-dive" or "bright-spark".

        Example:
            >>> AIThreadNameGenerator().generate_compound_name()
            'deep-reflection'
        """
        base, options = secrets.choice(self.COMPOUND_PATTERNS)
        complement = secrets.choice(options)
        return f"{base}{separator}{complement}"

    def generate_name(self, separator: str = "-") -> str:
        """
        Generate a meaningful thread name using random pattern selection.

        Args:
            separator (str): String to separate words (default: "-").

        Returns:
            str: A meaningful thread name from various patterns.

        Example:
            >>> AIThreadNameGenerator().generate_name()
            'engaging-discussion'
        """
        # Randomly choose between different naming patterns
        pattern = secrets.choice(["simple", "action", "compound"])

        if pattern == "simple":
            return self.generate_simple_name(separator)
        if pattern == "action":
            return self.generate_action_name(separator)
        # compound
        return self.generate_compound_name(separator)
Attributes
ACTION_PATTERNS class-attribute instance-attribute
ACTION_PATTERNS = {'exploring': ['ideas', 'concepts', 'possibilities', 'mysteries', 'frontiers', 'depths'], 'building': ['solutions', 'understanding', 'connections', 'frameworks', 'bridges'], 'discovering': ['insights', 'patterns', 'answers', 'truths', 'secrets', 'wisdom'], 'crafting': ['responses', 'solutions', 'stories', 'strategies', 'experiences'], 'navigating': ['challenges', 'questions', 'complexities', 'territories', 'paths'], 'unlocking': ['potential', 'mysteries', 'possibilities', 'creativity', 'knowledge'], 'weaving': ['ideas', 'stories', 'connections', 'patterns', 'narratives'], 'illuminating': ['concepts', 'mysteries', 'paths', 'truths', 'possibilities']}
ADJECTIVES class-attribute instance-attribute
ADJECTIVES = ['thoughtful', 'insightful', 'analytical', 'logical', 'strategic', 'methodical', 'systematic', 'comprehensive', 'detailed', 'precise', 'creative', 'imaginative', 'innovative', 'artistic', 'expressive', 'original', 'inventive', 'inspired', 'visionary', 'whimsical', 'engaging', 'collaborative', 'meaningful', 'productive', 'harmonious', 'enlightening', 'empathetic', 'supportive', 'encouraging', 'uplifting', 'dynamic', 'energetic', 'vibrant', 'lively', 'spirited', 'active', 'flowing', 'adaptive', 'responsive', 'interactive', 'focused', 'dedicated', 'thorough', 'meticulous', 'careful', 'patient', 'persistent', 'resilient', 'determined', 'ambitious']
COMPOUND_PATTERNS class-attribute instance-attribute
COMPOUND_PATTERNS = [('deep', ['dive', 'thought', 'reflection', 'analysis', 'exploration']), ('bright', ['spark', 'idea', 'insight', 'moment', 'flash']), ('fresh', ['perspective', 'approach', 'start', 'take', 'view']), ('open', ['dialogue', 'discussion', 'conversation', 'exchange', 'forum']), ('creative', ['flow', 'spark', 'burst', 'stream', 'wave']), ('mindful', ['moment', 'pause', 'reflection', 'consideration', 'thought']), ('collaborative', ['effort', 'venture', 'journey', 'exploration', 'creation'])]
NOUNS class-attribute instance-attribute
NOUNS = ['dialogue', 'conversation', 'discussion', 'exchange', 'chat', 'consultation', 'session', 'meeting', 'interaction', 'communication', 'journey', 'exploration', 'adventure', 'quest', 'voyage', 'expedition', 'discovery', 'investigation', 'research', 'study', 'insight', 'vision', 'perspective', 'understanding', 'wisdom', 'knowledge', 'learning', 'growth', 'development', 'progress', 'solution', 'approach', 'strategy', 'method', 'framework', 'plan', 'blueprint', 'pathway', 'route', 'direction', 'canvas', 'story', 'narrative', 'symphony', 'composition', 'creation', 'masterpiece', 'design', 'pattern', 'concept', 'partnership', 'collaboration', 'alliance', 'connection', 'bond', 'synergy', 'harmony', 'unity', 'cooperation', 'teamwork']
Functions
generate_action_name
generate_action_name(separator='-')

Generate an action-based thread name for a more dynamic feel.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "exploring-ideas" or "building-understanding".

Example

AIThreadNameGenerator().generate_action_name() 'building-connections'

Source code in agentflow/utils/thread_name_generator.py
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
def generate_action_name(self, separator: str = "-") -> str:
    """
    Generate an action-based thread name for a more dynamic feel.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "exploring-ideas" or "building-understanding".

    Example:
        >>> AIThreadNameGenerator().generate_action_name()
        'building-connections'
    """
    action = secrets.choice(list(self.ACTION_PATTERNS.keys()))
    target = secrets.choice(self.ACTION_PATTERNS[action])
    return f"{action}{separator}{target}"
generate_compound_name
generate_compound_name(separator='-')

Generate a compound descriptive thread name.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "deep-dive" or "bright-spark".

Example

AIThreadNameGenerator().generate_compound_name() 'deep-reflection'

Source code in agentflow/utils/thread_name_generator.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def generate_compound_name(self, separator: str = "-") -> str:
    """
    Generate a compound descriptive thread name.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "deep-dive" or "bright-spark".

    Example:
        >>> AIThreadNameGenerator().generate_compound_name()
        'deep-reflection'
    """
    base, options = secrets.choice(self.COMPOUND_PATTERNS)
    complement = secrets.choice(options)
    return f"{base}{separator}{complement}"
generate_name
generate_name(separator='-')

Generate a meaningful thread name using random pattern selection.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

A meaningful thread name from various patterns.

Example

AIThreadNameGenerator().generate_name() 'engaging-discussion'

Source code in agentflow/utils/thread_name_generator.py
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
def generate_name(self, separator: str = "-") -> str:
    """
    Generate a meaningful thread name using random pattern selection.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: A meaningful thread name from various patterns.

    Example:
        >>> AIThreadNameGenerator().generate_name()
        'engaging-discussion'
    """
    # Randomly choose between different naming patterns
    pattern = secrets.choice(["simple", "action", "compound"])

    if pattern == "simple":
        return self.generate_simple_name(separator)
    if pattern == "action":
        return self.generate_action_name(separator)
    # compound
    return self.generate_compound_name(separator)
generate_simple_name
generate_simple_name(separator='-')

Generate a simple adjective-noun combination for a thread name.

Parameters:

Name Type Description Default
separator str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

Name like "thoughtful-dialogue" or "creative-exploration".

Example

AIThreadNameGenerator().generate_simple_name() 'creative-exploration'

Source code in agentflow/utils/thread_name_generator.py
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
def generate_simple_name(self, separator: str = "-") -> str:
    """
    Generate a simple adjective-noun combination for a thread name.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: Name like "thoughtful-dialogue" or "creative-exploration".

    Example:
        >>> AIThreadNameGenerator().generate_simple_name()
        'creative-exploration'
    """
    adj = secrets.choice(self.ADJECTIVES)
    noun = secrets.choice(self.NOUNS)
    return f"{adj}{separator}{noun}"

Functions

generate_dummy_thread_name
generate_dummy_thread_name(separator='-')

Generate a meaningful English name for an AI chat thread.

Parameters:

Name Type Description Default
separator
str

String to separate words (default: "-").

'-'

Returns:

Name Type Description
str str

A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

Example

generate_dummy_thread_name() 'creative-exploration'

Source code in agentflow/utils/thread_name_generator.py
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
def generate_dummy_thread_name(separator: str = "-") -> str:
    """
    Generate a meaningful English name for an AI chat thread.

    Args:
        separator (str): String to separate words (default: "-").

    Returns:
        str: A meaningful thread name like 'thoughtful-dialogue', 'exploring-ideas', or 'deep-dive'.

    Example:
        >>> generate_dummy_thread_name()
        'creative-exploration'
    """
    generator = AIThreadNameGenerator()
    return generator.generate_name(separator)