checkpointing.cache.base

 1from __future__ import annotations
 2from abc import ABC, abstractmethod
 3from typing import Generic, TypeVar, Union
 4from checkpointing._typing import ContextId, ReturnValue
 5from checkpointing.exceptions import CheckpointNotExist
 6
 7from threading import Lock as ThreadLock
 8from multiprocessing import Lock as ProcessLock
 9
10
11class CacheBase(ABC, Generic[ContextId, ReturnValue]):
12    """
13    The base class for Cache.
14
15    To implement a concrete Cache class, you need to implement the following abstract methods:
16    - `save(self, context_id: ContextId, result: ReturnValue) -> None`
17    - `retrieve(self, context: Context) -> ReturnValue`
18    """
19
20    @abstractmethod
21    def save(self, context_id: ContextId, result: ReturnValue) -> None:
22        """
23        Save the result with the given context id.
24
25        Args:
26            context_id: identifier of the function call context
27            result: return value of the function call
28        """
29        pass
30
31    @abstractmethod
32    def retrieve(self, context_id: ContextId) -> ReturnValue:
33        """
34        Retrieve the function return value with the given context id.
35        If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist
36
37        Args:
38            context_id: identifier of the function call context
39
40        Returns:
41            The return value of the function that corresponds to this context id
42        """
43        pass
44
45    def synchronize_with(self, lock) -> SynchronizedCache:
46        """
47        Args:
48            lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock()
49
50        Returns:
51            A process-safe version of this cache using the given lock.
52        """
53        return SynchronizedCache(self, lock)
54
55
56CacheSubclass = TypeVar("CacheSubclass", bound=CacheBase)
57
58
59class SynchronizedCache(CacheBase, Generic[CacheSubclass]):
60    """
61    Base class for synchronized cache. In the constructor of a concrete subclass,
62    a `self.lock` attribute should be assigned as a thread/process lock, 
63    or any analog of such synchronization locks.
64    """
65
66    def __init__(self, cache, lock) -> None:
67        self.__cache = cache
68        self.__lock = lock
69
70    def save(self, context_id: ContextId, result: ReturnValue) -> None:
71        with self.__lock:
72            self.__cache.save(context_id, result)
73
74    def retrieve(self, context_id: ContextId) -> ReturnValue:
75        with self.__lock:
76            return self.__cache.retrieve(context_id)
class CacheBase(abc.ABC, typing.Generic[~ContextId, ~ReturnValue]):
12class CacheBase(ABC, Generic[ContextId, ReturnValue]):
13    """
14    The base class for Cache.
15
16    To implement a concrete Cache class, you need to implement the following abstract methods:
17    - `save(self, context_id: ContextId, result: ReturnValue) -> None`
18    - `retrieve(self, context: Context) -> ReturnValue`
19    """
20
21    @abstractmethod
22    def save(self, context_id: ContextId, result: ReturnValue) -> None:
23        """
24        Save the result with the given context id.
25
26        Args:
27            context_id: identifier of the function call context
28            result: return value of the function call
29        """
30        pass
31
32    @abstractmethod
33    def retrieve(self, context_id: ContextId) -> ReturnValue:
34        """
35        Retrieve the function return value with the given context id.
36        If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist
37
38        Args:
39            context_id: identifier of the function call context
40
41        Returns:
42            The return value of the function that corresponds to this context id
43        """
44        pass
45
46    def synchronize_with(self, lock) -> SynchronizedCache:
47        """
48        Args:
49            lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock()
50
51        Returns:
52            A process-safe version of this cache using the given lock.
53        """
54        return SynchronizedCache(self, lock)

The base class for Cache.

To implement a concrete Cache class, you need to implement the following abstract methods:

  • save(self, context_id: ContextId, result: ReturnValue) -> None
  • retrieve(self, context: Context) -> ReturnValue
@abstractmethod
def save(self, context_id: ~ContextId, result: ~ReturnValue) -> None:
21    @abstractmethod
22    def save(self, context_id: ContextId, result: ReturnValue) -> None:
23        """
24        Save the result with the given context id.
25
26        Args:
27            context_id: identifier of the function call context
28            result: return value of the function call
29        """
30        pass

Save the result with the given context id.

Args
  • context_id: identifier of the function call context
  • result: return value of the function call
@abstractmethod
def retrieve(self, context_id: ~ContextId) -> ~ReturnValue:
32    @abstractmethod
33    def retrieve(self, context_id: ContextId) -> ReturnValue:
34        """
35        Retrieve the function return value with the given context id.
36        If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist
37
38        Args:
39            context_id: identifier of the function call context
40
41        Returns:
42            The return value of the function that corresponds to this context id
43        """
44        pass

Retrieve the function return value with the given context id. If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist

Args
  • context_id: identifier of the function call context
Returns

The return value of the function that corresponds to this context id

def synchronize_with(self, lock) -> checkpointing.cache.base.SynchronizedCache:
46    def synchronize_with(self, lock) -> SynchronizedCache:
47        """
48        Args:
49            lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock()
50
51        Returns:
52            A process-safe version of this cache using the given lock.
53        """
54        return SynchronizedCache(self, lock)
Args
  • lock: the Lock object that supports the similar interface as threading/multiprocessing.Lock()
Returns

A process-safe version of this cache using the given lock.

class SynchronizedCache(CacheBase, typing.Generic[~CacheSubclass]):
60class SynchronizedCache(CacheBase, Generic[CacheSubclass]):
61    """
62    Base class for synchronized cache. In the constructor of a concrete subclass,
63    a `self.lock` attribute should be assigned as a thread/process lock, 
64    or any analog of such synchronization locks.
65    """
66
67    def __init__(self, cache, lock) -> None:
68        self.__cache = cache
69        self.__lock = lock
70
71    def save(self, context_id: ContextId, result: ReturnValue) -> None:
72        with self.__lock:
73            self.__cache.save(context_id, result)
74
75    def retrieve(self, context_id: ContextId) -> ReturnValue:
76        with self.__lock:
77            return self.__cache.retrieve(context_id)

Base class for synchronized cache. In the constructor of a concrete subclass, a self.lock attribute should be assigned as a thread/process lock, or any analog of such synchronization locks.

SynchronizedCache(cache, lock)
67    def __init__(self, cache, lock) -> None:
68        self.__cache = cache
69        self.__lock = lock
def save(self, context_id: ~ContextId, result: ~ReturnValue) -> None:
71    def save(self, context_id: ContextId, result: ReturnValue) -> None:
72        with self.__lock:
73            self.__cache.save(context_id, result)

Save the result with the given context id.

Args
  • context_id: identifier of the function call context
  • result: return value of the function call
def retrieve(self, context_id: ~ContextId) -> ~ReturnValue:
75    def retrieve(self, context_id: ContextId) -> ReturnValue:
76        with self.__lock:
77            return self.__cache.retrieve(context_id)

Retrieve the function return value with the given context id. If there is no cached results for the context_id, throws a checkpointing.exceptions.CheckpointNotExist

Args
  • context_id: identifier of the function call context
Returns

The return value of the function that corresponds to this context id

Inherited Members
CacheBase
synchronize_with