checkpointing.cache.base
1from __future__ import annotations 2from abc import ABC, abstractmethod 3from typing import Generic, TypeVar, Union 4from checkpointing._typing import ContextId, ReturnValue 5from checkpointing.exceptions import CheckpointNotExist 6 7from threading import Lock as ThreadLock 8from multiprocessing import Lock as ProcessLock 9 10 11class CacheBase(ABC, Generic[ContextId, ReturnValue]): 12 """ 13 The base class for Cache. 14 15 To implement a concrete Cache class, you need to implement the following abstract methods: 16 - `save(self, context_id: ContextId, result: ReturnValue) -> None` 17 - `retrieve(self, context: Context) -> ReturnValue` 18 """ 19 20 @abstractmethod 21 def save(self, context_id: ContextId, result: ReturnValue) -> None: 22 """ 23 Save the result with the given context id. 24 25 Args: 26 context_id: identifier of the function call context 27 result: return value of the function call 28 """ 29 pass 30 31 @abstractmethod 32 def retrieve(self, context_id: ContextId) -> ReturnValue: 33 """ 34 Retrieve the function return value with the given context id. 35 If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist 36 37 Args: 38 context_id: identifier of the function call context 39 40 Returns: 41 The return value of the function that corresponds to this context id 42 """ 43 pass 44 45 def synchronize_with(self, lock) -> SynchronizedCache: 46 """ 47 Args: 48 lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock() 49 50 Returns: 51 A process-safe version of this cache using the given lock. 52 """ 53 return SynchronizedCache(self, lock) 54 55 56CacheSubclass = TypeVar("CacheSubclass", bound=CacheBase) 57 58 59class SynchronizedCache(CacheBase, Generic[CacheSubclass]): 60 """ 61 Base class for synchronized cache. In the constructor of a concrete subclass, 62 a `self.lock` attribute should be assigned as a thread/process lock, 63 or any analog of such synchronization locks. 64 """ 65 66 def __init__(self, cache, lock) -> None: 67 self.__cache = cache 68 self.__lock = lock 69 70 def save(self, context_id: ContextId, result: ReturnValue) -> None: 71 with self.__lock: 72 self.__cache.save(context_id, result) 73 74 def retrieve(self, context_id: ContextId) -> ReturnValue: 75 with self.__lock: 76 return self.__cache.retrieve(context_id)
12class CacheBase(ABC, Generic[ContextId, ReturnValue]): 13 """ 14 The base class for Cache. 15 16 To implement a concrete Cache class, you need to implement the following abstract methods: 17 - `save(self, context_id: ContextId, result: ReturnValue) -> None` 18 - `retrieve(self, context: Context) -> ReturnValue` 19 """ 20 21 @abstractmethod 22 def save(self, context_id: ContextId, result: ReturnValue) -> None: 23 """ 24 Save the result with the given context id. 25 26 Args: 27 context_id: identifier of the function call context 28 result: return value of the function call 29 """ 30 pass 31 32 @abstractmethod 33 def retrieve(self, context_id: ContextId) -> ReturnValue: 34 """ 35 Retrieve the function return value with the given context id. 36 If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist 37 38 Args: 39 context_id: identifier of the function call context 40 41 Returns: 42 The return value of the function that corresponds to this context id 43 """ 44 pass 45 46 def synchronize_with(self, lock) -> SynchronizedCache: 47 """ 48 Args: 49 lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock() 50 51 Returns: 52 A process-safe version of this cache using the given lock. 53 """ 54 return SynchronizedCache(self, lock)
The base class for Cache.
To implement a concrete Cache class, you need to implement the following abstract methods:
save(self, context_id: ContextId, result: ReturnValue) -> Noneretrieve(self, context: Context) -> ReturnValue
21 @abstractmethod 22 def save(self, context_id: ContextId, result: ReturnValue) -> None: 23 """ 24 Save the result with the given context id. 25 26 Args: 27 context_id: identifier of the function call context 28 result: return value of the function call 29 """ 30 pass
Save the result with the given context id.
Args
- context_id: identifier of the function call context
- result: return value of the function call
32 @abstractmethod 33 def retrieve(self, context_id: ContextId) -> ReturnValue: 34 """ 35 Retrieve the function return value with the given context id. 36 If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist 37 38 Args: 39 context_id: identifier of the function call context 40 41 Returns: 42 The return value of the function that corresponds to this context id 43 """ 44 pass
Retrieve the function return value with the given context id. If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist
Args
- context_id: identifier of the function call context
Returns
The return value of the function that corresponds to this context id
46 def synchronize_with(self, lock) -> SynchronizedCache: 47 """ 48 Args: 49 lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock() 50 51 Returns: 52 A process-safe version of this cache using the given lock. 53 """ 54 return SynchronizedCache(self, lock)
Args
- lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock()
Returns
A process-safe version of this cache using the given lock.
60class SynchronizedCache(CacheBase, Generic[CacheSubclass]): 61 """ 62 Base class for synchronized cache. In the constructor of a concrete subclass, 63 a `self.lock` attribute should be assigned as a thread/process lock, 64 or any analog of such synchronization locks. 65 """ 66 67 def __init__(self, cache, lock) -> None: 68 self.__cache = cache 69 self.__lock = lock 70 71 def save(self, context_id: ContextId, result: ReturnValue) -> None: 72 with self.__lock: 73 self.__cache.save(context_id, result) 74 75 def retrieve(self, context_id: ContextId) -> ReturnValue: 76 with self.__lock: 77 return self.__cache.retrieve(context_id)
Base class for synchronized cache. In the constructor of a concrete subclass,
a self.lock attribute should be assigned as a thread/process lock,
or any analog of such synchronization locks.
71 def save(self, context_id: ContextId, result: ReturnValue) -> None: 72 with self.__lock: 73 self.__cache.save(context_id, result)
Save the result with the given context id.
Args
- context_id: identifier of the function call context
- result: return value of the function call
75 def retrieve(self, context_id: ContextId) -> ReturnValue: 76 with self.__lock: 77 return self.__cache.retrieve(context_id)
Retrieve the function return value with the given context id. If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist
Args
- context_id: identifier of the function call context
Returns
The return value of the function that corresponds to this context id