buildgrid.server.scheduler package

Submodules

Module contents

class buildgrid.server.scheduler.AgedJobHandlerOptions(job_max_age, handling_period, max_handling_window)

Bases: NamedTuple

static from_config(job_max_age_cfg: dict[str, float], handling_period_cfg: dict[str, float] | None = None, max_handling_window_cfg: int | None = None) AgedJobHandlerOptions

Helper method for creating AgedJobHandlerOptions objects If input configs are None, assign defaults

handling_period: timedelta

Alias for field number 1

job_max_age: timedelta

Alias for field number 0

max_handling_window: int

Alias for field number 2

class buildgrid.server.scheduler.Scheduler(sql_provider: SqlProvider, storage: StorageABC, *, sql_ro_provider: SqlProvider | None = None, sql_notifier_provider: SqlProvider | None = None, property_set: PropertySet, action_cache: ActionCacheABC | None = None, action_browser_url: str | None = None, max_execution_timeout: int = 7200, metering_client: SyncMeteringServiceClient | None = None, metering_throttle_action: MeteringThrottleAction | None = None, bot_session_keepalive_timeout: int = 600, logstream_channel: Channel | None = None, logstream_instance: str | None = None, asset_client: AssetClient | None = None, queued_action_retention_hours: float | None = None, completed_action_retention_hours: float | None = None, action_result_retention_hours: float | None = None, enable_job_watcher: bool = False, poll_interval: float = 1, pruning_options: AgedJobHandlerOptions | None = None, queue_timeout_options: AgedJobHandlerOptions | None = None, max_job_attempts: int = 5, assigner_configs: Sequence[AssignerConfig] | None = None, max_queue_size: int | None = None, execution_timer_interval: float = 60.0, session_expiry_timer_interval: float = 10.0, instance_pools: list[list[str]] | None = None, bot_locality_hint_limit: int = 10, bot_poll_interval: float = 1.0)

Bases: object

RETRYABLE_STATUS_CODES = (13, 14)
add_bot_entry(*, bot_name: str, bot_session_id: str, bot_session_status: int, bot_property_labels: list[str] | None = None, bot_capability_hashes: list[str] | None = None) str
assign_job_by_age(failure_backoff: float = 5.0) int

Assigns a job by age, returning the number of jobs updated

assign_job_by_priority(failure_backoff: float = 5.0) int

Assigns a job by priority, returning the number of jobs updated

cancel_jobs_exceeding_execution_timeout(max_execution_timeout: int | None = None) None
cancel_operation(operation_name: str) None
close_bot_sessions(bot_name: str) None
create_operation(job_name: str, *, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
create_operation_for_existing_job(*, action_digest: Digest, priority: int, request_metadata: RequestMetadata | None, client_identity: ClientIdentityEntry | None) str | None
create_operation_for_new_job(*, action: Action, action_digest: Digest, command: Command, execute_response: ExecuteResponse | None, platform_requirements: dict[str, list[str]], property_label: str, priority: int, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None, scheduling_metadata: SchedulingMetadata | None = None) str
execution_timer_loop(shutdown_requested: Event) None

Periodically timeout aged executing jobs

get_bot_status_metrics() BotMetrics

Count the number of bots with a particular status and property_label

get_client_identity_by_operation(operation_name: str) ClientIdentity | None
get_execute_action_metadata(job: JobEntry) ExecutedActionMetadata
get_metadata_for_leases(leases: Iterable[Lease]) list[tuple[str, bytes]]

Return a list of Job metadata for a given list of leases.

Parameters:

leases (list) – List of leases to get Job metadata for.

Returns:

List of tuples of the form ('executeoperationmetadata-bin': serialized_metadata).

get_metrics(instance_name: str) SchedulerMetrics | None
get_operation_job_name(operation_name: str) str | None
get_operation_request_metadata_by_name(operation_name: str) RequestMetadata | None
get_or_create_client_identity_in_store(session: Session, client_id: ClientIdentityEntry) ClientIdentityEntry

Get the ClientIdentity in the storage or create one. This helper function essentially makes sure the client_id is created during the transaction

Parameters:
  • session (Session) – sqlalchemy Session

  • client_id (ClientIdentityEntry) – identity of the client that creates an operation

Returns:

identity of the client that creates an operation

Return type:

ClientIdentityEntry

get_or_create_request_metadata_in_store(session: Session, request_metadata: RequestMetadata) RequestMetadataEntry
static job_by_priority_statement(schedule_after_now: bool = False) Select

Selects a job by priority, ordered by priority and queued timestamp

list_operations(operation_filters: list[OperationFilter] | None = None, page_size: int | None = None, page_token: str | None = None) tuple[list[Operation], str]
list_workers(name_filter: str, page_number: int, page_size: int) tuple[list[BotEntry], int]
load_operation(operation_name: str) Operation
maybe_update_bot_platforms(bot_name: str, capability_hashes: list[str] | None = None) None
prune_timer_loop(shutdown_requested: Event) None

Running in a background thread, this method wakes up periodically and deletes older records from the jobs tables using configurable parameters

publish_execution_stats(job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str = 'unknown') None
queue_job_action(*, action: Action, action_digest: Digest, command: Command, platform_requirements: dict[str, list[str]], property_label: str, priority: int, skip_cache_lookup: bool, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None, scheduling_metadata: SchedulingMetadata | None = None) str

De-duplicates or inserts a newly created job into the execution queue. Returns an operation name associated with this job.

queue_timer_loop(shutdown_requested: Event) None

Periodically timeout aged queued jobs

reap_expired_sessions() bool

Find and close expired bot sessions. Returns True if sessions were closed. Only closes a few sessions to minimize time in transaction.

refresh_bot_expiry_time(bot_name: str, bot_id: str) datetime

This update is done out-of-band from the main synchronize_bot_lease transaction, as there are cases where we will skip calling the synchronization, but still want the session to be updated such that it does not get reaped. This slightly duplicates the update happening in synchronize_bot_lease, however, that update is still required to not have the job reaped during its job assignment waiting period.

This method should be called at the end of the update and create bot session methods. The returned datetime should be assigned to the deadline within the returned session proto.

session_expiry_timer_loop(shutdown_requested: Event) None
start() None
stop() None
synchronize_bot_lease(bot_name: str, bot_id: str, bot_status: int, session_lease: Lease | None, partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None) Lease | None
class buildgrid.server.scheduler.SchedulerMetrics

Bases: TypedDict

jobs: dict[tuple[str, str], int]
class buildgrid.server.scheduler.PropertySet(*args, **kwargs)

Bases: Protocol

bot_property_labels(bot_session: BotSession) list[str]

Find all label_key’s which can be used to identify bot types in logging and metrics.

execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]

Parses a platform value and returns the match properties used for scheduling. Returns a label which can be used for applying metrics.

worker_properties(bot_session: BotSession) list[dict[str, list[str]]]

Find all the valid property combinations which can be used to assign work to a bot.

class buildgrid.server.scheduler.DynamicPropertySet(*, unique_property_keys: set[str], match_property_keys: set[str], wildcard_property_keys: set[str], label_key: str | None = None)

Bases: object

bot_property_labels(bot_session: BotSession) list[str]
execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]
worker_properties(bot_session: BotSession) list[dict[str, list[str]]]
class buildgrid.server.scheduler.StaticPropertySet(*, property_labels: list[PropertyLabel], wildcard_property_keys: set[str])

Bases: object

bot_property_labels(bot_session: BotSession) list[str]
execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]
worker_properties(bot_session: BotSession) list[dict[str, list[str]]]
class buildgrid.server.scheduler.PropertyLabel(label: str, properties: set[tuple[str, str]])

Bases: object

label: str
properties: set[tuple[str, str]]
class buildgrid.server.scheduler.AssignerConfig(*args, **kwargs)

Bases: Protocol

generate_assigners(scheduler: Scheduler) Generator[JobAssigner, None, None]

Generate the actual JobAssigner objects defined by this configuration.

count: int
interval: float
class buildgrid.server.scheduler.PriorityAgeAssignerConfig(count: int, interval: float, priority_assignment_percentage: int = 100, failure_backoff: float = 5.0, jitter_factor: float = 1.0, busy_sleep_factor: float = 0.01)

Bases: object

busy_sleep_factor: float = 0.01
failure_backoff: float = 5.0
generate_assigners(scheduler: Scheduler) Generator[PriorityAgeJobAssigner, None, None]
jitter_factor: float = 1.0
priority_assignment_percentage: int = 100
count: int
interval: float