Skip to content

Rabbitmq publisher

RabbitMQ publisher implementation (optional dependency).

Uses aio-pika to publish events to an exchange with a routing key.

Dependency: aio-pika Not installed by default; install extra: pip install pyagenity[rabbitmq].

Classes:

Name Description
RabbitMQPublisher

Publish events to RabbitMQ using aio-pika.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

RabbitMQPublisher

Bases: BasePublisher

Publish events to RabbitMQ using aio-pika.

Attributes:

Name Type Description
url str

RabbitMQ connection URL.

exchange str

Exchange name.

routing_key str

Routing key for messages.

exchange_type str

Type of exchange.

declare bool

Whether to declare the exchange.

durable bool

Whether the exchange is durable.

_conn

Connection instance.

_channel

Channel instance.

_exchange

Exchange instance.

Methods:

Name Description
__init__

Initialize the RabbitMQPublisher.

close

Close the RabbitMQ connection and channel.

publish

Publish an event to RabbitMQ.

sync_close

Synchronously close the RabbitMQ connection.

Source code in pyagenity/publisher/rabbitmq_publisher.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
class RabbitMQPublisher(BasePublisher):
    """Publish events to RabbitMQ using aio-pika.

    Attributes:
        url: RabbitMQ connection URL.
        exchange: Exchange name.
        routing_key: Routing key for messages.
        exchange_type: Type of exchange.
        declare: Whether to declare the exchange.
        durable: Whether the exchange is durable.
        _conn: Connection instance.
        _channel: Channel instance.
        _exchange: Exchange instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the RabbitMQPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/").
                - exchange: Exchange name (default: "pyagenity.events").
                - routing_key: Routing key (default: "pyagenity.events").
                - exchange_type: Exchange type (default: "topic").
                - declare: Whether to declare exchange (default: True).
                - durable: Whether exchange is durable (default: True).
        """
        super().__init__(config or {})
        self.url: str = self.config.get("url", "amqp://guest:guest@localhost/")
        self.exchange: str = self.config.get("exchange", "pyagenity.events")
        self.routing_key: str = self.config.get("routing_key", "pyagenity.events")
        self.exchange_type: str = self.config.get("exchange_type", "topic")
        self.declare: bool = self.config.get("declare", True)
        self.durable: bool = self.config.get("durable", True)

        self._conn = None  # type: ignore[var-annotated]
        self._channel = None  # type: ignore[var-annotated]
        self._exchange = None  # type: ignore[var-annotated]

    async def _ensure(self):
        """Ensure the connection, channel, and exchange are initialized."""
        if self._exchange is not None:
            return

        try:
            aio_pika = importlib.import_module("aio_pika")
        except Exception as exc:
            raise RuntimeError(
                "RabbitMQPublisher requires the 'aio-pika' package. Install with "
                "'pip install pyagenity[rabbitmq]' or 'pip install aio-pika'."
            ) from exc

        # Connect and declare exchange if needed
        self._conn = await aio_pika.connect_robust(self.url)
        self._channel = await self._conn.channel()

        if self.declare:
            ex_type = getattr(
                aio_pika.ExchangeType,
                self.exchange_type.upper(),
                aio_pika.ExchangeType.TOPIC,
            )
            self._exchange = await self._channel.declare_exchange(
                self.exchange, ex_type, durable=self.durable
            )
        else:
            # Fall back to default exchange
            self._exchange = self._channel.default_exchange

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to RabbitMQ.

        Args:
            event: The event to publish.

        Returns:
            True on success.
        """
        await self._ensure()
        payload = json.dumps(event.model_dump()).encode("utf-8")

        aio_pika = importlib.import_module("aio_pika")
        message = aio_pika.Message(body=payload)
        if self._exchange is None:
            raise RuntimeError("RabbitMQPublisher exchange not initialized")
        await self._exchange.publish(message, routing_key=self.routing_key)
        return True

    async def close(self):
        """Close the RabbitMQ connection and channel."""
        try:
            if self._channel is not None:
                await self._channel.close()
        except Exception:
            logger.debug("RabbitMQPublisher channel close error", exc_info=True)
        finally:
            self._channel = None

        try:
            if self._conn is not None:
                await self._conn.close()
        except Exception:
            logger.debug("RabbitMQPublisher connection close error", exc_info=True)
        finally:
            self._conn = None
            self._exchange = None

    def sync_close(self):
        """Synchronously close the RabbitMQ connection."""
        try:
            asyncio.run(self.close())
        except RuntimeError:
            logger.warning("sync_close called within an active event loop; skipping.")

Attributes

config instance-attribute
config = config
declare instance-attribute
declare = get('declare', True)
durable instance-attribute
durable = get('durable', True)
exchange instance-attribute
exchange = get('exchange', 'pyagenity.events')
exchange_type instance-attribute
exchange_type = get('exchange_type', 'topic')
routing_key instance-attribute
routing_key = get('routing_key', 'pyagenity.events')
url instance-attribute
url = get('url', 'amqp://guest:guest@localhost/')

Functions

__init__
__init__(config=None)

Initialize the RabbitMQPublisher.

Parameters:

Name Type Description Default
config
dict[str, Any] | None

Configuration dictionary. Supported keys: - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/"). - exchange: Exchange name (default: "pyagenity.events"). - routing_key: Routing key (default: "pyagenity.events"). - exchange_type: Exchange type (default: "topic"). - declare: Whether to declare exchange (default: True). - durable: Whether exchange is durable (default: True).

None
Source code in pyagenity/publisher/rabbitmq_publisher.py
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the RabbitMQPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - url: RabbitMQ URL (default: "amqp://guest:guest@localhost/").
            - exchange: Exchange name (default: "pyagenity.events").
            - routing_key: Routing key (default: "pyagenity.events").
            - exchange_type: Exchange type (default: "topic").
            - declare: Whether to declare exchange (default: True).
            - durable: Whether exchange is durable (default: True).
    """
    super().__init__(config or {})
    self.url: str = self.config.get("url", "amqp://guest:guest@localhost/")
    self.exchange: str = self.config.get("exchange", "pyagenity.events")
    self.routing_key: str = self.config.get("routing_key", "pyagenity.events")
    self.exchange_type: str = self.config.get("exchange_type", "topic")
    self.declare: bool = self.config.get("declare", True)
    self.durable: bool = self.config.get("durable", True)

    self._conn = None  # type: ignore[var-annotated]
    self._channel = None  # type: ignore[var-annotated]
    self._exchange = None  # type: ignore[var-annotated]
close async
close()

Close the RabbitMQ connection and channel.

Source code in pyagenity/publisher/rabbitmq_publisher.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
async def close(self):
    """Close the RabbitMQ connection and channel."""
    try:
        if self._channel is not None:
            await self._channel.close()
    except Exception:
        logger.debug("RabbitMQPublisher channel close error", exc_info=True)
    finally:
        self._channel = None

    try:
        if self._conn is not None:
            await self._conn.close()
    except Exception:
        logger.debug("RabbitMQPublisher connection close error", exc_info=True)
    finally:
        self._conn = None
        self._exchange = None
publish async
publish(event)

Publish an event to RabbitMQ.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

True on success.

Source code in pyagenity/publisher/rabbitmq_publisher.py
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
async def publish(self, event: EventModel) -> Any:
    """Publish an event to RabbitMQ.

    Args:
        event: The event to publish.

    Returns:
        True on success.
    """
    await self._ensure()
    payload = json.dumps(event.model_dump()).encode("utf-8")

    aio_pika = importlib.import_module("aio_pika")
    message = aio_pika.Message(body=payload)
    if self._exchange is None:
        raise RuntimeError("RabbitMQPublisher exchange not initialized")
    await self._exchange.publish(message, routing_key=self.routing_key)
    return True
sync_close
sync_close()

Synchronously close the RabbitMQ connection.

Source code in pyagenity/publisher/rabbitmq_publisher.py
131
132
133
134
135
136
def sync_close(self):
    """Synchronously close the RabbitMQ connection."""
    try:
        asyncio.run(self.close())
    except RuntimeError:
        logger.warning("sync_close called within an active event loop; skipping.")