Skip to content

proxystore.endpoint.serve

Endpoint serving.

create_app

create_app(
    endpoint: Endpoint,
    max_content_length: int | None = None,
    body_timeout: int = 300,
) -> Quart

Create quart app for endpoint and registers routes.

Parameters:

  • endpoint (Endpoint) –

    Initialized endpoint to forward quart routes to.

  • max_content_length (int | None, default: None ) –

    Max request body size in bytes.

  • body_timeout (int, default: 300 ) –

    Number of seconds to wait for the body to be completely received.

Returns:

  • Quart

    Quart app.

Source code in proxystore/endpoint/serve.py
def create_app(
    endpoint: Endpoint,
    max_content_length: int | None = None,
    body_timeout: int = 300,
) -> quart.Quart:
    """Create quart app for endpoint and registers routes.

    Args:
        endpoint: Initialized endpoint to forward quart routes to.
        max_content_length: Max request body size in bytes.
        body_timeout: Number of seconds to wait for the body to be
            completely received.

    Returns:
        Quart app.
    """
    app = quart.Quart(__name__)

    app.config['endpoint'] = endpoint

    app.register_blueprint(routes_blueprint, url_prefix='')

    app.config['MAX_CONTENT_LENGTH'] = max_content_length
    app.config['BODY_TIMEOUT'] = body_timeout

    return app

serve

serve(
    config: EndpointConfig,
    *,
    log_level: int | str = logging.INFO,
    log_file: str | None = None,
    use_uvloop: bool = True
) -> None

Initialize endpoint and serve Quart app.

Warning

This function does not return until the Quart app is terminated.

Parameters:

  • config (EndpointConfig) –

    Configuration object.

  • log_level (int | str, default: INFO ) –

    Logging level of endpoint.

  • log_file (str | None, default: None ) –

    Optional file path to append log to.

  • use_uvloop (bool, default: True ) –

    Install uvloop as the default event loop implementation.

Source code in proxystore/endpoint/serve.py
def serve(
    config: EndpointConfig,
    *,
    log_level: int | str = logging.INFO,
    log_file: str | None = None,
    use_uvloop: bool = True,
) -> None:
    """Initialize endpoint and serve Quart app.

    Warning:
        This function does not return until the Quart app is terminated.

    Args:
        config: Configuration object.
        log_level: Logging level of endpoint.
        log_file: Optional file path to append log to.
        use_uvloop: Install uvloop as the default event loop implementation.
    """
    if log_file is not None:
        parent_dir = os.path.dirname(log_file)
        if not os.path.isdir(parent_dir):
            os.makedirs(parent_dir, exist_ok=True)
        logging.getLogger().handlers.append(logging.FileHandler(log_file))

    for handler in logging.getLogger().handlers:
        handler.setFormatter(
            logging.Formatter(
                '[%(asctime)s.%(msecs)03d] %(levelname)-5s (%(name)s) :: '
                '%(message)s',
                datefmt='%Y-%m-%d %H:%M:%S',
            ),
        )
    logging.getLogger().setLevel(log_level)

    if use_uvloop:  # pragma: no cover
        logger.info('Installing uvloop as default event loop')
        uvloop.install()
    else:
        logger.warning(
            'Not installing uvloop. Uvicorn may override and install anyways',
        )

    # Convert SIGTERM to SIGINT which will be handled by Uvicorn first,
    # then passed on by this function.
    signal.signal(
        signal.SIGTERM,
        lambda *_args: signal.raise_signal(signal.SIGINT),
    )

    # The remaining set up and serving code is deferred to within the
    # _serve_async helper function which will be executed within an event loop.
    try:
        asyncio.run(_serve_async(config))
    except Exception as e:
        # Intercept exception so we can log it in the case that the endpoint
        # is running as a daemon process. Otherwise the user will never see
        # the exception.
        logger.exception(f'Caught unhandled exception: {e!r}')
        raise
    except KeyboardInterrupt:  # pragma: no cover
        # Uvicorn<0.29.0 captures SIGINT and does not propagate is.
        # Uvicorn>=0.29.1 changes this behavior to propagate the SIGINT after
        # Uvicorn has done it's cleanup, so we need to catch the exception
        # here and pass on it since we let Uvicorn handle our clean up in
        # the "after_app_serving" shutdown callback. This is excluded from
        # coverage because it depends on the Uvicorn version.
        # Relevant PR: https://github.com/encode/uvicorn/pull/1600
        pass
    finally:
        logger.info(f'Finished serving endpoint: {config.name}')

endpoint_handler async

endpoint_handler() -> Response

Route handler for GET /endpoint.

Responses:

  • Status Code 200: JSON containing the key uuid with the value as the string UUID of this endpoint.
Source code in proxystore/endpoint/serve.py
@routes_blueprint.route('/endpoint', methods=['GET'])
async def endpoint_handler() -> Response:
    """Route handler for `GET /endpoint`.

    Responses:

    * `Status Code 200`: JSON containing the key `uuid` with the value as
      the string UUID of this endpoint.
    """
    endpoint = quart.current_app.config['endpoint']
    return Response(
        json.dumps({'uuid': str(endpoint.uuid)}),
        200,
        content_type='application/json',
    )

evict_handler async

evict_handler() -> Response

Route handler for POST /evict.

Responses:

  • Status Code 200: If the operation succeeds. The response message will be empty.
  • Status Code 400: If the key argument is missing or the endpoint UUID argument is present but not a valid UUID.
  • Status Code 500: If there was a peer request error. The response will contain the string representation of the internal error.
Source code in proxystore/endpoint/serve.py
@routes_blueprint.route('/evict', methods=['POST'])
async def evict_handler() -> Response:
    """Route handler for `POST /evict`.

    Responses:

    * `Status Code 200`: If the operation succeeds. The response message will
      be empty.
    * `Status Code 400`: If the key argument is missing or the endpoint UUID
      argument is present but not a valid UUID.
    * `Status Code 500`: If there was a peer request error. The response
      will contain the string representation of the internal error.
    """
    key = request.args.get('key', None)
    if key is None:
        return Response('request missing key', 400)

    endpoint_uuid: str | uuid.UUID | None = request.args.get(
        'endpoint',
        None,
    )
    endpoint = quart.current_app.config['endpoint']
    if isinstance(endpoint_uuid, str):
        try:
            endpoint_uuid = uuid.UUID(endpoint_uuid, version=4)
        except ValueError:
            return Response(f'{endpoint_uuid} is not a valid UUID4', 400)

    try:
        await endpoint.evict(key=key, endpoint=endpoint_uuid)
        return Response('', 200)
    except PeerRequestError as e:
        return Response(str(e), 500)

exists_handler async

exists_handler() -> Response

Route handler for GET /exists.

Responses:

  • Status Code 200: If the operation succeeds. The response message will be empty.
  • Status Code 400: If the key argument is missing or the endpoint UUID argument is present but not a valid UUID.
  • Status Code 500: If there was a peer request error. The response will contain the string representation of the internal error.
Source code in proxystore/endpoint/serve.py
@routes_blueprint.route('/exists', methods=['GET'])
async def exists_handler() -> Response:
    """Route handler for `GET /exists`.

    Responses:

    * `Status Code 200`: If the operation succeeds. The response message will
      be empty.
    * `Status Code 400`: If the key argument is missing or the endpoint UUID
      argument is present but not a valid UUID.
    * `Status Code 500`: If there was a peer request error. The response
      will contain the string representation of the internal error.
    """
    key = request.args.get('key', None)
    if key is None:
        return Response('request missing key', 400)

    endpoint_uuid: str | uuid.UUID | None = request.args.get(
        'endpoint',
        None,
    )
    endpoint = quart.current_app.config['endpoint']
    if isinstance(endpoint_uuid, str):
        try:
            endpoint_uuid = uuid.UUID(endpoint_uuid, version=4)
        except ValueError:
            return Response(f'{endpoint_uuid} is not a valid UUID4', 400)

    try:
        exists = await endpoint.exists(key=key, endpoint=endpoint_uuid)
        return Response(
            json.dumps({'exists': exists}),
            200,
            content_type='application/json',
        )
    except PeerRequestError as e:
        return Response(str(e), 500)

get_handler async

get_handler() -> Response

Route handler for GET /get.

Responses:

  • Status Code 200: If the operation succeeds. The response message will contain the octet-stream of the requested data.
  • Status Code 400: If the key argument is missing or the endpoint UUID argument is present but not a valid UUID.
  • Status Code 404: If there is no data associated with the provided key.
  • Status Code 500: If there was a peer request error. The response will contain the string representation of the internal error.
Source code in proxystore/endpoint/serve.py
@routes_blueprint.route('/get', methods=['GET'])
async def get_handler() -> Response:
    """Route handler for `GET /get`.

    Responses:

    * `Status Code 200`: If the operation succeeds. The response message will
       contain the octet-stream of the requested data.
    * `Status Code 400`: If the key argument is missing or the endpoint UUID
      argument is present but not a valid UUID.
    * `Status Code 404`: If there is no data associated with the provided key.
    * `Status Code 500`: If there was a peer request error. The response
      will contain the string representation of the internal error.
    """
    key = request.args.get('key', None)
    if key is None:
        return Response('request missing key', 400)

    endpoint_uuid: str | uuid.UUID | None = request.args.get(
        'endpoint',
        None,
    )
    endpoint = quart.current_app.config['endpoint']
    if isinstance(endpoint_uuid, str):
        try:
            endpoint_uuid = uuid.UUID(endpoint_uuid, version=4)
        except ValueError:
            return Response(f'{endpoint_uuid} is not a valid UUID4', 400)

    try:
        data = await endpoint.get(key=key, endpoint=endpoint_uuid)
    except PeerRequestError as e:
        return Response(str(e), 500)

    if data is not None:
        return Response(
            response=chunk_bytes(data, MAX_CHUNK_LENGTH),
            content_type='application/octet-stream',
        )
    else:
        return Response('no data associated with request key', 404)

set_handler async

set_handler() -> Response

Route handler for POST /set.

Responses:

  • Status Code 200: If the operation succeeds. The response message will be empty.
  • Status Code 400: If the key argument is missing, the endpoint UUID argument is present but not a valid UUID, or the request is missing the data payload.
  • Status Code 500: If there was a peer request error. The response will contain the string representation of the internal error.
Source code in proxystore/endpoint/serve.py
@routes_blueprint.route('/set', methods=['POST'])
async def set_handler() -> Response:
    """Route handler for `POST /set`.

    Responses:

    * `Status Code 200`: If the operation succeeds. The response message will
      be empty.
    * `Status Code 400`: If the key argument is missing, the endpoint UUID
      argument is present but not a valid UUID, or the request is missing
      the data payload.
    * `Status Code 500`: If there was a peer request error. The response
      will contain the string representation of the internal error.
    """
    key = request.args.get('key', None)
    if key is None:
        return Response('request missing key', 400)

    endpoint_uuid: str | uuid.UUID | None = request.args.get(
        'endpoint',
        None,
    )
    endpoint = quart.current_app.config['endpoint']
    if isinstance(endpoint_uuid, str):
        try:
            endpoint_uuid = uuid.UUID(endpoint_uuid, version=4)
        except ValueError:
            return Response(f'{endpoint_uuid} is not a valid UUID4', 400)

    data = bytearray()
    # Note: tests/endpoint/serve_test.py::test_empty_chunked_data handles
    # the branching case for where the code in the for loop is not executed
    # but coverage is not detecting that hence the pragma here
    async for chunk in request.body:  # pragma: no branch
        data += chunk

    if len(data) == 0:
        return Response('received empty payload', 400)

    try:
        await endpoint.set(key=key, data=bytes(data), endpoint=endpoint_uuid)
    except PeerRequestError as e:
        return Response(str(e), 500)
    else:
        return Response('', 200)