RabbitMQ BuildGrid Architecture

Warning

This work is in the early stages of development and may be subject to change as it gets worked on. This document currently represents the planned architecture for the scheduler and bots -> execution service communication.

Motivation

BuildGrid’s current scheduler implementation is not scalable to the level we need it to be. The dependence on accessing the central state database as part of every request makes the system very sensitive to performance bottlenecks in the database.

It also limits the maximum throughput to the maximum number of database connections available at any one time. Most of BuildGrid’s connections are short-lived so there’s quite a lot of headroom here, but not enough to support scaling up to the level we need. Having to wait for available connections will cause extreme performance degradation in the current scheduler implementation.

There are also issues with the database session management in the current scheduler code, which lead to overcomplication and increased potential for deadlocks. This mostly stems from the current approach of supporting multiple scheduler types (ie. in-memory, SQLite, PostgreSQL). Refactoring the scheduler code to optimise for the recommended PostgreSQL use case will likely be almost the same amount of effort as writing a new RabbitMQ scheduler, and won’t address any of the fundamental problems with the current scheduler.

Overview

The core of the new approach to the scheduler is removing all persistent state from the Execution service and the Bots service (sometimes also called the BotsInterface in our docs).

Currently these services are responsible for maintaining the contents of the shared job database used to assign work to workers and track state chages. This adds overheads and leads to the issues described above, where database access is critical for performance. It also doesn’t really suit parts of the use-case; it’d be much cleaner to have the Bots service communicate state changes directly to Execution services for example, rather than having the Execution service monitor the database that gets updated by the Bots service.

By removing this responsibility, the critical path for handling an Execute request is streamlined to just the things which are actually needed to handle the request.

In order to remove this responsibility, we need to provide a way for these services to communicate directly with each other, rather than sharing a database (or in-memory state store in the in-memory scheduler case). A message broker like RabbitMQ is a good solution for this communication, since it provides various ways to route messages from one service to another without those services necessarily being aware of each other’s location or even existence.

Note

The rest of this document assumes familiarity with RabbitMQ concepts (like exchanges and queues), and the available approaches to message routing it provides.

BuildGrid will need a few exchanges to replace all the functionality we currently rely on the database for. The following sections will describe them and their general usage flow.

Execution exchange

The Execution exchange replaces the current database-based approach to informing the Bots service of work and getting that work assigned to a capable worker.

Note

It’s a direct exchange, so does exact matching between the message and queue routing keys to direct messages into queues. These routing keys will be set to some value derived from the platform properties of the Action.

Execution services publish a message containing the Action on this exchange when they’re handling an incoming Execute request. That message will eventually be consumed by a Bots service which will use it to assign the Action to a worker.

digraph execution_exchange {
    graph [fontsize=16 fontname="Verdana" compound=true]
    node [
        shape=box,
        width=2,
        height=1,
        fillcolor="#666666",
        fontcolor="#eeeeee",
        fontname="Verdana",
        fontsize=16,
        style=filled
    ];
    edge [
        fontname="Verdana",
        fontsize=16
    ];
    ranksep = 2

    edge[arrowtail="vee"];
    edge[arrowhead="vee"];

    execution_1 [
        label="Execution service"
    ]
    execution_2 [
        label="Execution service"
    ]
    subgraph cluster_rabbitmq {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        exchange [
            label="Execution Exchange",
            shape=box
        ]
        queue_1 [
            label="Platform-specific\nQueue"
        ]
        queue_2 [
            label="Platform-specific\nQueue"
        ]
        queue_3 [
            label="Platform-specific\nQueue"
        ]
        queue_4 [
            label="Platform-specific\nQueue"
        ]

        exchange -> queue_2;
        exchange -> queue_1;
        exchange -> queue_3;
        exchange -> queue_4;
    }
    bots_1 [
        label="Bots service"
    ]
    bots_2 [
        label="Bots service"
    ]
    execution_1 -> exchange [
        label="Enqueuing work\n\n",
        labeldistance=2
    ];
    execution_2 -> exchange;
    queue_1 -> bots_1;
    queue_1 -> bots_2;
    queue_2 -> bots_1;
    queue_2 -> bots_2;
    queue_3 -> bots_2;
    queue_4 -> bots_1 [
        label="Fetching work for an\navailable platform",
        labeldistance=2
    ];
}

In this design, the Bots service is configured with a mapping of worker platforms to queues. This tells the bots service which queues to consume from when specific types of worker are connected. For example,

platform-queues:
  - platform: osfamily=linux;isa=x86_64;pool=large
    queues:
      - osfamily=linux;isa=x86_64;pool=large
      - osfamily=linux;isa=x86_64
      - osfamily=linux
  - platform: osfamily=linux;isa=x86_64;pool=medium
    queues:
      - osfamily=linux;isa=x86_64;pool=medium
      - osfamily=linux;isa=x86_64
      - osfamily=linux
  - platform: osfamily=linux;isa=x86_64;pool=small
    queues:
      - osfamily=linux;isa=x86_64;pool=small
      - osfamily=linux;isa=x86_64
      - osfamily=linux

This config will create 5 queues, with the workers getting assigned work from only the queues they specify.

The exchange will reject messages published by the Execution service if a suitable queue doesn’t exist, meaning a queue for every expected type of incoming Action must be declared in the Bots service configuration.

Bots services will only consume from the configured queues when workers are available to assign a potentially consumed Action to. This is to prevent a Bots service consuming a chunk of the queue and waiting (potentially indefinitely) for a suitable worker to come along. Workers are expected to send CreateBotSession and UpdateBotSession requests with reasonably large timeouts to facilitate them being available to have work assigned for a reasonable length of time, to avoid the situation where we’re rapidly polling the queues as a worker briefly connects and disappears.

The Bots services will also only consume from queues configured as usable for the worker platforms that are currently connected. This further avoids holding work without executing it whilst a different Bots service could potentially have spare capacity.

Operations Exchange

The Operations exchange replaces the current database-based approach to informing execution clients of updates to the state of the Action they’re waiting for.

Note

This is a topic exchange, since some consumers will only need to consume a subset of the messages.

A fanout exchange will work here too if we find the performance of a topic exchange with wildcard routing patterns isn’t adequate, though that approach will need some extra consideration for multiple-instance setups.

The routing key is of the form '{stage}.{instance_name}' or '{stage}' in the case of the empty instance, where stage is a value in {'UNKNOWN', 'CACHE_CHECK', 'QUEUED', 'EXECUTING', 'COMPLETED'} (build.bazel.remote.execution.v2.ExecutionStage values).

Messages will consist of a google.protobuf.Any containing a buildgrid.v2.messaging.CreateOperation or buildgrid.v2.messaging.UpdateOperations message.

Messages will be published onto this exchange whenever a Job changes state by the service that performs the state transition. That is,

  • Execution service creating a new Operation and/or Job

  • Execution service moving the Job into CACHE_CHECK

  • Execution service moving the Job into QUEUED

  • Bots service moving the Job into EXECUTING

  • Bots service moving the Job into COMPLETED

Messages will be consumed from queues bound to this exchange by Execution services, Operations services, and ActionCache services.

Execution services will use these messages to relay Operation updates to connected clients about the state of their Execute request, and also to maintain a cache of Action digests which can be deduplicated.

Operations services will use these messages to maintain a persistent database of Job/Operation state, for use when handling ListOperations, GetOperation, and CancelOperation requests.

ActionCache services will specifically consume messages moving Jobs into the COMPLETED stage, and use them to cache the result without requiring the Bots service to make a separate UpdateActionResult request.

digraph operations_exchange_exec {
    graph [
        fontsize=16
        fontname="Verdana"
        compound=true
    ]
    node [
        shape=box,
        width=2,
        height=1,
        fillcolor="#666666",
        fontcolor="#eeeeee",
        fontname="Verdana",
        fontsize=16,
        style=filled
    ];
    edge [
        fontname="Verdana",
        fontsize=16
    ];
    ranksep = 2

    execution_1 [
        label="Execution service A"
    ]
    subgraph cluster_rabbitmq {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        exchange [
            label="Operations Exchange",
            shape=box
        ]
        operations_queue [
            label="Operations service\nQueue"
        ]
        execution_queue_1 [
            label="Execution service\nQueue"
        ]
        execution_queue_2 [
            label="Execution service\nQueue"
        ]

        exchange -> operations_queue;
        exchange -> execution_queue_1;
        exchange -> execution_queue_2;
    }
    execution_2 [
        label="Execution service B"
    ]
    execution_3 [
        label="Execution service C"
    ]
    operations_1 [
        label="Operations service A"
    ]
    operations_2 [
        label="Operations service B"
    ]

    execution_1 -> exchange;
    execution_queue_1 -> execution_2;
    execution_queue_2 -> execution_3;
    operations_queue -> operations_1;
    operations_queue -> operations_2;
}

This diagram shows the flow of messages sent to this exchange by the Execution service. All the Operations services share a queue to persist updates in the database. Meanwhile, all the other Execution services have their own queues and so receive all the messages that get sent to the exchange. This ensures that clients don’t miss messages, and that the deduplication cache can be maintained as well as possible. The ActionCache service doesn’t appear in this diagram since the Execution service never sends messages in the COMPLETED state, so will never send messages which get consumed by the ActionCache.

The other relevant flow here is messages sent by the Bots services.

digraph operations_exchange_bots {
    graph [
        fontsize=16
        fontname="Verdana"
        compound=true
    ]
    node [
        shape=box,
        width=2,
        height=1,
        fillcolor="#666666",
        fontcolor="#eeeeee",
        fontname="Verdana",
        fontsize=16,
        style=filled
    ];
    edge [
        fontname="Verdana",
        fontsize=16
    ];
    ranksep = 2

    bots_1 [
        label="Bots service A"
    ]
    bots_2 [
        label="Bots service B"
    ]
    subgraph cluster_rabbitmq {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        exchange [
            label="Operations Exchange",
            shape=box
        ]
        operations_queue [
            label="Operations service\nQueue"
        ]
        execution_queue_1 [
            label="Execution service\nQueue"
        ]
        execution_queue_2 [
            label="Execution service\nQueue"
        ]
        execution_queue_3 [
            label="Execution service\nQueue"
        ]
        actioncache_queue [
            label="ActionCache service\nQueue"
        ]

        exchange -> operations_queue;
        exchange -> execution_queue_1;
        exchange -> execution_queue_2;
        exchange -> execution_queue_3;
        exchange -> actioncache_queue;
    }
    execution_1 [
        label="Execution service A"
    ]
    execution_2 [
        label="Execution service B"
    ]
    execution_3 [
        label="Execution service C"
    ]
    operations_1 [
        label="Operations service A"
    ]
    operations_2 [
        label="Operations service B"
    ]
    actioncache [
        label="ActionCache service"
    ]

    bots_1 -> exchange;
    bots_2 -> exchange;
    execution_queue_1 -> execution_1;
    execution_queue_2 -> execution_2;
    execution_queue_3 -> execution_3;
    operations_queue -> operations_1;
    operations_queue -> operations_2;
    actioncache_queue -> actioncache;
}

This diagram is similar to the previous one. The Operations and Execution services behave in the same way, with Operations sharing a queue and each Execution service having its own queue. The ActionCache service is included here though, since sometimes the Bots service will publish messages moving Jobs into the COMPLETED stage.

The ActionCache’s queue will only receive those messages, so the ActionCache will only receive messages relevant to it’s use case. The queue behaviour when horizontally scaled is the same as the Operations service, with all the ActionCache services handling a subset of the messagestores to share the cache maintenance load.

Cancellation Exchange

The Cancellation exchange replaces the current cancellation implementation, where the Bots service must fetch the current Job state from the central database whenever it receives an UpdateBotSession request to check for cancellation of all the related Operations.

Note

This is a fanout exchange, since all the consumers need all the messages.

The Operations service publishes a message on this exchange when it has received a CancelOperation request for every Operation related to a specific Job. The message contains the Job ID to be cancelled. The Operations service keeps track of whether or not all related Operations are cancelled using the same database it uses to respond to ListOperations and GetOperation requests.

The messages sent to this exchange are consumed by both the Bots service and the Execution service. The Bots service keeps track of the Job IDs received in these messages, and checks the Lease in UpdateBotSession requests against this cache. The Lease ID is the same as the Job ID, so if it’s found in the cache the Lease gets cancelled in the response.

The Execution service uses the cancelled Job IDs to maintain its deduplication cache. Cancelled Job IDs are removed from the cache, to avoid deduplicating into a Job which will never be completed in the time between the Bots service receiving the cancellation message and the work actually being cancelled.

This cancellation behaviour is best-effort only since it works using in-memory caches of a limited size that aren’t populated at startup.

digraph cancellation_exchange {
    graph [
        fontsize=16
        fontname="Verdana"
        compound=true
    ]
    node [
        shape=box,
        width=2,
        height=1,
        fillcolor="#666666",
        fontcolor="#eeeeee",
        fontname="Verdana",
        fontsize=16,
        style=filled
    ];
    edge [
        fontname="Verdana",
        fontsize=16
    ];
    ranksep = 2

    operations [
        label="Operations service"
    ]
    subgraph cluster_rabbitmq {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        exchange [
            label="Cancellation Exchange",
            shape=box
        ]
        bots_queue_1 [
            label="Bots service\nQueue"
        ]
        bots_queue_2 [
            label="Bots service\nQueue"
        ]
        execution_queue_1 [
            label="Execution service\nQueue"
        ]
        execution_queue_2 [
            label="Execution service\nQueue"
        ]

        exchange -> bots_queue_1;
        exchange -> bots_queue_2;
        exchange -> execution_queue_1;
        exchange -> execution_queue_2;
    }
    execution_1 [
        label="Execution service A"
    ]
    execution_2 [
        label="Execution service B"
    ]
    bots_1 [
        label="Bots service A"
    ]
    bots_2 [
        label="Bots service B"
    ]

    operations -> exchange;
    execution_queue_1 -> execution_1;
    execution_queue_2 -> execution_2;
    bots_queue_1 -> bots_1;
    bots_queue_2 -> bots_2;
}

BotStatus Exchange

The BotStatus exchange replaces the BotSessionReaper coroutine and its retry functionality. (With the old implementation allowing workers to connect to an arbitrary Bots service in the grid, rather than forcing them to stick to a specific machine, requires database interaction in order to keep track of the worker and avoid spurious retries from Bots services to which the worker might have been connected in the past.)

Note

This is a direct exchange. The routing key is mostly irrelevant since this exchange will only have a single queue bound to it. The message payload will contain a buildgrid.v2.messaging.BotStatus protobuf.

The Bots service will publish messages on this exchange when a worker connects with either a CreateBotSession or UpdateBotSession request. The messages will contain the server-assigned worker name, the timestamp of the connection, and a list of the Jobs being worked on by the worker.

The messages sent to this exchange will be consumed by a new BotMonitor service, which tracks the last-seen time of workers across the whole grid, and periodically checks for timed-out workers. The work assigned to any timed-out workers is then retried or cancelled, depending on how many times it has previously been retried (which is also tracked by this service).

Retries are handled by requeueing the Job onto the Execution exchange and publishing a message on the Operations exchange moving the Job back into the QUEUED state. The Job may be requeued with a higher priority to avoid flaky Jobs having to repeatedly get through the queue.

digraph cancellation_exchange {
    graph [
        fontsize=16
        fontname="Verdana"
        compound=true
    ]
    node [
        shape=box,
        width=2,
        height=1,
        fillcolor="#666666",
        fontcolor="#eeeeee",
        fontname="Verdana",
        fontsize=16,
        style=filled
    ];
    edge [
        fontname="Verdana",
        fontsize=16
    ];
    ranksep = 2

    bots_1 [
        label="Bots service A"
    ]
    bots_2 [
        label="Bots service B"
    ]
    subgraph cluster_rabbitmq_1 {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        exchange [
            label="BotStatus Exchange",
            shape=box
        ]
        monitor_queue [
            label="Queue"
        ]

        exchange -> monitor_queue;
    }
    monitor_1 [
        label="BotMonitor service A"
    ]
    monitor_2 [
        label="BotMonitor service B"
    ]
    subgraph cluster_rabbitmq_2 {
        label="RabbitMQ";
        labeljust="l";
        style="dashed";
        node [
            shape=ellipse,
            width=2,
            height=1,
            fillcolor="#d0e0e3",
            fontcolor="#134f5c",
            color="#134f5c",
            style=filled
        ];

        execution_exchange [
            label="Execution Exchange",
            shape=box
        ]
        operations_exchange [
            label="Operations Exchange",
            shape=box
        ]
    }

    bots_1 -> exchange;
    bots_2 -> exchange;
    monitor_queue -> monitor_1;
    monitor_queue -> monitor_2;
    monitor_1 -> execution_exchange;
    monitor_1 -> operations_exchange;
    monitor_2 -> execution_exchange;
    monitor_2 -> operations_exchange;

}

Tradeoffs

There are some tradeoffs made to enable this design against the existing BuildGrid implementation. Most are fairly minor, and should be very much outweighed by the code quality and scalability improvements they enable.

Deduplication

Deduplication is now only best-effort rather than guaranteed to occur when possible. This is to avoid having an interaction with some canonical state store in the critical path, instead relying on caches maintained by consuming messages from the Operations exchange to get an approximate view of the current state.

Cancellation

Cancellation is also now only best-effort. This is still within spec, but previously cancellation was guaranteed to cause work to stop assuming the worker was still communicating. The upshot of this is that we might execute some unnecessary work, but this should still be at a manageable level.

WaitExecution response time

Handling a WaitExecution request now involves also making a GetOperation request to the Operations service to get the current state. This allows the Operations service to be the sole owner of the Job/Operations database tables, simplifying the code by allowing the database access to be completely contained in the Operations service. It also avoids the temptation to use the database as part of the more time-critical handling of Execute requests by having access to it completely unavailable to the Execution service.

This new GetOperation response will add some overhead to the time between sending a WaitExecution request and receiving the initial response.

Priority granularity

Priority is less granular than in the old scheduler, being dependent on the number of priority levels available in the RabbitMQ platform queues. This is normally configurable up to 10, which means a reasonable number of priority levels can still be supported.

Platform property flexibility

Platform properties and their possible combinations now need to be defined in a configuration file at deployment time, rather than the old support for fully arbitrary partial matching.

In reality, most use cases don’t actually need full partial matching, and that full matching is still possible with a verbose config. This allows the number of queues used for sending Jobs to workers to be kept at a sensible level rather than growing exponentially as new properties are introduced to the grid, whilst still allowing more flexibility than requiring client-set properties to exactly match worker-specified properties.