Skip to content

proxystore.store.lifetimes

Lifetime managers for objects in shared stores.

Learn more about managing object lifetimes in the Object Lifetimes guide.

Lifetime

Bases: Protocol

Lifetime protocol.

Attributes:

  • store (Store[Any]) –

    Store instance use to create the objects associated with this lifetime and that will be used to evict them when the lifetime has ended.

add_key()

add_key(*keys: ConnectorKeyT) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

Source code in proxystore/store/lifetimes.py
def add_key(self, *keys: ConnectorKeyT) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.
    """
    ...

add_proxy()

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
    """
    ...

close()

close() -> None

End the lifetime and evict all associated objects.

Source code in proxystore/store/lifetimes.py
def close(self) -> None:
    """End the lifetime and evict all associated objects."""
    ...

done()

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    ...

ContextLifetime

ContextLifetime(
    store: Store[Any], *, name: str | None = None
)

Basic lifetime manager.

Object lifetime manager with context manager support.

Example
from proxystore.store.base import Store
from proxystore.store.lifetimes import ContextLifetime

store = Store(...)

with ContextLifetime(store) as lifetime:
    # Objects in the store can be associated with this lifetime.
    key = store.put('value', lifetime=lifetime)
    proxy = store.proxy('value', lifetime=lifetime)

# Objects associated with the lifetime are evicted once the
# lifetime ends.
assert not store.exists(key)

store.close()

Parameters:

  • store (Store[Any]) –

    Store instance use to create the objects associated with this lifetime and that will be used to evict them when the lifetime has ended.

  • name (str | None, default: None ) –

    Specify a name for this lifetime used in logging. Otherwise, a unique ID will be generated.

Source code in proxystore/store/lifetimes.py
def __init__(
    self,
    store: Store[Any],
    *,
    name: str | None = None,
) -> None:
    self.store = store
    self.name = name if name is not None else str(uuid.uuid4())
    self._done = False
    self._keys: set[ConnectorKeyT] = set()

    logger.info(f'Initialized lifetime manager (name={self.name})')

add_key()

add_key(*keys: ConnectorKeyT) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_key(self, *keys: ConnectorKeyT) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.

    Raises:
        RuntimeError: If this lifetime has ended.
    """
    self._keys.update(keys)
    logger.debug(
        f'Added keys to lifetime manager (name={self.name}): '
        f'{", ".join(repr(key) for key in keys)}',
    )

add_proxy()

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
        RuntimeError: If this lifetime has ended.
    """
    keys: list[ConnectorKeyT] = []
    for proxy in proxies:
        factory = proxy.__factory__
        if isinstance(factory, StoreFactory):
            keys.append(factory.key)
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{type(StoreFactory).__name__}. {type(factory).__name__} '
                'is not supported.',
            )
    self.add_key(*keys)

close()

close() -> None

End the lifetime and evict all associated objects.

Source code in proxystore/store/lifetimes.py
def close(self) -> None:
    """End the lifetime and evict all associated objects."""
    if self.done():
        return

    for key in self._keys:
        self.store.evict(key)
    self._done = True
    logger.info(
        f'Closed lifetime manager and evicted {len(self._keys)} '
        f'associated objects (name={self.name})',
    )
    self._keys.clear()

done()

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    return self._done

LeaseLifetime

LeaseLifetime(
    store: Store[Any],
    expiry: datetime | timedelta | float,
    *,
    name: str | None = None
)

Bases: ContextLifetime

Time-based lease lifetime manager.

Example
from proxystore.store.base import Store
from proxystore.store.lifetimes import LeaseLifetime

with Store(...) as store:
    # Create a new lifetime with a current lease of ten seconds.
    lifetime = LeaseLifetime(store, expiry=10)

    # Objects in the store can be associated with this lifetime.
    key = store.put('value', lifetime=lifetime)
    proxy = store.proxy('value', lifetime=lifetime)

    # Extend the lease by another five seconds.
    lifetime.extend(5)

    time.sleep(15)

    # Lease has expired so the lifetime has ended.
    assert lifetime.done()
    assert not store.exists(key)

Parameters:

  • store (Store[Any]) –

    Store instance use to create the objects associated with this lifetime and that will be used to evict them when the lifetime has ended.

  • expiry (datetime | timedelta | float) –

    Initial expiry time of the lease. Can either be a datetime, timedelta, or float value specifying the number of seconds before expiring.

  • name (str | None, default: None ) –

    Specify a name for this lifetime used in logging. Otherwise, a unique ID will be generated.

Source code in proxystore/store/lifetimes.py
def __init__(
    self,
    store: Store[Any],
    expiry: datetime | timedelta | float,
    *,
    name: str | None = None,
) -> None:
    if isinstance(expiry, datetime):
        self._expiry = expiry.timestamp()
    elif isinstance(expiry, timedelta):
        self._expiry = time.time() + expiry.total_seconds()
    elif isinstance(expiry, (int, float)):
        self._expiry = time.time() + expiry
    else:
        raise AssertionError('Unreachable.')

    super().__init__(store, name=name)

    self._timer: threading.Timer | None = None
    self._start_timer()

add_key()

add_key(*keys: ConnectorKeyT) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_key(self, *keys: ConnectorKeyT) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.

    Raises:
        RuntimeError: If this lifetime has ended.
    """
    self._keys.update(keys)
    logger.debug(
        f'Added keys to lifetime manager (name={self.name}): '
        f'{", ".join(repr(key) for key in keys)}',
    )

add_proxy()

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
        RuntimeError: If this lifetime has ended.
    """
    keys: list[ConnectorKeyT] = []
    for proxy in proxies:
        factory = proxy.__factory__
        if isinstance(factory, StoreFactory):
            keys.append(factory.key)
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{type(StoreFactory).__name__}. {type(factory).__name__} '
                'is not supported.',
            )
    self.add_key(*keys)

done()

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    return self._done

close()

close() -> None

End the lifetime and evict all associated objects.

This can be called before the specified expiry time to end the lifetime early.

Source code in proxystore/store/lifetimes.py
def close(self) -> None:
    """End the lifetime and evict all associated objects.

    This can be called before the specified expiry time to end the
    lifetime early.
    """
    if self._timer is not None:
        self._timer.cancel()
        self._timer = None

    super().close()

extend()

extend(expiry: datetime | timedelta | float) -> None

Extend the expiry of the lifetime lease.

Parameters:

  • expiry (datetime | timedelta | float) –

    Extends the current expiry if the value is timedelta or float value specifying seconds. If a datetime, updates the expiry to the specified timestamp even if the new time is less than the current expiry.

Source code in proxystore/store/lifetimes.py
@_error_if_done
def extend(self, expiry: datetime | timedelta | float) -> None:
    """Extend the expiry of the lifetime lease.

    Args:
        expiry: Extends the current expiry if the value is
            [`timedelta`][datetime.timedelta] or float value specifying
            seconds. If a [`datetime`][datetime.datetime], updates the
            expiry to the specified timestamp even if the new time
            is less than the current expiry.
    """
    if isinstance(expiry, datetime):
        self._expiry = expiry.timestamp()
    elif isinstance(expiry, timedelta):
        self._expiry += expiry.total_seconds()
    elif isinstance(expiry, (int, float)):
        self._expiry += expiry
    else:
        raise AssertionError('Unreachable.')

register_lifetime_atexit()

register_lifetime_atexit(
    lifetime: Lifetime, close_store: bool = True
) -> Callable[[], None]

Register atexit callback to cleanup the lifetime.

Registers an atexit callback which will close the lifetime on normal program exit and optionally close the associated store as well.

Tip

Do not close the Store associated with the lifetime when registering an atexit callback. Using a Store after closing it is undefined behaviour. Rather, let the callback handle closing after it is safe to do so.

Warning

Callbacks are not guaranteed to be called in all cases. See the atexit docs for more details.

Parameters:

  • lifetime (Lifetime) –

    Lifetime to be closed at exit.

  • close_store (bool, default: True ) –

    Close the Store after the lifetime.

Returns:

Source code in proxystore/store/lifetimes.py
def register_lifetime_atexit(
    lifetime: Lifetime,
    close_store: bool = True,
) -> Callable[[], None]:
    """Register atexit callback to cleanup the lifetime.

    Registers an atexit callback which will close the lifetime on normal
    program exit and optionally close the associated store as well.

    Tip:
        Do not close the [`Store`][proxystore.store.base.Store] associated
        with the lifetime when registering an atexit callback. Using a
        [`Store`][proxystore.store.base.Store] after closing it is undefined
        behaviour. Rather, let the callback handle closing after it is
        safe to do so.

    Warning:
        Callbacks are not guaranteed to be called in all cases. See the
        [`atexit`][atexit] docs for more details.

    Args:
        lifetime: Lifetime to be closed at exit.
        close_store: Close the [`Store`][proxystore.store.base.Store] after
            the lifetime.

    Returns:
        The registered callback function which can be used with \
        [`atexit.unregister()`][atexit.unregister] if needed.
    """

    def _lifetime_atexit_callback() -> None:
        lifetime.close()
        if close_store:
            lifetime.store.close()

    atexit.register(_lifetime_atexit_callback)
    logger.debug(
        'Registered atexit callback for lifetime '
        f'(lifetime: {lifetime.name}, store: {lifetime.store.name})',
    )
    return _lifetime_atexit_callback