buildgrid.server.scheduler.impl module
- class buildgrid.server.scheduler.impl.SchedulerMetrics
Bases:
TypedDict
- jobs: dict[tuple[str, str], int]
- class buildgrid.server.scheduler.impl.AgedJobHandlerOptions(job_max_age, handling_period, max_handling_window)
Bases:
NamedTuple
- job_max_age: timedelta
Alias for field number 0
- handling_period: timedelta
Alias for field number 1
- max_handling_window: int
Alias for field number 2
- static from_config(job_max_age_cfg: dict[str, float], handling_period_cfg: dict[str, float] | None = None, max_handling_window_cfg: int | None = None) AgedJobHandlerOptions
Helper method for creating
AgedJobHandlerOptions
objects If input configs are None, assign defaults
- class buildgrid.server.scheduler.impl.Scheduler(sql_provider: SqlProvider, storage: StorageABC, *, sql_ro_provider: SqlProvider | None = None, sql_notifier_provider: SqlProvider | None = None, property_set: PropertySet, action_cache: ActionCacheABC | None = None, action_browser_url: str | None = None, max_execution_timeout: int = 7200, metering_client: SyncMeteringServiceClient | None = None, metering_throttle_action: MeteringThrottleAction | None = None, bot_session_keepalive_timeout: int = 600, logstream_channel: Channel | None = None, logstream_instance: str | None = None, asset_client: AssetClient | None = None, queued_action_retention_hours: float | None = None, completed_action_retention_hours: float | None = None, action_result_retention_hours: float | None = None, enable_job_watcher: bool = False, poll_interval: float = 1, pruning_options: AgedJobHandlerOptions | None = None, queue_timeout_options: AgedJobHandlerOptions | None = None, max_job_attempts: int = 5, job_assignment_interval: float = 1.0, priority_assignment_percentage: int = 100, max_queue_size: int | None = None, execution_timer_interval: float = 60.0, session_expiry_timer_interval: float = 10.0)
Bases:
object
- RETRYABLE_STATUS_CODES = (13, 14)
- start() None
- stop() None
- queue_job_action(*, action: Action, action_digest: Digest, command: Command, platform_requirements: dict[str, list[str]], property_label: str, priority: int, skip_cache_lookup: bool, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
De-duplicates or inserts a newly created job into the execution queue. Returns an operation name associated with this job.
- create_operation_for_existing_job(*, action_digest: Digest, priority: int, request_metadata: RequestMetadata | None, client_identity: ClientIdentityEntry | None) str | None
- create_operation_for_new_job(*, action: Action, action_digest: Digest, command: Command, execute_response: ExecuteResponse | None, platform_requirements: dict[str, list[str]], property_label: str, priority: int, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
- create_operation(job_name: str, *, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
- load_operation(operation_name: str) Operation
- get_operation_job_name(operation_name: str) str | None
- get_operation_request_metadata_by_name(operation_name: str) RequestMetadata | None
- get_client_identity_by_operation(operation_name: str) ClientIdentity | None
- execution_timer_loop(shutdown_requested: Event) None
Periodically timeout aged executing jobs
- cancel_jobs_exceeding_execution_timeout(max_execution_timeout: int | None = None) None
- cancel_operation(operation_name: str) None
- list_operations(operation_filters: list[buildgrid.server.operations.filtering.filter.OperationFilter] | None = None, page_size: int | None = None, page_token: str | None = None) tuple[list[buildgrid._protos.google.longrunning.operations_pb2.Operation], str]
- list_workers(name_filter: str, page_number: int, page_size: int) tuple[list[buildgrid.server.sql.models.BotEntry], int]
- get_metrics() SchedulerMetrics | None
- assign_n_leases_by_priority(*, capability_hash: str, bot_names: list[str]) list[str]
- assign_n_leases_by_age(*, capability_hash: str, bot_names: list[str]) list[str]
- queue_timer_loop(shutdown_requested: Event) None
Periodically timeout aged queued jobs
- prune_timer_loop(shutdown_requested: Event) None
Running in a background thread, this method wakes up periodically and deletes older records from the jobs tables using configurable parameters
- get_or_create_client_identity_in_store(session: Session, client_id: ClientIdentityEntry) ClientIdentityEntry
Get the ClientIdentity in the storage or create one. This helper function essentially makes sure the client_id is created during the transaction
- Parameters:
session (Session) – sqlalchemy Session
client_id (ClientIdentityEntry) – identity of the client that creates an operation
- Returns:
identity of the client that creates an operation
- Return type:
- get_or_create_request_metadata_in_store(session: Session, request_metadata: RequestMetadata) RequestMetadataEntry
- add_bot_entry(*, bot_session_id: str, bot_session_status: int) str
- close_bot_sessions(bot_name: str) None
- session_expiry_timer_loop(shutdown_requested: Event) None
- reap_expired_sessions() bool
Find and close expired bot sessions. Returns True if sessions were closed. Only closes a few sessions to minimize time in transaction.
- synchronize_bot_lease(bot_name: str, bot_id: str, bot_status: int, session_lease: Lease | None, partial_execution_metadata: dict[str, buildgrid._protos.build.bazel.remote.execution.v2.remote_execution_pb2.ExecutedActionMetadata] | None = None) Lease | None
- count_bots_by_status() dict[buildgrid.server.enums.BotStatus, int]
Count the number of bots with a particular status
- refresh_bot_expiry_time(bot_name: str, bot_id: str) datetime
This update is done out-of-band from the main synchronize_bot_lease transaction, as there are cases where we will skip calling the synchronization, but still want the session to be updated such that it does not get reaped. This slightly duplicates the update happening in synchronize_bot_lease, however, that update is still required to not have the job reaped during its job assignment waiting period.
This method should be called at the end of the update and create bot session methods. The returned datetime should be assigned to the deadline within the returned session proto.
- get_metadata_for_leases(leases: Iterable[Lease]) list[tuple[str, bytes]]
Return a list of Job metadata for a given list of leases.
- Parameters:
leases (list) – List of leases to get Job metadata for.
- Returns:
List of tuples of the form
('executeoperationmetadata-bin': serialized_metadata)
.
- publish_execution_stats(job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str = 'unknown') None