Skip to content

proxystore.store.lifetimes

Lifetime managers for objects in shared stores.

Learn more about managing object lifetimes in the Object Lifetimes guide.

Lifetime

Bases: Protocol

Lifetime protocol.

add_key

add_key(
    *keys: ConnectorKeyT, store: Store[Any] | None = None
) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

  • store (Store[Any] | None, default: None ) –

    Optional Store that keys belongs to.

Source code in proxystore/store/lifetimes.py
def add_key(
    self,
    *keys: ConnectorKeyT,
    store: Store[Any] | None = None,
) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.
        store: Optional [`Store`][proxystore.store.base.Store] that `keys`
            belongs to.
    """
    ...

add_proxy

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
    """
    ...

close

close(*, close_stores: bool = False) -> None

End the lifetime and evict all associated objects.

Parameters:

  • close_stores (bool, default: False ) –

    Close any Store store instances associated with the lifetime.

Source code in proxystore/store/lifetimes.py
def close(self, *, close_stores: bool = False) -> None:
    """End the lifetime and evict all associated objects.

    Args:
        close_stores: Close any [`Store`][proxystore.store.base.Store]
            store instances associated with the lifetime.
    """
    ...

done

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    ...

ContextLifetime

ContextLifetime(
    store: Store[Any], *, name: str | None = None
)

Basic lifetime manager.

Object lifetime manager with context manager support.

Example
from proxystore.store.base import Store
from proxystore.store.lifetimes import ContextLifetime

store = Store(...)

with ContextLifetime(store) as lifetime:
    # Objects in the store can be associated with this lifetime.
    key = store.put('value', lifetime=lifetime)
    proxy = store.proxy('value', lifetime=lifetime)

# Objects associated with the lifetime are evicted once the
# lifetime ends.
assert not store.exists(key)

store.close()

Parameters:

  • store (Store[Any]) –

    Store instance use to create the objects associated with this lifetime and that will be used to evict them when the lifetime has ended.

  • name (str | None, default: None ) –

    Specify a name for this lifetime used in logging. Otherwise, a unique ID will be generated.

Source code in proxystore/store/lifetimes.py
def __init__(
    self,
    store: Store[Any],
    *,
    name: str | None = None,
) -> None:
    self.store = store
    self.name = name if name is not None else str(uuid.uuid4())
    self._done = False
    self._keys: set[ConnectorKeyT] = set()

    logger.info(f'Initialized lifetime manager (name={self.name})')

add_key

add_key(
    *keys: ConnectorKeyT, store: Store[Any] | None = None
) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

  • store (Store[Any] | None, default: None ) –

    Optional Store that keys belongs to. Ignored by this implementation.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_key(
    self,
    *keys: ConnectorKeyT,
    store: Store[Any] | None = None,
) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.
        store: Optional [`Store`][proxystore.store.base.Store] that `keys`
            belongs to. Ignored by this implementation.

    Raises:
        RuntimeError: If this lifetime has ended.
    """
    self._keys.update(keys)
    logger.debug(
        f'Added keys to lifetime manager (name={self.name}): '
        f'{", ".join(repr(key) for key in keys)}',
    )

add_proxy

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
        RuntimeError: If this lifetime has ended.
    """
    keys: list[ConnectorKeyT] = []
    for proxy in proxies:
        factory = get_factory(proxy)
        if isinstance(factory, StoreFactory):
            keys.append(factory.key)
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{StoreFactory.__name__}. {type(factory).__name__} '
                'is not supported.',
            )
    self.add_key(*keys)

close

close(*, close_stores: bool = False) -> None

End the lifetime and evict all associated objects.

Parameters:

  • close_stores (bool, default: False ) –

    Close any Store store instances associated with the lifetime.

Source code in proxystore/store/lifetimes.py
def close(self, *, close_stores: bool = False) -> None:
    """End the lifetime and evict all associated objects.

    Args:
        close_stores: Close any [`Store`][proxystore.store.base.Store]
            store instances associated with the lifetime.
    """
    if self.done():
        return

    for key in self._keys:
        self.store.evict(key)
    self._done = True
    logger.info(
        f'Closed lifetime manager and evicted {len(self._keys)} '
        f'associated objects (name={self.name})',
    )
    self._keys.clear()

    if close_stores:
        self.store.close()

done

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    return self._done

LeaseLifetime

LeaseLifetime(
    store: Store[Any],
    expiry: datetime | timedelta | float,
    *,
    name: str | None = None
)

Bases: ContextLifetime

Time-based lease lifetime manager.

Example
from proxystore.store.base import Store
from proxystore.store.lifetimes import LeaseLifetime

with Store(...) as store:
    # Create a new lifetime with a current lease of ten seconds.
    lifetime = LeaseLifetime(store, expiry=10)

    # Objects in the store can be associated with this lifetime.
    key = store.put('value', lifetime=lifetime)
    proxy = store.proxy('value', lifetime=lifetime)

    # Extend the lease by another five seconds.
    lifetime.extend(5)

    time.sleep(15)

    # Lease has expired so the lifetime has ended.
    assert lifetime.done()
    assert not store.exists(key)

Parameters:

  • store (Store[Any]) –

    Store instance use to create the objects associated with this lifetime and that will be used to evict them when the lifetime has ended.

  • expiry (datetime | timedelta | float) –

    Initial expiry time of the lease. Can either be a datetime, timedelta, or float value specifying the number of seconds before expiring.

  • name (str | None, default: None ) –

    Specify a name for this lifetime used in logging. Otherwise, a unique ID will be generated.

Source code in proxystore/store/lifetimes.py
def __init__(
    self,
    store: Store[Any],
    expiry: datetime | timedelta | float,
    *,
    name: str | None = None,
) -> None:
    if isinstance(expiry, datetime):
        self._expiry = expiry.timestamp()
    elif isinstance(expiry, timedelta):
        self._expiry = time.time() + expiry.total_seconds()
    elif isinstance(expiry, (int, float)):
        self._expiry = time.time() + expiry
    else:
        raise AssertionError('Unreachable.')

    super().__init__(store, name=name)

    self._timer: threading.Timer | None = None
    self._start_timer()

add_key

add_key(
    *keys: ConnectorKeyT, store: Store[Any] | None = None
) -> None

Associate a new object with the lifetime.

Warning

All keys should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

  • store (Store[Any] | None, default: None ) –

    Optional Store that keys belongs to. Ignored by this implementation.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_key(
    self,
    *keys: ConnectorKeyT,
    store: Store[Any] | None = None,
) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All keys should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        keys: One or more keys of objects to associate with this lifetime.
        store: Optional [`Store`][proxystore.store.base.Store] that `keys`
            belongs to. Ignored by this implementation.

    Raises:
        RuntimeError: If this lifetime has ended.
    """
    self._keys.update(keys)
    logger.debug(
        f'Added keys to lifetime manager (name={self.name}): '
        f'{", ".join(repr(key) for key in keys)}',
    )

add_proxy

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

All proxies should have been created by the same Store that this lifetime was initialized with.

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        All proxies should have been created by the same
        [`Store`][proxystore.store.base.Store] that this lifetime was
        initialized with.

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
        RuntimeError: If this lifetime has ended.
    """
    keys: list[ConnectorKeyT] = []
    for proxy in proxies:
        factory = get_factory(proxy)
        if isinstance(factory, StoreFactory):
            keys.append(factory.key)
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{StoreFactory.__name__}. {type(factory).__name__} '
                'is not supported.',
            )
    self.add_key(*keys)

done

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    return self._done

close

close(*, close_stores: bool = False) -> None

End the lifetime and evict all associated objects.

This can be called before the specified expiry time to end the lifetime early.

Parameters:

  • close_stores (bool, default: False ) –

    Close any Store store instances associated with the lifetime.

Source code in proxystore/store/lifetimes.py
def close(self, *, close_stores: bool = False) -> None:
    """End the lifetime and evict all associated objects.

    This can be called before the specified expiry time to end the
    lifetime early.

    Args:
        close_stores: Close any [`Store`][proxystore.store.base.Store]
            store instances associated with the lifetime.
    """
    if self._timer is not None:
        self._timer.cancel()
        self._timer = None

    super().close(close_stores=close_stores)

extend

extend(expiry: datetime | timedelta | float) -> None

Extend the expiry of the lifetime lease.

Parameters:

  • expiry (datetime | timedelta | float) –

    Extends the current expiry if the value is timedelta or float value specifying seconds. If a datetime, updates the expiry to the specified timestamp even if the new time is less than the current expiry.

Source code in proxystore/store/lifetimes.py
@_error_if_done
def extend(self, expiry: datetime | timedelta | float) -> None:
    """Extend the expiry of the lifetime lease.

    Args:
        expiry: Extends the current expiry if the value is
            [`timedelta`][datetime.timedelta] or float value specifying
            seconds. If a [`datetime`][datetime.datetime], updates the
            expiry to the specified timestamp even if the new time
            is less than the current expiry.
    """
    if isinstance(expiry, datetime):
        self._expiry = expiry.timestamp()
    elif isinstance(expiry, timedelta):
        self._expiry += expiry.total_seconds()
    elif isinstance(expiry, (int, float)):
        self._expiry += expiry
    else:
        raise AssertionError('Unreachable.')

StaticLifetime

StaticLifetime()

Static lifetime manager.

Keeps associated objects alive for the remainder of the program.

Note

This is a singleton class.

Warning

This class registers an atexit handler which will close the lifetime at the end of the program, evicting all objects associated with the lifetime. Therefore, Store instances used to created objects associated with the static lifetime should not be closed prior to program exit. The handler will close all of these stores. It is possible to call StaticLifetime().close() manually, after which it is safe to also close the stores.

Example
Static Lifetime
1
2
3
4
5
6
7
8
from proxystore.connectors.local import LocalConnector
from proxystore.store import Store
from proxystore.store.lifetimes import StaticLifetime

store = Store('default', LocalConnector(), register=True)  # (1)!

key = store.put('value', lifetime=StaticLifetime())  # (2)!
proxy = store.proxy('value', lifetime=StaticLifetime())  # (3)!
  1. The atexit handler will call store.close() at the end of the program. Setting register=True is recommended to prevent another instance being created internally when a proxy is resolved.
  2. The object associated with key will be evicted at the end of the program.
  3. The object associated with proxy will be evicted at the end of the program.
Source code in proxystore/store/lifetimes.py
def __init__(self) -> None:
    if not self._initialized:
        self._done = False
        self._keys: dict[Store[Any], set[ConnectorKeyT]] = defaultdict(set)
        self._callback = register_lifetime_atexit(self, close_stores=True)
        self._initialized = True

add_key

add_key(
    *keys: ConnectorKeyT, store: Store[Any] | None = None
) -> None

Associate a new object with the lifetime.

Parameters:

  • keys (ConnectorKeyT, default: () ) –

    One or more keys of objects to associate with this lifetime.

  • store (Store[Any] | None, default: None ) –

    Store that keys belongs to. Required by this implementation.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_key(
    self,
    *keys: ConnectorKeyT,
    store: Store[Any] | None = None,
) -> None:
    """Associate a new object with the lifetime.

    Args:
        keys: One or more keys of objects to associate with this lifetime.
        store: [`Store`][proxystore.store.base.Store] that `keys`
            belongs to. Required by this implementation.

    Raises:
        RuntimeError: If this lifetime has ended.
        ValueError: If `store` is `None`.
    """
    if store is None:
        raise ValueError(
            f'The {self.__class__.__name__} requires the store parameter.',
        )
    self._keys[store].update(keys)
    logger.debug(
        f'Added keys to lifetime manager (name={self.name}): '
        f'{", ".join(repr(key) for key in keys)}',
    )

add_proxy

add_proxy(*proxies: Proxy[Any]) -> None

Associate a new object with the lifetime.

Warning

This method will initialized new Store instances if the stores which were used to create the input proxies have not been registered by setting the register flag or by calling register_store().

Parameters:

  • proxies (Proxy[Any], default: () ) –

    One or more proxies of objects to associate with this lifetime.

Raises:

Source code in proxystore/store/lifetimes.py
@_error_if_done
def add_proxy(self, *proxies: Proxy[Any]) -> None:
    """Associate a new object with the lifetime.

    Warning:
        This method will initialized new
        [`Store`][proxystore.store.base.Store] instances if the stores
        which were used to create the input proxies have not been
        registered by setting the `register` flag or by calling
        [`register_store()`][proxystore.store.register_store].

    Args:
        proxies: One or more proxies of objects to associate with this
            lifetime.

    Raises:
        ProxyStoreFactoryError: If the proxy's factory is not an instance
            of [`StoreFactory`][proxystore.store.base.StoreFactory].
        RuntimeError: If this lifetime has ended.
    """
    for proxy in proxies:
        factory = get_factory(proxy)
        if isinstance(factory, StoreFactory):
            self.add_key(factory.key, store=factory.get_store())
        else:
            raise ProxyStoreFactoryError(
                'The proxy must contain a factory with type '
                f'{StoreFactory.__name__}. {type(factory).__name__} '
                'is not supported.',
            )

close

close(*, close_stores: bool = False) -> None

End the lifetime and evict all associated objects.

Warning

Because this class is a singleton this operation can only be performed once.

Parameters:

  • close_stores (bool, default: False ) –

    Close any Store store instances associated with the lifetime.

Source code in proxystore/store/lifetimes.py
def close(self, *, close_stores: bool = False) -> None:
    """End the lifetime and evict all associated objects.

    Warning:
        Because this class is a singleton this operation can only
        be performed once.

    Args:
        close_stores: Close any [`Store`][proxystore.store.base.Store]
            store instances associated with the lifetime.
    """
    if self.done():
        return

    count = 0
    for store, keys in self._keys.items():
        for key in keys:
            store.evict(key)
            count += 1

        if close_stores:
            store.close()

    atexit.unregister(self._callback)

    self._done = True
    logger.info(
        f'Closed lifetime manager and evicted {count} '
        f'associated objects (name={self.name})',
    )
    self._keys.clear()

done

done() -> bool

Check if lifetime has ended.

Source code in proxystore/store/lifetimes.py
def done(self) -> bool:
    """Check if lifetime has ended."""
    return self._done

register_lifetime_atexit

register_lifetime_atexit(
    lifetime: Lifetime, close_stores: bool = True
) -> Callable[[], None]

Register atexit callback to cleanup the lifetime.

Registers an atexit callback which will close the lifetime on normal program exit and optionally close the associated store as well.

Tip

Do not close the Store associated with the lifetime when registering an atexit callback. Using a Store after closing it is undefined behaviour. Rather, let the callback handle closing after it is safe to do so.

Warning

Callbacks are not guaranteed to be called in all cases. See the atexit docs for more details.

Parameters:

  • lifetime (Lifetime) –

    Lifetime to be closed at exit.

  • close_stores (bool, default: True ) –

    Close any Store instances associated with the lifetime.

Returns:

Source code in proxystore/store/lifetimes.py
def register_lifetime_atexit(
    lifetime: Lifetime,
    close_stores: bool = True,
) -> Callable[[], None]:
    """Register atexit callback to cleanup the lifetime.

    Registers an atexit callback which will close the lifetime on normal
    program exit and optionally close the associated store as well.

    Tip:
        Do not close the [`Store`][proxystore.store.base.Store] associated
        with the lifetime when registering an atexit callback. Using a
        [`Store`][proxystore.store.base.Store] after closing it is undefined
        behaviour. Rather, let the callback handle closing after it is
        safe to do so.

    Warning:
        Callbacks are not guaranteed to be called in all cases. See the
        [`atexit`][atexit] docs for more details.

    Args:
        lifetime: Lifetime to be closed at exit.
        close_stores: Close any [`Store`][proxystore.store.base.Store]
            instances associated with the lifetime.

    Returns:
        The registered callback function which can be used with \
        [`atexit.unregister()`][atexit.unregister] if needed.
    """

    def _lifetime_atexit_callback() -> None:
        lifetime.close(close_stores=close_stores)

    atexit.register(_lifetime_atexit_callback)
    logger.debug(
        f'Registered atexit callback for {lifetime!r}',
    )
    return _lifetime_atexit_callback