Skip to content

proxystore.store.globus

Globus Endpoint Implementation.

GlobusEndpoint

GlobusEndpoint(
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
) -> None

Globus endpoint representation.

Parameters:

  • uuid (str) –

    UUID of Globus endpoint.

  • endpoint_path (str) –

    Path within endpoint to directory to use for storing objects.

  • local_path (str | None) –

    Local path (as seen by the host filesystem) that corresponds to the directory specified by endpoint_path.

  • host_regex (str | Pattern[str]) –

    String that matches the host where the Globus endpoint exists or regex pattern than can be used to match the host. The host pattern is needed so that proxies can figure out what the local endpoint is when they are resolved.

Source code in proxystore/store/globus.py
def __init__(
    self,
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
) -> None:
    if not isinstance(uuid, str):
        raise TypeError('uuid must be a str.')
    if not isinstance(endpoint_path, str):
        raise TypeError('endpoint_path must be a str.')
    if not isinstance(local_path, str):
        raise TypeError('local_path must be a str.')
    if not (
        isinstance(host_regex, str) or isinstance(host_regex, Pattern)
    ):
        raise TypeError('host_regex must be a str or re.Pattern.')

    self.uuid = uuid
    self.endpoint_path = endpoint_path
    self.local_path = local_path
    self.host_regex = host_regex

GlobusEndpoints

GlobusEndpoints(
    endpoints: Collection[GlobusEndpoint],
) -> None

A collection of Globus endpoints.

Parameters:

Raises:

  • ValueError

    If endpoints has length 0 or if multiple endpoints with the same UUID are provided.

Source code in proxystore/store/globus.py
def __init__(self, endpoints: Collection[GlobusEndpoint]) -> None:
    if len(endpoints) == 0:
        raise ValueError(
            'GlobusEndpoints must be passed at least one GlobusEndpoint '
            'object',
        )
    self._endpoints: dict[str, GlobusEndpoint] = {}
    for endpoint in endpoints:
        if endpoint.uuid in self._endpoints:
            raise ValueError(
                'Cannot pass multiple GlobusEndpoint objects with the '
                'same Globus endpoint UUID.',
            )
        self._endpoints[endpoint.uuid] = endpoint

from_dict classmethod

from_dict(
    json_object: dict[str, dict[str, str]]
) -> GlobusEndpoints

Construct an endpoints collection from a dictionary.

Example
{
  "endpoint-uuid-1": {
    "host_regex": "host1-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  },
  "endpoint-uuid-2": {
    "host_regex": "host2-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  }
}
Source code in proxystore/store/globus.py
@classmethod
def from_dict(
    cls: type[GlobusEndpoints],
    json_object: dict[str, dict[str, str]],
) -> GlobusEndpoints:
    """Construct an endpoints collection from a dictionary.

    Example:

        ```python
        {
          "endpoint-uuid-1": {
            "host_regex": "host1-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          },
          "endpoint-uuid-2": {
            "host_regex": "host2-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          }
        }
        ```
    """  # noqa: D412
    endpoints = []
    for uuid, params in json_object.items():
        endpoints.append(
            GlobusEndpoint(
                uuid=uuid,
                endpoint_path=params['endpoint_path'],
                local_path=params['local_path'],
                host_regex=params['host_regex'],
            ),
        )
    return GlobusEndpoints(endpoints)

from_json classmethod

from_json(json_file: str) -> GlobusEndpoints

Construct a GlobusEndpoints object from a json file.

The dict read from the JSON file will be passed to from_dict() and should match the format expected by from_dict().

Source code in proxystore/store/globus.py
@classmethod
def from_json(cls, json_file: str) -> GlobusEndpoints:
    """Construct a GlobusEndpoints object from a json file.

    The `dict` read from the JSON file will be passed to
    [`from_dict()`][proxystore.store.globus.GlobusEndpoints.from_dict] and
    should match the format expected by
    [`from_dict()`][proxystore.store.globus.GlobusEndpoints.from_dict].
    """
    with open(json_file) as f:
        data = f.read()
    return cls.from_dict(json.loads(data))

dict

dict() -> dict[str, dict[str, str]]

Convert the GlobusEndpoints to a dict.

Note that the GlobusEndpoints object can be reconstructed by passing the dict to. from_dict().

Source code in proxystore/store/globus.py
def dict(self) -> dict[str, dict[str, str]]:
    """Convert the GlobusEndpoints to a dict.

    Note that the
    [`GlobusEndpoints`][proxystore.store.globus.GlobusEndpoints]
    object can be reconstructed by passing the `dict` to.
    [`from_dict()`][proxystore.store.globus.GlobusEndpoints.from_dict].
    """
    data = {}
    for endpoint in self:
        data[endpoint.uuid] = {
            'endpoint_path': endpoint.endpoint_path,
            'local_path': endpoint.local_path,
            'host_regex': endpoint.host_regex.pattern
            if isinstance(endpoint.host_regex, Pattern)
            else endpoint.host_regex,
        }
    return data

get_by_host

get_by_host(host: str) -> GlobusEndpoint

Get endpoint by host.

Searches the endpoints for a endpoint who's host_regex matches host.

Parameters:

  • host (str) –

    Host to match.

Returns:

Raises:

  • ValueError

    If host does not match any of the endpoints.

Source code in proxystore/store/globus.py
def get_by_host(self, host: str) -> GlobusEndpoint:
    """Get endpoint by host.

    Searches the endpoints for a endpoint who's `host_regex` matches
    `host`.

    Args:
        host: Host to match.

    Returns:
        Globus endpoint.

    Raises:
        ValueError: If `host` does not match any of the endpoints.
    """
    for endpoint in self._endpoints.values():
        if re.fullmatch(endpoint.host_regex, host) is not None:
            return endpoint
    raise ValueError(f'Cannot find endpoint matching host {host}')

GlobusStoreKey

Bases: NamedTuple

Key to object in a GlobusStore.

filename class-attribute

filename: str

Unique object filename.

task_id class-attribute

task_id: str

Globus transfer task ID for the file.

__eq__

__eq__(other: Any) -> bool

Match keys by filename only.

This is a hack around the fact that the task_id is not created until after the filename is so there can be a state where the task_id is empty.

Source code in proxystore/store/globus.py
def __eq__(self, other: Any) -> bool:
    """Match keys by filename only.

    This is a hack around the fact that the task_id is not created until
    after the filename is so there can be a state where the task_id
    is empty.
    """
    if isinstance(other, tuple):
        return self[0] == other[0]
    return False

GlobusStore

GlobusStore(
    name: str,
    *,
    endpoints: GlobusEndpoints
    | list[GlobusEndpoint]
    | dict[str, dict[str, str]],
    polling_interval: int = 1,
    sync_level: int
    | Literal[exists, size, mtime, checksum] = "mtime",
    timeout: int = 60,
    cache_size: int = 16,
    stats: bool = False
) -> None

Bases: Store[GlobusStoreKey]

Globus backend class.

The GlobusStore is similar to a FileStore in that objects in the store are saved to disk but allows for the transfer of objects between two remote file systems. The two directories on the separate file systems are kept in sync via Globus transfers. The GlobusStore is useful when moving data between hosts that have a Globus endpoint but may have restrictions that prevent the use of other store backends (e.g., ports cannot be opened for using a RedisStore.

Note

To use Globus for data transfer, Globus authentication needs to be performed otherwise an error will be raised. Authentication can be performed on the command line with proxystore-globus-auth. Authentication only needs to be performed once per system.

Parameters:

  • name (str) –

    Name of the store instance.

  • endpoints (GlobusEndpoints | list[GlobusEndpoint] | dict[str, dict[str, str]]) –

    Globus endpoints to keep in sync. If passed as a dict, the dictionary must match the format expected by GlobusEndpoints.from_dict().

  • polling_interval (int) –

    Interval in seconds to check if Globus tasks have finished.

  • sync_level (int | Literal[exists, size, mtime, checksum]) –

    Globus transfer sync level.

  • timeout (int) –

    Timeout in seconds for waiting on Globus tasks.

  • cache_size (int) –

    Size of LRU cache (in # of objects). If 0, the cache is disabled. The cache is local to the Python process.

  • stats (bool) –

    Collect stats on store operations.

Raise

GlobusAuthFileError: If the Globus authentication file cannot be found. ValueError: If endpoints is of an incorrect type. ValueError: If the :code:len(endpoints) != 2 because this implementation can currently only keep two endpoints in sync.

Source code in proxystore/store/globus.py
def __init__(
    self,
    name: str,
    *,
    endpoints: GlobusEndpoints
    | list[GlobusEndpoint]
    | dict[str, dict[str, str]],
    polling_interval: int = 1,
    sync_level: int
    | Literal['exists', 'size', 'mtime', 'checksum'] = 'mtime',
    timeout: int = 60,
    cache_size: int = 16,
    stats: bool = False,
) -> None:
    if isinstance(endpoints, GlobusEndpoints):
        self.endpoints = endpoints
    elif isinstance(endpoints, list):
        self.endpoints = GlobusEndpoints(endpoints)
    elif isinstance(endpoints, dict):
        self.endpoints = GlobusEndpoints.from_dict(endpoints)
    else:
        raise ValueError(
            'endpoints must be of type GlobusEndpoints or a list of '
            f'GlobusEndpoint. Got {type(endpoints)}.',
        )
    if len(endpoints) != 2:
        raise ValueError(
            'ProxyStore only supports two endpoints at a time',
        )
    self.polling_interval = polling_interval
    self.sync_level = sync_level
    self.timeout = timeout

    try:
        authorizer = get_proxystore_authorizer()
    except GlobusAuthFileError as e:
        raise GlobusAuthFileError(
            'Error loading Globus auth tokens. Complete the '
            'authentication process with the proxystore-globus-auth tool.',
        ) from e

    self._transfer_client = globus_sdk.TransferClient(
        authorizer=authorizer,
    )

    super().__init__(
        name,
        cache_size=cache_size,
        stats=stats,
        kwargs={
            # Pass endpoints as a dict to make kwargs JSON serializable
            'endpoints': self.endpoints.dict(),
            'polling_interval': self.polling_interval,
            'sync_level': self.sync_level,
            'timeout': self.timeout,
        },
    )

close

close() -> None

Cleanup directories used by ProxyStore in the Globus endpoints.

Warning

Will delete the directory at local_path on each endpoint.

Warning

This method should only be called at the end of the program when the store will no longer be used, for example once all proxies have been resolved.

Source code in proxystore/store/globus.py
def close(self) -> None:
    """Cleanup directories used by ProxyStore in the Globus endpoints.

    Warning:
        Will delete the directory at `local_path` on each endpoint.

    Warning:
        This method should only be called at the end of the program when
        the store will no longer be used, for example once all proxies
        have been resolved.
    """
    for endpoint in self.endpoints:
        delete_task = globus_sdk.DeleteData(
            self._transfer_client,
            endpoint=endpoint.uuid,
            recursive=True,
        )
        delete_task['notify_on_succeeded'] = False
        delete_task['notify_on_failed'] = False
        delete_task['notify_on_inactive'] = False
        delete_task.add_item(endpoint.endpoint_path)
        tdata = _submit_transfer_action(self._transfer_client, delete_task)
        self._wait_on_tasks(tdata['task_id'])