Distributed GPs

[ ]:
# © Crown Copyright GCHQ
#
# Licensed under the GNU General Public License, version 3 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# https://www.gnu.org/licenses/gpl-3.0.en.html
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
[ ]:
# This notebook is not compiled into the documentation due to the time taken to run it to get
# a representative analysis. Please run this notebook locally if you wish to see the outputs.

In this notebook we introduce Gaussian process distribution, allowing the training of Gaussian processes quickly on large-scale data. In the future, we hope to offer (cloud) support for using distributed GPs in an embarrassingly parallel way.

[ ]:
random_seed = 1_989
num_iters = 100
[ ]:
import time
from contextlib import contextmanager

import matplotlib.pyplot as plt
import numpy as np
from gpytorch.kernels import MaternKernel, ScaleKernel

from vanguard.datasets.bike import BikeDataset
from vanguard.distribute import Distributed, aggregators, partitioners
from vanguard.vanilla import GaussianGPController
from vanguard.warps import SetWarp, warpfunctions

Introduction

Recall that the complexity of training a Gaussian process and then making a prediction is \(\mathcal{O}(n^3)\) and \(\mathcal{O}(n^2)\) respectively when using exact inference. One way to overcome this is to use variational inference <sparse_variational_gps.ipynb>_, but this uses non-exact inference which may not be the best fit. An alternative is to distribute the computations across multiple sources of compute and run exact inference in parallel. This is done by partitioning the data into \(m\) disjoint sets, and then running inference on all of them. The computational complexity of training under exact inference is therefore reduced to \(\mathcal{O}\left(\frac{n^3}{m^2}\right)\), and further reduced to \(\mathcal{O}\left(\frac{n^3}{m^3}\right)\) when this occurs in parallel. Predictions can then be made across these disjoint models, and aggregated into a final prediction presented to the user.

Data

We will use 5,000 data points from the BikeDataset. The data has 13 features. The target variable \(y\) is always non-negative, so warping could be useful. This dataset is taken from [FanaeeT2013] and was accessed and copied to Github LFS within this repo on 1st July 2024.

[ ]:
DATASET = BikeDataset(
    num_samples=5000, training_proportion=0.9, noise_scale=0.01, rng=np.random.default_rng(random_seed)
)
[ ]:
plt.figure(figsize=(10, 5))
DATASET.plot_y()
plt.show()

Baseline

We’ll start with an Exact GP baseline, this is the most accurate method but can be too slow/memory intensive for large data sets. We will begin with a scaled MaternKernel with a standard GaussianGPController.

[ ]:
class ScaledMaternKernel(ScaleKernel):
    """A scaled Matern kernel."""

    def __init__(self):
        super().__init__(MaternKernel(nu=1.5, ard_num_dims=DATASET.train_x.shape[1]))
[ ]:
gp = GaussianGPController(
    train_x=DATASET.train_x,
    train_y=DATASET.train_y,
    kernel_class=ScaledMaternKernel,
    y_std=DATASET.train_y_std,
    likelihood_kwargs={"learn_additional_noise": True},
    rng=np.random.default_rng(random_seed),
)

We will use a timer to showcase how fast the training process is under each model:

[ ]:
@contextmanager
def timer():
    start = time.time()
    yield
    end = time.time()
    print(f"Time taken: {end - start:.3f}s")
[ ]:
with timer():
    loss = gp.fit(n_sgd_iters=num_iters)
[ ]:
posterior = gp.posterior_over_point(DATASET.test_x)

plt.figure(figsize=(10, 5))
DATASET.plot_prediction(*posterior.confidence_interval())
plt.show()

We can improve our baseline by adding compositional warping to the Gaussian GP controller. We’ll use an affine-log warp to reflect the non-negativity of the data: \(\phi(y) = a + b\log(y)\).

The code to create this model in Vanguard is simple, we just add the SetWarp decorator to our existing controller.

[ ]:
warp = warpfunctions.AffineWarpFunction(a=3, b=-1) @ warpfunctions.BoxCoxWarpFunction(0.2)


@SetWarp(warp, ignore_all=True)
class WarpedGPController(GaussianGPController):
    pass
[ ]:
gp = WarpedGPController(
    train_x=DATASET.train_x,
    train_y=DATASET.train_y,
    kernel_class=ScaledMaternKernel,
    y_std=DATASET.train_y_std,
    likelihood_kwargs={"learn_additional_noise": True},
    rng=np.random.default_rng(random_seed),
)
[ ]:
with timer():
    loss = gp.fit(n_sgd_iters=num_iters)
[ ]:
posterior = gp.posterior_over_point(DATASET.test_x)

plt.figure(figsize=(10, 5))
DATASET.plot_prediction(*posterior.confidence_interval())
plt.show()

Distributed GPs

Distributed GPs can be created with the Distributed decorator applied over a controller. The decorator has three main arguments:

  • n_experts - The number of experts (separate Gaussian processes) to use. Fewer experts make the model slower but tend to give more accurate predictions.

  • partitioner_class - This class controls how experts are assigned data points.

  • aggregator_class - This class controls how expert predictions are aggregated.

Partitioning

There are a number of ways to partition data, with varied results on the fit of the overall model. Since each pot of data will be used to train a Gaussian process, it makes sense to partition data into coherent sections to make this easier. The most basic partition strategy is to group the data randomly, which sets a baseline for the process. This is equivalent to training a Gaussian process on a down-sample of the original data, and so one would expect a decrease in quality of fit across all models.

A much better choice is the KMeansPartitioner, which will cluster the training data to divide it into more appropriate sections for each expert. This way, each model has a better chance of fitting, as each cluster will hopefully represent a cohesive area of the data which can be more easily modelled.

[ ]:
partitioner = partitioners.KMeansPartitioner(DATASET.train_x, n_experts=5)
partition = partitioner.create_partition()

We can use a T-SNE plot to view how the training data has been partitioned:

[ ]:
plt.figure(figsize=(8, 8))
partitioner.plot_partition(partition)
plt.show()

KMeansPartitioner is almost always best unless you are using a “non-Euclidean” kernel, in which case KMedoidsPartitioner can give better results. The latter uses \(\mathcal{O}(n^2)\) memory so can only be used for small data sets (\(\sim10^4\) data points).

Aggregating

Once the training data has been partitioned, it is distributed to each of the expert controllers, who can then run their own inference in parallel to tune the hyperparameters. When it comes to making a prediction, each expert called and their results are pooled together into an overall posterior. Similarly to partitioning, there are a number of different methods available for doing this, but the most straightforward is the “product of experts” method from :cite`Deisenroth15`, implemented with the POEAggregator. Given the posteriors of the experts \(p_{i}(y|x) = N(\mu_{i}(x), \sigma_{i}^{2}(x))\) for \(i=1, 2, ..., m\), we define the joint posterior as a Gaussian with moments:

\[\begin{split}\mu &= \sigma^{2} \sum_{i} \sigma_{i}^{-2}(x) \mu_{i}(x) \\ \sigma^{-2} &= \sum_{i} \sigma_{i}^{-2}(x).\end{split}\]

However, the XGRBCMAggregator is theoretically the best choice (in the limit of n_experts), but the RBCMAggregator works well in practice.

Distribution in Vanguard

In Vanguard, things work a little differently. Instead of training experts separately, hyperparameters are tuned on a random subset of the training data. The trained mean and kernel (and any other trained components from other decorators) are then passed to each expert for use in inference, which saves overall computation. This makes the fitting process very efficient.

Warning

The Distributed decorator currently only accepts numerical values for the y_std parameter.

[ ]:
N_EXPERTS = 4
[ ]:
@Distributed(
    n_experts=N_EXPERTS,
    subset_fraction=1 / N_EXPERTS,
    aggregator_class=aggregators.XGRBCMAggregator,
    partitioner_class=partitioners.KMeansPartitioner,
    ignore_methods=("__init__",),
)
class DistributedGPController(GaussianGPController):
    pass
[ ]:
gp = DistributedGPController(
    train_x=DATASET.train_x,
    train_y=DATASET.train_y,
    kernel_class=ScaledMaternKernel,
    y_std=DATASET.train_y_std[0],
    likelihood_kwargs={"learn_additional_noise": True},
    rng=np.random.default_rng(random_seed),
)
[ ]:
with timer():
    loss = gp.fit(n_sgd_iters=num_iters)
[ ]:
posterior = gp.posterior_over_point(DATASET.test_x)

plt.figure(figsize=(10, 5))
DATASET.plot_prediction(*posterior.confidence_interval())
plt.show()

Again we can add compositional warping to improve the model.

[ ]:
warp = warpfunctions.AffineWarpFunction(a=3, b=-1) @ warpfunctions.BoxCoxWarpFunction(0.2)


@Distributed(
    n_experts=N_EXPERTS,
    subset_fraction=1 / N_EXPERTS,
    aggregator_class=aggregators.XGRBCMAggregator,
    partitioner_class=partitioners.KMeansPartitioner,
    ignore_all=True,
)
@SetWarp(warp, ignore_all=True)
class WarpedDistributedGPController(GaussianGPController):
    pass
[ ]:
gp = WarpedDistributedGPController(
    train_x=DATASET.train_x,
    train_y=DATASET.train_y,
    kernel_class=ScaledMaternKernel,
    y_std=DATASET.train_y_std[0],
    likelihood_kwargs={"learn_additional_noise": True},
    rng=np.random.default_rng(random_seed),
)
[ ]:
with timer():
    loss = gp.fit(n_sgd_iters=num_iters)
[ ]:
posterior = gp.posterior_over_point(DATASET.test_x)

plt.figure(figsize=(10, 5))
DATASET.plot_prediction(*posterior.confidence_interval())
plt.show()

Conclusions

We have demonstrated that while distributed GPs are not as accurate as the exact GP, they are much more scalable. For large data sets, where runtime and memory are key considerations, the distributed GP may be the preferred model.