Skip to content

proxystore.endpoint.storage

Blob storage interface for endpoints.

Storage

Bases: Protocol

Endpoint storage protocol for blobs.

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    ...

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    ...

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    ...

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.
    """
    ...

close() async

close() -> None

Close the storage.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Close the storage."""
    ...

DictStorage

DictStorage() -> None

Simple dictionary-based storage for blobs.

Source code in proxystore/endpoint/storage.py
def __init__(self) -> None:
    self._data: dict[str, bytes] = {}

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    self._data.pop(key, None)

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    return key in self._data

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    return self._data.get(key, default)

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.
    """
    self._data[key] = blob

close() async

close() -> None

Clear all stored blobs.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Clear all stored blobs."""
    self._data.clear()

SQLiteStorage

SQLiteStorage(
    database_path: str | pathlib.Path = ":memory:",
) -> None

SQLite storage protocol for blobs.

Source code in proxystore/endpoint/storage.py
def __init__(self, database_path: str | pathlib.Path = ':memory:') -> None:
    if database_path == ':memory:':
        self.database_path = database_path
    else:
        path = pathlib.Path(database_path).expanduser().resolve()
        self.database_path = str(path)

    self._db: aiosqlite.Connection | None = None

db() async

db() -> aiosqlite.Connection

Get the database connection object.

Source code in proxystore/endpoint/storage.py
async def db(self) -> aiosqlite.Connection:
    """Get the database connection object."""
    if self._db is None:
        self._db = await aiosqlite.connect(self.database_path)
        await self._db.execute(
            'CREATE TABLE IF NOT EXISTS blobs'
            '(key TEXT PRIMARY KEY, value BLOB NOT NULL)',
        )
    return self._db

evict() async

evict(key: str) -> None

Evict a blob from storage.

Parameters:

  • key (str) –

    Key associated with blob to evict.

Source code in proxystore/endpoint/storage.py
async def evict(self, key: str) -> None:
    """Evict a blob from storage.

    Args:
        key: Key associated with blob to evict.
    """
    db = await self.db()
    await db.execute('DELETE FROM blobs WHERE key=?', (key,))
    await db.commit()

exists() async

exists(key: str) -> bool

Check if a blob exists in the storage.

Parameters:

  • key (str) –

    Key associated with the blob to check.

Returns:

  • bool

    If a blob associated with the key exists.

Source code in proxystore/endpoint/storage.py
async def exists(self, key: str) -> bool:
    """Check if a blob exists in the storage.

    Args:
        key: Key associated with the blob to check.

    Returns:
        If a blob associated with the key exists.
    """
    db = await self.db()
    async with db.execute(
        'SELECT count(*) FROM blobs WHERE key=?',
        (key,),
    ) as cursor:
        (count,) = await cursor.fetchone()
        return bool(count)

get() async

get(key: str, default: bytes | None = None) -> bytes | None

Get a blob from storage.

Parameters:

  • key (str) –

    Key associated with the blob to get.

  • default (bytes | None) –

    Default return value if the blob does not exist.

Returns:

  • bytes | None

    The blob associated with the key or the value of default.

Source code in proxystore/endpoint/storage.py
async def get(
    self,
    key: str,
    default: bytes | None = None,
) -> bytes | None:
    """Get a blob from storage.

    Args:
        key: Key associated with the blob to get.
        default: Default return value if the blob does not exist.

    Returns:
        The blob associated with the key or the value of `default`.
    """
    db = await self.db()
    async with db.execute(
        'SELECT value FROM blobs WHERE key=?',
        (key,),
    ) as cursor:
        result = await cursor.fetchone()
        if result is None:
            return default
        else:
            return result[0]

set() async

set(key: str, blob: bytes) -> None

Store the blob associated with a key.

Parameters:

  • key (str) –

    Key that will be used to retrieve the blob.

  • blob (bytes) –

    Blob to store.

Source code in proxystore/endpoint/storage.py
async def set(self, key: str, blob: bytes) -> None:
    """Store the blob associated with a key.

    Args:
        key: Key that will be used to retrieve the blob.
        blob: Blob to store.
    """
    db = await self.db()
    await db.execute(
        'INSERT OR REPLACE INTO blobs (key, value) VALUES (?, ?)',
        (key, blob),
    )
    await db.commit()

close() async

close() -> None

Close the storage.

Source code in proxystore/endpoint/storage.py
async def close(self) -> None:
    """Close the storage."""
    if self._db is not None:
        await self._db.close()