buildgrid.server.cas.logstream.stream_storage.stream_storage_abc module

StreamStorageABC

The abstract base class for stream storage providers.

class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamHandle(name, write_resource_name)

Bases: tuple

name: str

Alias for field number 0

write_resource_name: str

Alias for field number 1

class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamLength(length, finished)

Bases: tuple

length: int

Alias for field number 0

finished: bool

Alias for field number 1

class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamChunk(data, chunk_length, finished)

Bases: tuple

data: bytes

Alias for field number 0

chunk_length: int

Alias for field number 1

finished: bool

Alias for field number 2

class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamStorageABC

Bases: ABC

create_stream(parent: str, prefix: str) StreamHandle

Creates a stream for a specific parent with a specific prefix and a random UUID, and returns a StreamHandle namedtuple containing:

name (str): The name of the string for read access write_resource_name (str): The secret name used only for writing

to the stream

abstract stream_exists(stream_name: str) bool

Returns True when there is a stream with the given instance/stream_name.

abstract stream_finished(stream_name: str) bool

Returns True/False depending on whether the stream has been marked

as completed.

Raises:

NotFoundError when a stream with the given instance/stream_name

does not exist.

abstract stream_length(stream_name: str) StreamLength

Returns a namedtuple of type StreamLength for the stream with the given instance/stream_name.

length (int): The length of the stream in bytes finished (bool): A boolean indicating whether the stream has finished

Raises:

NotFoundError when a stream with the given instance/stream_name – does not exist.

writeable_stream_length(write_resource_name: str) StreamLength

Returns a namedtuple of type StreamLength for the stream with the given instance/write_resource_name.

length (int): The length of the stream in bytes finished (bool): A boolean indicating whether the stream has finished

Raises:

NotFoundError when a stream with the given instance/stream_name – does not exist.

append_to_stream(write_resource_name: str, message: bytes | None = None, *, mark_finished: bool | None = False) None

Appends the message to the stream with the passed write_resource_name

as long as the stream has not been marked as finished yet and marks the stream as finished when the relevant arg is set to True.

Raises:

NotFoundError when a stream with the given instance/write_resource_name

does not exist.

StreamFinishedError when the stream was already marked as finished and

no further writes are allowed.

StorageFullError when the backing storage is full. WriteError when the write_resource_name is correct but

the write failed for other reasons.

abstract read_stream_chunk_blocking(stream_name: str, read_offset_bytes: int, read_limit_bytes: int | None = None) StreamChunk
Returns a commited chunk of the stream message or waits until it is available as follows:

chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )

When the read_limit_bytes argument is not set, this method will return the whole message starting from the specified offset, blocking until the stream finishes.

In cases in which the stream has not finished and the chunk offset and size requested was not commited yet, this method will block and wait until the chunk is commited and/or the stream is finished, returning the appropriate chunk.

Raises:

NotFoundError when a stream with the given instance/stream_name

does not exist.

OutOfRangeError when a finished stream does not contain a chunk of data

at the specified offset.

abstract read_stream_chunk(stream_name: str, read_offset_bytes: int, read_limit_bytes: int | None = None) StreamChunk
Returns a commited chunk of the stream message or raises if it is not available as follows:

chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )

When the read_limit_bytes argument is not set, this method will return the whole message starting from the specified offest, raising if the stream hasn’t finished.

Raises:

NotFoundError when a stream with the given instance/stream_name

does not exist.

StreamWritePendingError when the requested chunk is not fully commited. OutOfRangeError when a finished stream does not contain a chunk of data

at the specified offset.

read_stream_bytes_blocking_iterator(stream_name: str, max_chunk_size: int, offset: int = 0) Iterator[bytes]

An iterator returning commited chunks of up to chunk_size until the end of the stream. Blocks and waits until the stream finishes.

Can optionally start at a specific chunk_offset, which starts streaming data from

(stream_offset = chunk_offset * chunk_size)

Raises:

NotFoundError when a stream with the given instance/stream_name

does not exist.

OutOfRangeError when a finished stream does not contain a chunk of data

at the specified offset.

new_client_streaming(stream_name: str)

Inform the StreamStorage backend that a new client is streaming the stream stream_name in case it cares about the number of clients streaming a specific stream. This can be useful for e.g. cleaning up old streams. No-op by default. :raises NotFoundError when a stream with the given instance/stream_name:

does not exist.

streaming_client_left(stream_name: str)

Inform the StreamStorage backend that a client streaming the stream stream_name has left, in case it cares about the number of clients streaming a specific stream. This can be useful for e.g. cleaning up old streams. No-op by default. :raises NotFoundError when a stream with the given instance/stream_name:

does not/no longer exist(s).

wait_for_streaming_clients(write_resource_name: str, timeout: int = 300) bool

Returns a boolean specifying if at least one client is streaming this stream within the specified timeout.

This allows the writers to only start writing when there are readers interested in the stream. Note: Accepts write_resource_name and calls

_wait_for_streaming_clients(read_name) on the backend.

Raises:

NotFoundError when a stream with the given instance/stream_name – does not exist.

abstract delete_stream(write_resource_name)
set_instance_name(instance_name: str) None