Source code for desilike.observables.galaxy_clustering.power_spectrum

import glob
import warnings

import numpy as np
import lsstypes as types
from lsstypes.external import from_pypower

from desilike import plotting, jax, utils
from desilike.base import BaseCalculator
from .window import WindowedPowerSpectrumMultipoles
from desilike.observables.types import ObservableArray, ObservableCovariance


def _is_array(data):
    return isinstance(data, (np.ndarray,) + jax.array_types)


def _is_from_pypower(data):
    return utils.is_path(data) or not _is_array(data)


[docs] class TracerPowerSpectrumMultipolesObservable(BaseCalculator): """ Tracer power spectrum multipoles observable: compare measurement to theory. Parameters ---------- data : array, str, Path, list, lsstypes.Mesh2SpectrumPoles, dict, default=None Data power spectrum measurement: flat array (of all multipoles), :class:`lsstypes.Mesh2SpectrumPoles` instance, or path to such instances, or list of such objects (in which case the average of them is taken). If dict, parameters to be passed to theory to generate mock measurement. If a (list of) flat array, additionally provide list of multipoles ``ells`` and wavenumbers ``k``, and optionally ``shotnoise`` (see ``kwargs``). covariance : array, list, default=None 2D array, list of :class:`lsstypes.Mesh2SpectrumPoles` instance` instances, or paths to such instances; these are used to compute the covariance matrix. klim : dict, default=None Wavenumber limits: a dictionary mapping multipoles to (min separation, max separation, (optionally) step (float)), e.g. ``{0: (0.01, 0.2, 0.01), 2: (0.01, 0.15, 0.01)}``. If ``None``, no selection is applied for the given multipole. wmatrix : str, Path, lsstypes.WindowMatrix, WindowedPowerSpectrumMultipoles, default=None Optionally, window matrix. transform : array, default=None Transform to gaussianize the likelihood of the power spectrum. For 'cubic', see eq. 16 of https://arxiv.org/pdf/2302.07484.pdf. If ``None``, no transform is applied. **kwargs : dict Optional arguments for :class:`WindowedPowerSpectrumMultipoles`, e.g.: - theory: defaults to :class:`KaiserTracerPowerSpectrumMultipoles`. - shotnoise: take shot noise from ``data``, or ``covariance`` (mocks) if provided. - fiber_collisions - systematic_templates - if one only provided simple arrays for ``data`` and ``covariance``, one can provide the list of multipoles ``ells`` and the corresponding (list of) :math:`k` wavenumbers as a (list of) array ``k``, and optionally ``shotnoise``. """ name = 'spectrum2poles' def initialize(self, data=None, covariance=None, klim=None, wmatrix=None, transform=None, kedges=None, shotnoise=None, **kwargs): self.k, self.kedges, self.ells, self.shotnoise = None, kedges, None, None self.flatdata, self.mocks, self.covariance = None, None, None if not isinstance(data, dict): self.flatdata = self.load_data(data=data, klim=klim)[0] if self.mpicomm.bcast(_is_array(covariance) or isinstance(covariance, (ObservableCovariance, types.CovarianceMatrix)), root=0): self.covariance = self.mpicomm.bcast(covariance, root=0) else: self.mocks = self.load_data(data=covariance, klim=klim)[-1] if self.mpicomm.bcast(self.mocks is not None, root=0): covariance = None if self.mpicomm.rank == 0: covariance = np.cov(self.mocks, rowvar=False, ddof=1) self.covariance = self.mpicomm.bcast(covariance, root=0) self.wmatrix = wmatrix if not isinstance(wmatrix, WindowedPowerSpectrumMultipoles): self.wmatrix = WindowedPowerSpectrumMultipoles() if wmatrix is not None: self.wmatrix.init.update(wmatrix=wmatrix) if self.ells is not None: # set by data self.wmatrix.init.update(ells=self.ells) if self.kedges is not None: # set by data self.wmatrix.init.update(kedges=self.kedges) if self.k is not None: # set by data self.wmatrix.init.update(k=self.k) elif klim is not None: # FIXME: we do not want limits to apply to k (but mid-k) self.wmatrix.init.update(klim=klim) self.wmatrix.init.update(kwargs) if shotnoise is not None: self.shotnoise = shotnoise self.wmatrix.init.setdefault('shotnoise', self.shotnoise) #if self.shotnoise is None: self.shotnoise = 0. if self.flatdata is None: self.wmatrix(**data) self.flatdata = self.wmatrix.flatpower.copy() else: self.wmatrix.runtime_info.initialize() input_kedges = self.kedges is not None for name in ['k', 'ells', 'kedges']: setattr(self, name, getattr(self.wmatrix, name)) kmasklim = self.wmatrix.kmasklim if kmasklim is not None: # cut has been applied to input k cumsize = np.insert(np.cumsum([len(kk) for kk in kmasklim.values()]), 0, 0) data = [self.flatdata[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])] ells = list(kmasklim) self.flatdata = np.concatenate([data[ells.index(ell)][kmasklim[ell]] for ell in self.ells]) if isinstance(self.covariance, ObservableCovariance): warnings.warn('ObservableCovariance is now deprecated. Use lsstypes CovarianceMatrix') if input_kedges: x, method = [np.mean(edges, axis=-1) for edges in self.kedges], 'mid' else: x, method = list(self.k), 'mean' self.nobs = self.covariance.nobs self.covariance = self.covariance.xmatch(x=x, projs=list(self.ells), method=method).view(projs=list(self.ells)) elif isinstance(self.covariance, types.CovarianceMatrix): self.covariance = self.covariance.at.observable.match(self.to_lsstypes('data')).value() self.nobs = getattr(self.covariance, 'nobs', None) self.transform = transform allowed_transform = [None, 'cubic'] if self.transform not in allowed_transform: raise ValueError('transform must be one of {}'.format(allowed_transform)) def load_data(self, data=None, klim=None): def load_data(fn): fn = str(fn) if fn.endswith('.npy'): warnings.warn('Handling of *.npy files is deprecated. Switch to lsstypes format.') #with utils.LoggingContext(level='warning'): state = np.load(fn, allow_pickle=True)[()] if '_projs' in state: toret = ObservableArray.from_state(state) else: from pypower import MeshFFTPower, PowerSpectrumMultipoles toret = MeshFFTPower.from_state(state) if hasattr(toret, 'poles'): toret = toret.poles else: toret = PowerSpectrumMultipoles.from_state(state) toret = from_pypower(toret) else: toret = types.read(fn) return toret def lim_data(power, klim=klim): ells, list_k, list_kedges, list_data = [], [], [], [] if isinstance(power, ObservableArray): warnings.warn('Handling of ObservableArray is deprecated. Switch to lsstypes format.') shotnoise = power.attrs.get('shotnoise', None) if klim is None: klim = {ell: (0, np.inf) for ell in power.projs} for ell, lim in klim.items(): start, stop, *step = lim rebin = 1 if step and step[0] != 1: rebin = np.rint(step[0] / np.diff(power.edges(projs=ell)).mean()).astype(int) power_slice = power.copy().select(xlim=(start, stop), rebin=rebin, projs=ell) ells.append(ell) list_k.append(power_slice.x(projs=ell)) edges = power_slice.edges(projs=ell) edges = np.column_stack((edges[:-1], edges[1:])) list_kedges.append(edges) list_data.append(power_slice.view(projs=ell)) else: if not isinstance(power, types.ObservableTree): power = from_pypower(power) if klim is None: klim = {ell: (0, np.inf) for ell in power.ells} for ell, lim in klim.items(): start, stop, *step = lim rebin = 1 pole = power.get(ells=ell) if step and step[0] != 1: rebin = np.rint(step[0] / np.diff(pole.edges('k'), axis=-1).mean()).astype(int) pole = pole.select(k=slice(0, None, rebin)).select(k=tuple(lim[:2])) ells.append(ell) list_k.append(pole.coords('k')) list_kedges.append(pole.edges('k')) list_data.append(pole.value()) shotnoise = power.get(ells=0).values('shotnoise').mean() return list_k, list_kedges, tuple(ells), list_data, shotnoise def load_all(lmocks): list_mocks = [] for mocks in lmocks: if utils.is_path(mocks): gmocks = glob.glob(str(mocks)) # glob takes str if not gmocks: import warnings warnings.warn('file {} is not found'.format(mocks)) list_mocks += sorted(gmocks) else: list_mocks.append(mocks) fns = [mock for mock in list_mocks if utils.is_path(mock)] if len(fns): nfns = 5 if len(fns) < nfns: msg = 'Loading 1 file {}.'.format(fns) else: msg = 'Loading {:d} files [{}].'.format(len(fns), ', ..., '.join(fns[::len(fns) // nfns])) self.log_info(msg) list_y, list_shotnoise = [], [] for mock in list_mocks: if utils.is_path(mock): mock = load_data(mock) mock_k, mock_kedges, mock_ells, mock_y, mock_shotnoise = lim_data(mock) if self.k is None or self.kedges is None: self.k, self.kedges, self.ells = mock_k, mock_kedges, mock_ells if self.kedges is not None and mock_kedges is not None and not all(np.allclose(sk, mk, atol=0., rtol=1e-3) for sk, mk in zip(self.kedges, mock_kedges)): raise ValueError('{} does not have expected k-edges (based on previous data)'.format(mock)) if mock_ells != self.ells: raise ValueError('{} does not have expected poles (based on previous data)'.format(mock)) list_y.append(np.concatenate(mock_y)) if mock_shotnoise is not None: list_shotnoise.append(mock_shotnoise) return list_y, list_shotnoise flatdata, shotnoise, list_shotnoise, list_y = None, None, None, None if self.mpicomm.rank == 0 and data is not None: if not utils.is_sequence(data): data = [data] if any(_is_from_pypower(dd) or isinstance(dd, (ObservableArray, types.ObservableTree)) for dd in data): list_y, list_shotnoise = load_all(data) if not list_y: raise ValueError('no data/mocks could be obtained from {}'.format(data)) else: list_y = list(data) flatdata = np.mean(list_y, axis=0) if list_shotnoise: shotnoise = np.mean(list_shotnoise, axis=0) self.k, self.kedges, self.ells, flatdata, shotnoise = self.mpicomm.bcast((self.k, self.kedges, self.ells, flatdata, shotnoise) if self.mpicomm.rank == 0 else None, root=0) if self.shotnoise is None: self.shotnoise = shotnoise return flatdata, list_y
[docs] @plotting.plotter(interactive={'kw_theory': {'color': 'black', 'label': 'reference'}}) def plot(self, scaling='kpk', kpower=None, kw_theory=None, fig=None, figsize=None): """ Plot data and theory power spectrum multipoles. Parameters ---------- scaling : str, default='kpk' Either 'kpk' or 'loglog'. kpower : int or None, default=None If not None, will overwrite power of k suggested by `scaling` and will plot k**kpower * pk. kw_theory : list of dict, default=None Change the default line parametrization of the theory, one dictionary for each ell or duplicate it. fig : matplotlib.figure.Figure, default=None Optionally, a figure with at least ``1 + len(self.ells)`` axes. figsize : (width, height), default=None If not figure is passed, fix the size of the created figure. By default: fn : str, Path, default=None Optionally, path where to save figure. If not provided, figure is not saved. kw_save : dict, default=None Optionally, arguments for :meth:`matplotlib.figure.Figure.savefig`. show : bool, default=False If ``True``, show figure. interactive : bool, default=False If ``True``, use interactive interface provided by ipywidgets. Returns ------- fig : matplotlib.figure.Figure """ from matplotlib import pyplot as plt if kw_theory is None: kw_theory = {} if isinstance(kw_theory, dict): kw_theory = [kw_theory] if len(kw_theory) != len(self.ells): kw_theory = [{key: value for key, value in kw_theory[0].items() if (key != 'label') or (ill == 0)} for ill in range(len(self.ells))] kw_theory = [{'color': 'C{:d}'.format(ill), **kw} for ill, kw in enumerate(kw_theory)] if fig is None: height_ratios = [max(len(self.ells), 3)] + [1] * len(self.ells) figsize = (6, 1.5 * sum(height_ratios)) if figsize is None else figsize fig, lax = plt.subplots(len(height_ratios), sharex=True, sharey=False, gridspec_kw={'height_ratios': height_ratios}, figsize=figsize, squeeze=True) fig.subplots_adjust(hspace=0.1) show_legend = True else: lax = fig.axes show_legend = False data, theory, std = self.data, self.theory, self.std k_exp = 1 if scaling == 'kpk' else 0 if kpower is not None: k_exp = kpower for ill, ell in enumerate(self.ells): lax[0].errorbar(self.k[ill], self.k[ill]**k_exp * data[ill], yerr=self.k[ill]**k_exp * std[ill], color='C{:d}'.format(ill), linestyle='none', marker='o', label=r'$\ell = {:d}$'.format(ell)) lax[0].plot(self.k[ill], self.k[ill]**k_exp * theory[ill], **kw_theory[ill]) for ill, ell in enumerate(self.ells): lax[ill + 1].plot(self.k[ill], (data[ill] - theory[ill]) / std[ill], **kw_theory[ill]) lax[ill + 1].set_ylim(-4, 4) for offset in [-2., 2.]: lax[ill + 1].axhline(offset, color='k', linestyle='--') lax[ill + 1].set_ylabel(r'$\Delta P_{{{0:d}}} / \sigma_{{ P_{{{0:d}}} }}$'.format(ell)) for ax in lax: ax.grid(True) if show_legend: lax[0].legend() if scaling == 'kpk': lax[0].set_ylabel(r'$k P_{\ell}(k)$ [$(\mathrm{Mpc}/h)^{2}$]') if scaling == 'loglog': lax[0].set_ylabel(r'$P_{\ell}(k)$ [$(\mathrm{Mpc}/h)^{3}$]') lax[0].set_yscale('log') lax[0].set_xscale('log') lax[-1].set_xlabel(r'$k$ [$h/\mathrm{Mpc}$]') return fig
[docs] @plotting.plotter def plot_bao(self, fig=None): """ Plot data and theory BAO power spectrum wiggles. Parameters ---------- fig : matplotlib.figure.Figure, default=None Optionally, a figure with at least ``len(self.ells)`` axes. fn : str, Path, default=None Optionally, path where to save figure. If not provided, figure is not saved. kw_save : dict, default=None Optionally, arguments for :meth:`matplotlib.figure.Figure.savefig`. show : bool, default=False If ``True``, show figure. Returns ------- fig : matplotlib.figure.Figure """ from matplotlib import pyplot as plt if fig is None: height_ratios = [1] * len(self.ells) figsize = (6, 2 * sum(height_ratios)) fig, lax = plt.subplots(len(height_ratios), sharex=True, sharey=False, gridspec_kw={'height_ratios': height_ratios}, figsize=figsize, squeeze=False) lax = lax.ravel() # in case only one ell fig.subplots_adjust(hspace=0) else: lax = fig.axes data, theory, std = self.data, self.theory, self.std nobao = self.theory_nobao for ill, ell in enumerate(self.ells): lax[ill].errorbar(self.k[ill], self.k[ill] * (data[ill] - nobao[ill]), yerr=self.k[ill] * std[ill], color='C{:d}'.format(ill), linestyle='none', marker='o') lax[ill].plot(self.k[ill], self.k[ill] * (theory[ill] - nobao[ill]), color='C{:d}'.format(ill)) lax[ill].set_ylabel(r'$k \Delta P_{{{:d}}}(k)$ [$(\mathrm{{Mpc}}/h)^{{2}}$]'.format(ell)) for ax in lax: ax.grid(True) lax[-1].set_xlabel(r'$k$ [$h/\mathrm{Mpc}$]') return fig
def plot_wiggles(self, *args, **kwargs): import warnings warnings.warn('plot_wiggles is deprecated, use plot_bao instead') self.plot_bao(*args, **kwargs)
[docs] @plotting.plotter def plot_covariance_matrix(self, corrcoef=True, **kwargs): """ Plot covariance matrix. Parameters ---------- corrcoef : bool, default=True If ``True``, plot the correlation matrix; else the covariance. barlabel : str, default=None Optionally, label for the color bar. figsize : int, tuple, default=None Optionally, figure size. norm : matplotlib.colors.Normalize, default=None Scales the covariance / correlation to the canonical colormap range [0, 1] for mapping to colors. By default, the covariance / correlation range is mapped to the color bar range using linear scaling. labelsize : int, default=None Optionally, size for labels. fig : matplotlib.figure.Figure, default=None Optionally, a figure with at least ``len(self.ells) * len(self.ells)`` axes. Returns ------- fig : matplotlib.figure.Figure """ covariance = self.to_lsstypes('covariance') return covariance.plot(corrcoef=corrcoef, **kwargs)
def calculate(self): self.flattheory = self.wmatrix.flatpower if self.transform == 'cubic': # See eq. 16 of https://arxiv.org/pdf/2302.07484.pdf self.flattheory = (3. * (self.flattheory / self.flatdata)**(1. / 3.) - 2.) * self.flatdata @property def theory(self): return self.wmatrix.power @property def theory_nobao(self): template = self.wmatrix.theory.template only_now = template.only_now template.only_now = True def callback(calculator): all_requires.append(calculator) for require in calculator.runtime_info.requires: if require in all_requires: del all_requires[all_requires.index(require)] # we want first dependencies at the end callback(require) all_requires = [] callback(self) all_requires = all_requires[::-1] for calculator in all_requires: calculator.runtime_info.tocalculate = True calculator.runtime_info.calculate(calculator.runtime_info.input_values) nobao = self.theory template.only_now = only_now for calculator in all_requires: calculator.runtime_info.tocalculate = True calculator.runtime_info.calculate(calculator.runtime_info.input_values) return nobao @property def data(self): cumsize = np.insert(np.cumsum([len(k) for k in self.k]), 0, 0) return [self.flatdata[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])] @property def std(self): cumsize = np.insert(np.cumsum([len(k) for k in self.k]), 0, 0) diag = np.diag(self.covariance)**0.5 return [diag[start:stop] for start, stop in zip(cumsize[:-1], cumsize[1:])] def __getstate__(self): state = {} for name in ['k', 'kedges', 'ells', 'flatdata', 'flattheory', 'shotnoise']: if hasattr(self, name): state[name] = getattr(self, name) return state @classmethod def install(cls, config): config.pip('git+https://github.com/adematti/lsstypes')
[docs] def to_lsstypes(self, kind): """Return data or covariance.""" shotnoise = self.shotnoise if self.shotnoise is not None else 0 data = [types.Mesh2SpectrumPole(k=self.k[ill], k_edges=self.kedges[ill], num_raw=self.data[ill] + shotnoise * (ell == 0), num_shotnoise=shotnoise * (ell == 0) * np.ones_like(self.data[ill]), ell=ell) for ill, ell in enumerate(self.ells)] data = types.Mesh2SpectrumPoles(data) if kind == 'data': return data if kind == 'covariance': return types.CovarianceMatrix(observable=data, value=self.covariance) raise NotImplementedError(f'kind {kind} not recognized')
def to_array(self): warnings.warn('to_array is deprecated. Please use to_lsstypes') from desilike.observables import ObservableArray kedges = [np.append(edges[:, 0], edges[-1, 1]) for edges in self.kedges] return ObservableArray(x=self.k, edges=kedges, value=self.data, projs=self.ells, attrs={'shotnoise': self.shotnoise}, name=self.__class__.__name__)