Source code for desilike.samplers.mhmcmc

"""Module implementing a Metropolis-Hastings sampler.

Note that the implemenation here is independent of the one in cobaya.
"""

import sys

import numpy as np

from .base import MarkovChainSampler


class FastSlowProposer:
    """Proposer sampling fast and slow parameter spaces separately."""

    def __init__(self, cov, fast=None, rng=None):
        """Initialize the proposal distribution.

        Parameters
        ----------
        cov : numpy.ndarray
            Covariance matrix used to whiten parameter space.
        fast : list or None, optional
            List of dimensions that are considered fast. Default is ``None``.
        rng : numpy.random.Generator, optional
            Random number generator. Default is ``None``.

        """
        self.rng = rng

        self.n_dim = len(cov)
        fast = [] if fast is None else fast
        is_fast = np.isin(np.arange(self.n_dim), fast)
        self.n_fast = np.sum(is_fast)
        self.n_slow = self.n_dim - self.n_fast

        self.sort = np.argsort(is_fast)
        self.unsort = np.argsort(self.sort)
        self.L = np.linalg.cholesky(cov[:, self.sort][self.sort, :])

    def _propose(self, n_dim, k):
        r"""Generate :math:`k \cdot n` random orthogonal vectors.

        Parameters
        ----------
        n_dim : int
            Number of dimensions :math:`n`.
        k : int
            Number of samples.

        Returns
        -------
        numpy.ndarray of shape (k, n_dim, n_dim)
            :math:`k \cdot n` :math:`n`-dimensional orthogonal vectors drawn
            from a unit normal. All vectors within each of the :math:`k` sets
            are orthogonal to each other.

        """
        m = self.rng.standard_normal((k, n_dim, n_dim))
        d = np.sqrt(self.rng.chisquare(n_dim, size=(k, n_dim))) * (
            self.rng.choice([-1, +1], (k, n_dim)))
        Q = np.linalg.qr(m)[0]
        return Q * d[:, :, np.newaxis]

    def propose_fast(self, k):
        r"""Generate random vectors along the fast parameter directions.

        Parameters
        ----------
        k : int
            Number of samples.

        Returns
        -------
        numpy.ndarray of shape (k, n_fast, n_dim)
            :math:`k \cdot n_\mathrm{fast}` :math:`n`-dimensional vectors. All
            vectors are 0 along slow dimensions.

        """
        m_fast = np.zeros((k, self.n_fast, self.n_dim))
        if self.n_fast == 0:
            return np.zeros((k, 0, self.n_dim))
        m_fast[:, :, self.n_slow:] = self._propose(self.n_fast, k)
        return (m_fast @ self.L.T)[:, :, self.unsort]

    def propose_slow(self, k):
        r"""Generate random vectors along the slow parameter directions.

        Parameters
        ----------
        k : int
            Number of samples.

        Returns
        -------
        numpy.ndarray of shape (k, n_slow, n_dim)
            :math:`k \cdot n_\mathrm{slow}` :math:`n`-dimensional vectors.

        """
        m_slow = np.zeros((k, self.n_slow, self.n_dim))
        if self.n_slow == 0:
            return m_slow
        m_slow[:, :, :self.n_slow] = self._propose(self.n_slow, k)
        return (m_slow @ self.L.T)[:, :, self.unsort]


class StandAloneMetropolisHastingsSampler:
    """A Metropolis-Hastings sampler with fast-slow decomposition.

    Note that this is a from-scratch reimplementation of this algorithm. Also,
    this class works outside of ``desilike``.

    .. rubric:: References
    - https://arxiv.org/abs/1304.4473

    """

    def __init__(self, posterior, fast=None, f_fast=1, f_drag=0, pool=None,
                 rng=np.random.default_rng()):
        """Initialize the sampler.

        Parameters
        ----------
        posterior : callable
            Logarithm of the posterior.
        fast : list or None, optional
            List of dimensions that are considered fast. Default is ``None``.
        f_fast : int, optional
            Oversampling factor of fast parameters. The default is 1 which
            implies not oversampling.
        f_drag : int, optional
            Factor for dragging of fast parameters. The default is 0, i.e., no
            dragging.
        pool : object or None, optional
            Pool used for distributing the posterior computation. Default is
            ``None``.
        rng : numpy.random.Generator, optional
            NumPy random number generator used for seeding. Default is
            ``numpy.random.default_rng()``.

        Raises
        ------
        Valuerror
            If `f_fast` is smaller than 1 or `f_drag` is smaller than 0.

        """
        self.posterior = posterior
        self.fast = [] if fast is None else fast
        self.f_fast = int(f_fast)
        if self.f_fast < 1:
            raise ValueError("'f_fast' cannot be smaller than 1.")
        self.f_drag = int(f_drag)
        if self.f_drag < 0:
            raise ValueError("'f_drag' cannot be smaller than 1.")
        if pool is None:
            self.map = map
        else:
            self.map = pool.map
        self.rng = rng

    def update(self, pos=None, log_p=None, blobs=None, cov=None):
        """Update the sampler's starting position and/or proposal.

        Parameters
        ----------
        pos : numpy.ndarray of shape (n_chains, n_dim) or None, optional
            Starting position(s) of the chains.
        log_p : numpy.ndarray of shape (n_chains) or None, optional
            Logarith of the posterior of the starting position(s). If not
            provided, these values are computed.
        blobs : numpy.ndarray of shape (n_chains, ...) or None, optional
            Blobs for the starting positions.
        cov : numpy.ndarray or None, optional
            Covariance matrix used to whiten parameter space.

        """
        if pos is not None:
            self.pos = np.array(pos, dtype=float)
            self.n_chains = len(pos)

            if log_p is None or blobs is None:
                log_p, blobs = self._compute_posterior(self.pos)

            self.log_p = np.array(log_p)
            self.blobs = np.array(blobs)

            self.counter = 0
            self.proposal_fast = []
            self.proposal_slow = []

        if cov is not None:
            self.proposer = FastSlowProposer(
                cov * 2.38**2 / np.sqrt(len(cov)), fast=self.fast,
                rng=self.rng)

    def _compute_posterior(self, points):
        """Compute the natural logarithm of the posterior.

        Parameters
        ----------
        points : numpy.ndarray of shape (n_points, n_dim)
            Points for which to compute the likelihood.

        Returns
        -------
        log_p : np.ndarray of shape (n_points, )
            Natural logarithm of the posterior.
        blobs : np.ndarray of shape (n_points, ...)
            Blobs associated with the posterior function.

        """
        results = list(self.map(self.posterior, points))
        if isinstance(results[0], tuple):
            log_p = np.array([r[0] for r in results])
            blobs = np.array([r[1] for r in results])
        else:
            log_p = np.array(results)
            blobs = np.zeros((len(points), 0))
        return log_p, blobs

    def propose_fast(self):
        """Propose a fast-parameter step.

        Returns
        -------
        step_fast : numpy.ndararay of shape (n_chains, n_dim)
            Fast-parameter steps where slow parameters are unchanged.

        """
        if len(self.proposal_fast) == 0:
            self.proposal_fast = list(np.transpose(self.proposer.propose_fast(
                self.n_chains), axes=[1, 0, 2]))
        return self.proposal_fast.pop()

    def propose_slow(self):
        """Propose a slow-parameter step.

        Returns
        -------
        step_slow : numpy.ndararay of shape (n_chains, n_dim)
            Slow-parameter steps.

        """
        if len(self.proposal_slow) == 0:
            self.proposal_slow = list(np.transpose(self.proposer.propose_slow(
                self.n_chains), axes=[1, 0, 2]))
        proposal_drag = []
        for _ in range(self.f_drag):
            proposal_drag += list(np.transpose(self.proposer.propose_fast(
                self.n_chains), axes=[1, 0, 2]))
        return self.proposal_slow.pop(), proposal_drag

    def make_one_step(self):
        """Advance all chains by one step.

        Returns
        -------
        pos : numpy.ndarray of shape (n_chains, n_dim)
            New positions in parameter space.
        blobs : np.ndarray of shape (n_chains, ...)
            Blobs associated with the posterior function.
        log_p : numpy.ndarray of shape (n_chains)
            Logarithm of the posterior.

        """
        n_cycle = self.proposer.n_fast * self.f_fast + self.proposer.n_slow
        if self.counter % n_cycle < self.proposer.n_fast * self.f_fast:
            step, steps_drag = self.propose_fast(), []
        else:
            step, steps_drag = self.propose_slow()
        self.counter += 1

        # First, assume we do a regular step.
        pos_prop = self.pos + step
        log_p_prop, blobs_prop = self._compute_posterior(pos_prop)
        p_accept = np.exp(log_p_prop - self.log_p)

        # If applicable, do a dragging step, instead.
        if len(steps_drag) > 0:
            # The following is described in section III of 1304.4473.
            n = len(steps_drag) + 1

            # We will use a slightly different notation than in the paper.
            # In particular, x represents the change in the fast parameters,
            # not the fast parameters themselves.
            y_new = pos_prop
            y_old = self.pos.copy()
            x = [np.zeros(self.pos.shape)]
            log_p_new = [log_p_prop]
            log_p_old = [self.log_p]

            # Run a mini MCMC chain on x, the fast parameter.
            for i, step in enumerate(steps_drag, start=1):
                log_p_new_prop, blobs_prop = self._compute_posterior(
                    y_new + x[-1] + step)
                log_p_old_prop = self._compute_posterior(
                    y_old + x[-1] + step)[0]
                p_accept = np.exp(
                    ((n - i) * log_p_old_prop + i * log_p_new_prop -
                     (n - i) * log_p_old[-1] - i * log_p_new[-1]) / n)
                accept = self.rng.random(size=self.n_chains) < p_accept
                x.append(np.where(accept[:, None], x[-1] + step, x[-1]))
                log_p_new.append(
                    np.where(accept, log_p_new_prop, log_p_new[-1]))
                log_p_old.append(
                    np.where(accept, log_p_old_prop, log_p_old[-1]))

            pos_prop = y_new + x[-1]
            log_p_prop = log_p_new[-1]
            p_accept = np.exp(np.mean(log_p_new, axis=0) -
                              np.mean(log_p_old, axis=0))

        accept = self.rng.random(size=self.n_chains) < p_accept
        self.pos = np.where(accept[:, None], pos_prop, self.pos)
        self.log_p = np.where(accept, log_p_prop, self.log_p)
        self.blobs = np.where(accept[:, None], blobs_prop, self.blobs)

        return self.pos.copy(), self.blobs.copy(), self.log_p.copy()

    def make_n_steps(self, n_steps):
        """Advance all chains by :math:`n` steps.

        Parameters
        ----------
        n_steps : int
            Number of steps to take.

        Returns
        -------
        chains : numpy.ndarray of shape (n_chains, n_steps, n_dim)
            Positions in parameter space.
        blobs : numpy.ndarray of shape (n_chains, n_steps, ...)
            Blobs returned from the posterior.
        log_p : numpy.ndarray of shape (n_chains, n_steps)
            Logarithm of the posterior.

        """
        results = [self.make_one_step() for _ in range(n_steps)]
        chains = np.stack([r[0] for r in results], axis=1)
        blobs = np.stack([r[1] for r in results], axis=1)
        log_p = np.stack([r[2] for r in results], axis=1)
        return chains, blobs, log_p


[docs] class MetropolisHastingsSampler(MarkovChainSampler): """A Metropolis-Hastings sampler with fast-slow decomposition. Note that this is a from-scratch reimplementation of this algorithm. .. rubric:: References - `algorithm paper <https://doi.org/10.1103/PhysRevD.87.103529>`_ """ default_adaptation_steps = sys.maxsize def __init__(self, likelihood, n_chains=1, cov=None, f_fast=1, f_drag=0, fast=None, chains=None, rng=None, directory=None): """Initialize the Metropolis-Hastings sampler. Parameters ---------- likelihood : BaseLikelihood Likelihood to sample. n_chains : int, optional Number of **independent** chains. Default is 1. cov : numpy.ndarray or None, optional Covariance matrix estimate used to whiten parameter space. If None, the sampler will use each parameter's proposal scale. f_fast : int, optional Oversampling factor of fast parameters. The default is 1 which implies not oversampling. f_drag : int, optional Factor for dragging of fast parameters. The default is 0, i.e., no dragging. fast : list of str or None, optional List of dimensions that are considered fast. Default is ``None``. chains : list of desilike.samples.Chain, optional If given, continue the chains. In that case, we will ignore what was read from disk. Default is ``None``. rng : numpy.random.Generator, int, or None, optional Random number generator. Default is ``None``. directory : str, Path, or None, optional Save samples to this location. Default is ``None``. """ super().__init__( likelihood, n_chains=n_chains, chains=chains, rng=rng, directory=directory) fast = [] if fast is None else fast for i in range(len(fast)): fast[i] = self.varied_params.index(fast[i]) self.sampler = StandAloneMetropolisHastingsSampler( self._compute_posterior, fast=fast, f_fast=f_fast, f_drag=f_drag, pool=self.pool, rng=self.rng) if cov is None: n_dim = len(self.varied_params) cov = np.zeros((n_dim, n_dim)) for i, param in enumerate(self.likelihood.varied_params): cov[i, i] = param.proposal**2 self.sampler.update(cov=cov) def _run(self, n_steps): """Run the Metropolis-Hastings sampler. Parameters ---------- n_steps: int Number of steps to take. """ if not hasattr(self.sampler, 'pos'): samples, derived, log_post = self._state self.sampler.update( pos=samples, log_p=log_post, blobs=derived) self._extend(*self.sampler.make_n_steps(n_steps)) if len(self.chains[0]) < self.adaptation_steps: cov = np.mean([chain.covariance(self.varied_params) for chain in self.chains], axis=0) try: self.sampler.update(cov=cov) except np.linalg.LinAlgError: pass # matrix was not positive definite