Skip to content

proxystore.connectors.globus

Globus transfer connector implementation.

GlobusEndpoint

GlobusEndpoint(
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
)

Globus Collection endpoint configuration.

Defines the directory within the Globus Collection to be used for storage and transfer of files.

Tip

A Globus Collection may have a different mount point than what you would use when logged in to a system. The endpoint_path and local_path parameters are used as the mapping between the two. For example, if I created a directory bar/ within the foo project allocation on ALCF's Grand filesystem, the endpoint_path would be /foo/bar but the local_path would be /projects/foo/bar. Be sure to check that the two paths point to the same physical directory when instantiating this type.

Warning

The path should refer to a unique directory that ProxyStore can exclusively use. For example, do not use your $HOME directory and instead prefer a directory suitable for bulk data storage, such as a subdirectory of a project allocation (e.g., /projects/FOO/proxystore-globus-cache).

Parameters:

  • uuid (str) –

    UUID of the Globus Collection. This can be found by searching for the collection on app.globus.org.

  • endpoint_path (str) –

    Directory path within the Globus Collection to use for storing objects and transferring files. This path can be found via the File Manager on app.globus.org.

  • local_path (str | None) –

    The local path equivalent of endpoint_path. This may or may not be equal to endpoint_path, depending on the configuration of the Globus Collection. This is equivalent to the path that you would ls when logged on to the system.

  • host_regex (str | Pattern[str]) –

    String or regular expression that matches the hostname where the Globus Collection exists. The host pattern is used by the GlobusConnector to determine what the "local" endpoint is when reading, writing, and transferring files.

Source code in proxystore/connectors/globus.py
def __init__(
    self,
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
) -> None:
    if not isinstance(uuid, str):
        raise TypeError('uuid must be a str.')
    if not isinstance(endpoint_path, str):
        raise TypeError('endpoint_path must be a str.')
    if not isinstance(local_path, str):
        raise TypeError('local_path must be a str.')
    if not isinstance(host_regex, (str, Pattern)):
        raise TypeError('host_regex must be a str or re.Pattern.')

    self.uuid = uuid
    self.endpoint_path = endpoint_path
    self.local_path = local_path
    self.host_regex = host_regex

GlobusEndpoints

GlobusEndpoints(endpoints: Collection[GlobusEndpoint])

A collection of Globus endpoints.

Parameters:

Raises:

  • ValueError

    If endpoints has length 0 or if multiple endpoints with the same UUID are provided.

Source code in proxystore/connectors/globus.py
def __init__(self, endpoints: Collection[GlobusEndpoint]) -> None:
    if len(endpoints) == 0:
        raise ValueError(
            'GlobusEndpoints must be passed at least one GlobusEndpoint '
            'object',
        )
    self._endpoints: dict[str, GlobusEndpoint] = {}
    for endpoint in endpoints:
        if endpoint.uuid in self._endpoints:
            raise ValueError(
                'Cannot pass multiple GlobusEndpoint objects with the '
                'same Globus endpoint UUID.',
            )
        self._endpoints[endpoint.uuid] = endpoint

from_dict classmethod

from_dict(
    json_object: dict[str, dict[str, str]]
) -> GlobusEndpoints

Construct an endpoints collection from a dictionary.

Example:

```python
{
  "endpoint-uuid-1": {
    "host_regex": "host1-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  },
  "endpoint-uuid-2": {
    "host_regex": "host2-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  }
}
```
Source code in proxystore/connectors/globus.py
@classmethod
def from_dict(
    cls: type[GlobusEndpoints],
    json_object: dict[str, dict[str, str]],
) -> GlobusEndpoints:
    """Construct an endpoints collection from a dictionary.

    Example:

        ```python
        {
          "endpoint-uuid-1": {
            "host_regex": "host1-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          },
          "endpoint-uuid-2": {
            "host_regex": "host2-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          }
        }
        ```
    """  # noqa: D412
    endpoints = []
    for ep_uuid, params in json_object.items():
        endpoints.append(
            GlobusEndpoint(
                uuid=ep_uuid,
                endpoint_path=params['endpoint_path'],
                local_path=params['local_path'],
                host_regex=params['host_regex'],
            ),
        )
    return GlobusEndpoints(endpoints)

from_json classmethod

from_json(json_file: str) -> GlobusEndpoints

Construct a GlobusEndpoints object from a json file.

The dict read from the JSON file will be passed to from_dict() and should match the format expected by from_dict().

Source code in proxystore/connectors/globus.py
@classmethod
def from_json(cls, json_file: str) -> GlobusEndpoints:
    """Construct a GlobusEndpoints object from a json file.

    The `dict` read from the JSON file will be passed to
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict]
    and should match the format expected by
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict].
    """
    with open(json_file) as f:
        data = f.read()
    return cls.from_dict(json.loads(data))

dict

dict() -> dict[str, dict[str, str]]

Convert the GlobusEndpoints to a dict.

Note that the GlobusEndpoints object can be reconstructed by passing the dict to. from_dict().

Source code in proxystore/connectors/globus.py
def dict(self) -> dict[str, dict[str, str]]:
    """Convert the GlobusEndpoints to a dict.

    Note that the
    [`GlobusEndpoints`][proxystore.connectors.globus.GlobusEndpoints]
    object can be reconstructed by passing the `dict` to.
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict].
    """
    data = {}
    for endpoint in self:
        data[endpoint.uuid] = {
            'endpoint_path': endpoint.endpoint_path,
            'local_path': endpoint.local_path,
            'host_regex': endpoint.host_regex.pattern
            if isinstance(endpoint.host_regex, Pattern)
            else endpoint.host_regex,
        }
    return data

get_by_host

get_by_host(host: str) -> GlobusEndpoint

Get endpoint by host.

Searches the endpoints for a endpoint who's host_regex matches host.

Parameters:

  • host (str) –

    Host to match.

Returns:

Raises:

  • ValueError

    If host does not match any of the endpoints.

Source code in proxystore/connectors/globus.py
def get_by_host(self, host: str) -> GlobusEndpoint:
    """Get endpoint by host.

    Searches the endpoints for a endpoint who's `host_regex` matches
    `host`.

    Args:
        host: Host to match.

    Returns:
        Globus endpoint.

    Raises:
        ValueError: If `host` does not match any of the endpoints.
    """
    for endpoint in self._endpoints.values():
        if re.fullmatch(endpoint.host_regex, host) is not None:
            return endpoint
    raise ValueError(f'Cannot find endpoint matching host {host}')

GlobusKey

Bases: NamedTuple

Key to object transferred with Globus.

Attributes:

  • filename (str) –

    Unique object filename.

  • task_id (str | tuple[str, ...]) –

    Globus transfer task IDs for the file.

__eq__

__eq__(other: Any) -> bool

Match keys by filename only.

This is a hack around the fact that the task_id is not created until after the filename is so there can be a state where the task_id is empty.

Source code in proxystore/connectors/globus.py
def __eq__(self, other: Any) -> bool:
    """Match keys by filename only.

    This is a hack around the fact that the task_id is not created until
    after the filename is so there can be a state where the task_id
    is empty.
    """
    if isinstance(other, tuple):
        return self[0] == other[0]
    return False

GlobusConnector

GlobusConnector(
    endpoints: (
        GlobusEndpoints
        | list[GlobusEndpoint]
        | dict[str, dict[str, str]]
    ),
    polling_interval: int = 1,
    sync_level: (
        int | Literal["exists", "size", "mtime", "checksum"]
    ) = "mtime",
    timeout: int = 60,
    clear: bool = True,
)

Globus transfer connector.

The GlobusConnector is similar to a FileConnector in that objects are saved to disk but allows for the transfer of objects between remote file systems. Directories on separate file systems are kept in sync via Globus transfers. The GlobusConnector is useful when moving data between hosts that have a Globus Transfer endpoint but may have restrictions that prevent the use of other connectors (e.g., ports cannot be opened for using a RedisConnector).

Note

To use Globus for data transfer, Globus authentication needs to be performed with the proxystore-globus-auth CLI. If authentication is not performed before initializing a GlobusConnector, the program will prompt the user to perform authentication. This can result in unexpected program hangs while the constructor waits on the user to authenticate. Authentication only needs to be performed once per system

Warning

The close() method will, by default, delete all of the provided directories to keep in sync. Ensure that the provided directories are unique and only used by ProxyStore.

Parameters:

  • endpoints (GlobusEndpoints | list[GlobusEndpoint] | dict[str, dict[str, str]]) –

    Collection of directories across Globus Collection endpoints to keep in sync. If passed as a dict, the dictionary must match the format expected by GlobusEndpoints.from_dict(). Note that given n endpoints there will be n-1 Globus transfers per operation, so we suggest not using too many endpoints at the same time. I.e., stored objects are transferred to all endpoints. If this behavior is not desired, use multiple connector instances, each with a different set of endpoints.

  • polling_interval (int, default: 1 ) –

    Interval in seconds to check if Globus Transfer tasks have finished.

  • sync_level (int | Literal['exists', 'size', 'mtime', 'checksum'], default: 'mtime' ) –

    Globus Transfer sync level.

  • timeout (int, default: 60 ) –

    Timeout in seconds for waiting on Globus Transfer tasks.

  • clear (bool, default: True ) –

    Delete all directories specified in endpoints when close() is called to cleanup files.

Raises:

  • GlobusAuthFileError

    If the Globus authentication file cannot be found.

  • ValueError

    If endpoints is of an incorrect type.

  • ValueError

    If fewer than two endpoints are provided.

Source code in proxystore/connectors/globus.py
def __init__(
    self,
    endpoints: GlobusEndpoints
    | list[GlobusEndpoint]
    | dict[str, dict[str, str]],
    polling_interval: int = 1,
    sync_level: int
    | Literal['exists', 'size', 'mtime', 'checksum'] = 'mtime',
    timeout: int = 60,
    clear: bool = True,
) -> None:
    if isinstance(endpoints, GlobusEndpoints):
        self.endpoints = endpoints
    elif isinstance(endpoints, list):
        self.endpoints = GlobusEndpoints(endpoints)
    elif isinstance(endpoints, dict):
        self.endpoints = GlobusEndpoints.from_dict(endpoints)
    else:
        raise ValueError(
            'endpoints must be of type GlobusEndpoints or a list of '
            f'GlobusEndpoint. Got {type(endpoints)}.',
        )
    if len(endpoints) < 2:
        raise ValueError('At least two Globus endpoints are required.')
    self.polling_interval = polling_interval
    self.sync_level = sync_level
    self.timeout = timeout
    self.clear = clear

    self._transfer_client = get_transfer_client(
        collections=[ep.uuid for ep in self.endpoints],
    )

close

close(clear: bool | None = None) -> None

Close the connector and clean up.

Warning

This will delete the directory at local_path on each endpoint by default.

Warning

This method should only be called at the end of the program when the store will no longer be used, for example once all proxies have been resolved. Calling close() multiple times can raise file not found errors.

Parameters:

  • clear (bool | None, default: None ) –

    Delete the user-provided directories on each endpoint. Overrides the default value of clear provided when the GlobusConnector was instantiated.

Source code in proxystore/connectors/globus.py
def close(self, clear: bool | None = None) -> None:
    """Close the connector and clean up.

    Warning:
        This will delete the directory at `local_path` on each endpoint
        by default.

    Warning:
        This method should only be called at the end of the program when
        the store will no longer be used, for example once all proxies
        have been resolved. Calling `close()` multiple times
        can raise file not found errors.

    Args:
        clear: Delete the user-provided directories on each endpoint.
            Overrides the default value of `clear` provided when the
            [`GlobusConnector`][proxystore.connectors.globus.GlobusConnector]
            was instantiated.
    """
    clear = self.clear if clear is None else clear
    if clear:
        for endpoint in self.endpoints:
            delete_task = globus_sdk.DeleteData(
                self._transfer_client,
                endpoint=endpoint.uuid,
                recursive=True,
            )
            delete_task['notify_on_succeeded'] = False
            delete_task['notify_on_failed'] = False
            delete_task['notify_on_inactive'] = False
            delete_task.add_item(endpoint.endpoint_path)
            tdata = _submit_transfer_action(
                self._transfer_client,
                delete_task,
            )
            self._wait_on_tasks(tdata['task_id'])

config

config() -> dict[str, Any]

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore/connectors/globus.py
def config(self) -> dict[str, Any]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    return {
        'endpoints': self.endpoints.dict(),
        'polling_interval': self.polling_interval,
        'sync_level': self.sync_level,
        'timeout': self.timeout,
        'clear': self.clear,
    }

from_config classmethod

from_config(config: dict[str, Any]) -> GlobusConnector

Create a new connector instance from a configuration.

Parameters:

  • config (dict[str, Any]) –

    Configuration returned by .config().

Source code in proxystore/connectors/globus.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> GlobusConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    return cls(**config)

evict

evict(key: GlobusKey) -> None

Evict the object associated with the key.

Parameters:

  • key (GlobusKey) –

    Key associated with object to evict.

Source code in proxystore/connectors/globus.py
def evict(self, key: GlobusKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    if not self.exists(key):
        return

    path = self._get_filepath(key.filename)
    os.remove(path)
    self._transfer_files(key.filename, delete=True)

exists

exists(key: GlobusKey) -> bool

Check if an object associated with the key exists.

Note

If the corresponding Globus Transfer is still in progress, this method will wait to make sure the transfers is successful.

Parameters:

  • key (GlobusKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/connectors/globus.py
def exists(self, key: GlobusKey) -> bool:
    """Check if an object associated with the key exists.

    Note:
        If the corresponding Globus Transfer is still in progress, this
        method will wait to make sure the transfers is successful.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    if not self._validate_task_id(key.task_id):
        return False
    self._wait_on_tasks(key.task_id)
    return os.path.exists(self._get_filepath(key.filename))

get

get(key: GlobusKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (GlobusKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore/connectors/globus.py
def get(self, key: GlobusKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    if not self.exists(key):
        return None

    path = self._get_filepath(key.filename)
    with open(path, 'rb') as f:
        return f.read()

get_batch

get_batch(keys: Sequence[GlobusKey]) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

  • keys (Sequence[GlobusKey]) –

    Sequence of keys associated with objects to retrieve.

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore/connectors/globus.py
def get_batch(self, keys: Sequence[GlobusKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    return [self.get(key) for key in keys]

put

put(obj: bytes) -> GlobusKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

Returns:

  • GlobusKey

    Key which can be used to retrieve the object.

Source code in proxystore/connectors/globus.py
def put(self, obj: bytes) -> GlobusKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.

    Returns:
        Key which can be used to retrieve the object.
    """
    filename = str(uuid.uuid4())

    path = self._get_filepath(filename)
    os.makedirs(os.path.dirname(path), exist_ok=True)

    with open(path, 'wb', buffering=0) as f:
        f.write(obj)

    tids = self._transfer_files(filename)

    return GlobusKey(filename=filename, task_id=tids)

put_batch

put_batch(objs: Sequence[bytes]) -> list[GlobusKey]

Put a batch of serialized objects in the store.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

Returns:

  • list[GlobusKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Source code in proxystore/connectors/globus.py
def put_batch(self, objs: Sequence[bytes]) -> list[GlobusKey]:
    """Put a batch of serialized objects in the store.

    Args:
        objs: Sequence of serialized objects to put in the store.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.
    """
    filenames = [str(uuid.uuid4()) for _ in objs]

    for filename, obj in zip(filenames, objs):
        path = self._get_filepath(filename)
        os.makedirs(os.path.dirname(path), exist_ok=True)

        with open(path, 'wb', buffering=0) as f:
            f.write(obj)

    tids = self._transfer_files(filenames)

    return [
        GlobusKey(filename=filename, task_id=tids)
        for filename in filenames
    ]