Skip to content

Model response converter

Classes:

Name Description
ModelResponseConverter

Wrap an LLM SDK call and normalize its output via a converter.

Classes

ModelResponseConverter

Wrap an LLM SDK call and normalize its output via a converter.

Supports sync/async invocation and streaming. Use invoke() for non-streaming calls and stream() for streaming calls.

Methods:

Name Description
__init__

Initialize ModelResponseConverter.

invoke

Call the underlying function and convert a non-streaming response to Message.

stream

Call the underlying function and yield normalized streaming events and final Message.

Attributes:

Name Type Description
converter
response
Source code in pyagenity/adapters/llm/model_response_converter.py
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
class ModelResponseConverter:
    """Wrap an LLM SDK call and normalize its output via a converter.

    Supports sync/async invocation and streaming. Use `invoke()` for
    non-streaming calls and `stream()` for streaming calls.
    """

    def __init__(
        self,
        response: Any | Callable[..., Any],
        converter: BaseConverter | str,
    ) -> None:
        """
        Initialize ModelResponseConverter.

        Args:
            response (Any | Callable[..., Any]): The LLM response or a callable returning
                a response.
            converter (BaseConverter | str): Converter instance or string identifier
                (e.g., "litellm").

        Raises:
            ValueError: If the converter is not supported.
        """
        self.response = response

        if isinstance(converter, str) and converter == "litellm":
            from .litellm_converter import LiteLLMConverter

            self.converter = LiteLLMConverter()
        elif isinstance(converter, BaseConverter):
            self.converter = converter
        else:
            raise ValueError(f"Unsupported converter: {converter}")

    async def invoke(self) -> Message:
        """
        Call the underlying function and convert a non-streaming response to Message.

        Returns:
            Message: The normalized message from the LLM response.

        Raises:
            Exception: If the underlying function or converter fails.
        """
        if callable(self.response):
            if inspect.iscoroutinefunction(self.response):
                response = await self.response()
            else:
                response = self.response()
        else:
            response = self.response

        return await self.converter.convert_response(response)  # type: ignore

    async def stream(
        self,
        config: dict,
        node_name: str,
        meta: dict | None = None,
    ) -> AsyncGenerator[Message]:
        """
        Call the underlying function and yield normalized streaming events and final Message.

        Args:
            config (dict): Node configuration parameters for streaming.
            node_name (str): Name of the node processing the response.
            meta (dict | None): Optional metadata for conversion.

        Yields:
            Message: Normalized streaming message events from the LLM response.

        Raises:
            ValueError: If config is not provided.
            Exception: If the underlying function or converter fails.
        """
        if not config:
            raise ValueError("Config must be provided for streaming conversion")

        if callable(self.response):
            if inspect.iscoroutinefunction(self.response):
                response = await self.response()
            else:
                response = self.response()
        else:
            response = self.response

        async for item in self.converter.convert_streaming_response(  # type: ignore
            config,
            node_name=node_name,
            response=response,
            meta=meta,
        ):
            yield item

Attributes

converter instance-attribute
converter = LiteLLMConverter()
response instance-attribute
response = response

Functions

__init__
__init__(response, converter)

Initialize ModelResponseConverter.

Parameters:

Name Type Description Default
response
Any | Callable[..., Any]

The LLM response or a callable returning a response.

required
converter
BaseConverter | str

Converter instance or string identifier (e.g., "litellm").

required

Raises:

Type Description
ValueError

If the converter is not supported.

Source code in pyagenity/adapters/llm/model_response_converter.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
def __init__(
    self,
    response: Any | Callable[..., Any],
    converter: BaseConverter | str,
) -> None:
    """
    Initialize ModelResponseConverter.

    Args:
        response (Any | Callable[..., Any]): The LLM response or a callable returning
            a response.
        converter (BaseConverter | str): Converter instance or string identifier
            (e.g., "litellm").

    Raises:
        ValueError: If the converter is not supported.
    """
    self.response = response

    if isinstance(converter, str) and converter == "litellm":
        from .litellm_converter import LiteLLMConverter

        self.converter = LiteLLMConverter()
    elif isinstance(converter, BaseConverter):
        self.converter = converter
    else:
        raise ValueError(f"Unsupported converter: {converter}")
invoke async
invoke()

Call the underlying function and convert a non-streaming response to Message.

Returns:

Name Type Description
Message Message

The normalized message from the LLM response.

Raises:

Type Description
Exception

If the underlying function or converter fails.

Source code in pyagenity/adapters/llm/model_response_converter.py
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
async def invoke(self) -> Message:
    """
    Call the underlying function and convert a non-streaming response to Message.

    Returns:
        Message: The normalized message from the LLM response.

    Raises:
        Exception: If the underlying function or converter fails.
    """
    if callable(self.response):
        if inspect.iscoroutinefunction(self.response):
            response = await self.response()
        else:
            response = self.response()
    else:
        response = self.response

    return await self.converter.convert_response(response)  # type: ignore
stream async
stream(config, node_name, meta=None)

Call the underlying function and yield normalized streaming events and final Message.

Parameters:

Name Type Description Default
config
dict

Node configuration parameters for streaming.

required
node_name
str

Name of the node processing the response.

required
meta
dict | None

Optional metadata for conversion.

None

Yields:

Name Type Description
Message AsyncGenerator[Message]

Normalized streaming message events from the LLM response.

Raises:

Type Description
ValueError

If config is not provided.

Exception

If the underlying function or converter fails.

Source code in pyagenity/adapters/llm/model_response_converter.py
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
async def stream(
    self,
    config: dict,
    node_name: str,
    meta: dict | None = None,
) -> AsyncGenerator[Message]:
    """
    Call the underlying function and yield normalized streaming events and final Message.

    Args:
        config (dict): Node configuration parameters for streaming.
        node_name (str): Name of the node processing the response.
        meta (dict | None): Optional metadata for conversion.

    Yields:
        Message: Normalized streaming message events from the LLM response.

    Raises:
        ValueError: If config is not provided.
        Exception: If the underlying function or converter fails.
    """
    if not config:
        raise ValueError("Config must be provided for streaming conversion")

    if callable(self.response):
        if inspect.iscoroutinefunction(self.response):
            response = await self.response()
        else:
            response = self.response()
    else:
        response = self.response

    async for item in self.converter.convert_streaming_response(  # type: ignore
        config,
        node_name=node_name,
        response=response,
        meta=meta,
    ):
        yield item