Skip to content

proxystore.endpoint.storage

Blob storage interface for endpoints.

Storage

Bases: Protocol

Endpoint storage protocol for blobs.

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    ...

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    ...

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None, default: None ) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    ...

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Raises:

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.

    Raises:
        ObjectSizeExceededError: If the max object size is configured and
            the data exceeds that size.
    """
    ...

close() async

close() -> None

Close the storage.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Close the storage."""
    ...

DictStorage

DictStorage(
    *, max_object_size: int | None = MAX_OBJECT_SIZE_DEFAULT
)

Simple dictionary-based storage for blobs.

Parameters:

  • max_object_size (int | None, default: MAX_OBJECT_SIZE_DEFAULT ) –

    Optional max size in bytes for any single object stored by the endpoint. If exceeded, an error is raised.

Source code in proxystore/endpoint/storage.py
def __init__(
    self,
    *,
    max_object_size: int | None = MAX_OBJECT_SIZE_DEFAULT,
) -> None:
    self._data: dict[str, bytes] = {}
    self._max_object_size = max_object_size

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    self._data.pop(key, None)

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    return key in self._data

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None, default: None ) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    return self._data.get(key, default)

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Raises:

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.

    Raises:
        ObjectSizeExceededError: If the max object size is configured and
            the data exceeds that size.
    """
    if (
        self._max_object_size is not None
        and len(blob) > self._max_object_size
    ):
        raise ObjectSizeExceededError(
            f'Bytes value has size {bytes_to_readable(len(blob))} which '
            f'exceeds the {bytes_to_readable(self._max_object_size)} '
            'object limit.',
        )
    self._data[key] = blob

close() async

close() -> None

Clear all stored blobs.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Clear all stored blobs."""
    self._data.clear()

SQLiteStorage

SQLiteStorage(
    database_path: str | Path = ":memory:",
    *,
    max_object_size: int | None = MAX_OBJECT_SIZE_DEFAULT
)

SQLite storage protocol for blobs.

Parameters:

  • database_path (str | Path, default: ':memory:' ) –

    Path to database file.

  • max_object_size (int | None, default: MAX_OBJECT_SIZE_DEFAULT ) –

    Optional max size in bytes for any single object stored by the endpoint. If exceeded, an error is raised.

Source code in proxystore/endpoint/storage.py
def __init__(
    self,
    database_path: str | pathlib.Path = ':memory:',
    *,
    max_object_size: int | None = MAX_OBJECT_SIZE_DEFAULT,
) -> None:
    if database_path == ':memory:':
        self.database_path = database_path
    else:
        path = pathlib.Path(database_path).expanduser().resolve()
        self.database_path = str(path)

    self._max_object_size = max_object_size
    self._db: aiosqlite.Connection | None = None

db() async

db() -> Connection

Get the database connection object.

Source code in proxystore/endpoint/storage.py
async def db(self) -> aiosqlite.Connection:
    """Get the database connection object."""
    if self._db is None:
        self._db = await aiosqlite.connect(self.database_path)
        await self._db.execute(
            'CREATE TABLE IF NOT EXISTS blobs'
            '(key TEXT PRIMARY KEY, value BLOB NOT NULL)',
        )
    return self._db

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    db = await self.db()
    await db.execute('DELETE FROM blobs WHERE key=?', (key,))
    await db.commit()

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    db = await self.db()
    async with db.execute(
        'SELECT count(*) FROM blobs WHERE key=?',
        (key,),
    ) as cursor:
        result = await cursor.fetchone()
        # count() won't ever return 0 rows but mypy doesn't know this
        assert result is not None
        (count,) = result
        return bool(count)

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None, default: None ) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    db = await self.db()
    async with db.execute(
        'SELECT value FROM blobs WHERE key=?',
        (key,),
    ) as cursor:
        result = await cursor.fetchone()
        if result is None:
            return default
        else:
            return result[0]

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Raises:

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.

    Raises:
        ObjectSizeExceededError: If the max object size is configured and
            the data exceeds that size.
    """
    if (
        self._max_object_size is not None
        and len(blob) > self._max_object_size
    ):
        raise ObjectSizeExceededError(
            f'Bytes value has size {bytes_to_readable(len(blob))} which '
            f'exceeds the {bytes_to_readable(self._max_object_size)} '
            'object limit.',
        )
    db = await self.db()
    await db.execute(
        'INSERT OR REPLACE INTO blobs (key, value) VALUES (?, ?)',
        (key, blob),
    )
    await db.commit()

close() async

close() -> None

Close the storage.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Close the storage."""
    if self._db is not None:
        await self._db.close()