Skip to content

Kafka publisher

Kafka publisher implementation (optional dependency).

Uses aiokafka to publish events to a Kafka topic.

Dependency: aiokafka Not installed by default; install extra: pip install pyagenity[kafka].

Classes:

Name Description
KafkaPublisher

Publish events to a Kafka topic using aiokafka.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

KafkaPublisher

Bases: BasePublisher

Publish events to a Kafka topic using aiokafka.

This class provides an asynchronous interface for publishing events to a Kafka topic. It uses the aiokafka library to handle the producer operations. The publisher is lazily initialized and can be reused for multiple publishes.

Attributes:

Name Type Description
bootstrap_servers str

Kafka bootstrap servers.

topic str

Kafka topic to publish to.

client_id str | None

Client ID for the producer.

_producer

Lazy-loaded Kafka producer instance.

Methods:

Name Description
__init__

Initialize the KafkaPublisher.

close

Close the Kafka producer.

publish

Publish an event to the Kafka topic.

sync_close

Synchronously close the Kafka producer.

Source code in pyagenity/publisher/kafka_publisher.py
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
class KafkaPublisher(BasePublisher):
    """Publish events to a Kafka topic using aiokafka.

    This class provides an asynchronous interface for publishing events to a Kafka topic.
    It uses the aiokafka library to handle the producer operations. The publisher is
    lazily initialized and can be reused for multiple publishes.

    Attributes:
        bootstrap_servers: Kafka bootstrap servers.
        topic: Kafka topic to publish to.
        client_id: Client ID for the producer.
        _producer: Lazy-loaded Kafka producer instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the KafkaPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092").
                - topic: Kafka topic to publish to (default: "pyagenity.events").
                - client_id: Client ID for the producer.
        """
        super().__init__(config or {})
        self.bootstrap_servers: str = self.config.get("bootstrap_servers", "localhost:9092")
        self.topic: str = self.config.get("topic", "pyagenity.events")
        self.client_id: str | None = self.config.get("client_id")
        self._producer = None  # type: ignore[var-annotated]

    async def _get_producer(self):
        """Get or create the Kafka producer instance.

        This method lazily initializes the producer if it hasn't been created yet.
        It imports aiokafka and starts the producer.

        Returns:
            The initialized producer instance.

        Raises:
            RuntimeError: If the 'aiokafka' package is not installed.
        """
        if self._producer is not None:
            return self._producer

        try:
            aiokafka = importlib.import_module("aiokafka")
        except Exception as exc:
            raise RuntimeError(
                "KafkaPublisher requires the 'aiokafka' package. Install with "
                "'pip install pyagenity[kafka]' or 'pip install aiokafka'."
            ) from exc

        producer_cls = aiokafka.AIOKafkaProducer
        self._producer = producer_cls(
            bootstrap_servers=self.bootstrap_servers,
            client_id=self.client_id,
        )
        await self._producer.start()
        return self._producer

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to the Kafka topic.

        Args:
            event: The event to publish.

        Returns:
            The result of the send_and_wait operation.
        """
        producer = await self._get_producer()
        payload = json.dumps(event.model_dump()).encode("utf-8")
        return await producer.send_and_wait(self.topic, payload)

    async def close(self):
        """Close the Kafka producer.

        Stops the producer and cleans up resources. Errors during stopping are logged
        but do not raise exceptions.
        """
        if self._producer is None:
            return

        try:
            await self._producer.stop()
        except Exception:
            logger.debug("KafkaPublisher close encountered an error", exc_info=True)
        finally:
            self._producer = None

    def sync_close(self):
        """Synchronously close the Kafka producer.

        This method runs the async close in a new event loop. If called within an
        active event loop, it logs a warning and skips the operation.
        """
        try:
            asyncio.run(self.close())
        except RuntimeError:
            logger.warning("sync_close called within an active event loop; skipping.")

Attributes

bootstrap_servers instance-attribute
bootstrap_servers = get('bootstrap_servers', 'localhost:9092')
client_id instance-attribute
client_id = get('client_id')
config instance-attribute
config = config
topic instance-attribute
topic = get('topic', 'pyagenity.events')

Functions

__init__
__init__(config=None)

Initialize the KafkaPublisher.

Parameters:

Name Type Description Default
config
dict[str, Any] | None

Configuration dictionary. Supported keys: - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092"). - topic: Kafka topic to publish to (default: "pyagenity.events"). - client_id: Client ID for the producer.

None
Source code in pyagenity/publisher/kafka_publisher.py
38
39
40
41
42
43
44
45
46
47
48
49
50
51
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the KafkaPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092").
            - topic: Kafka topic to publish to (default: "pyagenity.events").
            - client_id: Client ID for the producer.
    """
    super().__init__(config or {})
    self.bootstrap_servers: str = self.config.get("bootstrap_servers", "localhost:9092")
    self.topic: str = self.config.get("topic", "pyagenity.events")
    self.client_id: str | None = self.config.get("client_id")
    self._producer = None  # type: ignore[var-annotated]
close async
close()

Close the Kafka producer.

Stops the producer and cleans up resources. Errors during stopping are logged but do not raise exceptions.

Source code in pyagenity/publisher/kafka_publisher.py
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
async def close(self):
    """Close the Kafka producer.

    Stops the producer and cleans up resources. Errors during stopping are logged
    but do not raise exceptions.
    """
    if self._producer is None:
        return

    try:
        await self._producer.stop()
    except Exception:
        logger.debug("KafkaPublisher close encountered an error", exc_info=True)
    finally:
        self._producer = None
publish async
publish(event)

Publish an event to the Kafka topic.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the send_and_wait operation.

Source code in pyagenity/publisher/kafka_publisher.py
84
85
86
87
88
89
90
91
92
93
94
95
async def publish(self, event: EventModel) -> Any:
    """Publish an event to the Kafka topic.

    Args:
        event: The event to publish.

    Returns:
        The result of the send_and_wait operation.
    """
    producer = await self._get_producer()
    payload = json.dumps(event.model_dump()).encode("utf-8")
    return await producer.send_and_wait(self.topic, payload)
sync_close
sync_close()

Synchronously close the Kafka producer.

This method runs the async close in a new event loop. If called within an active event loop, it logs a warning and skips the operation.

Source code in pyagenity/publisher/kafka_publisher.py
113
114
115
116
117
118
119
120
121
122
def sync_close(self):
    """Synchronously close the Kafka producer.

    This method runs the async close in a new event loop. If called within an
    active event loop, it logs a warning and skips the operation.
    """
    try:
        asyncio.run(self.close())
    except RuntimeError:
        logger.warning("sync_close called within an active event loop; skipping.")