import os
import re
import glob
import numpy as np
from desilike.parameter import ParameterCollection, Parameter, ParameterPrior, ParameterArray, Samples, ParameterCovariance, _reshape, is_parameter_sequence
from desilike import LikelihoodFisher
from . import utils
[docs]
def vectorize(func):
"""Vectorize input function ``func`` for input parameters."""
from functools import wraps
@wraps(func)
def wrapper(self, params=None, *args, **kwargs):
if params is None:
params = self.params()
def _reshape(result, pshape):
def __reshape(array):
try:
array.shape = array.shape[:array.ndim -
len(pshape)] + pshape
except AttributeError:
pass
return array
if isinstance(result, tuple): # for :meth:`Chain.interval`
for res in result:
__reshape(res)
else:
__reshape(result)
return result
if is_parameter_sequence(params):
params = [self[param].param for param in params]
return [_reshape(func(self, param, *args, **kwargs), param.shape) for param in params]
return _reshape(func(self, params, *args, **kwargs), self[params].param.shape)
return wrapper
def _get_solved_covariance(chain, params=None, return_hessian=False):
logposterior = chain[chain._loglikelihood] + chain[chain._logprior]
if params is None:
params = chain.params(solved=True)
params = [str(param) for param in params]
solved_params = []
for param in params:
if logposterior.isin((param, param)):
solved_params.append(param)
if set(solved_params) != set(params):
import warnings
warnings.warn('You need the covariance of analytically marginalized ("solved") parameters, but it has not been computed / saved for {}. Assuming zero covariance.'.format(
[param for param in params if param not in solved_params]))
all_solved_params = [str(param) for param in chain.params(
solved=True) if logposterior.isin((param, param))]
hessian = np.array([[logposterior[param1, param2].ravel()
for param2 in all_solved_params] for param1 in all_solved_params])
hessian = np.moveaxis(
hessian, -1, 0).reshape(chain.shape + (len(all_solved_params),) * 2)
covariance = np.linalg.inv(-hessian)
# symmetrizing helps for numerical errors
covariance = (np.moveaxis(covariance, chain.ndim +
1, chain.ndim) + covariance) / 2.
toret_covariance = np.zeros(chain.shape + (len(params),) * 2, dtype='f8')
toret_hessian = toret_covariance.copy()
for iparam1, param1 in enumerate(all_solved_params):
if param1 not in params:
continue
index1 = params.index(param1)
for iparam2, param2 in enumerate(all_solved_params):
if param2 not in params:
continue
index2 = params.index(param2)
toret_covariance[..., index1,
index2] = covariance[..., iparam1, iparam2]
toret_hessian[..., index1, index2] = hessian[..., iparam1, iparam2]
if return_hessian:
return toret_covariance, toret_hessian
return toret_covariance
[docs]
class Chain(Samples):
"""
Class that holds samples drawn from posterior (in practice, :class:`Samples` with a log-posterior and optional weights).
Parameter arrays can be accessed (and updated) as for a dictionary:
.. code-block:: python
chain = Chain([np.ones(100), np.zeros(100)], params=['a', 'b'])
chain['a'] += 1.
print(chain['a'].mean())
chain['c'] = chain['b'] + 1
chain['c'].param.update(latex='c')
"""
_type = ParameterArray
_attrs = Samples._attrs + \
['_logposterior', '_loglikelihood', '_logprior',
'_aweight', '_fweight', '_weight']
def __init__(self, data=None, params=None, logposterior=None, loglikelihood=None, logprior=None, aweight=None, fweight=None, weight=None, attrs=None):
"""
Initialize :class:`Chain`.
Parameters
----------
data : list, dict, Samples
Can be:
- list of :class:`ParameterArray`, or :class:`np.ndarray` if list of parameters
(or :class:`ParameterCollection`) is provided in ``params``
- dictionary mapping parameter to array
params : list, ParameterCollection
Optionally, list of parameters.
logposterior : str, default='logposterior'
Name of log-posterior in ``data``.
loglikelihood : str, default='loglikelihood'
Name of log-likelihood in ``data``.
logprior : str, default='logprior'
Name of log-prior in ``data``.
aweight : str, default='aweight'
Name of sample weights (which default to 1. if not provided in ``data``).
fweight : str, default='fweight'
Name of sample frequency weights (which default to 1 if not provided in ``data``).
weight : str, default='weight'
Name of sample total weight. It is defined as the product of :attr:`aweight` and :attr:`fweight`,
hence should not provided in ``data``.
attrs : dict, default=None
Optionally, other attributes, stored in :attr:`attrs`.
"""
super(Chain, self).__init__(data=data, params=params, attrs=attrs)
for _name in self._attrs[-6:]:
name = _name[1:]
value = locals()[name]
# set only if not previously set, or new value are provided
if getattr(self, _name, None) is None or value is not None:
setattr(self, _name, name if value is None else str(value))
value = getattr(self, _name)
if value in self:
self[value].param.update(derived=True)
def __setstate__(self, state):
# Backward-compatibility
for name in ['_logposterior', '_loglikelihood', '_logprior', '_aweight', '_fweight', '_weight']:
state.setdefault(name, name[1:])
super(Chain, self).__setstate__(state)
@property
def aweight(self):
"""Sample weights (floats)."""
if self._aweight not in self:
return np.ones(self.shape, dtype='f8')
else:
return self[self._aweight]
@property
def fweight(self):
"""Sample frequency weights (integers)."""
if self._fweight not in self:
return np.ones(self.shape, dtype='i8')
return self[self._fweight]
@property
def logposterior(self):
"""Log-posterior."""
if self._logposterior not in self:
self[Parameter(self._logposterior, derived=True, latex=utils.outputs_to_latex(
self._logposterior))] = np.zeros(self.shape, dtype='f8')
return self[self._logposterior]
@aweight.setter
def aweight(self, item):
"""Set weights (floats)."""
self[Parameter(self._aweight, derived=True,
latex=utils.outputs_to_latex(self._aweight))] = item
@fweight.setter
def fweight(self, item):
"""Set frequency weights (integers)."""
self[Parameter(self._fweight, derived=True,
latex=utils.outputs_to_latex(self._fweight))] = item
@logposterior.setter
def logposterior(self, item):
"""Set log-posterior."""
self[Parameter(self._logposterior, derived=True,
latex=utils.outputs_to_latex(self._logposterior))] = item
@property
def weight(self):
"""Return total weight, as the product of :attr:`aweight` and :attr:`fweight`."""
return ParameterArray(self.aweight * self.fweight, Parameter(self._weight, derived=True, latex=utils.outputs_to_latex(self._weight)))
[docs]
def set_derived(self, basename, array, **kwargs):
"""
Set derived parameter.
Parameters
----------
array : np.array
Numpy array.
kwargs : dict
Arguments for :class:`Parameter`.
"""
kwargs['basename'] = basename
kwargs.setdefault('derived', True)
self.set(ParameterArray(array, Parameter(kwargs)))
[docs]
def remove_burnin(self, burnin=0):
"""
Return new samples with burn-in removed.
Parameters
----------
burnin : float, int
If ``burnin`` between 0 and 1, remove that fraction of samples.
Else, remove ``burnin`` (integer) first points.
Returns
-------
samples : Chain
"""
if 0 < burnin < 1:
burnin = burnin * len(self)
burnin = int(burnin + 0.5)
return self[burnin:]
[docs]
def sample_solved(self, size=1, seed=42):
"""Sample parameters that have been analytic marginalized over (``solved``)."""
new = self.deepcopy()
all_solved_params = self.params(solved=True)
solved_params = []
for param in all_solved_params:
if self[self._loglikelihood].isin((param, param)) and self[self._logprior].isin((param, param)):
solved_params.append(param)
if set(solved_params) != set(all_solved_params):
import warnings
warnings.warn('sample over parameters {}, derivatives for {} are not saved'.format(
solved_params, [param for param in all_solved_params if param not in solved_params]))
if not solved_params:
return new
covariance, hessian = _get_solved_covariance(
self, params=solved_params, return_hessian=True)
L = np.moveaxis(np.linalg.cholesky(covariance), (-2, -1), (0, 1))
new.data = []
for array in self:
new.set(array.clone(value=np.repeat(
array, size, axis=self.ndim - 1)))
rng = np.random.RandomState(seed=seed)
noise = rng.standard_normal(
(len(solved_params),) + self.shape + (size,))
values = np.sum(noise[None, ...] * L[..., None], axis=1)
for param, value in zip(solved_params, values):
new[param] = new[param].clone(
value=new[param] + value.reshape(new.shape), param=param.clone(derived=False))
dlogposterior = 0.
for param in [self._loglikelihood, self._logprior]:
hess = np.array([[self[param][param1, param2]
for param2 in solved_params] for param1 in solved_params])
log = 1. / 2. * np.sum(values[None, ...] * hess[..., None]
* values[:, None, ...], axis=(0, 1)).reshape(new.shape)
new[param] = self[param].clone(
value=new[param][()] + log, derivs=None)
dlogposterior += log
marg_indices = np.array([iparam for iparam, param in enumerate(
solved_params) if 'auto' in param.derived or 'marg' in param.derived])
if marg_indices.size:
log = 1. / 2. * \
np.linalg.slogdet(- hessian[(Ellipsis,) +
np.ix_(marg_indices, marg_indices)])[1]
new[self._loglikelihood] += log
dlogposterior += log
new.logposterior[...] += dlogposterior
return new
[docs]
def select(self, **kwargs):
# Keep weight columns
toret = self._select(name=[self._aweight, self._fweight])
toret.update(self._select(**kwargs))
return toret
def __getitem__(self, name):
"""
Return item corresponding to parameter ``name``.
Parameters
----------
name : Parameter, str, int
Parameter name.
If :class:`Parameter` instance, search for parameter with same name.
"""
try:
return super().__getitem__(name)
except KeyError:
if name == self._weight:
return self.weight
else:
raise
[docs]
@classmethod
def from_getdist(cls, samples, concatenate=None):
"""
Turn getdist.MCSamples into a :class:`Chain` instance.
Note
----
GetDist package is required.
"""
params = ParameterCollection()
for param in samples.paramNames.names:
params.set(Parameter(param.name, latex=param.label,
derived=param.isDerived, fixed=False))
param_indices = samples._getParamIndices()
for param in params:
limits = [samples.ranges.lower.get(
param.name, -np.inf), samples.ranges.upper.get(param.name, np.inf)]
param.update(prior=ParameterPrior(limits=limits))
isscalar = True
try:
chains = samples.getSeparateChains()
isscalar = False
except:
chains = [samples]
toret = []
for chain in chains:
new = cls()
fweight, new.logposterior = chain.weights, -chain.loglikes
iweight = np.rint(fweight)
if np.allclose(fweight, iweight, atol=0., rtol=1e-9):
new.fweight = iweight.astype('i4')
else:
new.aweight = fweight
for param in params:
new.set(ParameterArray(
chain[param_indices[param.name]], param=param))
for param in new.params(basename='chi2_*'):
namespace = re.match('chi2_[_]*(.*)$', param.name).groups()[0]
if namespace == 'prior':
new_param = param.clone(
basename=new._logprior, derived=True)
else:
new_param = param.clone(
basename=new._loglikelihood, namespace=namespace, derived=True)
new[new_param] = -0.5 * new[param]
toret.append(new)
if isscalar:
if concatenate or concatenate is None:
toret = toret[0]
elif concatenate:
toret = cls.concatenate(toret)
return toret
@utils.hybridmethod
def to_getdist(cls, chain, params=None, label=None, **kwargs):
"""
Return GetDist hook to samples.
Note
----
GetDist package is required.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to save samples of (weight and log-posterior are added anyway). Defaults to all parameters.
label : str, default=None
Name for GetDist to use for these samples.
**kwargs : dict
Optional arguments for :class:`getdist.MCSamples`.
Returns
-------
samples : getdist.MCSamples
"""
from getdist import MCSamples
isscalar = not utils.is_sequence(chain)
if isscalar:
chain = [chain]
chains = list(chain)
toret = None
if params is None:
params = chains[0].params(varied=True)
else:
params = [chains[0][param].param for param in params]
if any(param.solved for param in params):
for ichain, chain in enumerate(chains):
chains[ichain] = chain.sample_solved()
chain = chains[0]
params = [param for param in params if param.name not in [
chain._weight, chain._logposterior]]
labels = [param.latex() for param in params]
names = [str(param) for param in params]
ranges = {str(param): tuple('N' if limit is None or not np.isfinite(
limit) else limit for limit in param.prior.limits) for param in params}
samples, weights, loglikes = [], [], []
for chain in chains:
samples.append(chain.to_array(
params=params, struct=False, derivs=()).reshape(-1, chain.size).T)
weights.append(chain.weight.ravel())
loglikes.append(-np.asarray(chain.logposterior.ravel()))
if isscalar:
samples, weights, loglikes = samples[0], weights[0], loglikes[0]
toret = MCSamples(samples=samples, weights=weights, loglikes=loglikes,
names=names, labels=labels, label=label, ranges=ranges, **kwargs)
return toret
[docs]
@to_getdist.instancemethod
def to_getdist(self, *args, **kwargs):
return self.__class__.to_getdist(self, *args, **kwargs)
[docs]
@classmethod
def read_getdist(cls, base_fn, ichains=None, concatenate=False):
"""
Load samples in *CosmoMC* format, i.e.:
- '_{ichain}.txt' files for sample values
- '.paramnames' files for parameter names / latex
- '.ranges' for parameter ranges
Note
----
GetDist package *is not* required.
Parameters
----------
base_fn : str, Path
Base *CosmoMC* file name. Will be appended by '_{ichain}.txt' for sample values,
'.paramnames' for parameter names and '.ranges' for parameter ranges.
ichains : int, tuple, list, default=None
Chain numbers to load. Defaults to all chains matching pattern '{base_fn}*.txt'.
If a single number is provided, return a unique chain.
If multiple numbers are provided, or is ``None``, return a list of chains (see ``concatenate``).
concatenate : bool, default=False
If ``True``, concatenate all chains in one.
Returns
-------
samples : list, Chain
Chain or list of chains.
"""
params_fn = '{}.paramnames'.format(base_fn)
cls.log_info('Loading params file: {}.'.format(params_fn))
params = ParameterCollection()
with open(params_fn) as file:
for line in file:
line = [item.strip() for item in line.split(maxsplit=1)]
if line:
name, latex = line
derived = name.endswith('*')
if derived:
name = name[:-1]
params.set(Parameter(basename=name, latex=latex.replace(
'\n', ''), fixed=False, derived=derived))
ranges_fn = '{}.ranges'.format(base_fn)
if os.path.exists(ranges_fn):
cls.log_info(
'Loading parameter ranges from {}.'.format(ranges_fn))
with open(ranges_fn) as file:
for line in file:
name, low, high = [item.strip()
for item in line.split()]
latex = latex.replace('\n', '')
limits = []
for lh, li in zip([low, high], [-np.inf, np.inf]):
if lh == 'N':
lh = li
else:
lh = float(lh)
limits.append(lh)
if name in params:
params[name].update(
prior=ParameterPrior(limits=limits))
else:
cls.log_info(
'Parameter ranges file {} does not exist.'.format(ranges_fn))
chain_fn = '{}_{{}}.txt'.format(base_fn)
isscalar = False
chain_fns = []
if ichains is not None:
isscalar = np.ndim(ichains) == 0
if isscalar:
ichains = [ichains]
for ichain in ichains:
chain_fns.append(chain_fn.format('{:d}'.format(ichain)))
else:
chain_fns = glob.glob(chain_fn.format('[0-9]*'))
toret = []
for chain_fn in chain_fns:
cls.log_info('Loading chain file: {}.'.format(chain_fn))
array = np.loadtxt(chain_fn, unpack=True)
new = cls()
fweight, new.logposterior = array[0], -array[1]
iweight = np.rint(fweight)
if np.allclose(fweight, iweight, atol=0., rtol=1e-9):
new.fweight = iweight.astype('i4')
else:
new.aweight = fweight
for param, values in zip(params, array[2:]):
new.set(ParameterArray(values, param))
toret.append(new)
for new in toret:
for param in new.params(basename='chi2_*'):
namespace = re.match('chi2_[_]*(.*)$', param.name).groups()[0]
if namespace == 'prior':
new_param = param.clone(
basename=new._logprior, derived=True)
else:
new_param = param.clone(
basename=new._loglikelihood, namespace=namespace, derived=True)
new[new_param] = -0.5 * new[param]
if isscalar:
return toret[0]
if concatenate:
return cls.concatenate(toret)
return toret
@utils.hybridmethod
def write_getdist(cls, chain, base_fn, params=None, ichain=None, fmt='%.18e', delimiter=' ', **kwargs):
"""
Save samples to disk in *CosmoMC* format.
Note
----
GetDist package *is not* required.
Parameters
----------
base_fn : str, Path
Base *CosmoMC* file name. Will be prepended by '_{ichain}.txt' for sample values,
'.paramnames' for parameter names and '.ranges' for parameter ranges.
params : list, ParameterCollection, default=None
Parameters to save samples of (weight and log-posterior are added anyway). Defaults to all parameters.
ichain : int, default=None
Chain number to append to file name, i.e. sample values will be saved as '{base_fn}_{ichain}.txt'.
If ``None``, does not append any number, sample values will be saved as '{base_fn}.txt'.
fmt : str, default='%.18e'
How to format floats.
delimiter : str, default=' '
String or character separating columns.
kwargs : dict
Optional arguments for :func:`numpy.savetxt`.
"""
isscalar = not utils.is_sequence(chain)
if isscalar:
chain = [chain]
chains = list(chain)
if params is None:
params = chains[0].params()
else:
params = [chains[0][param].param for param in params]
if any(param.solved for param in params):
for ichain, chain in enumerate(chains):
chains[ichain] = chain.sample_solved()
chain = chains[0]
columns = [str(param) for param in params]
outputs_columns = [chain._weight, chain._logposterior]
shape = chain.shape
outputs = [array.param.name for array in chain if array.shape != shape]
for column in outputs:
if column in columns:
del columns[columns.index(column)]
if ichain is None:
if isscalar:
ichain = [None] * len(chains)
else:
ichain = list(range(len(chains)))
if not utils.is_sequence(ichain):
ichain = [ichain]
assert len(ichain) == len(chains)
utils.mkdir(os.path.dirname(base_fn))
output = ''
params = chain.params(name=columns)
for param in params:
tmp = '{}* {}\n' if getattr(param, 'derived',
getattr(param, 'fixed')) else '{} {}\n'
output += tmp.format(param.name, param.latex())
params_fn = '{}.paramnames'.format(base_fn)
cls.log_info('Saving parameter names to {}.'.format(params_fn))
with open(params_fn, 'w') as file:
file.write(output)
output = ''
for param in params:
limits = param.prior.limits
limits = tuple('N' if limit is None or np.abs(limit)
== np.inf else limit for limit in limits)
output += '{} {} {}\n'.format(param.name, limits[0], limits[1])
ranges_fn = '{}.ranges'.format(base_fn)
cls.log_info('Saving parameter ranges to {}.'.format(ranges_fn))
with open(ranges_fn, 'w') as file:
file.write(output)
for chain, ichain in zip(chains, ichain):
data = chain.to_array(params=outputs_columns + columns,
struct=False, derivs=()).reshape(-1, chain.size)
data[1] *= -1
data = data.T
chain_fn = '{}.txt'.format(
base_fn) if ichain is None else '{}_{:d}.txt'.format(base_fn, ichain)
cls.log_info('Saving chain to {}.'.format(chain_fn))
np.savetxt(chain_fn, data, header='', fmt=fmt,
delimiter=delimiter, **kwargs)
[docs]
@write_getdist.instancemethod
def write_getdist(self, *args, **kwargs):
return self.__class__.write_getdist(self, *args, **kwargs)
[docs]
def to_anesthetic(self, params=None, label=None, **kwargs):
"""
Return anesthetic hook to samples.
Note
----
anesthetic package *is* required.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to save samples of (weight and log-posterior are added anyway). Defaults to all parameters.
label : str, default=None
Name for anesthetic to use for these samples.
**kwargs : dict
Optional arguments for :class:`anesthetic.MCMCSamples`.
Returns
-------
samples : anesthetic.MCMCSamples
"""
from anesthetic import MCMCSamples
toret = None
if params is None:
params = self.params(varied=True)
else:
params = [self[param].param for param in params]
if any(param.solved for param in params):
self = self.sample_solved()
labels = [param.latex() for param in params]
samples = self.to_array(
params=params, struct=False, derivs=()).reshape(-1, self.size)
names = [str(param) for param in params]
limits = {param.name: tuple('N' if limit is None or np.abs(
limit) == np.inf else limit for limit in param.prior.limits) for param in params}
toret = MCMCSamples(samples=samples.T, columns=names, weights=np.asarray(self.weight.ravel(
)), logL=-np.asarray(self.logposterior.ravel()), labels=labels, label=label, logzero=-np.inf, limits=limits, **kwargs)
return toret
[docs]
def choice(self, index='mean', params=None, return_type='dict', **kwargs):
"""
Return parameter mean(s) or best fit(s).
Parameters
----------
index : str, default='mean'
'argmax' to return "best fit" (as defined by the point with maximum log-posterior in the chain).
'mean' to return mean of parameters (weighted by :attr:`weight`).
params : list, ParameterCollection, default=None
Parameters to compute mean / best fit for. Defaults to all parameters.
return_type : default='dict'
'dict' to return a dictionary mapping parameter names to mean / best fit;
'nparray' to return an array of parameter mean / best fit;
``None`` to return a :class:`Chain` instance with a single value.
**kwargs : dict
Optional arguments passed to :meth:`params` to select params to return, e.g. ``varied=True, derived=False``.
Returns
-------
toret : dict, array, Chain
"""
if params is None:
params = self.params(**kwargs)
if isinstance(index, str) and index == 'mean':
di = {str(param): self.mean(param) for param in params}
index = (0,) # just for test below
else:
if isinstance(index, str) and index == 'argmax':
index = np.unravel_index(
self.logposterior.argmax(), self.shape)
if not isinstance(index, tuple):
index = (index,)
di = {str(param): self[param][index] for param in params}
if return_type == 'dict':
return di
if return_type == 'nparray':
return np.array(list(di.values()))
toret = self.copy()
isscalar = all(np.ndim(ii) == 0 for ii in index)
toret.data = []
for param, value in di.items():
value = np.asarray(value)
toret.data.append(self[param].clone(
value=value[None, ...] if isscalar else value))
return toret
[docs]
def covariance(self, params=None, return_type='nparray', ddof=1):
"""
Return parameter covariance computed from (weighted) samples.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to compute covariance for. Defaults to all parameters.
If a single parameter is provided, this parameter is a scalar, and ``return_type`` is 'nparray', return a scalar.
return_type : str, default='nparray'
'nparray' to return matrix array;
``None`` to return :class:`ParameterCovariance` instance.
ddof : int, default=1
Number of degrees of freedom.
Returns
-------
covariance : array, float, ParameterCovariance
"""
if params is None:
params = self.params()
if not is_parameter_sequence(params):
params = [params]
params = ParameterCollection(
[self[param].param for param in params]) # eliminates duplicates
values = [self[param][()].reshape(self.size, -1)
for param in params] # [()] to take order 0 derivatives
values = np.concatenate(values, axis=-1)
covariance = np.atleast_2d(np.cov(values, rowvar=False, fweights=self.fweight.ravel(
), aweights=self.aweight.ravel(), ddof=ddof))
solved_params = [param for param in params if param.solved]
if solved_params:
solved_indices = [params.index(param) for param in solved_params]
covariance[np.ix_(solved_indices, solved_indices)] += np.average(_get_solved_covariance(
self, params=solved_params).reshape(-1, len(solved_params), len(solved_params)), weights=self.weight.ravel(), axis=0)
return ParameterCovariance(covariance, params=params).view(return_type=return_type)
[docs]
def precision(self, params=None, return_type='nparray', ddof=1):
"""
Return inverse parameter covariance computed from (weighted) samples.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to compute covariance for. Defaults to all parameters.
If a single parameter is provided, this parameter is a scalar, and ``return_type`` is 'nparray', return a scalar.
return_type : str, default='nparray'
'nparray' to return matrix array.
``None`` to return a :class:`ParameterPrecision` instance.
ddof : int, default=1
Number of degrees of freedom.
Returns
-------
precision : array, float, ParameterPrecision
"""
return self.covariance(params=params, ddof=ddof, return_type=None).to_precision(return_type=return_type)
[docs]
def corrcoef(self, params=None):
"""Return correlation matrix array computed from (weighted) samples (optionally restricted to input parameters)."""
return self.covariance(params=params, return_type=None).corrcoef()
[docs]
def var(self, params=None, ddof=1):
"""
Return variance computed from (weighted) samples (optionally restricted to input parameters).
If a single parameter is given as input and this parameter is a scalar, return a scalar.
``ddof`` is the number of degrees of freedom.
"""
isscalar = not is_parameter_sequence(params)
cov = self.covariance(params, ddof=ddof, return_type='nparray')
if isscalar:
return cov.flat[0] # single param
return np.diag(cov)
[docs]
def std(self, params=None, ddof=1):
"""
Return standard deviation computed from (weighted) samples (optionally restricted to input parameters).
If a single parameter is given as input and this parameter is a scalar, return a scalar.
``ddof`` is the number of degrees of freedom.
"""
return self.var(params=params, ddof=ddof)**0.5
[docs]
@vectorize
def mean(self, params=None):
"""
Return mean computed from (weighted) samples (optionally restricted to input parameters).
If a single parameter is given as input and this parameter is a scalar, return a scalar.
"""
return np.average(_reshape(self[params], self.size), weights=self.weight.ravel(), axis=0)
[docs]
@vectorize
def argmax(self, params=None):
"""
Return parameter values for maximum of log-posterior (optionally restricted to input parameters).
If a single parameter is given as input and this parameter is a scalar, return a scalar.
"""
return _reshape(self[params], self.size)[np.argmax(self.logposterior.ravel())]
[docs]
@vectorize
def quantile(self, params=None, q=(0.1587, 0.8413), method='linear'):
"""
Compute the q-th quantile of the weighted parameter samples.
If a single parameter is given as input this parameter is a scalar, and a ``q`` is a scalar, return a scalar.
Note
----
Adapted from https://github.com/minaskar/cronus/blob/master/cronus/plot.py.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to compute quantiles for. Defaults to all parameters.
q : tuple, list, array
Quantile or sequence of quantiles to compute, which must be between
0 and 1 inclusive.
method : {'linear', 'lower', 'higher', 'midpoint', 'nearest'}, default='linear'
This optional parameter specifies the method method to
use when the desired quantile lies between two data points
``i < j``:
- linear: ``i + (j - i) * fraction``, where ``fraction``
is the fractional part of the index surrounded by ``i``
and ``j``.
- lower: ``i``.
- higher: ``j``.
- nearest: ``i`` or ``j``, whichever is nearest.
- midpoint: ``(i + j) / 2``.
Returns
-------
quantiles : list, scalar, array
"""
value = _reshape(self[params], self.size)
weight = self.weight.ravel()
weight /= np.sum(weight)
if value.param.solved:
from scipy import stats
locs = value
scales = _get_solved_covariance(
self, [params])[..., 0, 0].ravel()**0.5
if np.all(scales == 0.):
return utils.weighted_quantile(value, q=q, weights=weight, axis=0, method=method)
isscalar = np.ndim(q) == 0
q = np.atleast_1d(q)
quantiles = np.array(q)
for iq, qq in enumerate(q.flat):
from scipy import special
def cdf(x):
toret = np.empty_like(x)
nx = len(x)
nslabs = max(nx * len(locs) // int(1e8), 1)
for islab in range(nslabs):
start, stop = islab * \
nx // nslabs, (islab + 1) * nx // nslabs
toret[start:stop] = np.sum(
weight / 2. * (1. + special.erf((x[start:stop, None] - locs) / (2**0.5 * scales))), axis=-1)
return toret
nsigmas = 100
limits = np.min(locs - nsigmas *
scales), np.max(locs + nsigmas * scales)
if qq <= limits[0]:
res = limits[0]
elif qq >= limits[1]:
res = limits[1]
else:
x = np.linspace(*limits, num=10000)
cdf = cdf(x) - qq
idx = np.searchsorted(cdf, 0., side='right') - 1
res = (x[idx + 1] - x[idx]) / \
(cdf[idx + 1] - cdf[idx]) * cdf[idx + 1] + x[idx]
# print(cdf(x[idx]), cdf(x[idx + 1]))
# res = optimize.bisect(cdf, x[idx], x[idx + 1], xtol=1e-6 * np.mean(scales), disp=True)
quantiles.flat[iq] = res
if isscalar:
return quantiles[0]
return quantiles
return utils.weighted_quantile(value, q=q, weights=weight, axis=0, method=method)
[docs]
@vectorize
def interval(self, params=None, nsigmas=1.):
"""
Return n-sigma confidence interval(s).
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to compute confidence interval for. Defaults to all parameters.
nsigmas : int
Return interval for this number of sigmas.
Returns
-------
interval : tuple, list
"""
value = self[params].ravel()
weight = self.weight.ravel()
weight /= np.sum(weight)
if value.param.solved:
from scipy import stats
locs = value
scales = _get_solved_covariance(
self, [params])[..., 0, 0].ravel()**0.5
if not np.all(scales == 0.):
from scipy import special
def cdf(x):
toret = np.empty_like(x)
nx = len(x)
nslabs = max(nx * len(locs) // int(1e8), 1)
for islab in range(nslabs):
start, stop = islab * \
nx // nslabs, (islab + 1) * nx // nslabs
toret[start:stop] = np.sum(
weight / 2. * (1. + special.erf((x[start:stop, None] - locs) / (2**0.5 * scales))), axis=-1)
return toret
limits = np.min(locs - 2 * nsigmas *
scales), np.max(locs + 2 * nsigmas * scales)
value = np.linspace(*limits, num=10000)
weight = cdf(value)
weight = np.concatenate(
[[weight[0]], np.diff(weight)[:-1], [1. - weight[-2]]])
return utils.interval(value, weights=weight, nsigmas=nsigmas)
[docs]
def to_fisher(self, params=None, ddof=1, **kwargs):
"""
Return Fisher from (weighted) samples.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to return Fisher for. Defaults to all parameters.
ddof : int, default=1
Number of degrees of freedom.
**kwargs : dict
Arguments for :meth:`choice`, giving the mean of the output Fisher likelihood.
Returns
-------
fisher : LikelihoodFisher
"""
precision = self.precision(params=params, ddof=ddof, return_type=None)
params = precision._params
mean = self.choice(params=params, return_type='nparray', **kwargs)
return LikelihoodFisher(center=mean, params=params, offset=self.logposterior.max(), hessian=-precision.view(return_type='nparray'), with_prior=True)
[docs]
def to_stats(self, params=None, quantities=None, sigfigs=2, tablefmt='latex_raw', fn=None):
"""
Export summary sampling quantities.
Parameters
----------
params : list, ParameterCollection, default=None
Parameters to export quantities for. Defaults to all parameters.
quantities : list, default=None
Quantities to export. Defaults to ``['argmax', 'mean', 'median', 'std', 'quantile:1sigma', 'interval:1sigma']``.
sigfigs : int, default=2
Number of significant digits.
See :func:`utils.round_measurement`.
tablefmt : str, default='latex_raw'
Format for summary table.
See :func:`tabulate.tabulate`.
If 'list', return table as list of list of strings, and headers.
If 'list_latex', return table as list of list of latex strings, and headers.
fn : str, default=None
If not ``None``, file name where to save summary table.
Returns
-------
tab : str
Summary table.
"""
import tabulate
if params is None:
params = self.params(varied=True)
else:
params = [self[param].param for param in params]
if quantities is None:
quantities = ['argmax', 'mean', 'median',
'std', 'quantile:1sigma', 'interval:1sigma']
is_latex = 'latex' in tablefmt
def round_errors(low, up):
low, up = utils.round_measurement(
0.0, low, up, sigfigs=sigfigs, positive_sign='u')[1:]
if is_latex:
return '${{}}_{{{}}}^{{{}}}$'.format(low, up)
return '{}/{}'.format(low, up)
data = []
for param in params:
row = []
row.append(param.latex(inline=True) if is_latex else str(param))
ref_center = self.mean(param)
ref_error = self.var(param)**0.5
for quantity in quantities:
if quantity in ['argmax', 'mean', 'median', 'std']:
value = getattr(self, quantity)(param)
value = utils.round_measurement(
value, ref_error, sigfigs=sigfigs)[0]
if is_latex:
value = '${}$'.format(value)
row.append(value)
elif quantity.startswith('quantile'):
nsigmas = int(
re.match('quantile:(.*)sigma', quantity).group(1))
low, up = self.quantile(
param, q=utils.nsigmas_to_quantiles_1d_sym(nsigmas))
row.append(round_errors(low - ref_center, up - ref_center))
elif quantity.startswith('interval'):
nsigmas = int(
re.match('interval:(.*)sigma', quantity).group(1))
low, up = self.interval(param, nsigmas=nsigmas)
row.append(round_errors(low - ref_center, up - ref_center))
else:
raise RuntimeError('Unknown quantity {}.'.format(quantity))
data.append(row)
if 'list' in tablefmt:
return data, quantities
tab = tabulate.tabulate(data, headers=quantities, tablefmt=tablefmt)
if fn is not None:
utils.mkdir(os.path.dirname(fn))
self.log_info('Saving to {}.'.format(fn))
with open(fn, 'w') as file:
file.write(tab)
return tab