buildgrid.server.cas.logstream.stream_storage.stream_storage_abc module
StreamStorageABC
The abstract base class for stream storage providers.
- class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamHandle(name, write_resource_name)
Bases:
tuple
- name: str
Alias for field number 0
- write_resource_name: str
Alias for field number 1
- class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamLength(length, finished)
Bases:
tuple
- length: int
Alias for field number 0
- finished: bool
Alias for field number 1
- class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamChunk(data, chunk_length, finished)
Bases:
tuple
- data: bytes
Alias for field number 0
- chunk_length: int
Alias for field number 1
- finished: bool
Alias for field number 2
- class buildgrid.server.cas.logstream.stream_storage.stream_storage_abc.StreamStorageABC
Bases:
ABC
- create_stream(parent: str, prefix: str) StreamHandle
Creates a stream for a specific parent with a specific prefix and a random UUID, and returns a StreamHandle namedtuple containing:
name (str): The name of the string for read access write_resource_name (str): The secret name used only for writing
to the stream
- abstract stream_exists(stream_name: str) bool
Returns True when there is a stream with the given instance/stream_name.
- abstract stream_finished(stream_name: str) bool
Returns True/False depending on whether the stream has been marked
as completed.
- Raises:
NotFoundError when a stream with the given instance/stream_name
does not exist.
- abstract stream_length(stream_name: str) StreamLength
Returns a namedtuple of type StreamLength for the stream with the given instance/stream_name.
length (int): The length of the stream in bytes finished (bool): A boolean indicating whether the stream has finished
- Raises:
NotFoundError when a stream with the given instance/stream_name – does not exist.
- writeable_stream_length(write_resource_name: str) StreamLength
Returns a namedtuple of type StreamLength for the stream with the given instance/write_resource_name.
length (int): The length of the stream in bytes finished (bool): A boolean indicating whether the stream has finished
- Raises:
NotFoundError when a stream with the given instance/stream_name – does not exist.
- append_to_stream(write_resource_name: str, message: bytes | None = None, *, mark_finished: bool | None = False) None
Appends the message to the stream with the passed write_resource_name
as long as the stream has not been marked as finished yet and marks the stream as finished when the relevant arg is set to True.
- Raises:
NotFoundError when a stream with the given instance/write_resource_name
- does not exist.
StreamFinishedError when the stream was already marked as finished and
- no further writes are allowed.
StorageFullError when the backing storage is full. WriteError when the write_resource_name is correct but
the write failed for other reasons.
- abstract read_stream_chunk_blocking(stream_name: str, read_offset_bytes: int, read_limit_bytes: int | None = None) StreamChunk
- Returns a commited chunk of the stream message or waits until it is available as follows:
chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )
When the read_limit_bytes argument is not set, this method will return the whole message starting from the specified offset, blocking until the stream finishes.
In cases in which the stream has not finished and the chunk offset and size requested was not commited yet, this method will block and wait until the chunk is commited and/or the stream is finished, returning the appropriate chunk.
- Raises:
NotFoundError when a stream with the given instance/stream_name –
- does not exist.
OutOfRangeError when a finished stream does not contain a chunk of data
at the specified offset.
- abstract read_stream_chunk(stream_name: str, read_offset_bytes: int, read_limit_bytes: int | None = None) StreamChunk
- Returns a commited chunk of the stream message or raises if it is not available as follows:
chunk = message[read_offset_bytes, min(read_limit_bytes, finished_stream_length) )
When the read_limit_bytes argument is not set, this method will return the whole message starting from the specified offest, raising if the stream hasn’t finished.
- Raises:
NotFoundError when a stream with the given instance/stream_name –
- does not exist.
StreamWritePendingError when the requested chunk is not fully commited. OutOfRangeError when a finished stream does not contain a chunk of data
at the specified offset.
- read_stream_bytes_blocking_iterator(stream_name: str, max_chunk_size: int, offset: int = 0) Iterator[bytes]
An iterator returning commited chunks of up to chunk_size until the end of the stream. Blocks and waits until the stream finishes.
- Can optionally start at a specific chunk_offset, which starts streaming data from
(stream_offset = chunk_offset * chunk_size)
- Raises:
NotFoundError when a stream with the given instance/stream_name –
- does not exist.
OutOfRangeError when a finished stream does not contain a chunk of data
at the specified offset.
- new_client_streaming(stream_name: str)
Inform the StreamStorage backend that a new client is streaming the stream stream_name in case it cares about the number of clients streaming a specific stream. This can be useful for e.g. cleaning up old streams. No-op by default. :raises NotFoundError when a stream with the given instance/stream_name:
does not exist.
- streaming_client_left(stream_name: str)
Inform the StreamStorage backend that a client streaming the stream stream_name has left, in case it cares about the number of clients streaming a specific stream. This can be useful for e.g. cleaning up old streams. No-op by default. :raises NotFoundError when a stream with the given instance/stream_name:
does not/no longer exist(s).
- wait_for_streaming_clients(write_resource_name: str, timeout: int = 300) bool
Returns a boolean specifying if at least one client is streaming this stream within the specified timeout.
This allows the writers to only start writing when there are readers interested in the stream. Note: Accepts write_resource_name and calls
_wait_for_streaming_clients(read_name) on the backend.
- Raises:
NotFoundError when a stream with the given instance/stream_name – does not exist.
- abstract delete_stream(write_resource_name)
- set_instance_name(instance_name: str) None