Skip to content

Base checkpointer

Classes:

Name Description
BaseCheckpointer

Abstract base class for checkpointing agent state, messages, and threads.

Attributes:

Name Type Description
StateT
logger

Attributes

StateT module-attribute

StateT = TypeVar('StateT', bound='AgentState')

logger module-attribute

logger = getLogger(__name__)

Classes

BaseCheckpointer

Bases: ABC

Abstract base class for checkpointing agent state, messages, and threads.

This class defines the contract for all checkpointer implementations, supporting both async and sync methods. Subclasses should implement async methods for optimal performance. Sync methods are provided for compatibility.

Usage
  • Async-first design: subclasses should implement async def methods.
  • If a subclass provides only a sync def, it will be executed in a worker thread automatically using asyncio.run.
  • Callers always use the async APIs (await cp.put_state(...), etc.).

Class Type Parameters:

Name Bound or Constraints Description Default
StateT AgentState

Type of agent state (must inherit from AgentState).

required

Methods:

Name Description
aclean_thread

Clean/delete thread asynchronously.

aclear_state

Clear agent state asynchronously.

adelete_message

Delete a specific message asynchronously.

aget_message

Retrieve a specific message asynchronously.

aget_state

Retrieve agent state asynchronously.

aget_state_cache

Retrieve agent state from cache asynchronously.

aget_thread

Retrieve thread info asynchronously.

alist_messages

List messages asynchronously with optional filtering.

alist_threads

List threads asynchronously with optional filtering.

aput_messages

Store messages asynchronously.

aput_state

Store agent state asynchronously.

aput_state_cache

Store agent state in cache asynchronously.

aput_thread

Store thread info asynchronously.

arelease

Release resources asynchronously.

asetup

Asynchronous setup method for checkpointer.

clean_thread

Clean/delete thread synchronously.

clear_state

Clear agent state synchronously.

delete_message

Delete a specific message synchronously.

get_message

Retrieve a specific message synchronously.

get_state

Retrieve agent state synchronously.

get_state_cache

Retrieve agent state from cache synchronously.

get_thread

Retrieve thread info synchronously.

list_messages

List messages synchronously with optional filtering.

list_threads

List threads synchronously with optional filtering.

put_messages

Store messages synchronously.

put_state

Store agent state synchronously.

put_state_cache

Store agent state in cache synchronously.

put_thread

Store thread info synchronously.

release

Release resources synchronously.

setup

Synchronous setup method for checkpointer.

Source code in pyagenity/checkpointer/base_checkpointer.py
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
class BaseCheckpointer[StateT: AgentState](ABC):
    """
    Abstract base class for checkpointing agent state, messages, and threads.

    This class defines the contract for all checkpointer implementations, supporting both
    async and sync methods.
    Subclasses should implement async methods for optimal performance.
    Sync methods are provided for compatibility.

    Usage:
        - Async-first design: subclasses should implement `async def` methods.
        - If a subclass provides only a sync `def`, it will be executed in a worker thread
            automatically using `asyncio.run`.
        - Callers always use the async APIs (`await cp.put_state(...)`, etc.).

    Type Args:
        StateT: Type of agent state (must inherit from AgentState).
    """

    ###########################
    #### SETUP ################
    ###########################
    def setup(self) -> Any:
        """
        Synchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        return run_coroutine(self.asetup())

    @abstractmethod
    async def asetup(self) -> Any:
        """
        Asynchronous setup method for checkpointer.

        Returns:
            Any: Implementation-defined setup result.
        """
        raise NotImplementedError

    # -------------------------
    # State methods Async
    # -------------------------
    @abstractmethod
    async def aput_state(self, config: dict[str, Any], state: StateT) -> StateT:
        """
        Store agent state asynchronously.

        Args:
            config (dict): Configuration dictionary.
            state (StateT): State object to store.

        Returns:
            StateT: The stored state object.
        """
        raise NotImplementedError

    @abstractmethod
    async def aget_state(self, config: dict[str, Any]) -> StateT | None:
        """
        Retrieve agent state asynchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            StateT | None: Retrieved state or None.
        """
        raise NotImplementedError

    @abstractmethod
    async def aclear_state(self, config: dict[str, Any]) -> Any:
        """
        Clear agent state asynchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            Any: Implementation-defined result.
        """
        raise NotImplementedError

    @abstractmethod
    async def aput_state_cache(self, config: dict[str, Any], state: StateT) -> Any | None:
        """
        Store agent state in cache asynchronously.

        Args:
            config (dict): Configuration dictionary.
            state (StateT): State object to cache.

        Returns:
            Any | None: Implementation-defined result.
        """
        raise NotImplementedError

    @abstractmethod
    async def aget_state_cache(self, config: dict[str, Any]) -> StateT | None:
        """
        Retrieve agent state from cache asynchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            StateT | None: Cached state or None.
        """
        raise NotImplementedError

    # -------------------------
    # State methods Sync
    # -------------------------
    def put_state(self, config: dict[str, Any], state: StateT) -> StateT:
        """
        Store agent state synchronously.

        Args:
            config (dict): Configuration dictionary.
            state (StateT): State object to store.

        Returns:
            StateT: The stored state object.
        """
        return run_coroutine(self.aput_state(config, state))

    def get_state(self, config: dict[str, Any]) -> StateT | None:
        """
        Retrieve agent state synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            StateT | None: Retrieved state or None.
        """
        return run_coroutine(self.aget_state(config))

    def clear_state(self, config: dict[str, Any]) -> Any:
        """
        Clear agent state synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            Any: Implementation-defined result.
        """
        return run_coroutine(self.aclear_state(config))

    def put_state_cache(self, config: dict[str, Any], state: StateT) -> Any | None:
        """
        Store agent state in cache synchronously.

        Args:
            config (dict): Configuration dictionary.
            state (StateT): State object to cache.

        Returns:
            Any | None: Implementation-defined result.
        """
        return run_coroutine(self.aput_state_cache(config, state))

    def get_state_cache(self, config: dict[str, Any]) -> StateT | None:
        """
        Retrieve agent state from cache synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            StateT | None: Cached state or None.
        """
        return run_coroutine(self.aget_state_cache(config))

    # -------------------------
    # Message methods async
    # -------------------------
    @abstractmethod
    async def aput_messages(
        self,
        config: dict[str, Any],
        messages: list[Message],
        metadata: dict[str, Any] | None = None,
    ) -> Any:
        """
        Store messages asynchronously.

        Args:
            config (dict): Configuration dictionary.
            messages (list[Message]): List of messages to store.
            metadata (dict, optional): Additional metadata.

        Returns:
            Any: Implementation-defined result.
        """
        raise NotImplementedError

    @abstractmethod
    async def aget_message(self, config: dict[str, Any], message_id: str | int) -> Message:
        """
        Retrieve a specific message asynchronously.

        Args:
            config (dict): Configuration dictionary.
            message_id (str|int): Message identifier.

        Returns:
            Message: Retrieved message object.
        """
        raise NotImplementedError

    @abstractmethod
    async def alist_messages(
        self,
        config: dict[str, Any],
        search: str | None = None,
        offset: int | None = None,
        limit: int | None = None,
    ) -> list[Message]:
        """
        List messages asynchronously with optional filtering.

        Args:
            config (dict): Configuration dictionary.
            search (str, optional): Search string.
            offset (int, optional): Offset for pagination.
            limit (int, optional): Limit for pagination.

        Returns:
            list[Message]: List of message objects.
        """
        raise NotImplementedError

    @abstractmethod
    async def adelete_message(self, config: dict[str, Any], message_id: str | int) -> Any | None:
        """
        Delete a specific message asynchronously.

        Args:
            config (dict): Configuration dictionary.
            message_id (str|int): Message identifier.

        Returns:
            Any | None: Implementation-defined result.
        """
        raise NotImplementedError

    # -------------------------
    # Message methods sync
    # -------------------------
    def put_messages(
        self,
        config: dict[str, Any],
        messages: list[Message],
        metadata: dict[str, Any] | None = None,
    ) -> Any:
        """
        Store messages synchronously.

        Args:
            config (dict): Configuration dictionary.
            messages (list[Message]): List of messages to store.
            metadata (dict, optional): Additional metadata.

        Returns:
            Any: Implementation-defined result.
        """
        return run_coroutine(self.aput_messages(config, messages, metadata))

    def get_message(self, config: dict[str, Any], message_id: str | int) -> Message:
        """
        Retrieve a specific message synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            Message: Retrieved message object.
        """
        return run_coroutine(self.aget_message(config, message_id))

    def list_messages(
        self,
        config: dict[str, Any],
        search: str | None = None,
        offset: int | None = None,
        limit: int | None = None,
    ) -> list[Message]:
        """
        List messages synchronously with optional filtering.

        Args:
            config (dict): Configuration dictionary.
            search (str, optional): Search string.
            offset (int, optional): Offset for pagination.
            limit (int, optional): Limit for pagination.

        Returns:
            list[Message]: List of message objects.
        """
        return run_coroutine(self.alist_messages(config, search, offset, limit))

    def delete_message(self, config: dict[str, Any], message_id: str | int) -> Any | None:
        """
        Delete a specific message synchronously.

        Args:
            config (dict): Configuration dictionary.
            message_id (str|int): Message identifier.

        Returns:
            Any | None: Implementation-defined result.
        """
        return run_coroutine(self.adelete_message(config, message_id))

    # -------------------------
    # Thread methods async
    # -------------------------
    @abstractmethod
    async def aput_thread(
        self,
        config: dict[str, Any],
        thread_info: ThreadInfo,
    ) -> Any | None:
        """
        Store thread info asynchronously.

        Args:
            config (dict): Configuration dictionary.
            thread_info (ThreadInfo): Thread information object.

        Returns:
            Any | None: Implementation-defined result.
        """
        raise NotImplementedError

    @abstractmethod
    async def aget_thread(
        self,
        config: dict[str, Any],
    ) -> ThreadInfo | None:
        """
        Retrieve thread info asynchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            ThreadInfo | None: Thread information object or None.
        """
        raise NotImplementedError

    @abstractmethod
    async def alist_threads(
        self,
        config: dict[str, Any],
        search: str | None = None,
        offset: int | None = None,
        limit: int | None = None,
    ) -> list[ThreadInfo]:
        """
        List threads asynchronously with optional filtering.

        Args:
            config (dict): Configuration dictionary.
            search (str, optional): Search string.
            offset (int, optional): Offset for pagination.
            limit (int, optional): Limit for pagination.

        Returns:
            list[ThreadInfo]: List of thread information objects.
        """
        raise NotImplementedError

    @abstractmethod
    async def aclean_thread(self, config: dict[str, Any]) -> Any | None:
        """
        Clean/delete thread asynchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            Any | None: Implementation-defined result.
        """
        raise NotImplementedError

    # -------------------------
    # Thread methods sync
    # -------------------------
    def put_thread(self, config: dict[str, Any], thread_info: ThreadInfo) -> Any | None:
        """
        Store thread info synchronously.

        Args:
            config (dict): Configuration dictionary.
            thread_info (ThreadInfo): Thread information object.

        Returns:
            Any | None: Implementation-defined result.
        """
        return run_coroutine(self.aput_thread(config, thread_info))

    def get_thread(self, config: dict[str, Any]) -> ThreadInfo | None:
        """
        Retrieve thread info synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            ThreadInfo | None: Thread information object or None.
        """
        return run_coroutine(self.aget_thread(config))

    def list_threads(
        self,
        config: dict[str, Any],
        search: str | None = None,
        offset: int | None = None,
        limit: int | None = None,
    ) -> list[ThreadInfo]:
        """
        List threads synchronously with optional filtering.

        Args:
            config (dict): Configuration dictionary.
            search (str, optional): Search string.
            offset (int, optional): Offset for pagination.
            limit (int, optional): Limit for pagination.

        Returns:
            list[ThreadInfo]: List of thread information objects.
        """
        return run_coroutine(self.alist_threads(config, search, offset, limit))

    def clean_thread(self, config: dict[str, Any]) -> Any | None:
        """
        Clean/delete thread synchronously.

        Args:
            config (dict): Configuration dictionary.

        Returns:
            Any | None: Implementation-defined result.
        """
        return run_coroutine(self.aclean_thread(config))

    # -------------------------
    # Clean Resources
    # -------------------------
    def release(self) -> Any | None:
        """
        Release resources synchronously.

        Returns:
            Any | None: Implementation-defined result.
        """
        return run_coroutine(self.arelease())

    @abstractmethod
    async def arelease(self) -> Any | None:
        """
        Release resources asynchronously.

        Returns:
            Any | None: Implementation-defined result.
        """
        raise NotImplementedError

Functions

aclean_thread abstractmethod async
aclean_thread(config)

Clean/delete thread asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
396
397
398
399
400
401
402
403
404
405
406
407
@abstractmethod
async def aclean_thread(self, config: dict[str, Any]) -> Any | None:
    """
    Clean/delete thread asynchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        Any | None: Implementation-defined result.
    """
    raise NotImplementedError
aclear_state abstractmethod async
aclear_state(config)

Clear agent state asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Name Type Description
Any Any

Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
@abstractmethod
async def aclear_state(self, config: dict[str, Any]) -> Any:
    """
    Clear agent state asynchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        Any: Implementation-defined result.
    """
    raise NotImplementedError
adelete_message abstractmethod async
adelete_message(config, message_id)

Delete a specific message asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
message_id
str | int

Message identifier.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
255
256
257
258
259
260
261
262
263
264
265
266
267
@abstractmethod
async def adelete_message(self, config: dict[str, Any], message_id: str | int) -> Any | None:
    """
    Delete a specific message asynchronously.

    Args:
        config (dict): Configuration dictionary.
        message_id (str|int): Message identifier.

    Returns:
        Any | None: Implementation-defined result.
    """
    raise NotImplementedError
aget_message abstractmethod async
aget_message(config, message_id)

Retrieve a specific message asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
message_id
str | int

Message identifier.

required

Returns:

Name Type Description
Message Message

Retrieved message object.

Source code in pyagenity/checkpointer/base_checkpointer.py
219
220
221
222
223
224
225
226
227
228
229
230
231
@abstractmethod
async def aget_message(self, config: dict[str, Any], message_id: str | int) -> Message:
    """
    Retrieve a specific message asynchronously.

    Args:
        config (dict): Configuration dictionary.
        message_id (str|int): Message identifier.

    Returns:
        Message: Retrieved message object.
    """
    raise NotImplementedError
aget_state abstractmethod async
aget_state(config)

Retrieve agent state asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
StateT | None

StateT | None: Retrieved state or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
78
79
80
81
82
83
84
85
86
87
88
89
@abstractmethod
async def aget_state(self, config: dict[str, Any]) -> StateT | None:
    """
    Retrieve agent state asynchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        StateT | None: Retrieved state or None.
    """
    raise NotImplementedError
aget_state_cache abstractmethod async
aget_state_cache(config)

Retrieve agent state from cache asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
StateT | None

StateT | None: Cached state or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
118
119
120
121
122
123
124
125
126
127
128
129
@abstractmethod
async def aget_state_cache(self, config: dict[str, Any]) -> StateT | None:
    """
    Retrieve agent state from cache asynchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        StateT | None: Cached state or None.
    """
    raise NotImplementedError
aget_thread abstractmethod async
aget_thread(config)

Retrieve thread info asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
ThreadInfo | None

ThreadInfo | None: Thread information object or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
@abstractmethod
async def aget_thread(
    self,
    config: dict[str, Any],
) -> ThreadInfo | None:
    """
    Retrieve thread info asynchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        ThreadInfo | None: Thread information object or None.
    """
    raise NotImplementedError
alist_messages abstractmethod async
alist_messages(config, search=None, offset=None, limit=None)

List messages asynchronously with optional filtering.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
search
str

Search string.

None
offset
int

Offset for pagination.

None
limit
int

Limit for pagination.

None

Returns:

Type Description
list[Message]

list[Message]: List of message objects.

Source code in pyagenity/checkpointer/base_checkpointer.py
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
@abstractmethod
async def alist_messages(
    self,
    config: dict[str, Any],
    search: str | None = None,
    offset: int | None = None,
    limit: int | None = None,
) -> list[Message]:
    """
    List messages asynchronously with optional filtering.

    Args:
        config (dict): Configuration dictionary.
        search (str, optional): Search string.
        offset (int, optional): Offset for pagination.
        limit (int, optional): Limit for pagination.

    Returns:
        list[Message]: List of message objects.
    """
    raise NotImplementedError
alist_threads abstractmethod async
alist_threads(config, search=None, offset=None, limit=None)

List threads asynchronously with optional filtering.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
search
str

Search string.

None
offset
int

Offset for pagination.

None
limit
int

Limit for pagination.

None

Returns:

Type Description
list[ThreadInfo]

list[ThreadInfo]: List of thread information objects.

Source code in pyagenity/checkpointer/base_checkpointer.py
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
@abstractmethod
async def alist_threads(
    self,
    config: dict[str, Any],
    search: str | None = None,
    offset: int | None = None,
    limit: int | None = None,
) -> list[ThreadInfo]:
    """
    List threads asynchronously with optional filtering.

    Args:
        config (dict): Configuration dictionary.
        search (str, optional): Search string.
        offset (int, optional): Offset for pagination.
        limit (int, optional): Limit for pagination.

    Returns:
        list[ThreadInfo]: List of thread information objects.
    """
    raise NotImplementedError
aput_messages abstractmethod async
aput_messages(config, messages, metadata=None)

Store messages asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
messages
list[Message]

List of messages to store.

required
metadata
dict

Additional metadata.

None

Returns:

Name Type Description
Any Any

Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
@abstractmethod
async def aput_messages(
    self,
    config: dict[str, Any],
    messages: list[Message],
    metadata: dict[str, Any] | None = None,
) -> Any:
    """
    Store messages asynchronously.

    Args:
        config (dict): Configuration dictionary.
        messages (list[Message]): List of messages to store.
        metadata (dict, optional): Additional metadata.

    Returns:
        Any: Implementation-defined result.
    """
    raise NotImplementedError
aput_state abstractmethod async
aput_state(config, state)

Store agent state asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
state
StateT

State object to store.

required

Returns:

Name Type Description
StateT StateT

The stored state object.

Source code in pyagenity/checkpointer/base_checkpointer.py
64
65
66
67
68
69
70
71
72
73
74
75
76
@abstractmethod
async def aput_state(self, config: dict[str, Any], state: StateT) -> StateT:
    """
    Store agent state asynchronously.

    Args:
        config (dict): Configuration dictionary.
        state (StateT): State object to store.

    Returns:
        StateT: The stored state object.
    """
    raise NotImplementedError
aput_state_cache abstractmethod async
aput_state_cache(config, state)

Store agent state in cache asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
state
StateT

State object to cache.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
104
105
106
107
108
109
110
111
112
113
114
115
116
@abstractmethod
async def aput_state_cache(self, config: dict[str, Any], state: StateT) -> Any | None:
    """
    Store agent state in cache asynchronously.

    Args:
        config (dict): Configuration dictionary.
        state (StateT): State object to cache.

    Returns:
        Any | None: Implementation-defined result.
    """
    raise NotImplementedError
aput_thread abstractmethod async
aput_thread(config, thread_info)

Store thread info asynchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
thread_info
ThreadInfo

Thread information object.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
@abstractmethod
async def aput_thread(
    self,
    config: dict[str, Any],
    thread_info: ThreadInfo,
) -> Any | None:
    """
    Store thread info asynchronously.

    Args:
        config (dict): Configuration dictionary.
        thread_info (ThreadInfo): Thread information object.

    Returns:
        Any | None: Implementation-defined result.
    """
    raise NotImplementedError
arelease abstractmethod async
arelease()

Release resources asynchronously.

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
482
483
484
485
486
487
488
489
490
@abstractmethod
async def arelease(self) -> Any | None:
    """
    Release resources asynchronously.

    Returns:
        Any | None: Implementation-defined result.
    """
    raise NotImplementedError
asetup abstractmethod async
asetup()

Asynchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/checkpointer/base_checkpointer.py
51
52
53
54
55
56
57
58
59
@abstractmethod
async def asetup(self) -> Any:
    """
    Asynchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    raise NotImplementedError
clean_thread
clean_thread(config)

Clean/delete thread synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
458
459
460
461
462
463
464
465
466
467
468
def clean_thread(self, config: dict[str, Any]) -> Any | None:
    """
    Clean/delete thread synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        Any | None: Implementation-defined result.
    """
    return run_coroutine(self.aclean_thread(config))
clear_state
clear_state(config)

Clear agent state synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Name Type Description
Any Any

Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
159
160
161
162
163
164
165
166
167
168
169
def clear_state(self, config: dict[str, Any]) -> Any:
    """
    Clear agent state synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        Any: Implementation-defined result.
    """
    return run_coroutine(self.aclear_state(config))
delete_message
delete_message(config, message_id)

Delete a specific message synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
message_id
str | int

Message identifier.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
324
325
326
327
328
329
330
331
332
333
334
335
def delete_message(self, config: dict[str, Any], message_id: str | int) -> Any | None:
    """
    Delete a specific message synchronously.

    Args:
        config (dict): Configuration dictionary.
        message_id (str|int): Message identifier.

    Returns:
        Any | None: Implementation-defined result.
    """
    return run_coroutine(self.adelete_message(config, message_id))
get_message
get_message(config, message_id)

Retrieve a specific message synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Name Type Description
Message Message

Retrieved message object.

Source code in pyagenity/checkpointer/base_checkpointer.py
291
292
293
294
295
296
297
298
299
300
301
def get_message(self, config: dict[str, Any], message_id: str | int) -> Message:
    """
    Retrieve a specific message synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        Message: Retrieved message object.
    """
    return run_coroutine(self.aget_message(config, message_id))
get_state
get_state(config)

Retrieve agent state synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
StateT | None

StateT | None: Retrieved state or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
147
148
149
150
151
152
153
154
155
156
157
def get_state(self, config: dict[str, Any]) -> StateT | None:
    """
    Retrieve agent state synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        StateT | None: Retrieved state or None.
    """
    return run_coroutine(self.aget_state(config))
get_state_cache
get_state_cache(config)

Retrieve agent state from cache synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
StateT | None

StateT | None: Cached state or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
184
185
186
187
188
189
190
191
192
193
194
def get_state_cache(self, config: dict[str, Any]) -> StateT | None:
    """
    Retrieve agent state from cache synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        StateT | None: Cached state or None.
    """
    return run_coroutine(self.aget_state_cache(config))
get_thread
get_thread(config)

Retrieve thread info synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required

Returns:

Type Description
ThreadInfo | None

ThreadInfo | None: Thread information object or None.

Source code in pyagenity/checkpointer/base_checkpointer.py
425
426
427
428
429
430
431
432
433
434
435
def get_thread(self, config: dict[str, Any]) -> ThreadInfo | None:
    """
    Retrieve thread info synchronously.

    Args:
        config (dict): Configuration dictionary.

    Returns:
        ThreadInfo | None: Thread information object or None.
    """
    return run_coroutine(self.aget_thread(config))
list_messages
list_messages(config, search=None, offset=None, limit=None)

List messages synchronously with optional filtering.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
search
str

Search string.

None
offset
int

Offset for pagination.

None
limit
int

Limit for pagination.

None

Returns:

Type Description
list[Message]

list[Message]: List of message objects.

Source code in pyagenity/checkpointer/base_checkpointer.py
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def list_messages(
    self,
    config: dict[str, Any],
    search: str | None = None,
    offset: int | None = None,
    limit: int | None = None,
) -> list[Message]:
    """
    List messages synchronously with optional filtering.

    Args:
        config (dict): Configuration dictionary.
        search (str, optional): Search string.
        offset (int, optional): Offset for pagination.
        limit (int, optional): Limit for pagination.

    Returns:
        list[Message]: List of message objects.
    """
    return run_coroutine(self.alist_messages(config, search, offset, limit))
list_threads
list_threads(config, search=None, offset=None, limit=None)

List threads synchronously with optional filtering.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
search
str

Search string.

None
offset
int

Offset for pagination.

None
limit
int

Limit for pagination.

None

Returns:

Type Description
list[ThreadInfo]

list[ThreadInfo]: List of thread information objects.

Source code in pyagenity/checkpointer/base_checkpointer.py
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
def list_threads(
    self,
    config: dict[str, Any],
    search: str | None = None,
    offset: int | None = None,
    limit: int | None = None,
) -> list[ThreadInfo]:
    """
    List threads synchronously with optional filtering.

    Args:
        config (dict): Configuration dictionary.
        search (str, optional): Search string.
        offset (int, optional): Offset for pagination.
        limit (int, optional): Limit for pagination.

    Returns:
        list[ThreadInfo]: List of thread information objects.
    """
    return run_coroutine(self.alist_threads(config, search, offset, limit))
put_messages
put_messages(config, messages, metadata=None)

Store messages synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
messages
list[Message]

List of messages to store.

required
metadata
dict

Additional metadata.

None

Returns:

Name Type Description
Any Any

Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def put_messages(
    self,
    config: dict[str, Any],
    messages: list[Message],
    metadata: dict[str, Any] | None = None,
) -> Any:
    """
    Store messages synchronously.

    Args:
        config (dict): Configuration dictionary.
        messages (list[Message]): List of messages to store.
        metadata (dict, optional): Additional metadata.

    Returns:
        Any: Implementation-defined result.
    """
    return run_coroutine(self.aput_messages(config, messages, metadata))
put_state
put_state(config, state)

Store agent state synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
state
StateT

State object to store.

required

Returns:

Name Type Description
StateT StateT

The stored state object.

Source code in pyagenity/checkpointer/base_checkpointer.py
134
135
136
137
138
139
140
141
142
143
144
145
def put_state(self, config: dict[str, Any], state: StateT) -> StateT:
    """
    Store agent state synchronously.

    Args:
        config (dict): Configuration dictionary.
        state (StateT): State object to store.

    Returns:
        StateT: The stored state object.
    """
    return run_coroutine(self.aput_state(config, state))
put_state_cache
put_state_cache(config, state)

Store agent state in cache synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
state
StateT

State object to cache.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
171
172
173
174
175
176
177
178
179
180
181
182
def put_state_cache(self, config: dict[str, Any], state: StateT) -> Any | None:
    """
    Store agent state in cache synchronously.

    Args:
        config (dict): Configuration dictionary.
        state (StateT): State object to cache.

    Returns:
        Any | None: Implementation-defined result.
    """
    return run_coroutine(self.aput_state_cache(config, state))
put_thread
put_thread(config, thread_info)

Store thread info synchronously.

Parameters:

Name Type Description Default
config
dict

Configuration dictionary.

required
thread_info
ThreadInfo

Thread information object.

required

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
412
413
414
415
416
417
418
419
420
421
422
423
def put_thread(self, config: dict[str, Any], thread_info: ThreadInfo) -> Any | None:
    """
    Store thread info synchronously.

    Args:
        config (dict): Configuration dictionary.
        thread_info (ThreadInfo): Thread information object.

    Returns:
        Any | None: Implementation-defined result.
    """
    return run_coroutine(self.aput_thread(config, thread_info))
release
release()

Release resources synchronously.

Returns:

Type Description
Any | None

Any | None: Implementation-defined result.

Source code in pyagenity/checkpointer/base_checkpointer.py
473
474
475
476
477
478
479
480
def release(self) -> Any | None:
    """
    Release resources synchronously.

    Returns:
        Any | None: Implementation-defined result.
    """
    return run_coroutine(self.arelease())
setup
setup()

Synchronous setup method for checkpointer.

Returns:

Name Type Description
Any Any

Implementation-defined setup result.

Source code in pyagenity/checkpointer/base_checkpointer.py
42
43
44
45
46
47
48
49
def setup(self) -> Any:
    """
    Synchronous setup method for checkpointer.

    Returns:
        Any: Implementation-defined setup result.
    """
    return run_coroutine(self.asetup())

Functions