RabbitMQ BuildGrid Architecture¶
This work is in the early stages of development and may be subject to change as it gets worked on. This document currently represents the planned architecture for the scheduler and bots -> execution service communication.
BuildGrid’s current scheduler implementation is not scalable to the level we need it to be. The dependence on accessing the central state database as part of every request makes the system very sensitive to performance bottlenecks in the database.
It also limits the maximum throughput to the maximum number of database connections available at any one time. Most of BuildGrid’s connections are short-lived so there’s quite a lot of headroom here, but not enough to support scaling up to the level we need. Having to wait for available connections will cause extreme performance degradation in the current scheduler implementation.
There are also issues with the database session management in the current scheduler code, which lead to overcomplication and increased potential for deadlocks. This mostly stems from the current approach of supporting multiple scheduler types (ie. in-memory, SQLite, PostgreSQL). Refactoring the scheduler code to optimise for the recommended PostgreSQL use case will likely be almost the same amount of effort as writing a new RabbitMQ scheduler, and won’t address any of the fundamental problems with the current scheduler.
The core of the new approach to the scheduler is removing all persistent
state from the Execution service and the Bots service (sometimes also
BotsInterface in our docs).
Currently these services are responsible for maintaining the contents of the shared job database used to assign work to workers and track state chages. This adds overheads and leads to the issues described above, where database access is critical for performance. It also doesn’t really suit parts of the use-case; it’d be much cleaner to have the Bots service communicate state changes directly to Execution services for example, rather than having the Execution service monitor the database that gets updated by the Bots service.
By removing this responsibility, the critical path for handling an Execute request is streamlined to just the things which are actually needed to handle the request.
In order to remove this responsibility, we need to provide a way for these services to communicate directly with each other, rather than sharing a database (or in-memory state store in the in-memory scheduler case). A message broker like RabbitMQ is a good solution for this communication, since it provides various ways to route messages from one service to another without those services necessarily being aware of each other’s location or even existence.
The rest of this document assumes familiarity with RabbitMQ concepts (like exchanges and queues), and the available approaches to message routing it provides.
BuildGrid will need a few exchanges to replace all the functionality we currently rely on the database for. The following sections will describe them and their general usage flow.
The Execution exchange replaces the current database-based approach to informing the Bots service of work and getting that work assigned to a capable worker.
It’s a direct exchange, so does exact matching between the message
and queue routing keys to direct messages into queues. These routing
keys will be set to some value derived from the platform properties of
Execution services publish a message containing the
Action on this
exchange when they’re handling an incoming Execute request. That message
will eventually be consumed by a Bots service which will use it to
Action to a worker.
In this design, the Bots service is configured with a mapping of worker platforms to queues. This tells the bots service which queues to consume from when specific types of worker are connected. For example,
platform-queues: - platform: osfamily=linux:isa=x86_64:pool=large queues: - osfamily=linux:isa=x86_64:pool=large - osfamily=linux:isa=x86_64 - osfamily=linux - platform: osfamily=linux:isa=x86_64:pool=medium queues: - osfamily=linux:isa=x86_64:pool=medium - osfamily=linux:isa=x86_64 - osfamily=linux - platform: osfamily=linux:isa=x86_64:pool=small queues: - osfamily=linux:isa=x86_64:pool=small - osfamily=linux:isa=x86_64 - osfamily=linux
This config will create 5 queues, with the workers getting assigned work from only the queues they specify.
The exchange will reject messages published by the Execution service if
a suitable queue doesn’t exist, meaning a queue for every expected type
Action must be declared in the Bots service configuration.
Bots services will only consume from the configured queues when workers
are available to assign a potentially consumed
Action to. This is
to prevent a Bots service consuming a chunk of the queue and waiting
(potentially indefinitely) for a suitable worker to come along. Workers
are expected to send
requests with reasonably large timeouts to facilitate them being
available to have work assigned for a reasonable length of time, to
avoid the situation where we’re rapidly polling the queues as a worker
briefly connects and disappears.
The Bots services will also only consume from queues configured as usable for the worker platforms that are currently connected. This further avoids holding work without executing it whilst a different Bots service could potentially have spare capacity.
The Operations exchange replaces the current database-based approach to informing execution clients of updates to the state of the Action they’re waiting for.
This is a topic exchange, since some consumers will only need to consume a subset of the messages.
A fanout exchange will work here too if we find the performance of a topic exchange with wildcard routing patterns isn’t adequate, though that approach will need some extra consideration for multiple-instance setups.
Messages will be published onto this exchange whenever a Job changes state by the service that performs the state transition. That is,
Execution service creating a new Operation and/or Job
Execution service moving the Job into
Execution service moving the Job into
Bots service moving the Job into
Bots service moving the Job into
Messages will be consumed from queues bound to this exchange by Execution services, Operations services, and ActionCache services.
Execution services will use these messages to relay Operation updates to connected clients about the state of their Execute request, and also to maintain a cache of Action digests which can be deduplicated.
Operations services will use these messages to maintain a persistent
database of Job/Operation state, for use when handling
ActionCache services will specifically consume messages moving Jobs into
COMPLETED stage, and use them to cache the result without requiring
the Bots service to make a separate
This diagram shows the flow of messages sent to this exchange by the Execution service. All the Operations services share a queue to persist updates in the database. Meanwhile, all the other Execution services have their own queues and so receive all the messages that get sent to the exchange. This ensures that clients don’t miss messages, and that the deduplication cache can be maintained as well as possible. The ActionCache service doesn’t appear in this diagram since the Execution service never sends messages in the COMPLETED state, so will never send messages which get consumed by the ActionCache.
The other relevant flow here is messages sent by the Bots services.
This diagram is similar to the previous one. The Operations and Execution services behave in the same way, with Operations sharing a queue and each Execution service having its own queue. The ActionCache service is included here though, since sometimes the Bots service will publish messages moving Jobs into the COMPLETED stage.
The ActionCache’s queue will only receive those messages, so the ActionCache will only receive messages relevant to it’s use case. The queue behaviour when horizontally scaled is the same as the Operations service, with all the ActionCache services handling a subset of the messagestores to share the cache maintenance load.
The Cancellation exchange replaces the current cancellation implementation,
where the Bots service must fetch the current Job state from the central
database whenever it receives an
UpdateBotSession request to check for
cancellation of all the related Operations.
This is a fanout exchange, since all the consumers need all the messages.
The Operations service publishes a message on this exchange when it has
CancelOperation request for every Operation related to a
specific Job. The message contains the Job ID to be cancelled. The Operations
service keeps track of whether or not all related Operations are cancelled
using the same database it uses to respond to
The messages sent to this exchange are consumed by both the Bots service
and the Execution service. The Bots service keeps track of the Job IDs
received in these messages, and checks the Lease in
requests against this cache. The Lease ID is the same as the Job ID, so
if it’s found in the cache the Lease gets cancelled in the response.
The Execution service uses the cancelled Job IDs to maintain its deduplication cache. Cancelled Job IDs are removed from the cache, to avoid deduplicating into a Job which will never be completed in the time between the Bots service receiving the cancellation message and the work actually being cancelled.
This cancellation behaviour is best-effort only since it works using in-memory caches of a limited size that aren’t populated at startup.
The BotStatus exchange replaces the BotSessionReaper coroutine and its retry functionality. (With the old implementation allowing workers to connect to an arbitrary Bots service in the grid, rather than forcing them to stick to a specific machine, requires database interaction in order to keep track of the worker and avoid spurious retries from Bots services to which the worker might have been connected in the past.)
This is a direct exchange. The routing key is mostly irrelevant since this exchange will only have a single queue bound to it.
The Bots service will publish messages on this exchange when a worker
connects with either a
request. The messages will contain the server-assigned worker name,
the timestamp of the connection, and a list of the Jobs being worked
on by the worker.
The messages sent to this exchange will be consumed by a new BotMonitor service, which tracks the last-seen time of workers across the whole grid, and periodically checks for timed-out workers. The work assigned to any timed-out workers is then retried or cancelled, depending on how many times it has previously been retried (which is also tracked by this service).
Retries are handled by requeueing the Job onto the Execution exchange
and publishing a message on the Operations exchange moving the Job
back into the
QUEUED state. The Job may be requeued with a higher
priority to avoid flaky Jobs having to repeatedly get through the queue.
There are some tradeoffs made to enable this design against the existing BuildGrid implementation. Most are fairly minor, and should be very much outweighed by the code quality and scalability improvements they enable.
Deduplication is now only best-effort rather than guaranteed to occur when possible. This is to avoid having an interaction with some canonical state store in the critical path, instead relying on caches maintained by consuming messages from the Operations exchange to get an approximate view of the current state.
Cancellation is also now only best-effort. This is still within spec, but previously cancellation was guaranteed to cause work to stop assuming the worker was still communicating. The upshot of this is that we might execute some unnecessary work, but this should still be at a manageable level.
WaitExecution response time¶
Handling a WaitExecution request now involves also making a GetOperation request to the Operations service to get the current state. This allows the Operations service to be the sole owner of the Job/Operations database tables, simplifying the code by allowing the database access to be completely contained in the Operations service. It also avoids the temptation to use the database as part of the more time-critical handling of Execute requests by having access to it completely unavailable to the Execution service.
This new GetOperation response will add some overhead to the time between sending a WaitExecution request and receiving the initial response.
Priority is less granular than in the old scheduler, being dependent on the number of priority levels available in the RabbitMQ platform queues. This is normally configurable up to 10, which means a reasonable number of priority levels can still be supported.
Platform property flexibility¶
Platform properties and their possible combinations now need to be defined in a configuration file at deployment time, rather than the old support for fully arbitrary partial matching.
In reality, most use cases don’t actually need full partial matching, and that full matching is still possible with a verbose config. This allows the number of queues used for sending Jobs to workers to be kept at a sensible level rather than growing exponentially as new properties are introduced to the grid, whilst still allowing more flexibility than requiring client-set properties to exactly match worker-specified properties.