buildgrid.server.persistence.sql.impl module
- class buildgrid.server.persistence.sql.impl.PruningOptions(pruner_job_max_age, pruner_period, pruner_max_delete_window)
Bases:
tuple
- pruner_job_max_age: timedelta
Alias for field number 0
- pruner_period: timedelta
Alias for field number 1
- pruner_max_delete_window: int
Alias for field number 2
- static from_config(pruner_job_max_age_cfg: Dict[str, float], pruner_period_cfg: Dict[str, float] | None = None, pruner_max_delete_window_cfg: int | None = None) PruningOptions
Helper method for creating
PruningOptions
objects If input configs are None, assign defaults
- class buildgrid.server.persistence.sql.impl.SQLDataStore(sql_provider: SqlProvider, storage: StorageABC, *, sql_ro_provider: SqlProvider | None = None, poll_interval: int = 1, pruning_options: PruningOptions | None = None)
Bases:
DataStoreInterface
- activate_monitoring() None
Enable the monitoring features of the data store.
- deactivate_monitoring() None
Disable the monitoring features of the data store.
This method also performs any necessary cleanup of stored metrics.
- start(*, start_job_watcher: bool = True) None
- stop() None
- get_job_by_action(action_digest: Digest, *, max_execution_timeout: int | None = None) Job | None
Return the job corresponding to an Action digest.
This method looks for a job object corresponding to the given Action digest in the data store. If a job is found it is returned, otherwise None is returned.
- Parameters:
action_digest (Digest) – The digest of the Action to find the corresponding job for.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns:
The job with the given Action digest, if it exists. Otherwise None.
- Return type:
buildgrid.server.job.Job or None
- get_job_by_name(name: str, *, max_execution_timeout: int | None = None) Job | None
Return the job with the given name.
This method looks for a job with the specified name in the data store. If there is a matching Job it is returned, otherwise this returns None.
- Parameters:
name (str) – The name of the job to return.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns:
The job with the given name, if it exists. Otherwise None.
- Return type:
buildgrid.server.job.Job or None
- get_job_by_operation(operation_name: str, *, max_execution_timeout: int | None = None) Job | None
Return the Job for a given Operation.
This method takes an Operation name, and returns the Job which corresponds to that Operation. If the Operation isn’t found, or if the data store doesn’t contain a corresponding job, this returns None.
- Parameters:
operation (str) – Name of the Operation whose corresponding Job is to be returned.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns:
The job related to the given operation, if it exists. Otherwise None.
- Return type:
buildgrid.server.job.Job or None
- get_all_jobs() List[Job]
Return a list of all jobs in the data store.
This method returns a list of all incomplete jobs in the data store.
- Returns:
List of all incomplete jobs in the data store.
- Return type:
list
- get_jobs_by_stage(operation_stage: OperationStage) List[Job]
Return a list of jobs in the given stage.
This method returns a list of all jobs in a specific operation stage.
- Parameters:
operation_stage (OperationStage) – The stage that the returned list of jobs should all be in.
- Returns:
List of all jobs in the specified operation stage.
- Return type:
list
- get_operation_request_metadata_by_name(operation_name: str) Dict[str, Any] | None
Return a dictionary containing metadata information that was sent by a client as part of a
remote_execution_pb2.RequestMetadata
message.It contains the following keys:
{'tool-name', 'tool-version', 'invocation-id', 'correlated-invocations-id'}
.
- create_job(job: Job) None
Add a new job to the data store.
NOTE: This method just stores the job in the data store. In order to enqueue the job to make it available for scheduling execution, the
queue_job
method should also be called.- Parameters:
job (buildgrid.server.job.Job) – The job to be stored.
- queue_job(job_name: str) None
Add an existing job to the queue of jobs waiting to be assigned.
This method adds a job with the given name to the queue of jobs. If the job is already in the queue, then this method ensures that the position of the job in the queue is correct.
- update_job(job_name: str, changes: Dict[str, Any], *, skip_notify: bool = False) None
Update a job in the data store.
This method takes a job name and a dictionary of changes to apply to the job in the data store, and updates the job with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
- Parameters:
job_name (str) – The name of the job that is being updated.
changes – (dict): The dictionary of changes
skip_notify – (bool): Whether notifying about job changes should be skipped
- delete_job(job_name: str) None
Delete a job from the data store.
This method removes a job from the data store.
- Parameters:
job_name (str) – The name of the job to be removed.
- wait_for_job_updates() None
- store_response(job: Job, commit_changes: bool = True) Dict[str, Any] | None
Store the job’s ExecuteResponse in the data store.
This method stores the response message for the job in the data store, in order to allow it to be retrieved when getting jobs in the future.
This is separate from
update_job
as implementations will likely need to always have a special case for handling persistence of the response message.- Parameters:
job (buildgrid.server.job.Job) – The job to store the response message of.
- get_operations_by_stage(operation_stage: OperationStage) Set[str]
Return a set of Job names in a specific operation stage.
Find the operations in a given stage and return a set containing the names of the Jobs related to those operations.
- Parameters:
operation_stage (OperationStage) – The stage that the operations should be in.
- Returns:
Set of all job names with operations in the specified state.
- Return type:
set
- list_operations(operation_filters: List[OperationFilter] | None = None, page_size: int | None = None, page_token: str | None = None, max_execution_timeout: int | None = None) Tuple[List[Operation], str]
Return all operations matching the filter.
- Returns:
A page of matching operations in the data store. str: If nonempty, a token to be submitted by the requester for the next page of results.
- Return type:
list
- create_operation(operation_name: str, job_name: str, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) None
Add a new operation to the data store.
- Parameters:
operation_name (str) – The name of the Operation to create in the data store.
job_name (str) – The name of the Job representing the execution of this operation.
request_metadata – Request metadata of the operation.
client_identity (Optional[ClientIdentityEntry]) – the identity of the client that creates this operation.
- update_operation(operation_name: str, changes: Dict[str, Any]) None
Update an operation in the data store.
This method takes an operation name and a dictionary of changes to apply to the operation in the data store, and updates the operation with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
- Parameters:
operation_name (str) – The name of the operation that is being updated.
changes – (dict): The dictionary of changes to be applied.
- delete_operation(operation_name: str) None
Delete a operation from the data store.
This method removes a operation from the data store.
- Parameters:
operation_name (str) – The name of the operation to be removed.
- get_leases_by_state(lease_state: LeaseState) Set[str]
Return the set of IDs of leases in a given state.
- Parameters:
lease_state (LeaseState) – The state that the leases should be in.
- Returns:
Set of strings containing IDs of leases in the given state.
- Return type:
set
- get_metrics() DataStoreMetrics | None
Return a dictionary of metrics for jobs, operations, and leases.
The returned dictionary is keyed by
buildgrid._enums.MetricCategories
values, and the values are dictionaries of counts per operation stage (or lease state, in the case of leases).
- create_lease(lease: Lease) None
Add a new lease to the data store.
- Parameters:
lease (Lease) – The Lease protobuf object representing the lease to be added to the data store.
- update_lease(job_name: str, changes: Dict[str, Any]) None
Update a lease in the data store.
This method takes a job name and a dictionary of changes to apply to the lease for that job in the data store, and updates the lease with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
The job name is used as leases have no unique identifier; instead there should always be at most one active lease for the job. It is the responsibility of data store implementations to ensure this.
- Parameters:
job_name (str) – The name of the job whose lease is being updated.
changes – (dict): The dictionary of changes to be applied.
- assign_n_leases(*, capability_hash: str, lease_count: int, assignment_callback: Callable[[List[Job]], Dict[str, Job]]) None
Attempt to assign Leases for several Jobs.
This method selects
lease_count
Jobs from the data store with platform properties matching the worker capabilities given bycapability_hash
.The given
assignment_callback
function is called with this list of Jobs, and is responsible for actually passing the Jobs to workers and arranging for execution.The
assignment_callback
function must return a dictionary mapping Job names to Job objects. This dictionary must contain all the Jobs which were successfully given to workers, and is used by the data store to remove assigned Jobs from the queue.- Parameters:
capability_hash (str) – The hash of the worker capabilities to use when selecting Jobs. This is matched to the hash of the Job’s platform properties, and so should be generated using
buildgrid.utils.hash_from_dict()
for consistency.lease_count (int) – How many Leases we want to create. This specifies the maximum length of the list of Jobs passed to
assignment_callback
.assignment_callback (callable) – Function which takes a list of Jobs to be assigned to workers, and returns a dictionary of
Job.name -> Job
containing the Jobs which were successfully assigned.
- get_operations_by_job(job_name: str) List[OperationEntry]
Return a list of Operations associated with a job
- Parameters:
job_name (str) – name of the job
- Returns:
the list of Operations associated with the job
- Return type:
List[Operation]