Skip to content

proxystore.p2p.chunks

Message chunking utilities.

ChunkDType

Bases: Enum

Data type contained in a Chunk.

BYTES class-attribute instance-attribute

BYTES = 1

Data is bytes.

STRING class-attribute instance-attribute

STRING = 2

Data is a string.

Chunk

Chunk(
    stream_id: int,
    seq_id: int,
    seq_len: int,
    data: bytes | str,
    dtype: ChunkDType | None = None,
)

Representation of a chunk of a message.

Parameters:

  • stream_id (int) –

    Unique ID for the stream of chunks.

  • seq_id (int) –

    Sequence number for this chunk in the stream.

  • seq_len (int) –

    Length of the stream.

  • data (bytes | str) –

    Data for this chunk.

  • dtype (ChunkDType | None, default: None ) –

    Optionally specify data type otherwise inferred from data.

Raises:

  • ValueError

    if the sequence ID is not less than the sequence length.

Source code in proxystore/p2p/chunks.py
def __init__(
    self,
    stream_id: int,
    seq_id: int,
    seq_len: int,
    data: bytes | str,
    dtype: ChunkDType | None = None,
) -> None:
    if seq_len <= seq_id:
        raise ValueError(
            f'seq_id ({seq_id}) must be less than seq_len ({seq_len}).',
        )
    self.stream_id = stream_id
    self.seq_id = seq_id
    self.seq_len = seq_len
    self.data = data
    if dtype is None:
        self.dtype = (
            ChunkDType.BYTES
            if isinstance(data, bytes)
            else ChunkDType.STRING
        )
    else:
        self.dtype = dtype

__bytes__()

__bytes__() -> bytes

Pack the chunk into bytes.

Source code in proxystore/p2p/chunks.py
def __bytes__(self) -> bytes:
    """Pack the chunk into bytes."""
    length = CHUNK_HEADER_LENGTH + len(self.data)
    header = pack(
        CHUNK_HEADER_FORMAT,
        self.dtype.value,
        length,
        self.stream_id,
        self.seq_id,
        self.seq_len,
    )
    data = (
        self.data.encode('utf8')
        if isinstance(self.data, str)
        else self.data
    )
    chunk = header + data

    data += b'\x00' * (len(chunk) % 4)
    return chunk

from_bytes() classmethod

from_bytes(chunk: bytes) -> Chunk

Decode bytes into a Chunk.

Source code in proxystore/p2p/chunks.py
@classmethod
def from_bytes(cls, chunk: bytes) -> Chunk:
    """Decode bytes into a Chunk."""
    (dtype_value, length, stream_id, seq_id, seq_len) = unpack_from(
        CHUNK_HEADER_FORMAT,
        chunk,
    )
    dtype = ChunkDType(dtype_value)
    chunk_data = chunk[CHUNK_HEADER_LENGTH:length]
    data: bytes | str
    if dtype is ChunkDType.STRING:
        data = chunk_data.decode('utf8')
    else:
        data = chunk_data
    return cls(
        stream_id=stream_id,
        seq_id=seq_id,
        seq_len=seq_len,
        data=data,
        dtype=dtype,
    )

chunkify()

chunkify(
    data: bytes | str, size: int, stream_id: int
) -> Generator[Chunk, None, None]

Generate chunks from data.

Parameters:

  • data (bytes | str) –

    Data to chunk.

  • size (int) –

    Size of each chunk.

  • stream_id (int) –

    Unique ID for the stream of chunks.

Yields:

  • Chunk

    Chunks of data.

Source code in proxystore/p2p/chunks.py
def chunkify(
    data: bytes | str,
    size: int,
    stream_id: int,
) -> Generator[Chunk, None, None]:
    """Generate chunks from data.

    Args:
        data: Data to chunk.
        size: Size of each chunk.
        stream_id: Unique ID for the stream of chunks.

    Yields:
        Chunks of data.
    """
    seq_len = math.ceil(len(data) / size)

    for i, x in enumerate(range(0, len(data), size)):
        chunk_data = data[x : min(x + size, len(data))]
        yield Chunk(
            stream_id=stream_id,
            seq_id=i,
            seq_len=seq_len,
            data=chunk_data,
        )

reconstruct()

reconstruct(chunks: list[Chunk]) -> bytes | str

Reconstructs data from list of chunks.

Parameters:

  • chunks (list[Chunk]) –

    List of chunks to order and join.

Returns:

  • bytes | str

    Reconstructed bytes or string.

Source code in proxystore/p2p/chunks.py
def reconstruct(chunks: list[Chunk]) -> bytes | str:
    """Reconstructs data from list of chunks.

    Args:
        chunks: List of chunks to order and join.

    Returns:
        Reconstructed bytes or string.
    """
    if len(chunks) == 0:
        raise ValueError('Chunks list cannot be empty.')
    seq_len = chunks[0].seq_len
    if len(chunks) != seq_len:
        raise ValueError(f'Got {len(chunks)} but expected {seq_len}.')
    chunks = sorted(chunks, key=lambda c: c.seq_id)
    if isinstance(chunks[0].data, bytes):
        return b''.join(c.data for c in chunks)  # type: ignore
    elif isinstance(chunks[0].data, str):
        return ''.join(c.data for c in chunks)  # type: ignore
    else:
        raise AssertionError('Unreachable.')