buildgrid.server.persistence.interface module¶
-
class
buildgrid.server.persistence.interface.
DataStoreInterface
¶ Bases:
abc.ABC
Abstract class defining an interface to a data store for the scheduler.
The
DataStoreInterface
defines the interface used by the scheduler to manage storage of its internal state. It also provides some of the infrastructure for streaming messages triggered by changes in job and operation state, which can be used (via thebuildgrid.server.scheduler.Scheduler
itself) to stream progress updates back to clients.Implementations of the interface are required to implement all the abstract methods in this class. However, there is no requirement about the internal details of those implementations; it may be beneficial for certain implementations to make some of these methods a noop for example.
Implementations must also implement some way to set the events that are used in
stream_operations_updates
, which live in thewatched_jobs
dictionary.-
set_instance_name
(instance_name)¶
-
set_action_browser_url
(url)¶
-
abstract
activate_monitoring
() → None¶ Enable the monitoring features of the data store.
-
abstract
deactivate_monitoring
() → None¶ Disable the monitoring features of the data store.
This method also performs any necessary cleanup of stored metrics.
-
abstract
get_metrics
() → Dict[str, Dict[int, int]]¶ Return a dictionary of metrics for jobs, operations, and leases.
The returned dictionary is keyed by
buildgrid._enums.MetricCategories
values, and the values are dictionaries of counts per operation stage (or lease state, in the case of leases).
-
abstract
create_job
(job: buildgrid.server.job.Job) → None¶ Add a new job to the data store.
NOTE: This method just stores the job in the data store. In order to enqueue the job to make it available for scheduling execution, the
queue_job
method should also be called.- Parameters
job (buildgrid.server.job.Job) – The job to be stored.
-
abstract
queue_job
(job_name: str) → None¶ Add an existing job to the queue of jobs waiting to be assigned.
This method adds a job with the given name to the queue of jobs. If the job is already in the queue, then this method ensures that the position of the job in the queue is correct.
-
abstract
store_response
(job: buildgrid.server.job.Job, commit_changes: bool) → None¶ Store the job’s ExecuteResponse in the data store.
This method stores the response message for the job in the data store, in order to allow it to be retrieved when getting jobs in the future.
This is separate from
update_job
as implementations will likely need to always have a special case for handling persistence of the response message.- Parameters
job (buildgrid.server.job.Job) – The job to store the response message of.
-
abstract
get_job_by_action
(action_digest: build.bazel.remote.execution.v2.remote_execution_pb2.Digest, *, max_execution_timeout: Optional[int] = None) → Optional[buildgrid.server.job.Job]¶ Return the job corresponding to an Action digest.
This method looks for a job object corresponding to the given Action digest in the data store. If a job is found it is returned, otherwise None is returned.
- Parameters
action_digest (Digest) – The digest of the Action to find the corresponding job for.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns
The job with the given Action digest, if it exists. Otherwise None.
- Return type
buildgrid.server.job.Job or None
-
abstract
get_job_by_name
(name: str, *, max_execution_timeout: Optional[int] = None) → Optional[buildgrid.server.job.Job]¶ Return the job with the given name.
This method looks for a job with the specified name in the data store. If there is a matching Job it is returned, otherwise this returns None.
- Parameters
name (str) – The name of the job to return.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns
The job with the given name, if it exists. Otherwise None.
- Return type
buildgrid.server.job.Job or None
-
abstract
get_job_by_operation
(operation: str, *, max_execution_timeout: Optional[int] = None) → Optional[buildgrid.server.job.Job]¶ Return the Job for a given Operation.
This method takes an Operation name, and returns the Job which corresponds to that Operation. If the Operation isn’t found, or if the data store doesn’t contain a corresponding job, this returns None.
- Parameters
operation (str) – Name of the Operation whose corresponding Job is to be returned.
max_execution_timeout (int, Optional) – The max execution timeout.
- Returns
The job related to the given operation, if it exists. Otherwise None.
- Return type
buildgrid.server.job.Job or None
-
abstract
get_all_jobs
() → List[buildgrid.server.job.Job]¶ Return a list of all jobs in the data store.
This method returns a list of all incomplete jobs in the data store.
- Returns
List of all incomplete jobs in the data store.
- Return type
list
-
abstract
get_jobs_by_stage
(operation_stage: buildgrid._enums.OperationStage) → List[buildgrid.server.job.Job]¶ Return a list of jobs in the given stage.
This method returns a list of all jobs in a specific operation stage.
- Parameters
operation_stage (OperationStage) – The stage that the returned list of jobs should all be in.
- Returns
List of all jobs in the specified operation stage.
- Return type
list
-
abstract
update_job
(job_name: str, changes: Mapping[str, Any], skip_notify: bool) → None¶ Update a job in the data store.
This method takes a job name and a dictionary of changes to apply to the job in the data store, and updates the job with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
- Parameters
job_name (str) – The name of the job that is being updated.
changes – (dict): The dictionary of changes
skip_notify – (bool): Whether notifying about job changes should be skipped
-
abstract
delete_job
(job_name: str) → None¶ Delete a job from the data store.
This method removes a job from the data store.
- Parameters
job_name (str) – The name of the job to be removed.
-
watch_job
(job: buildgrid.server.job.Job, operation_name: str, peer: str) → None¶ Start watching a job and operation for changes.
If the given job is already being watched, then this method finds (or adds) the operation in the job’s entry in
watched_jobs
, and adds the peer to the list of peers for that operation.Otherwise, it creates a whole new entry in
watched_jobs
for the given job, operation, and peer.This method runs in a thread spawned by gRPC handling a connected peer.
- Parameters
job (buildgrid.server.job.Job) – The job to watch.
operation_name (str) – The name of the specific operation to watch.
peer (str) – The peer that is requesting to watch the job.
-
stream_operation_updates
(operation_name: str, context: grpc.RpcContext) → Generator[Tuple[Optional[Exception], str], None, None]¶ Stream update messages for a given operation.
This is a generator which yields tuples of the form
(error, operation)
where
error
is None unless the job is cancelled, in which caseerror
is abuildgrid._exceptions.CancelledError
.This method runs in a thread spawned by gRPC handling a connected peer, and should spend most of its time blocked waiting on an event which is set by either the thread which watches the data store for job updates or the main thread handling the gRPC termination callback.
Iteration finishes either when the provided gRPC context becomes inactive, or when the job owning the operation being watched is deleted from the data store.
- Parameters
operation_name (str) – The name of the operation to stream updates for.
context (grpc.ServicerContext) – The RPC context for the peer that is requesting a stream of events.
-
stop_watching_operation
(job: buildgrid.server.job.Job, operation_name: str, peer: str) → None¶ Remove the given peer from the list of peers watching the given job.
If the given job is being watched, this method triggers a
JobEventType.STOP
for it to cause the waiting threads to check whether their context is still active. It then removes the given peer from the list of peers watching the given operation name. If this leaves no peers then the entire entry for the operation in the tracked job is removed.If this process leaves the job with no operations being watched, the job itself is removed from the watched_jobs dictionary, and it will no longer be checked for updates.
This runs in the main thread as part of the RPC termination callback for
Execute
andWaitExecution
requests.- Parameters
job (buildgrid.server.job.Job) – The job to stop watching.
operation_name (str) – The name of the specific operation to stop watching.
peer (str) – The peer that is requesting to stop watching the job.
-
abstract
create_operation
(operation_name: str, job_name: str) → None¶ Add a new operation to the data store.
- Parameters
operation_name (str) – The name of the Operation to create in the data store.
job_name (str) – The name of the Job representing the execution of this operation.
-
abstract
get_operations_by_stage
(operation_stage: buildgrid._enums.OperationStage) → Set[str]¶ Return a set of Job names in a specific operation stage.
Find the operations in a given stage and return a set containing the names of the Jobs related to those operations.
- Parameters
operation_stage (OperationStage) – The stage that the operations should be in.
- Returns
Set of all job names with operations in the specified state.
- Return type
set
-
abstract
list_operations
(operation_filters: List[buildgrid.server.operations.filtering.filter.OperationFilter], page_size: Optional[int] = None, page_token: Optional[str] = None, max_execution_timeout: Optional[int] = None) → Tuple[List[google.longrunning.operations_pb2.Operation], str]¶ Return all operations matching the filter.
- Returns
A page of matching operations in the data store. str: If nonempty, a token to be submitted by the requester for the next page of results.
- Return type
list
-
abstract
update_operation
(operation_name: str, changes: Mapping[str, Any]) → None¶ Update an operation in the data store.
This method takes an operation name and a dictionary of changes to apply to the operation in the data store, and updates the operation with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
- Parameters
operation_name (str) – The name of the operation that is being updated.
changes – (dict): The dictionary of changes to be applied.
-
abstract
delete_operation
(operation_name: str) → None¶ Delete a operation from the data store.
This method removes a operation from the data store.
- Parameters
operation_name (str) – The name of the operation to be removed.
-
abstract
create_lease
(lease: google.devtools.remoteworkers.v1test2.bots_pb2.Lease) → None¶ Add a new lease to the data store.
- Parameters
lease (Lease) – The Lease protobuf object representing the lease to be added to the data store.
-
abstract
get_leases_by_state
(lease_state) → Set[str]¶ Return the set of IDs of leases in a given state.
- Parameters
lease_state (LeaseState) – The state that the leases should be in.
- Returns
Set of strings containing IDs of leases in the given state.
- Return type
set
-
abstract
update_lease
(job_name: str, changes: Mapping[str, Any]) → None¶ Update a lease in the data store.
This method takes a job name and a dictionary of changes to apply to the lease for that job in the data store, and updates the lease with those changes. The dictionary should be keyed by the attribute names which need to be updated, with the values being the new values for the attributes.
The job name is used as leases have no unique identifier; instead there should always be at most one active lease for the job. It is the responsibility of data store implementations to ensure this.
- Parameters
job_name (str) – The name of the job whose lease is being updated.
changes – (dict): The dictionary of changes to be applied.
-
abstract
assign_lease_for_next_job
(capabilities: Mapping[str, Set[str]], callback: Callable[buildgrid.server.job.Job, List[google.devtools.remoteworkers.v1test2.bots_pb2.Lease]], timeout: Optional[int] = None) → List[google.devtools.remoteworkers.v1test2.bots_pb2.Lease]¶ Return a list of leases for a worker to run.
NOTE: Currently the list only ever has one or zero leases.
Inspect the list of jobs in the data store and return a list containing the lease for the highest priority job whose requirements match the given worker capabilities. If there are no matching jobs, return the empty list.
The list containing the lease must be created by the callback passed to this method. The callback is passed the job selected as suitable for execution by the specific worker.
Setting a timeout will cause this method to block up to
timeout
seconds, returning the empty list if no matching jobs become available before the timeout is reached.- Parameters
capabilities (dict) – Dictionary of worker capabilities to compare with job requirements when finding a job.
callback (callable) – Function to run on the next runnable job, should return a list of leases.
timeout (int) – time to wait for new jobs, caps if longer than MAX_JOB_BLOCK_TIME.
- Returns
List of leases for the worker to run.
- Return type
list
-