Skip to content

Console publisher

Console publisher implementation for debugging and testing.

This module provides a publisher that outputs events to the console for development and debugging purposes.

Classes:

Name Description
ConsolePublisher

Publisher that prints events to the console for debugging and testing.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

ConsolePublisher

Bases: BasePublisher

Publisher that prints events to the console for debugging and testing.

This publisher is useful for development and debugging purposes, as it outputs event information to the standard output.

Attributes:

Name Type Description
format

Output format ('json' by default).

include_timestamp

Whether to include timestamp (True by default).

indent

Indentation for output (2 by default).

Methods:

Name Description
__init__

Initialize the ConsolePublisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event to the console.

sync_close

Synchronously close the publisher and release any resources.

Source code in pyagenity/publisher/console_publisher.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
class ConsolePublisher(BasePublisher):
    """Publisher that prints events to the console for debugging and testing.

    This publisher is useful for development and debugging purposes, as it outputs event information
    to the standard output.

    Attributes:
        format: Output format ('json' by default).
        include_timestamp: Whether to include timestamp (True by default).
        indent: Indentation for output (2 by default).
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the ConsolePublisher with the given configuration.

        Args:
            config: Configuration dictionary. Supported keys:
                - format: Output format (default: 'json').
                - include_timestamp: Whether to include timestamp (default: True).
                - indent: Indentation for output (default: 2).
        """
        super().__init__(config or {})
        self.format = config.get("format", "json") if config else "json"
        self.include_timestamp = config.get("include_timestamp", True) if config else True
        self.indent = config.get("indent", 2) if config else 2

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to the console.

        Args:
            event: The event to publish.

        Returns:
            None
        """
        msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
        msg += f"-> Payload: {event.data}"
        msg += f" -> {event.metadata}"
        print(msg)  # noqa: T201

    async def close(self):
        """Close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher closed")

    def sync_close(self):
        """Synchronously close the publisher and release any resources.

        ConsolePublisher does not require cleanup, but this method is provided for
        interface compatibility.
        """
        logger.debug("ConsolePublisher sync closed")

Attributes

config instance-attribute
config = config
format instance-attribute
format = get('format', 'json') if config else 'json'
include_timestamp instance-attribute
include_timestamp = get('include_timestamp', True) if config else True
indent instance-attribute
indent = get('indent', 2) if config else 2

Functions

__init__
__init__(config=None)

Initialize the ConsolePublisher with the given configuration.

Parameters:

Name Type Description Default
config
dict[str, Any] | None

Configuration dictionary. Supported keys: - format: Output format (default: 'json'). - include_timestamp: Whether to include timestamp (default: True). - indent: Indentation for output (default: 2).

None
Source code in pyagenity/publisher/console_publisher.py
29
30
31
32
33
34
35
36
37
38
39
40
41
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the ConsolePublisher with the given configuration.

    Args:
        config: Configuration dictionary. Supported keys:
            - format: Output format (default: 'json').
            - include_timestamp: Whether to include timestamp (default: True).
            - indent: Indentation for output (default: 2).
    """
    super().__init__(config or {})
    self.format = config.get("format", "json") if config else "json"
    self.include_timestamp = config.get("include_timestamp", True) if config else True
    self.indent = config.get("indent", 2) if config else 2
close async
close()

Close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
57
58
59
60
61
62
63
async def close(self):
    """Close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher closed")
publish async
publish(event)

Publish an event to the console.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

None

Source code in pyagenity/publisher/console_publisher.py
43
44
45
46
47
48
49
50
51
52
53
54
55
async def publish(self, event: EventModel) -> Any:
    """Publish an event to the console.

    Args:
        event: The event to publish.

    Returns:
        None
    """
    msg = f"{event.timestamp} -> Source: {event.node_name}.{event.event_type}:"
    msg += f"-> Payload: {event.data}"
    msg += f" -> {event.metadata}"
    print(msg)  # noqa: T201
sync_close
sync_close()

Synchronously close the publisher and release any resources.

ConsolePublisher does not require cleanup, but this method is provided for interface compatibility.

Source code in pyagenity/publisher/console_publisher.py
65
66
67
68
69
70
71
def sync_close(self):
    """Synchronously close the publisher and release any resources.

    ConsolePublisher does not require cleanup, but this method is provided for
    interface compatibility.
    """
    logger.debug("ConsolePublisher sync closed")