import glob
import warnings
import numpy as np
import lsstypes as types
from lsstypes.external import from_pycorr, from_pypower
from desilike import plotting, jax, utils
from desilike.base import BaseCalculator
from .window import WindowedCorrelationFunctionMultipoles
from desilike.observables.types import ObservableArray, ObservableCovariance
def _is_array(data):
return isinstance(data, (np.ndarray,) + jax.array_types)
def _is_from_pycorr(data):
return utils.is_path(data) or not _is_array(data)
[docs]
class TracerCorrelationFunctionMultipolesObservable(BaseCalculator):
"""
Tracer correlation function multipoles observable: compare measurement to theory.
Parameters
----------
data : str, Path, list, lsstypes.Count2Correlation, lsstypes.Count2CorrelationPoles, dict, default=None
Data correlation function measurement: flat array (of all multipoles), :class:`lsstypes.Count2Correlation` instance,
or path to such instances, or list of such objects (in which case the average of them is taken).
If dict, parameters to be passed to theory to generate mock measurement.
If a (list of) flat array, additionally provide list of multipoles ``ells`` and separations ``s`` (see ``kwargs``).
covariance : list, default=None
2D array, list of :class:`lsstypes.Count2Correlation` instances, or paths to such instances;
these are used to compute the covariance matrix.
slim : dict, default=None
Separation limits: a dictionary mapping multipoles to (min separation, max separation, (optionally) step (float)),
e.g. ``{0: (30., 160., 5.), 2: (30., 160., 5.)}``. If ``None``, no selection is applied for the given multipole.
**kwargs : dict
Optional arguments for :class:`WindowedCorrelationFunctionMultipoles`, e.g.:
- theory: defaults to :class:`KaiserTracerCorrelationFunctionMultipoles`.
- fiber_collisions
- systematic_templates
- if one only provided simple arrays for ``data`` and ``covariance``,
one can provide the list of multipoles ``ells`` and the corresponding (list of) :math:`s` separations as a (list of) array ``s``.
"""
name = 'correlation2poles'
def initialize(self, data=None, covariance=None, slim=None, wmatrix=None, ignore_nan=False, sedges=None, **kwargs):
self.s, self.sedges, self.RR, self.ells = None, sedges, None, None
self.flatdata, self.mocks, self.covariance = None, None, None
if not isinstance(data, dict):
self.flatdata = self.load_data(data=data, slim=slim, ignore_nan=ignore_nan)[0]
if self.mpicomm.bcast(_is_array(covariance) or isinstance(covariance, (ObservableCovariance, types.CovarianceMatrix)), root=0):
self.covariance = self.mpicomm.bcast(covariance, root=0)
else:
self.mocks = self.load_data(data=covariance, slim=slim, ignore_nan=ignore_nan)[-1]
if self.mpicomm.bcast(self.mocks is not None, root=0):
covariance = None
if self.mpicomm.rank == 0:
covariance = np.cov(self.mocks, rowvar=False, ddof=1)
self.covariance = self.mpicomm.bcast(covariance, root=0)
self.wmatrix = wmatrix
if not isinstance(wmatrix, WindowedCorrelationFunctionMultipoles):
self.wmatrix = WindowedCorrelationFunctionMultipoles()
#if wmatrix is None: wmatrix = {}
if self.RR and isinstance(wmatrix, dict):
wmatrix = {**self.RR, **wmatrix}
self.wmatrix.init.update(wmatrix=wmatrix)
if self.ells is not None: # set by data
self.wmatrix.init.update(ells=self.ells)
if self.sedges is not None: # set by data
self.wmatrix.init.update(sedges=self.sedges)
if self.s is not None: # set by data
self.wmatrix.init.update(s=self.s)
elif slim is not None: # FIXME: we do not want limits to apply to s (but mid-s)
self.wmatrix.init.update(slim=slim)
self.wmatrix.init.update(kwargs)
if self.flatdata is None:
self.wmatrix(**data)
self.flatdata = self.wmatrix.flatcorr.copy()
else:
self.wmatrix.runtime_info.initialize()
input_sedges = self.sedges is not None
for name in ['s', 'ells', 'sedges']:
setattr(self, name, getattr(self.wmatrix, name))
smasklim = self.wmatrix.smasklim
if smasklim is not None: # cut has been applied to input s
cumsize = np.insert(np.cumsum([len(ss) for ss in smasklim.values()]), 0, 0)
data = [self.flatdata[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])]
ells = list(smasklim)
self.flatdata = np.concatenate([data[ells.index(ell)][smasklim[ell]] for ell in self.ells])
if isinstance(self.covariance, ObservableCovariance):
if input_sedges: x, method = [np.mean(edges, axis=-1) for edges in self.sedges], 'mid'
else: x, method = list(self.s),'mean'
self.covariance = self.covariance.xmatch(x=x, projs=list(self.ells), method=method).view(projs=list(self.ells))
elif isinstance(self.covariance, types.CovarianceMatrix):
if 'observables' in self.covariance.observable.labels(return_type='keys'):
self.covariance = self.covariance.at.observable.get(observables=self.name)
observable = self.to_lsstypes('data')
self.covariance = self.covariance.at.observable.match(observable).value()
self.nobs = getattr(self.covariance, 'nobs', None)
def load_data(self, data=None, slim=None, ignore_nan=False):
def load_data(fn):
fn = str(fn)
if fn.endswith('.npy'):
warnings.warn('Handling of *.npy files is deprecated. Switch to lsstypes format.')
state = np.load(fn, allow_pickle=True)[()]
if '_projs' in state:
toret = ObservableArray.from_state(state)
else:
try:
from pycorr import TwoPointCorrelationFunction
toret = TwoPointCorrelationFunction.from_state(state)
toret = from_pycorr(toret)
except:
from pypower import MeshFFTCorr, CorrelationFunctionMultipoles
toret = MeshFFTCorr.from_state(state)
if hasattr(toret, 'poles'):
toret = toret.poles
else:
toret = CorrelationFunctionMultipoles.from_state(state)
toret = from_pypower(toret)
else:
toret = types.read(fn)
return toret
def lim_data(corr, slim=slim):
ells, list_s, list_sedges, RR, list_data = [], [], [], None, []
if isinstance(corr, ObservableArray):
if slim is None:
slim = {ell: (0, np.inf) for ell in corr.projs}
RR = corr.attrs.get('R1R2', None)
for ell, lim in slim.items():
start, stop, *step = lim
rebin = 1
if step and step[0] != 1: rebin = np.rint(step[0] / np.diff(corr.edges(projs=ell)).mean()).astype(int)
corr_slice = corr.copy().select(xlim=(start, stop), rebin=rebin, projs=ell)
ells.append(ell)
list_s.append(corr_slice.x(projs=ell))
edges = corr_slice.edges(projs=ell)
edges = np.column_stack((edges[:-1], edges[1:]))
list_sedges.append(edges)
list_data.append(corr_slice.view(projs=ell))
else:
if not isinstance(corr, types.ObservableTree):
corr = from_pycorr(corr)
if slim is None:
slim = {ell: (0, np.inf) for ell in (0, 2, 4)}
try:
RR = corr.get('RR')
RR = {'sedges': RR.edges('s'), 'muedges': RR.edges('mu'), 'wcounts': RR.value()}
except ValueError:
RR = None
for ell, lim in slim.items():
start, stop, *step = lim
def rebin(corr):
rebin = 1
if step and step[0] != 1:
rebin = np.rint(step[0] / np.diff(corr.edges('s'), axis=-1).mean()).astype(int)
return corr.select(s=slice(0, None, rebin))
if hasattr(corr, 'project'): # input is ('s', 'mu')
pole = rebin(corr).project(ells=ell, ignore_nan=True)
else:
pole = rebin(corr.get(ells=ell))
pole = pole.select(s=tuple(lim[:2]))
ells.append(ell)
list_s.append(pole.coords('s'))
list_sedges.append(pole.edges('s'))
list_data.append(pole.value())
return list_s, list_sedges, RR, ells, list_data
def load_all(lmocks):
list_mocks = []
for mocks in lmocks:
if utils.is_path(mocks):
gmocks = glob.glob(str(mocks)) # glob takes str
if not gmocks:
import warnings
warnings.warn('file {} is not found'.format(mocks))
list_mocks += sorted(gmocks)
else:
list_mocks.append(mocks)
fns = [mock for mock in list_mocks if utils.is_path(mock)]
if len(fns):
nfns = 5
if len(fns) < nfns:
msg = 'Loading 1 file {}.'.format(fns)
else:
msg = 'Loading {:d} files [{}].'.format(len(fns), ', ..., '.join(fns[::len(fns) // nfns]))
self.log_info(msg)
list_y = []
for mock in list_mocks:
if utils.is_path(mock):
mock = load_data(mock)
mock_s, mock_sedges, mock_RR, mock_ells, mock_y = lim_data(mock)
if self.s is None:
self.s, self.sedges, self.RR, self.ells = mock_s, mock_sedges, mock_RR, mock_ells
if not all(np.allclose(ss, ms, atol=0., rtol=1e-3) for ss, ms in zip(self.sedges, mock_sedges)):
raise ValueError('{} does not have expected s-edges (based on previous data)'.format(mock))
if mock_ells != self.ells:
raise ValueError('{} does not have expected poles (based on previous data)'.format(mock))
list_y.append(np.concatenate(mock_y))
return list_y
flatdata, list_y = None, None
if self.mpicomm.rank == 0 and data is not None:
if not utils.is_sequence(data):
data = [data]
if any(_is_from_pycorr(dd) or isinstance(dd, (ObservableArray, types.ObservableTree)) for dd in data):
list_y = load_all(data)
if not list_y: raise ValueError('no data/mocks could be obtained from {}'.format(data))
else:
list_y = list(data)
flatdata = np.mean(list_y, axis=0)
self.s, self.sedges, self.RR, self.ells, flatdata = self.mpicomm.bcast((self.s, self.sedges, self.RR, self.ells, flatdata) if self.mpicomm.rank == 0 else None, root=0)
return flatdata, list_y
[docs]
@plotting.plotter(interactive={'kw_theory': {'color': 'black', 'label': 'reference'}})
def plot(self, kw_theory=None, fig=None):
"""
Plot data and theory correlation function multipoles.
Parameters
----------
kw_theory : list of dict, default=None
Change the default line parametrization of the theory, one dictionary for each ell or duplicate it.
fig : matplotlib.figure.Figure, default=None
Optionally, a figure with at least ``1 + len(self.ells)`` axes.
fn : str, Path, default=None
Optionally, path where to save figure.
If not provided, figure is not saved.
kw_save : dict, default=None
Optionally, arguments for :meth:`matplotlib.figure.Figure.savefig`.
show : bool, default=False
If ``True``, show figure.
interactive : bool, default=False
If ``True``, use interactive interface provided by ipywidgets.
Returns
-------
fig : matplotlib.figure.Figure
"""
from matplotlib import pyplot as plt
if kw_theory is None:
kw_theory = {}
if isinstance(kw_theory, dict):
kw_theory = [kw_theory]
if len(kw_theory) != len(self.ells):
kw_theory = [{key: value for key, value in kw_theory[0].items() if (key != 'label') or (ill == 0)} for ill in range(len(self.ells))]
kw_theory = [{'color': 'C{:d}'.format(ill), **kw} for ill, kw in enumerate(kw_theory)]
if fig is None:
height_ratios = [max(len(self.ells), 3)] + [1] * len(self.ells)
figsize = (6, 1.5 * sum(height_ratios))
fig, lax = plt.subplots(len(height_ratios), sharex=True, sharey=False, gridspec_kw={'height_ratios': height_ratios}, figsize=figsize, squeeze=True)
fig.subplots_adjust(hspace=0.1)
show_legend = True
else:
lax = fig.axes
show_legend = False
data, theory, std = self.data, self.theory, self.std
for ill, ell in enumerate(self.ells):
lax[0].errorbar(self.s[ill], self.s[ill]**2 * data[ill], yerr=self.s[ill]**2 * std[ill], color='C{:d}'.format(ill), linestyle='none', marker='o', label=r'$\ell = {:d}$'.format(ell))
lax[0].plot(self.s[ill], self.s[ill]**2 * theory[ill], **kw_theory[ill])
for ill, ell in enumerate(self.ells):
lax[ill + 1].plot(self.s[ill], (data[ill] - theory[ill]) / std[ill], **kw_theory[ill])
lax[ill + 1].set_ylim(-4, 4)
for offset in [-2., 2.]: lax[ill + 1].axhline(offset, color='k', linestyle='--')
lax[ill + 1].set_ylabel(r'$\Delta \xi_{{{0:d}}} / \sigma_{{ \xi_{{{0:d}}} }}$'.format(ell))
for ax in lax: ax.grid(True)
if show_legend: lax[0].legend()
lax[0].set_ylabel(r'$s^{2} \xi_{\ell}(s)$ [$(\mathrm{Mpc}/h)^{2}$]')
lax[-1].set_xlabel(r'$s$ [$\mathrm{Mpc}/h$]')
return fig
[docs]
@plotting.plotter
def plot_bao(self, fig=None):
"""
Plot data and theory BAO correlation function peak.
Parameters
----------
fig : matplotlib.figure.Figure, default=None
Optionally, a figure with at least ``len(self.ells)`` axes.
fn : str, Path, default=None
Optionally, path where to save figure.
If not provided, figure is not saved.
kw_save : dict, default=None
Optionally, arguments for :meth:`matplotlib.figure.Figure.savefig`.
show : bool, default=False
If ``True``, show figure.
Returns
-------
fig : matplotlib.figure.Figure
"""
from matplotlib import pyplot as plt
if fig is None:
height_ratios = [1] * len(self.ells)
figsize = (4, 3 * sum(height_ratios))
fig, lax = plt.subplots(len(height_ratios), sharex=True, sharey=False, gridspec_kw={'height_ratios': height_ratios}, figsize=figsize, squeeze=False)
lax = lax.ravel() # in case only one ell
fig.subplots_adjust(hspace=0)
else:
lax = fig.axes
data, theory, std = self.data, self.theory, self.std
nobao = self.theory_nobao
for ill, ell in enumerate(self.ells):
lax[ill].errorbar(self.s[ill], self.s[ill]**2 * (data[ill] - nobao[ill]), yerr=self.s[ill]**2 * std[ill], color='C{:d}'.format(ill), linestyle='none', marker='o')
lax[ill].plot(self.s[ill], self.s[ill]**2 * (theory[ill] - nobao[ill]), color='C{:d}'.format(ill))
lax[ill].set_ylabel(r'$s^{{2}} \Delta \xi_{{{:d}}}(s)$ [$(\mathrm{{Mpc}}/h)^{{2}}$]'.format(ell))
for ax in lax: ax.grid(True)
lax[-1].set_xlabel(r'$s$ [$\mathrm{Mpc}/h$]')
return fig
def plot_wiggles(self, *args, **kwargs):
import warnings
warnings.warn('plot_wiggles is deprecated, use plot_bao instead')
self.plot_bao(*args, **kwargs)
[docs]
@plotting.plotter
def plot_covariance_matrix(self, corrcoef=True, **kwargs):
"""
Plot covariance matrix.
Parameters
----------
corrcoef : bool, default=True
If ``True``, plot the correlation matrix; else the covariance.
barlabel : str, default=None
Optionally, label for the color bar.
figsize : int, tuple, default=None
Optionally, figure size.
norm : matplotlib.colors.Normalize, default=None
Scales the covariance / correlation to the canonical colormap range [0, 1] for mapping to colors.
By default, the covariance / correlation range is mapped to the color bar range using linear scaling.
labelsize : int, default=None
Optionally, size for labels.
fig : matplotlib.figure.Figure, default=None
Optionally, a figure with at least ``len(self.ells) * len(self.ells)`` axes.
Returns
-------
fig : matplotlib.figure.Figure
"""
covariance = self.to_lsstypes('covariance')
return covariance.plot(corrcoef=corrcoef, **kwargs)
def calculate(self):
self.flattheory = self.wmatrix.flatcorr
@property
def theory(self):
return self.wmatrix.corr
@property
def theory_nobao(self):
template = self.wmatrix.theory.template
only_now = template.only_now
template.only_now = True
def callback(calculator):
all_requires.append(calculator)
for require in calculator.runtime_info.requires:
if require in all_requires:
del all_requires[all_requires.index(require)] # we want first dependencies at the end
callback(require)
all_requires = []
callback(self)
all_requires = all_requires[::-1]
for calculator in all_requires:
calculator.runtime_info.tocalculate = True
calculator.runtime_info.calculate(calculator.runtime_info.input_values)
nobao = self.theory
template.only_now = only_now
for calculator in all_requires:
calculator.runtime_info.tocalculate = True
calculator.runtime_info.calculate(calculator.runtime_info.input_values)
return nobao
@property
def data(self):
cumsize = np.insert(np.cumsum([len(s) for s in self.s]), 0, 0)
return [self.flatdata[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])]
@property
def std(self):
cumsize = np.insert(np.cumsum([len(s) for s in self.s]), 0, 0)
diag = np.diag(self.covariance)**0.5
return [diag[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])]
def __getstate__(self):
state = {}
for name in ['s', 'sedges', 'RR', 'ells', 'flatdata', 'shotnoise', 'flattheory']:
if hasattr(self, name):
state[name] = getattr(self, name)
return state
@classmethod
def install(cls, config):
# TODO: remove this dependency
config.pip('git+https://github.com/adematti/lsstypes')
[docs]
def to_lsstypes(self, kind):
"""Return observable (data) and covariance."""
data = [types.Count2CorrelationPole(s=self.s[ill], s_edges=self.sedges[ill], value=self.data[ill], ell=ell) for ill, ell in enumerate(self.ells)]
data = types.Count2CorrelationPoles(data)
if kind == 'data':
return data
if kind == 'covariance':
return types.CovarianceMatrix(observable=data, value=self.covariance)
raise NotImplementedError(f'kind {kind} not recognized')
def to_array(self):
warnings.warn('to_array is deprecated. Please use to_lsstypes')
from desilike.observables import ObservableArray
sedges = [np.append(edges[:, 0], edges[-1, 1]) for edges in self.sedges]
return ObservableArray(x=self.s, edges=sedges, value=self.data, projs=self.ells, name=self.__class__.__name__)