"""Module implementing a Metropolis-Hastings sampler.
Note that the implemenation here is independent of the one in cobaya.
"""
import sys
import numpy as np
from .base import MarkovChainSampler
class FastSlowProposer:
"""Proposer sampling fast and slow parameter spaces separately."""
def __init__(self, cov, fast=None, rng=None):
"""Initialize the proposal distribution.
Parameters
----------
cov : numpy.ndarray
Covariance matrix used to whiten parameter space.
fast : list or None, optional
List of dimensions that are considered fast. Default is ``None``.
rng : numpy.random.Generator, optional
Random number generator. Default is ``None``.
"""
self.rng = rng
self.n_dim = len(cov)
fast = [] if fast is None else fast
is_fast = np.isin(np.arange(self.n_dim), fast)
self.n_fast = np.sum(is_fast)
self.n_slow = self.n_dim - self.n_fast
self.sort = np.argsort(is_fast)
self.unsort = np.argsort(self.sort)
self.L = np.linalg.cholesky(cov[:, self.sort][self.sort, :])
def _propose(self, n_dim, k):
r"""Generate :math:`k \cdot n` random orthogonal vectors.
Parameters
----------
n_dim : int
Number of dimensions :math:`n`.
k : int
Number of samples.
Returns
-------
numpy.ndarray of shape (k, n_dim, n_dim)
:math:`k \cdot n` :math:`n`-dimensional orthogonal vectors drawn
from a unit normal. All vectors within each of the :math:`k` sets
are orthogonal to each other.
"""
m = self.rng.standard_normal((k, n_dim, n_dim))
d = np.sqrt(self.rng.chisquare(n_dim, size=(k, n_dim))) * (
self.rng.choice([-1, +1], (k, n_dim)))
Q = np.linalg.qr(m)[0]
return Q * d[:, :, np.newaxis]
def propose_fast(self, k):
r"""Generate random vectors along the fast parameter directions.
Parameters
----------
k : int
Number of samples.
Returns
-------
numpy.ndarray of shape (k, n_fast, n_dim)
:math:`k \cdot n_\mathrm{fast}` :math:`n`-dimensional vectors. All
vectors are 0 along slow dimensions.
"""
m_fast = np.zeros((k, self.n_fast, self.n_dim))
if self.n_fast == 0:
return np.zeros((k, 0, self.n_dim))
m_fast[:, :, self.n_slow:] = self._propose(self.n_fast, k)
return (m_fast @ self.L.T)[:, :, self.unsort]
def propose_slow(self, k):
r"""Generate random vectors along the slow parameter directions.
Parameters
----------
k : int
Number of samples.
Returns
-------
numpy.ndarray of shape (k, n_slow, n_dim)
:math:`k \cdot n_\mathrm{slow}` :math:`n`-dimensional vectors.
"""
m_slow = np.zeros((k, self.n_slow, self.n_dim))
if self.n_slow == 0:
return m_slow
m_slow[:, :, :self.n_slow] = self._propose(self.n_slow, k)
return (m_slow @ self.L.T)[:, :, self.unsort]
class StandAloneMetropolisHastingsSampler:
"""A Metropolis-Hastings sampler with fast-slow decomposition.
Note that this is a from-scratch reimplementation of this algorithm. Also,
this class works outside of ``desilike``.
.. rubric:: References
- https://arxiv.org/abs/1304.4473
"""
def __init__(self, posterior, fast=None, f_fast=1, f_drag=0, pool=None,
rng=np.random.default_rng()):
"""Initialize the sampler.
Parameters
----------
posterior : callable
Logarithm of the posterior.
fast : list or None, optional
List of dimensions that are considered fast. Default is ``None``.
f_fast : int, optional
Oversampling factor of fast parameters. The default is 1 which
implies not oversampling.
f_drag : int, optional
Factor for dragging of fast parameters. The default is 0, i.e., no
dragging.
pool : object or None, optional
Pool used for distributing the posterior computation. Default is
``None``.
rng : numpy.random.Generator, optional
NumPy random number generator used for seeding. Default is
``numpy.random.default_rng()``.
Raises
------
Valuerror
If `f_fast` is smaller than 1 or `f_drag` is smaller than 0.
"""
self.posterior = posterior
self.fast = [] if fast is None else fast
self.f_fast = int(f_fast)
if self.f_fast < 1:
raise ValueError("'f_fast' cannot be smaller than 1.")
self.f_drag = int(f_drag)
if self.f_drag < 0:
raise ValueError("'f_drag' cannot be smaller than 1.")
if pool is None:
self.map = map
else:
self.map = pool.map
self.rng = rng
def update(self, pos=None, log_p=None, blobs=None, cov=None):
"""Update the sampler's starting position and/or proposal.
Parameters
----------
pos : numpy.ndarray of shape (n_chains, n_dim) or None, optional
Starting position(s) of the chains.
log_p : numpy.ndarray of shape (n_chains) or None, optional
Logarith of the posterior of the starting position(s). If not
provided, these values are computed.
blobs : numpy.ndarray of shape (n_chains, ...) or None, optional
Blobs for the starting positions.
cov : numpy.ndarray or None, optional
Covariance matrix used to whiten parameter space.
"""
if pos is not None:
self.pos = np.array(pos, dtype=float)
self.n_chains = len(pos)
if log_p is None or blobs is None:
log_p, blobs = self._compute_posterior(self.pos)
self.log_p = np.array(log_p)
self.blobs = np.array(blobs)
self.counter = 0
self.proposal_fast = []
self.proposal_slow = []
if cov is not None:
self.proposer = FastSlowProposer(
cov * 2.38**2 / np.sqrt(len(cov)), fast=self.fast,
rng=self.rng)
def _compute_posterior(self, points):
"""Compute the natural logarithm of the posterior.
Parameters
----------
points : numpy.ndarray of shape (n_points, n_dim)
Points for which to compute the likelihood.
Returns
-------
log_p : np.ndarray of shape (n_points, )
Natural logarithm of the posterior.
blobs : np.ndarray of shape (n_points, ...)
Blobs associated with the posterior function.
"""
results = list(self.map(self.posterior, points))
if isinstance(results[0], tuple):
log_p = np.array([r[0] for r in results])
blobs = np.array([r[1] for r in results])
else:
log_p = np.array(results)
blobs = np.zeros((len(points), 0))
return log_p, blobs
def propose_fast(self):
"""Propose a fast-parameter step.
Returns
-------
step_fast : numpy.ndararay of shape (n_chains, n_dim)
Fast-parameter steps where slow parameters are unchanged.
"""
if len(self.proposal_fast) == 0:
self.proposal_fast = list(np.transpose(self.proposer.propose_fast(
self.n_chains), axes=[1, 0, 2]))
return self.proposal_fast.pop()
def propose_slow(self):
"""Propose a slow-parameter step.
Returns
-------
step_slow : numpy.ndararay of shape (n_chains, n_dim)
Slow-parameter steps.
"""
if len(self.proposal_slow) == 0:
self.proposal_slow = list(np.transpose(self.proposer.propose_slow(
self.n_chains), axes=[1, 0, 2]))
proposal_drag = []
for _ in range(self.f_drag):
proposal_drag += list(np.transpose(self.proposer.propose_fast(
self.n_chains), axes=[1, 0, 2]))
return self.proposal_slow.pop(), proposal_drag
def make_one_step(self):
"""Advance all chains by one step.
Returns
-------
pos : numpy.ndarray of shape (n_chains, n_dim)
New positions in parameter space.
blobs : np.ndarray of shape (n_chains, ...)
Blobs associated with the posterior function.
log_p : numpy.ndarray of shape (n_chains)
Logarithm of the posterior.
"""
n_cycle = self.proposer.n_fast * self.f_fast + self.proposer.n_slow
if self.counter % n_cycle < self.proposer.n_fast * self.f_fast:
step, steps_drag = self.propose_fast(), []
else:
step, steps_drag = self.propose_slow()
self.counter += 1
# First, assume we do a regular step.
pos_prop = self.pos + step
log_p_prop, blobs_prop = self._compute_posterior(pos_prop)
p_accept = np.exp(log_p_prop - self.log_p)
# If applicable, do a dragging step, instead.
if len(steps_drag) > 0:
# The following is described in section III of 1304.4473.
n = len(steps_drag) + 1
# We will use a slightly different notation than in the paper.
# In particular, x represents the change in the fast parameters,
# not the fast parameters themselves.
y_new = pos_prop
y_old = self.pos.copy()
x = [np.zeros(self.pos.shape)]
log_p_new = [log_p_prop]
log_p_old = [self.log_p]
# Run a mini MCMC chain on x, the fast parameter.
for i, step in enumerate(steps_drag, start=1):
log_p_new_prop, blobs_prop = self._compute_posterior(
y_new + x[-1] + step)
log_p_old_prop = self._compute_posterior(
y_old + x[-1] + step)[0]
p_accept = np.exp(
((n - i) * log_p_old_prop + i * log_p_new_prop -
(n - i) * log_p_old[-1] - i * log_p_new[-1]) / n)
accept = self.rng.random(size=self.n_chains) < p_accept
x.append(np.where(accept[:, None], x[-1] + step, x[-1]))
log_p_new.append(
np.where(accept, log_p_new_prop, log_p_new[-1]))
log_p_old.append(
np.where(accept, log_p_old_prop, log_p_old[-1]))
pos_prop = y_new + x[-1]
log_p_prop = log_p_new[-1]
p_accept = np.exp(np.mean(log_p_new, axis=0) -
np.mean(log_p_old, axis=0))
accept = self.rng.random(size=self.n_chains) < p_accept
self.pos = np.where(accept[:, None], pos_prop, self.pos)
self.log_p = np.where(accept, log_p_prop, self.log_p)
self.blobs = np.where(accept[:, None], blobs_prop, self.blobs)
return self.pos.copy(), self.blobs.copy(), self.log_p.copy()
def make_n_steps(self, n_steps):
"""Advance all chains by :math:`n` steps.
Parameters
----------
n_steps : int
Number of steps to take.
Returns
-------
chains : numpy.ndarray of shape (n_chains, n_steps, n_dim)
Positions in parameter space.
blobs : numpy.ndarray of shape (n_chains, n_steps, ...)
Blobs returned from the posterior.
log_p : numpy.ndarray of shape (n_chains, n_steps)
Logarithm of the posterior.
"""
results = [self.make_one_step() for _ in range(n_steps)]
chains = np.stack([r[0] for r in results], axis=1)
blobs = np.stack([r[1] for r in results], axis=1)
log_p = np.stack([r[2] for r in results], axis=1)
return chains, blobs, log_p
[docs]
class MetropolisHastingsSampler(MarkovChainSampler):
"""A Metropolis-Hastings sampler with fast-slow decomposition.
Note that this is a from-scratch reimplementation of this algorithm.
.. rubric:: References
- `algorithm paper <https://doi.org/10.1103/PhysRevD.87.103529>`_
"""
default_adaptation_steps = sys.maxsize
def __init__(self, likelihood, n_chains=1, cov=None, f_fast=1, f_drag=0,
fast=None, chains=None, rng=None, directory=None):
"""Initialize the Metropolis-Hastings sampler.
Parameters
----------
likelihood : BaseLikelihood
Likelihood to sample.
n_chains : int, optional
Number of **independent** chains. Default is 1.
cov : numpy.ndarray or None, optional
Covariance matrix estimate used to whiten parameter space. If None,
the sampler will use each parameter's proposal scale.
f_fast : int, optional
Oversampling factor of fast parameters. The default is 1 which
implies not oversampling.
f_drag : int, optional
Factor for dragging of fast parameters. The default is 0, i.e., no
dragging.
fast : list of str or None, optional
List of dimensions that are considered fast. Default is ``None``.
chains : list of desilike.samples.Chain, optional
If given, continue the chains. In that case, we will ignore what
was read from disk. Default is ``None``.
rng : numpy.random.Generator, int, or None, optional
Random number generator. Default is ``None``.
directory : str, Path, or None, optional
Save samples to this location. Default is ``None``.
"""
super().__init__(
likelihood, n_chains=n_chains, chains=chains, rng=rng,
directory=directory)
fast = [] if fast is None else fast
for i in range(len(fast)):
fast[i] = self.varied_params.index(fast[i])
self.sampler = StandAloneMetropolisHastingsSampler(
self._compute_posterior, fast=fast, f_fast=f_fast, f_drag=f_drag,
pool=self.pool, rng=self.rng)
if cov is None:
n_dim = len(self.varied_params)
cov = np.zeros((n_dim, n_dim))
for i, param in enumerate(self.likelihood.varied_params):
cov[i, i] = param.proposal**2
self.sampler.update(cov=cov)
def _run(self, n_steps):
"""Run the Metropolis-Hastings sampler.
Parameters
----------
n_steps: int
Number of steps to take.
"""
if not hasattr(self.sampler, 'pos'):
samples, derived, log_post = self._state
self.sampler.update(
pos=samples, log_p=log_post, blobs=derived)
self._extend(*self.sampler.make_n_steps(n_steps))
if len(self.chains[0]) < self.adaptation_steps:
cov = np.mean([chain.covariance(self.varied_params)
for chain in self.chains], axis=0)
try:
self.sampler.update(cov=cov)
except np.linalg.LinAlgError:
pass # matrix was not positive definite