Skip to content

Publish

Functions:

Name Description
publish_event

Publish an event asynchronously using the background task manager.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

Functions

publish_event

publish_event(event, publisher=Inject[BasePublisher], task_manager=Inject[BackgroundTaskManager])

Publish an event asynchronously using the background task manager.

Parameters:

Name Type Description Default

event

EventModel

The event to publish.

required

publisher

BasePublisher | None

The publisher instance (injected).

Inject[BasePublisher]

task_manager

BackgroundTaskManager

The background task manager (injected).

Inject[BackgroundTaskManager]
Source code in pyagenity/publisher/publish.py
31
32
33
34
35
36
37
38
39
40
41
42
43
44
def publish_event(
    event: EventModel,
    publisher: BasePublisher | None = Inject[BasePublisher],
    task_manager: BackgroundTaskManager = Inject[BackgroundTaskManager],
) -> None:
    """Publish an event asynchronously using the background task manager.

    Args:
        event: The event to publish.
        publisher: The publisher instance (injected).
        task_manager: The background task manager (injected).
    """
    # Store the task to prevent it from being garbage collected
    task_manager.create_task(_publish_event_task(event, publisher))