buildgrid.server.scheduler package
Submodules
- buildgrid.server.scheduler.assigner module
- buildgrid.server.scheduler.impl module
SchedulerMetrics
BotMetrics
AgedJobHandlerOptions
Scheduler
Scheduler.RETRYABLE_STATUS_CODES
Scheduler.start()
Scheduler.stop()
Scheduler.queue_job_action()
Scheduler.create_operation_for_existing_job()
Scheduler.create_operation_for_new_job()
Scheduler.create_operation()
Scheduler.load_operation()
Scheduler.get_operation_job_name()
Scheduler.get_operation_request_metadata_by_name()
Scheduler.get_client_identity_by_operation()
Scheduler.execution_timer_loop()
Scheduler.cancel_jobs_exceeding_execution_timeout()
Scheduler.cancel_operation()
Scheduler.list_operations()
Scheduler.list_workers()
Scheduler.get_metrics()
Scheduler.job_by_priority_statement()
Scheduler.assign_job_by_priority()
Scheduler.assign_job_by_age()
Scheduler.queue_timer_loop()
Scheduler.prune_timer_loop()
Scheduler.get_or_create_client_identity_in_store()
Scheduler.get_or_create_request_metadata_in_store()
Scheduler.add_bot_entry()
Scheduler.maybe_update_bot_platforms()
Scheduler.close_bot_sessions()
Scheduler.session_expiry_timer_loop()
Scheduler.reap_expired_sessions()
Scheduler.synchronize_bot_lease()
Scheduler.get_bot_status_metrics()
Scheduler.refresh_bot_expiry_time()
Scheduler.get_metadata_for_leases()
Scheduler.get_execute_action_metadata()
Scheduler.publish_execution_stats()
- buildgrid.server.scheduler.notifier module
- buildgrid.server.scheduler.properties module
Module contents
- class buildgrid.server.scheduler.AgedJobHandlerOptions(job_max_age, handling_period, max_handling_window)
Bases:
NamedTuple
- static from_config(job_max_age_cfg: dict[str, float], handling_period_cfg: dict[str, float] | None = None, max_handling_window_cfg: int | None = None) AgedJobHandlerOptions
Helper method for creating
AgedJobHandlerOptions
objects If input configs are None, assign defaults
- handling_period: timedelta
Alias for field number 1
- job_max_age: timedelta
Alias for field number 0
- max_handling_window: int
Alias for field number 2
- class buildgrid.server.scheduler.Scheduler(sql_provider: SqlProvider, storage: StorageABC, *, sql_ro_provider: SqlProvider | None = None, sql_notifier_provider: SqlProvider | None = None, property_set: PropertySet, action_cache: ActionCacheABC | None = None, action_browser_url: str | None = None, max_execution_timeout: int = 7200, metering_client: SyncMeteringServiceClient | None = None, metering_throttle_action: MeteringThrottleAction | None = None, bot_session_keepalive_timeout: int = 600, logstream_channel: Channel | None = None, logstream_instance: str | None = None, asset_client: AssetClient | None = None, queued_action_retention_hours: float | None = None, completed_action_retention_hours: float | None = None, action_result_retention_hours: float | None = None, enable_job_watcher: bool = False, poll_interval: float = 1, pruning_options: AgedJobHandlerOptions | None = None, queue_timeout_options: AgedJobHandlerOptions | None = None, max_job_attempts: int = 5, assigner_configs: Sequence[AssignerConfig] | None = None, max_queue_size: int | None = None, execution_timer_interval: float = 60.0, session_expiry_timer_interval: float = 10.0, instance_pools: list[list[str]] | None = None, bot_locality_hint_limit: int = 10, bot_poll_interval: float = 1.0)
Bases:
object
- RETRYABLE_STATUS_CODES = (13, 14)
- add_bot_entry(*, bot_name: str, bot_session_id: str, bot_session_status: int, bot_property_labels: list[str] | None = None, bot_capability_hashes: list[str] | None = None) str
- assign_job_by_age(failure_backoff: float = 5.0) int
Assigns a job by age, returning the number of jobs updated
- assign_job_by_priority(failure_backoff: float = 5.0) int
Assigns a job by priority, returning the number of jobs updated
- cancel_jobs_exceeding_execution_timeout(max_execution_timeout: int | None = None) None
- cancel_operation(operation_name: str) None
- close_bot_sessions(bot_name: str) None
- create_operation(job_name: str, *, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None) str
- create_operation_for_existing_job(*, action_digest: Digest, priority: int, request_metadata: RequestMetadata | None, client_identity: ClientIdentityEntry | None) str | None
- create_operation_for_new_job(*, action: Action, action_digest: Digest, command: Command, execute_response: ExecuteResponse | None, platform_requirements: dict[str, list[str]], property_label: str, priority: int, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None, scheduling_metadata: SchedulingMetadata | None = None) str
- execution_timer_loop(shutdown_requested: Event) None
Periodically timeout aged executing jobs
- get_bot_status_metrics() BotMetrics
Count the number of bots with a particular status and property_label
- get_client_identity_by_operation(operation_name: str) ClientIdentity | None
- get_metadata_for_leases(leases: Iterable[Lease]) list[tuple[str, bytes]]
Return a list of Job metadata for a given list of leases.
- Parameters:
leases (list) – List of leases to get Job metadata for.
- Returns:
List of tuples of the form
('executeoperationmetadata-bin': serialized_metadata)
.
- get_metrics(instance_name: str) SchedulerMetrics | None
- get_operation_job_name(operation_name: str) str | None
- get_operation_request_metadata_by_name(operation_name: str) RequestMetadata | None
- get_or_create_client_identity_in_store(session: Session, client_id: ClientIdentityEntry) ClientIdentityEntry
Get the ClientIdentity in the storage or create one. This helper function essentially makes sure the client_id is created during the transaction
- Parameters:
session (Session) – sqlalchemy Session
client_id (ClientIdentityEntry) – identity of the client that creates an operation
- Returns:
identity of the client that creates an operation
- Return type:
- get_or_create_request_metadata_in_store(session: Session, request_metadata: RequestMetadata) RequestMetadataEntry
- static job_by_priority_statement(schedule_after_now: bool = False) Select
Selects a job by priority, ordered by priority and queued timestamp
- list_operations(operation_filters: list[OperationFilter] | None = None, page_size: int | None = None, page_token: str | None = None) tuple[list[Operation], str]
- load_operation(operation_name: str) Operation
- maybe_update_bot_platforms(bot_name: str, capability_hashes: list[str] | None = None) None
- prune_timer_loop(shutdown_requested: Event) None
Running in a background thread, this method wakes up periodically and deletes older records from the jobs tables using configurable parameters
- publish_execution_stats(job_name: str, execution_metadata: ExecutedActionMetadata, property_label: str = 'unknown') None
- queue_job_action(*, action: Action, action_digest: Digest, command: Command, platform_requirements: dict[str, list[str]], property_label: str, priority: int, skip_cache_lookup: bool, request_metadata: RequestMetadata | None = None, client_identity: ClientIdentityEntry | None = None, scheduling_metadata: SchedulingMetadata | None = None) str
De-duplicates or inserts a newly created job into the execution queue. Returns an operation name associated with this job.
- queue_timer_loop(shutdown_requested: Event) None
Periodically timeout aged queued jobs
- reap_expired_sessions() bool
Find and close expired bot sessions. Returns True if sessions were closed. Only closes a few sessions to minimize time in transaction.
- refresh_bot_expiry_time(bot_name: str, bot_id: str) datetime
This update is done out-of-band from the main synchronize_bot_lease transaction, as there are cases where we will skip calling the synchronization, but still want the session to be updated such that it does not get reaped. This slightly duplicates the update happening in synchronize_bot_lease, however, that update is still required to not have the job reaped during its job assignment waiting period.
This method should be called at the end of the update and create bot session methods. The returned datetime should be assigned to the deadline within the returned session proto.
- session_expiry_timer_loop(shutdown_requested: Event) None
- start() None
- stop() None
- synchronize_bot_lease(bot_name: str, bot_id: str, bot_status: int, session_lease: Lease | None, partial_execution_metadata: dict[str, ExecutedActionMetadata] | None = None) Lease | None
- class buildgrid.server.scheduler.SchedulerMetrics
Bases:
TypedDict
- jobs: dict[tuple[str, str], int]
- class buildgrid.server.scheduler.PropertySet(*args, **kwargs)
Bases:
Protocol
- bot_property_labels(bot_session: BotSession) list[str]
Find all label_key’s which can be used to identify bot types in logging and metrics.
- execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]
Parses a platform value and returns the match properties used for scheduling. Returns a label which can be used for applying metrics.
- worker_properties(bot_session: BotSession) list[dict[str, list[str]]]
Find all the valid property combinations which can be used to assign work to a bot.
- class buildgrid.server.scheduler.DynamicPropertySet(*, unique_property_keys: set[str], match_property_keys: set[str], wildcard_property_keys: set[str], label_key: str | None = None)
Bases:
object
- bot_property_labels(bot_session: BotSession) list[str]
- execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]
- worker_properties(bot_session: BotSession) list[dict[str, list[str]]]
- class buildgrid.server.scheduler.StaticPropertySet(*, property_labels: list[PropertyLabel], wildcard_property_keys: set[str])
Bases:
object
- bot_property_labels(bot_session: BotSession) list[str]
- execution_properties(platform: Platform) tuple[str, dict[str, list[str]]]
- worker_properties(bot_session: BotSession) list[dict[str, list[str]]]
- class buildgrid.server.scheduler.PropertyLabel(label: str, properties: set[tuple[str, str]])
Bases:
object
- label: str
- properties: set[tuple[str, str]]
- class buildgrid.server.scheduler.AssignerConfig(*args, **kwargs)
Bases:
Protocol
- generate_assigners(scheduler: Scheduler) Generator[JobAssigner, None, None]
Generate the actual JobAssigner objects defined by this configuration.
- count: int
- interval: float
- class buildgrid.server.scheduler.PriorityAgeAssignerConfig(count: int, interval: float, priority_assignment_percentage: int = 100, failure_backoff: float = 5.0, jitter_factor: float = 1.0, busy_sleep_factor: float = 0.01)
Bases:
object
- busy_sleep_factor: float = 0.01
- failure_backoff: float = 5.0
- generate_assigners(scheduler: Scheduler) Generator[PriorityAgeJobAssigner, None, None]
- jitter_factor: float = 1.0
- priority_assignment_percentage: int = 100
- count: int
- interval: float