buildgrid.server.persistence.sql.impl module

class buildgrid.server.persistence.sql.impl.DataStoreMetrics(*args, **kwargs)

Bases: dict

leases: Dict[LeaseState, int]
jobs: Dict[OperationStage, int]
class buildgrid.server.persistence.sql.impl.AgedJobHandlerOptions(job_max_age, handling_period, max_handling_window)

Bases: tuple

job_max_age: timedelta

Alias for field number 0

handling_period: timedelta

Alias for field number 1

max_handling_window: int

Alias for field number 2

static from_config(job_max_age_cfg: Dict[str, float], handling_period_cfg: Dict[str, float] | None = None, max_handling_window_cfg: int | None = None) AgedJobHandlerOptions

Helper method for creating AgedJobHandlerOptions objects If input configs are None, assign defaults

class buildgrid.server.persistence.sql.impl.SQLDataStore(sql_provider: SqlProvider, storage: StorageABC, *, sql_ro_provider: SqlProvider | None = None, sql_notifier_provider: SqlProvider | None = None, property_keys: Set[str] | None = None, match_properties: Set[str] | None = None, action_cache: ActionCacheABC | None = None, action_browser_url: str | None = None, max_execution_timeout: int = 7200, metering_client: SyncMeteringServiceClient | None = None, bot_session_keepalive_timeout: int = 600, logstream_channel: Channel | None = None, logstream_instance: str | None = None, asset_client: AssetClient | None = None, queued_action_retention_hours: float | None = None, completed_action_retention_hours: float | None = None, action_result_retention_hours: float | None = None, enable_job_watcher: bool = False, poll_interval: float = 1, pruning_options: AgedJobHandlerOptions | None = None, queue_timeout_options: AgedJobHandlerOptions | None = None, max_job_attempts: int = 5, job_assignment_interval: float = 1.0, priority_assignment_percentage: int = 100)

Bases: object

start(*, start_job_watcher: bool = True) None
stop() None
queue_job_action(*, action: Action, action_digest: Digest, command: Command, platform_requirements: Dict[str, Set[str]], priority: int, skip_cache_lookup: bool, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str

De-duplicates or inserts a newly created job into the execution queue. Returns an operation name associated with this job.

create_operation_for_existing_job(*, action_digest: Digest, priority: int, request_metadata: RequestMetadata | None, client_identity: ClientIdentityEntry | None) str | None
create_operation_for_new_job(*, action: Action, action_digest: Digest, command: Command, execute_response: ExecuteResponse | None, platform_requirements: Dict[str, Set[str]], priority: int, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
create_operation(job_name: str, *, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
load_operation(operation_name: str) Operation
get_operation_job_name(operation_name: str) str | None
get_operation_request_metadata_by_name(operation_name: str) RequestMetadata | None
get_client_identity_by_operation(operation_name: str) ClientIdentity | None
execution_timer_loop(shutdown_requested: Event) None

Periodically timeout aged executing jobs

cancel_jobs_exceeding_execution_timeout(max_execution_timeout: int | None = None) None
cancel_operation(operation_name: str) None
list_operations(operation_filters: List[OperationFilter] | None = None, page_size: int | None = None, page_token: str | None = None) Tuple[List[Operation], str]
get_metrics() DataStoreMetrics | None
assign_n_leases_by_priority(*, capability_hash: str, bot_names: List[str]) List[str]
assign_n_leases_by_age(*, capability_hash: str, bot_names: List[str]) List[str]
queue_timer_loop(shutdown_requested: Event) None

Periodically timeout aged queued jobs

prune_timer_loop(shutdown_requested: Event) None

Running in a background thread, this method wakes up periodically and deletes older records from the jobs tables using configurable parameters

get_or_create_client_identity_in_store(session: Session, client_id: ClientIdentityEntry) ClientIdentityEntry

Get the ClientIdentity in the storage or create one. This helper function essentially makes sure the client_id is created during the transaction

  • session (Session) – sqlalchemy Session

  • client_id (ClientIdentityEntry) – identity of the client that creates an operation


identity of the client that creates an operation

Return type:


get_or_create_request_metadata_in_store(session: Session, request_metadata: RequestMetadata) RequestMetadataEntry
add_bot_entry(*, bot_session_id: str, bot_session_status: int) str
close_bot_sessions(bot_name: str) None
session_expiry_timer_loop(shutdown_requested: Event) None
reap_expired_sessions() bool

Find and close expired bot sessions. Returns True if sessions were closed. Only closes a few sessions to minimize time in transaction.

synchronize_bot_lease(bot_name: str, bot_id: str, bot_status: int, session_lease: Lease | None) Lease | None
count_bots_by_status() Dict[BotStatus, int]

Count the number of bots with a particular status

refresh_bot_expiry_time(bot_name: str, bot_id: str) datetime

This update is done out-of-band from the main synchronize_bot_lease transaction, as there are cases where we will skip calling the synchronization, but still want the session to be updated such that it does not get reaped. This slightly duplicates the update happening in synchronize_bot_lease, however, that update is still required to not have the job reaped during its job assignment waiting period.

This method should be called at the end of the update and create bot session methods. The returned datetime should be assigned to the deadline within the returned session proto.

get_metadata_for_leases(leases: Iterable[Lease]) List[Tuple[str, bytes]]

Return a list of Job metadata for a given list of leases.


leases (list) – List of leases to get Job metadata for.


List of tuples of the form ('executeoperationmetadata-bin': serialized_metadata).

publish_execution_stats(job_name: str, execution_metadata: ExecutedActionMetadata) None
class buildgrid.server.persistence.sql.impl.JobAssigner(data_store: SQLDataStore, job_assignment_interval: float = 1.0, priority_percentage: int = 100)

Bases: object

start() None
stop() None
listener_count(instance_name: str | None = None) int
assignment_context(bot_session: BotSession) Iterator[Event]
assign_jobs(shutdown_requested: Event, instance_name: str, oldest_first: bool = False) None

Assign jobs to the currently connected workers

This method iterates over the buckets of currently connected workers, and requests a number of job assignments from the data store to cover the number of workers in each bucket. Empty buckets are skipped.

begin(shutdown_requested: Event) None
buildgrid.server.persistence.sql.impl.get_partial_capabilities(capabilities: Dict[str, List[str]]) Iterable[Dict[str, List[str]]]

Given a capabilities dictionary with all values as lists, yield all partial capabilities dictionaries.

buildgrid.server.persistence.sql.impl.get_partial_capabilities_hashes(capabilities: Dict[str, Set[str]]) List[str]

Given a list of configurations, obtain each partial configuration for each configuration, obtain the hash of each partial configuration, compile these into a list, and return the result.

buildgrid.server.persistence.sql.impl.bot_capabilities(bot_session: BotSession) Dict[str, Set[str]]