EndpointStorage(
max_size: int | None = None,
max_object_size: int | None = None,
dump_dir: str | None = None,
) -> None
Bases: MutableMapping[str, bytes]
Endpoint in-memory blob storage with filesystem fallback.
Provides a dict-like storage of key-bytes pairs. Optionally, a maximum
in-memory size for the data structure can be specified and least-recently
used key-bytes pairs will be dumped to a file in a specified directory.
Parameters:
-
max_size
(
int | None
)
– Optional maximum size in bytes for in-memory
storage of blobs. If the memory limit is exceeded, least
recently used blobs will be dumped to disk (if configured).
-
max_object_size
(
int | None
)
– Optional maximum size in bytes for any single blob.
-
dump_dir
(
str | None
)
– Optional directory to dump blobs to when max_object_size
is reached.
Source code in proxystore/endpoint/storage.py
| def __init__(
self,
max_size: int | None = None,
max_object_size: int | None = None,
dump_dir: str | None = None,
) -> None:
if (max_size is not None or dump_dir is not None) and (
max_size is None or dump_dir is None
):
raise ValueError(
'Either both of max_size and dump_dir should be specified '
'or neither.',
)
self.max_size = max_size
self.max_object_size = max_object_size
self.dump_dir = dump_dir
if self.dump_dir is not None:
os.makedirs(self.dump_dir, exist_ok=True)
self._in_memory_size = 0
self._blobs: dict[str, Blob] = {}
# Only in-memory objects should be in this.
# Recently used keys are appended to right side, LRU keys are
# popped from left side.
self._lru_queue: Deque[str] = collections.deque()
|
__getitem__
__getitem__(key: str) -> bytes
Get bytes associated with key.
Source code in proxystore/endpoint/storage.py
| def __getitem__(self, key: str) -> bytes:
"""Get bytes associated with key."""
if key not in self._blobs:
raise KeyError(key)
blob = self._blobs[key]
if blob.location == BlobLocation.MEMORY:
# Move to right side because recently used
self._lru_queue.remove(key)
self._lru_queue.append(key)
return blob.value
self._make_space(blob.size)
self._in_memory_size += blob.size
blob.load()
# Add to queue because it is back in memory
self._lru_queue.append(key)
return blob.value
|
__setitem__
__setitem__(key: str, value: bytes) -> None
Set key to value.
Raises:
Source code in proxystore/endpoint/storage.py
| def __setitem__(self, key: str, value: bytes) -> None:
"""Set key to value.
Raises:
ValueError: If `value` is larger than `max_size`.
"""
if (
self.max_object_size is not None
and len(value) > self.max_object_size
):
raise ObjectSizeExceededError(
f'Bytes value has size {bytes_to_readable(len(value))} which '
f'exceeds the {bytes_to_readable(self.max_object_size)} '
'object limit.',
)
if self.max_size is not None and len(value) > self.max_size:
raise ObjectSizeExceededError(
f'Bytes value has size {bytes_to_readable(len(value))} which '
f'exceeds the {bytes_to_readable(self.max_size)} '
'memory limit.',
)
filepath = (
None if self.dump_dir is None else os.path.join(self.dump_dir, key)
)
blob = Blob(key, value, filepath)
self._make_space(blob.size)
self._blobs[key] = blob
self._in_memory_size += blob.size
self._lru_queue.append(key)
|
__delitem__
__delitem__(key: str) -> None
Remove a key from the storage.
Source code in proxystore/endpoint/storage.py
| def __delitem__(self, key: str) -> None:
"""Remove a key from the storage."""
if key not in self._blobs:
raise KeyError(key)
blob = self._blobs.pop(key)
assert blob is not None
if blob.location == BlobLocation.MEMORY:
self._in_memory_size -= blob.size
self._lru_queue.remove(key)
blob.delete_file()
|
__iter__
__iter__() -> Iterator[str]
Iterate over keys in the storage.
Source code in proxystore/endpoint/storage.py
| def __iter__(self) -> Iterator[str]:
"""Iterate over keys in the storage."""
yield from self._blobs
|
__len__
Return number of keys in the storage.
Source code in proxystore/endpoint/storage.py
| def __len__(self) -> int:
"""Return number of keys in the storage."""
return len(self._blobs)
|
__contains__
__contains__(key: object) -> bool
Check if storage contains a key.
Source code in proxystore/endpoint/storage.py
| def __contains__(self, key: object) -> bool:
"""Check if storage contains a key."""
return key in self._blobs
|
clear
Clear all keys in the storage.
Source code in proxystore/endpoint/storage.py
| def clear(self) -> None:
"""Clear all keys in the storage."""
keys = list(self._blobs.keys())
for key in keys:
del self._blobs[key]
self._lru_queue.clear()
|
cleanup
Clear all keys in the storage and remove the data dump.
Source code in proxystore/endpoint/storage.py
| def cleanup(self) -> None:
"""Clear all keys in the storage and remove the data dump."""
if self.dump_dir is not None:
shutil.rmtree(self.dump_dir)
self._blobs.clear()
|