Skip to content

proxystore.connectors.globus

Globus transfer connector implementation.

GlobusEndpoint

GlobusEndpoint(
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
)

Globus endpoint representation.

Parameters:

  • uuid (str) –

    UUID of Globus endpoint.

  • endpoint_path (str) –

    Path within endpoint to directory to use for storing objects.

  • local_path (str | None) –

    Local path (as seen by the host filesystem) that corresponds to the directory specified by endpoint_path.

  • host_regex (str | Pattern[str]) –

    String that matches the host where the Globus endpoint exists or regex pattern than can be used to match the host. The host pattern is needed so that proxies can figure out what the local endpoint is when they are resolved.

Source code in proxystore/connectors/globus.py
def __init__(
    self,
    uuid: str,
    endpoint_path: str,
    local_path: str | None,
    host_regex: str | Pattern[str],
) -> None:
    if not isinstance(uuid, str):
        raise TypeError('uuid must be a str.')
    if not isinstance(endpoint_path, str):
        raise TypeError('endpoint_path must be a str.')
    if not isinstance(local_path, str):
        raise TypeError('local_path must be a str.')
    if not isinstance(host_regex, (str, Pattern)):
        raise TypeError('host_regex must be a str or re.Pattern.')

    self.uuid = uuid
    self.endpoint_path = endpoint_path
    self.local_path = local_path
    self.host_regex = host_regex

GlobusEndpoints

GlobusEndpoints(endpoints: Collection[GlobusEndpoint])

A collection of Globus endpoints.

Parameters:

Raises:

  • ValueError

    If endpoints has length 0 or if multiple endpoints with the same UUID are provided.

Source code in proxystore/connectors/globus.py
def __init__(self, endpoints: Collection[GlobusEndpoint]) -> None:
    if len(endpoints) == 0:
        raise ValueError(
            'GlobusEndpoints must be passed at least one GlobusEndpoint '
            'object',
        )
    self._endpoints: dict[str, GlobusEndpoint] = {}
    for endpoint in endpoints:
        if endpoint.uuid in self._endpoints:
            raise ValueError(
                'Cannot pass multiple GlobusEndpoint objects with the '
                'same Globus endpoint UUID.',
            )
        self._endpoints[endpoint.uuid] = endpoint

from_dict() classmethod

from_dict(
    json_object: dict[str, dict[str, str]]
) -> GlobusEndpoints

Construct an endpoints collection from a dictionary.

Example:

```python
{
  "endpoint-uuid-1": {
    "host_regex": "host1-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  },
  "endpoint-uuid-2": {
    "host_regex": "host2-regex",
    "endpoint_path": "/path/to/endpoint/dir",
    "local_path": "/path/to/local/dir"
  }
}
```
Source code in proxystore/connectors/globus.py
@classmethod
def from_dict(
    cls: type[GlobusEndpoints],
    json_object: dict[str, dict[str, str]],
) -> GlobusEndpoints:
    """Construct an endpoints collection from a dictionary.

    Example:

        ```python
        {
          "endpoint-uuid-1": {
            "host_regex": "host1-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          },
          "endpoint-uuid-2": {
            "host_regex": "host2-regex",
            "endpoint_path": "/path/to/endpoint/dir",
            "local_path": "/path/to/local/dir"
          }
        }
        ```
    """  # noqa: D412
    endpoints = []
    for ep_uuid, params in json_object.items():
        endpoints.append(
            GlobusEndpoint(
                uuid=ep_uuid,
                endpoint_path=params['endpoint_path'],
                local_path=params['local_path'],
                host_regex=params['host_regex'],
            ),
        )
    return GlobusEndpoints(endpoints)

from_json() classmethod

from_json(json_file: str) -> GlobusEndpoints

Construct a GlobusEndpoints object from a json file.

The dict read from the JSON file will be passed to from_dict() and should match the format expected by from_dict().

Source code in proxystore/connectors/globus.py
@classmethod
def from_json(cls, json_file: str) -> GlobusEndpoints:
    """Construct a GlobusEndpoints object from a json file.

    The `dict` read from the JSON file will be passed to
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict]
    and should match the format expected by
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict].
    """
    with open(json_file) as f:
        data = f.read()
    return cls.from_dict(json.loads(data))

dict()

dict() -> dict[str, dict[str, str]]

Convert the GlobusEndpoints to a dict.

Note that the GlobusEndpoints object can be reconstructed by passing the dict to. from_dict().

Source code in proxystore/connectors/globus.py
def dict(self) -> dict[str, dict[str, str]]:
    """Convert the GlobusEndpoints to a dict.

    Note that the
    [`GlobusEndpoints`][proxystore.connectors.globus.GlobusEndpoints]
    object can be reconstructed by passing the `dict` to.
    [`from_dict()`][proxystore.connectors.globus.GlobusEndpoints.from_dict].
    """
    data = {}
    for endpoint in self:
        data[endpoint.uuid] = {
            'endpoint_path': endpoint.endpoint_path,
            'local_path': endpoint.local_path,
            'host_regex': endpoint.host_regex.pattern
            if isinstance(endpoint.host_regex, Pattern)
            else endpoint.host_regex,
        }
    return data

get_by_host()

get_by_host(host: str) -> GlobusEndpoint

Get endpoint by host.

Searches the endpoints for a endpoint who's host_regex matches host.

Parameters:

  • host (str) –

    Host to match.

Returns:

Raises:

  • ValueError

    If host does not match any of the endpoints.

Source code in proxystore/connectors/globus.py
def get_by_host(self, host: str) -> GlobusEndpoint:
    """Get endpoint by host.

    Searches the endpoints for a endpoint who's `host_regex` matches
    `host`.

    Args:
        host: Host to match.

    Returns:
        Globus endpoint.

    Raises:
        ValueError: If `host` does not match any of the endpoints.
    """
    for endpoint in self._endpoints.values():
        if re.fullmatch(endpoint.host_regex, host) is not None:
            return endpoint
    raise ValueError(f'Cannot find endpoint matching host {host}')

GlobusKey

Bases: NamedTuple

Key to object transferred with Globus.

Attributes:

  • filename (str) –

    Unique object filename.

  • task_id (str | tuple[str, ...]) –

    Globus transfer task IDs for the file.

__eq__()

__eq__(other: Any) -> bool

Match keys by filename only.

This is a hack around the fact that the task_id is not created until after the filename is so there can be a state where the task_id is empty.

Source code in proxystore/connectors/globus.py
def __eq__(self, other: Any) -> bool:
    """Match keys by filename only.

    This is a hack around the fact that the task_id is not created until
    after the filename is so there can be a state where the task_id
    is empty.
    """
    if isinstance(other, tuple):
        return self[0] == other[0]
    return False

GlobusConnector

GlobusConnector(
    endpoints: (
        GlobusEndpoints
        | list[GlobusEndpoint]
        | dict[str, dict[str, str]]
    ),
    polling_interval: int = 1,
    sync_level: (
        int | Literal["exists", "size", "mtime", "checksum"]
    ) = "mtime",
    timeout: int = 60,
    clear: bool = True,
)

Globus transfer connector.

The GlobusConnector is similar to a FileConnector in that objects are saved to disk but allows for the transfer of objects between two remote file systems. The two directories on the separate file systems are kept in sync via Globus transfers. The GlobusConnector is useful when moving data between hosts that have a Globus endpoint but may have restrictions that prevent the use of other store backends (e.g., ports cannot be opened for using a RedisConnector.

Note

To use Globus for data transfer, Globus authentication needs to be performed with the proxystore-globus-auth CLI. If authentication is not performed before initializing a GlobusConnector, the program will prompt the user to perform authentication. This can result in unexpected program hangs while the constructor waits on the user to authenticate. Authentication only needs to be performed once per system

Parameters:

  • endpoints (GlobusEndpoints | list[GlobusEndpoint] | dict[str, dict[str, str]]) –

    Globus endpoints to keep in sync. If passed as a dict, the dictionary must match the format expected by GlobusEndpoints.from_dict(). Note that given n endpoints there will be n-1 Globus transfers per operation, so we suggest not using too many endpoints at the same time.

  • polling_interval (int, default: 1 ) –

    Interval in seconds to check if Globus tasks have finished.

  • sync_level (int | Literal['exists', 'size', 'mtime', 'checksum'], default: 'mtime' ) –

    Globus transfer sync level.

  • timeout (int, default: 60 ) –

    Timeout in seconds for waiting on Globus tasks.

  • clear (bool, default: True ) –

    Clear all objects on close() by deleting the local_path of each endpoint.

Raises:

  • GlobusAuthFileError

    If the Globus authentication file cannot be found.

  • ValueError

    If endpoints is of an incorrect type.

  • ValueError

    If fewer than two endpoints are provided.

Source code in proxystore/connectors/globus.py
def __init__(
    self,
    endpoints: GlobusEndpoints
    | list[GlobusEndpoint]
    | dict[str, dict[str, str]],
    polling_interval: int = 1,
    sync_level: int
    | Literal['exists', 'size', 'mtime', 'checksum'] = 'mtime',
    timeout: int = 60,
    clear: bool = True,
) -> None:
    if isinstance(endpoints, GlobusEndpoints):
        self.endpoints = endpoints
    elif isinstance(endpoints, list):
        self.endpoints = GlobusEndpoints(endpoints)
    elif isinstance(endpoints, dict):
        self.endpoints = GlobusEndpoints.from_dict(endpoints)
    else:
        raise ValueError(
            'endpoints must be of type GlobusEndpoints or a list of '
            f'GlobusEndpoint. Got {type(endpoints)}.',
        )
    if len(endpoints) < 2:
        raise ValueError('At least two Globus endpoints are required.')
    self.polling_interval = polling_interval
    self.sync_level = sync_level
    self.timeout = timeout
    self.clear = clear

    self._transfer_client = get_transfer_client_flow(
        check_collections=[ep.uuid for ep in self.endpoints],
    )

close()

close(clear: bool | None = None) -> None

Close the connector and clean up.

Warning

This will delete the directory at local_path on each endpoint by default.

Warning

This method should only be called at the end of the program when the store will no longer be used, for example once all proxies have been resolved.

Parameters:

  • clear (bool | None, default: None ) –

    Remove the store directory. Overrides the default value of clear provided when the GlobusConnector was instantiated.

Source code in proxystore/connectors/globus.py
def close(self, clear: bool | None = None) -> None:
    """Close the connector and clean up.

    Warning:
        This will delete the directory at `local_path` on each endpoint
        by default.

    Warning:
        This method should only be called at the end of the program when
        the store will no longer be used, for example once all proxies
        have been resolved.

    Args:
        clear: Remove the store directory. Overrides the default
            value of `clear` provided when the
            [`GlobusConnector`][proxystore.connectors.globus.GlobusConnector]
            was instantiated.
    """
    clear = self.clear if clear is None else clear
    if clear:
        for endpoint in self.endpoints:
            delete_task = globus_sdk.DeleteData(
                self._transfer_client,
                endpoint=endpoint.uuid,
                recursive=True,
            )
            delete_task['notify_on_succeeded'] = False
            delete_task['notify_on_failed'] = False
            delete_task['notify_on_inactive'] = False
            delete_task.add_item(endpoint.endpoint_path)
            tdata = _submit_transfer_action(
                self._transfer_client,
                delete_task,
            )
            self._wait_on_tasks(tdata['task_id'])

config()

config() -> dict[str, Any]

Get the connector configuration.

The configuration contains all the information needed to reconstruct the connector object.

Source code in proxystore/connectors/globus.py
def config(self) -> dict[str, Any]:
    """Get the connector configuration.

    The configuration contains all the information needed to reconstruct
    the connector object.
    """
    return {
        'endpoints': self.endpoints.dict(),
        'polling_interval': self.polling_interval,
        'sync_level': self.sync_level,
        'timeout': self.timeout,
        'clear': self.clear,
    }

from_config() classmethod

from_config(config: dict[str, Any]) -> GlobusConnector

Create a new connector instance from a configuration.

Parameters:

  • config (dict[str, Any]) –

    Configuration returned by .config().

Source code in proxystore/connectors/globus.py
@classmethod
def from_config(cls, config: dict[str, Any]) -> GlobusConnector:
    """Create a new connector instance from a configuration.

    Args:
        config: Configuration returned by `#!python .config()`.
    """
    return cls(**config)

evict()

evict(key: GlobusKey) -> None

Evict the object associated with the key.

Parameters:

  • key (GlobusKey) –

    Key associated with object to evict.

Source code in proxystore/connectors/globus.py
def evict(self, key: GlobusKey) -> None:
    """Evict the object associated with the key.

    Args:
        key: Key associated with object to evict.
    """
    if not self.exists(key):
        return

    path = self._get_filepath(key.filename)
    os.remove(path)
    self._transfer_files(key.filename, delete=True)

exists()

exists(key: GlobusKey) -> bool

Check if an object associated with the key exists.

Note

If the corresponding Globus transfer is still in progress, this method will wait to make sure the transfers is successful.

Parameters:

  • key (GlobusKey) –

    Key potentially associated with stored object.

Returns:

  • bool

    If an object associated with the key exists.

Source code in proxystore/connectors/globus.py
def exists(self, key: GlobusKey) -> bool:
    """Check if an object associated with the key exists.

    Note:
        If the corresponding Globus transfer is still in progress, this
        method will wait to make sure the transfers is successful.

    Args:
        key: Key potentially associated with stored object.

    Returns:
        If an object associated with the key exists.
    """
    if not self._validate_task_id(key.task_id):
        return False
    self._wait_on_tasks(key.task_id)
    return os.path.exists(self._get_filepath(key.filename))

get()

get(key: GlobusKey) -> bytes | None

Get the serialized object associated with the key.

Parameters:

  • key (GlobusKey) –

    Key associated with the object to retrieve.

Returns:

  • bytes | None

    Serialized object or None if the object does not exist.

Source code in proxystore/connectors/globus.py
def get(self, key: GlobusKey) -> bytes | None:
    """Get the serialized object associated with the key.

    Args:
        key: Key associated with the object to retrieve.

    Returns:
        Serialized object or `None` if the object does not exist.
    """
    if not self.exists(key):
        return None

    path = self._get_filepath(key.filename)
    with open(path, 'rb') as f:
        return f.read()

get_batch()

get_batch(keys: Sequence[GlobusKey]) -> list[bytes | None]

Get a batch of serialized objects associated with the keys.

Parameters:

  • keys (Sequence[GlobusKey]) –

    Sequence of keys associated with objects to retrieve.

Returns:

  • list[bytes | None]

    List with same order as keys with the serialized objects or None if the corresponding key does not have an associated object.

Source code in proxystore/connectors/globus.py
def get_batch(self, keys: Sequence[GlobusKey]) -> list[bytes | None]:
    """Get a batch of serialized objects associated with the keys.

    Args:
        keys: Sequence of keys associated with objects to retrieve.

    Returns:
        List with same order as `keys` with the serialized objects or \
        `None` if the corresponding key does not have an associated object.
    """
    return [self.get(key) for key in keys]

put()

put(obj: bytes) -> GlobusKey

Put a serialized object in the store.

Parameters:

  • obj (bytes) –

    Serialized object to put in the store.

Returns:

  • GlobusKey

    Key which can be used to retrieve the object.

Source code in proxystore/connectors/globus.py
def put(self, obj: bytes) -> GlobusKey:
    """Put a serialized object in the store.

    Args:
        obj: Serialized object to put in the store.

    Returns:
        Key which can be used to retrieve the object.
    """
    filename = str(uuid.uuid4())

    path = self._get_filepath(filename)
    os.makedirs(os.path.dirname(path), exist_ok=True)

    with open(path, 'wb', buffering=0) as f:
        f.write(obj)

    tids = self._transfer_files(filename)

    return GlobusKey(filename=filename, task_id=tids)

put_batch()

put_batch(objs: Sequence[bytes]) -> list[GlobusKey]

Put a batch of serialized objects in the store.

Parameters:

  • objs (Sequence[bytes]) –

    Sequence of serialized objects to put in the store.

Returns:

  • list[GlobusKey]

    List of keys with the same order as objs which can be used to retrieve the objects.

Source code in proxystore/connectors/globus.py
def put_batch(self, objs: Sequence[bytes]) -> list[GlobusKey]:
    """Put a batch of serialized objects in the store.

    Args:
        objs: Sequence of serialized objects to put in the store.

    Returns:
        List of keys with the same order as `objs` which can be used to \
        retrieve the objects.
    """
    filenames = [str(uuid.uuid4()) for _ in objs]

    for filename, obj in zip(filenames, objs):
        path = self._get_filepath(filename)
        os.makedirs(os.path.dirname(path), exist_ok=True)

        with open(path, 'wb', buffering=0) as f:
            f.write(obj)

    tids = self._transfer_files(filenames)

    return [
        GlobusKey(filename=filename, task_id=tids)
        for filename in filenames
    ]