Kafka publisher
Kafka publisher implementation (optional dependency).
Uses aiokafka to publish events to a Kafka topic.
Dependency: aiokafka
Not installed by default; install extra: pip install pyagenity[kafka]
.
Classes:
Name | Description |
---|---|
KafkaPublisher |
Publish events to a Kafka topic using aiokafka. |
Attributes:
Name | Type | Description |
---|---|---|
logger |
|
Attributes¶
Classes¶
KafkaPublisher
¶
Bases: BasePublisher
Publish events to a Kafka topic using aiokafka.
This class provides an asynchronous interface for publishing events to a Kafka topic. It uses the aiokafka library to handle the producer operations. The publisher is lazily initialized and can be reused for multiple publishes.
Attributes:
Name | Type | Description |
---|---|---|
bootstrap_servers |
str
|
Kafka bootstrap servers. |
topic |
str
|
Kafka topic to publish to. |
client_id |
str | None
|
Client ID for the producer. |
_producer |
Lazy-loaded Kafka producer instance. |
Methods:
Name | Description |
---|---|
__init__ |
Initialize the KafkaPublisher. |
close |
Close the Kafka producer. |
publish |
Publish an event to the Kafka topic. |
sync_close |
Synchronously close the Kafka producer. |
Source code in pyagenity/publisher/kafka_publisher.py
24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 |
|
Attributes¶
bootstrap_servers
instance-attribute
¶
bootstrap_servers = get('bootstrap_servers', 'localhost:9092')
Functions¶
__init__
¶
__init__(config=None)
Initialize the KafkaPublisher.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
dict[str, Any] | None
|
Configuration dictionary. Supported keys: - bootstrap_servers: Kafka bootstrap servers (default: "localhost:9092"). - topic: Kafka topic to publish to (default: "pyagenity.events"). - client_id: Client ID for the producer. |
None
|
Source code in pyagenity/publisher/kafka_publisher.py
38 39 40 41 42 43 44 45 46 47 48 49 50 51 |
|
close
async
¶
close()
Close the Kafka producer.
Stops the producer and cleans up resources. Errors during stopping are logged but do not raise exceptions.
Source code in pyagenity/publisher/kafka_publisher.py
97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 |
|
publish
async
¶
publish(event)
Publish an event to the Kafka topic.
Parameters:
Name | Type | Description | Default |
---|---|---|---|
|
EventModel
|
The event to publish. |
required |
Returns:
Type | Description |
---|---|
Any
|
The result of the send_and_wait operation. |
Source code in pyagenity/publisher/kafka_publisher.py
84 85 86 87 88 89 90 91 92 93 94 95 |
|
sync_close
¶
sync_close()
Synchronously close the Kafka producer.
This method runs the async close in a new event loop. If called within an active event loop, it logs a warning and skips the operation.
Source code in pyagenity/publisher/kafka_publisher.py
113 114 115 116 117 118 119 120 121 122 |
|