Skip to content

Redis publisher

Redis publisher implementation (optional dependency).

This publisher uses the redis-py asyncio client to publish events via: - Pub/Sub channels (default), or - Redis Streams (XADD) when configured with mode="stream".

Dependency: redis>=4.2 (provides redis.asyncio). Not installed by default; install extra: pip install pyagenity[redis].

Classes:

Name Description
RedisPublisher

Publish events to Redis via Pub/Sub channel or Stream.

Attributes:

Name Type Description
logger

Attributes

logger module-attribute

logger = getLogger(__name__)

Classes

RedisPublisher

Bases: BasePublisher

Publish events to Redis via Pub/Sub channel or Stream.

Attributes:

Name Type Description
url str

Redis URL.

mode str

Publishing mode ('pubsub' or 'stream').

channel str

Pub/Sub channel name.

stream str

Stream name.

maxlen int | None

Max length for streams.

encoding str

Encoding for messages.

_redis

Redis client instance.

Methods:

Name Description
__init__

Initialize the RedisPublisher.

close

Close the Redis client.

publish

Publish an event to Redis.

sync_close

Synchronously close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
class RedisPublisher(BasePublisher):
    """Publish events to Redis via Pub/Sub channel or Stream.

    Attributes:
        url: Redis URL.
        mode: Publishing mode ('pubsub' or 'stream').
        channel: Pub/Sub channel name.
        stream: Stream name.
        maxlen: Max length for streams.
        encoding: Encoding for messages.
        _redis: Redis client instance.
    """

    def __init__(self, config: dict[str, Any] | None = None):
        """Initialize the RedisPublisher.

        Args:
            config: Configuration dictionary. Supported keys:
                - url: Redis URL (default: "redis://localhost:6379/0").
                - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub').
                - channel: Pub/Sub channel name (default: "pyagenity.events").
                - stream: Stream name (default: "pyagenity.events").
                - maxlen: Max length for streams.
                - encoding: Encoding (default: "utf-8").
        """
        super().__init__(config or {})
        self.url: str = self.config.get("url", "redis://localhost:6379/0")
        self.mode: str = self.config.get("mode", "pubsub")
        self.channel: str = self.config.get("channel", "pyagenity.events")
        self.stream: str = self.config.get("stream", "pyagenity.events")
        self.maxlen: int | None = self.config.get("maxlen")
        self.encoding: str = self.config.get("encoding", "utf-8")

        # Lazy import & connect on first use to avoid ImportError at import-time.
        self._redis = None  # type: ignore[var-annotated]

    async def _get_client(self):
        """Get or create the Redis client.

        Returns:
            The Redis client instance.

        Raises:
            RuntimeError: If connection fails.
        """
        if self._redis is not None:
            return self._redis

        try:
            redis_asyncio = importlib.import_module("redis.asyncio")
        except Exception as exc:  # ImportError and others
            raise RuntimeError(
                "RedisPublisher requires the 'redis' package. Install with "
                "'pip install pyagenity[redis]' or 'pip install redis'."
            ) from exc

        try:
            self._redis = redis_asyncio.from_url(
                self.url, encoding=self.encoding, decode_responses=False
            )
        except Exception as exc:
            raise RuntimeError(f"RedisPublisher failed to connect to Redis at {self.url}") from exc

        return self._redis

    async def publish(self, event: EventModel) -> Any:
        """Publish an event to Redis.

        Args:
            event: The event to publish.

        Returns:
            The result of the publish operation.
        """
        client = await self._get_client()
        payload = json.dumps(event.model_dump()).encode(self.encoding)

        if self.mode == "stream":
            # XADD to stream
            fields = {"data": payload}
            if self.maxlen is not None:
                return await client.xadd(self.stream, fields, maxlen=self.maxlen, approximate=True)
            return await client.xadd(self.stream, fields)

        # Default: Pub/Sub channel
        return await client.publish(self.channel, payload)

    async def close(self):
        """Close the Redis client."""
        if self._redis is not None:
            try:
                await self._redis.close()
                await self._redis.connection_pool.disconnect(inuse_connections=True)
            except Exception:  # best-effort close
                logger.debug("RedisPublisher close encountered an error", exc_info=True)
            finally:
                self._redis = None

    def sync_close(self):
        """Synchronously close the Redis client."""
        try:
            asyncio.run(self.close())
        except RuntimeError:
            # Already in an event loop; fall back to scheduling close
            logger.warning("sync_close called within an active event loop; skipping.")

Attributes

channel instance-attribute
channel = get('channel', 'pyagenity.events')
config instance-attribute
config = config
encoding instance-attribute
encoding = get('encoding', 'utf-8')
maxlen instance-attribute
maxlen = get('maxlen')
mode instance-attribute
mode = get('mode', 'pubsub')
stream instance-attribute
stream = get('stream', 'pyagenity.events')
url instance-attribute
url = get('url', 'redis://localhost:6379/0')

Functions

__init__
__init__(config=None)

Initialize the RedisPublisher.

Parameters:

Name Type Description Default
config
dict[str, Any] | None

Configuration dictionary. Supported keys: - url: Redis URL (default: "redis://localhost:6379/0"). - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub'). - channel: Pub/Sub channel name (default: "pyagenity.events"). - stream: Stream name (default: "pyagenity.events"). - maxlen: Max length for streams. - encoding: Encoding (default: "utf-8").

None
Source code in pyagenity/publisher/redis_publisher.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def __init__(self, config: dict[str, Any] | None = None):
    """Initialize the RedisPublisher.

    Args:
        config: Configuration dictionary. Supported keys:
            - url: Redis URL (default: "redis://localhost:6379/0").
            - mode: Publishing mode ('pubsub' or 'stream', default: 'pubsub').
            - channel: Pub/Sub channel name (default: "pyagenity.events").
            - stream: Stream name (default: "pyagenity.events").
            - maxlen: Max length for streams.
            - encoding: Encoding (default: "utf-8").
    """
    super().__init__(config or {})
    self.url: str = self.config.get("url", "redis://localhost:6379/0")
    self.mode: str = self.config.get("mode", "pubsub")
    self.channel: str = self.config.get("channel", "pyagenity.events")
    self.stream: str = self.config.get("stream", "pyagenity.events")
    self.maxlen: int | None = self.config.get("maxlen")
    self.encoding: str = self.config.get("encoding", "utf-8")

    # Lazy import & connect on first use to avoid ImportError at import-time.
    self._redis = None  # type: ignore[var-annotated]
close async
close()

Close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
114
115
116
117
118
119
120
121
122
123
async def close(self):
    """Close the Redis client."""
    if self._redis is not None:
        try:
            await self._redis.close()
            await self._redis.connection_pool.disconnect(inuse_connections=True)
        except Exception:  # best-effort close
            logger.debug("RedisPublisher close encountered an error", exc_info=True)
        finally:
            self._redis = None
publish async
publish(event)

Publish an event to Redis.

Parameters:

Name Type Description Default
event
EventModel

The event to publish.

required

Returns:

Type Description
Any

The result of the publish operation.

Source code in pyagenity/publisher/redis_publisher.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
async def publish(self, event: EventModel) -> Any:
    """Publish an event to Redis.

    Args:
        event: The event to publish.

    Returns:
        The result of the publish operation.
    """
    client = await self._get_client()
    payload = json.dumps(event.model_dump()).encode(self.encoding)

    if self.mode == "stream":
        # XADD to stream
        fields = {"data": payload}
        if self.maxlen is not None:
            return await client.xadd(self.stream, fields, maxlen=self.maxlen, approximate=True)
        return await client.xadd(self.stream, fields)

    # Default: Pub/Sub channel
    return await client.publish(self.channel, payload)
sync_close
sync_close()

Synchronously close the Redis client.

Source code in pyagenity/publisher/redis_publisher.py
125
126
127
128
129
130
131
def sync_close(self):
    """Synchronously close the Redis client."""
    try:
        asyncio.run(self.close())
    except RuntimeError:
        # Already in an event loop; fall back to scheduling close
        logger.warning("sync_close called within an active event loop; skipping.")