Skip to content

Base publisher

Classes:

Name Description
BasePublisher

Abstract base class for event publishers.

Classes

BasePublisher

Bases: ABC

Abstract base class for event publishers.

This class defines the interface for publishing events. Subclasses should implement the publish, close, and sync_close methods to provide specific publishing logic.

Attributes:

Name Type Description
config

Configuration dictionary for the publisher.

Methods:

Name Description
__init__

Initialize the publisher with the given configuration.

close

Close the publisher and release any resources.

publish

Publish an event.

sync_close

Close the publisher and release any resources (synchronous version).

Source code in pyagenity/publisher/base_publisher.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
class BasePublisher(ABC):
    """Abstract base class for event publishers.

    This class defines the interface for publishing events. Subclasses should implement
    the publish, close, and sync_close methods to provide specific publishing logic.

    Attributes:
        config: Configuration dictionary for the publisher.
    """

    def __init__(self, config: dict[str, Any]):
        """Initialize the publisher with the given configuration.

        Args:
            config: Configuration dictionary for the publisher.
        """
        self.config = config

    @abstractmethod
    async def publish(self, event: EventModel) -> Any:
        """Publish an event.

        Args:
            event: The event to publish.

        Returns:
            The result of the publish operation.
        """
        raise NotImplementedError

    @abstractmethod
    async def close(self):
        """Close the publisher and release any resources.

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError

    @abstractmethod
    def sync_close(self):
        """Close the publisher and release any resources (synchronous version).

        This method should be overridden by subclasses to provide specific cleanup logic.
        It will be called externally.
        """
        raise NotImplementedError

Attributes

config instance-attribute
config = config

Functions

__init__
__init__(config)

Initialize the publisher with the given configuration.

Parameters:

Name Type Description Default
config
dict[str, Any]

Configuration dictionary for the publisher.

required
Source code in pyagenity/publisher/base_publisher.py
17
18
19
20
21
22
23
def __init__(self, config: dict[str, Any]):
    """Initialize the publisher with the given configuration.

    Args:
        config: Configuration dictionary for the publisher.
    """
    self.config = config
close abstractmethod async
close()

Close the publisher and release any resources.

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
37
38
39
40
41
42
43
44
@abstractmethod
async def close(self):
    """Close the publisher and release any resources.

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError
publish abstractmethod async
publish(event)

Publish an event.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the publish operation.

Source code in pyagenity/publisher/base_publisher.py
25
26
27
28
29
30
31
32
33
34
35
@abstractmethod
async def publish(self, event: EventModel) -> Any:
    """Publish an event.

    Args:
        event: The event to publish.

    Returns:
        The result of the publish operation.
    """
    raise NotImplementedError
sync_close abstractmethod
sync_close()

Close the publisher and release any resources (synchronous version).

This method should be overridden by subclasses to provide specific cleanup logic. It will be called externally.

Source code in pyagenity/publisher/base_publisher.py
46
47
48
49
50
51
52
53
@abstractmethod
def sync_close(self):
    """Close the publisher and release any resources (synchronous version).

    This method should be overridden by subclasses to provide specific cleanup logic.
    It will be called externally.
    """
    raise NotImplementedError