"""Base class for profilers."""
# TODO: expand functionality such as warm starts
import json
from functools import partial
from pathlib import Path
import numpy as np
from desilike import Samples
from desilike.pool import MPIPool
from desilike.utils import BaseClass
from .optimizers import scipy_dual_annealing
[docs]
class Profiler(BaseClass):
"""Profiler used to compute likelihood and posterior profiles."""
def __init__(self, likelihood, posterior=True, rng=None, directory=None):
"""Initialize the profiler.
Parameters
----------
likelihood : BaseLikelihood
Likelihood to profile.
posterior : bool, optional
If ``True``, profile the posterior. Otherwise, profile the
likelihood. Default is ``True``.
rng : numpy.random.Generator, int or None, optional
Random number generator. Default is ``None``.
directory : str, Path, or None, optional
Save samples to this folder. Default is ``None``.
"""
self.likelihood = likelihood
if posterior:
self.neg_cost_key = 'log_posterior'
else:
self.neg_cost_key = 'log_likelihood'
self.params = likelihood.varied_params.names()
self.limits = {param.name: (param.limits[0], param.limits[1]) for param
in likelihood.varied_params}
self.pool = MPIPool()
for name in ['_cost_function', '_run_optimizer']:
setattr(self, name, self.pool.cache_function(
getattr(self, name), name))
if directory is not None:
directory = Path(directory)
if directory.suffix:
raise ValueError("The directory cannot have a suffix.")
if self.pool.main:
directory.mkdir(parents=True, exist_ok=True)
self.directory = directory
if self.directory is not None:
try:
self._load()
except FileNotFoundError:
pass
if not hasattr(self, 'samples'):
self.samples = Samples()
if not hasattr(self, 'rng'):
if isinstance(rng, int) or rng is None:
rng = np.random.default_rng(seed=rng)
self.rng = rng
def _save(self):
"""Save all results to disk."""
if self.pool.main:
self.samples.save(self.directory / 'samples.npz')
with open(self.directory / 'rng.json', 'w') as fstream:
json.dump(self.rng.bit_generator.state, fstream)
def _load(self):
"""Load internal calculations from disk."""
if self.pool.main:
self.samples = Samples.load(self.directory / 'samples.npz')
with open(self.directory / 'rng.json', 'r') as fstream:
self.rng = np.random.default_rng()
self.rng.bit_generator.state = json.load(fstream)
def _add_samples(self, samples):
"""Add samples to profile."""
samples[self.neg_cost_key] = -np.inf
self.samples.append(samples)
# Remove duplicate parameter combinations. Use a complex number as a
# placeholder for optimized parameters since np.nan is not treated
# as equal if present in arrays.
x = np.column_stack([np.where(
self.samples.get_flag('optimize', param), 1j,
self.samples[param]) for param in self.params])
self.samples = self.samples[np.unique(x, axis=0, return_index=True)[1]]
# Get a list of dictionaries of fixed parameters.
self.fixed_params = []
for i in range(len(self.samples)):
self.fixed_params.append(dict())
for param in self.params:
if not self.samples.get_flag('optimize', param)[i]:
self.fixed_params[i][param] = self.samples[i][param]
[docs]
def add_single_sample(self, sample):
"""Add a parameter combination to optimize.
Parameters
----------
sample : dict
Single parameter combination to profile.
Raises
------
ValueError
If a parameter is not described in the likelihood.
"""
for param in sample.keys():
if param not in self.params:
msg = f"Unkown parameter '{param}'."
raise ValueError(msg)
samples = Samples(**{key: [value, ] for key, value in sample.items()})
for param in self.params:
if param not in sample.keys():
samples[param] = [np.nan, ]
samples.set_flag('optimize', param, True)
else:
samples.set_flag('optimize', param, False)
self._add_samples(samples)
[docs]
def add_optimize_all(self):
"""Add finding the global optimum."""
self.add_single_sample({})
[docs]
def add_manual_grid(self, grid):
"""Manually add parameter grid to optimize.
Parameters
----------
grid : dict
Parameter grid to profile, i.e., ``dict(a=[0, 1, 2])`` implies
that the maximum likelihood is found for :math:`a=0`, :math:`a=1`,
and :math:`a=2`. If multiple parameters are specified, all
combinations are profiled.
Raises
------
ValueError
If no parameter is specificed or a parameter is not described in
the likelihood.
"""
if not grid:
msg = "You must specify at least one parameter."
raise ValueError(msg)
for param in grid.keys():
if param not in self.params:
msg = f"Unkown parameter '{param}'."
raise ValueError(msg)
# Get all combinations.
samples = dict(zip(grid.keys(), np.meshgrid(*grid.values())))
samples = {key: value.flatten() for key, value in samples.items()}
samples = Samples(**samples)
for param in self.params:
if param not in samples.params:
samples[param] = np.nan
samples.set_flag('optimize', param, True)
else:
samples.set_flag('optimize', param, False)
self._add_samples(samples)
def _vector_to_params(self, vector, index):
"""Convert an array of varied parameters to a (complete) dictionary.
Parameters
----------
vector : numpy.ndarray
Array of varied parameters normalized to [0, 1].
index : int
Index of the fixed parameters.
Returns
-------
params : dict
Dictionary including (not normalized) varied and fixed parameters.
Raises
------
ValueError
If ``vector`` has the wrong length.
"""
if len(vector) != len(self.params) - len(self.fixed_params[index]):
msg = "Incorrect number of parameters."
raise ValueError(msg)
varied_params = [p for p in self.params if p not in
self.fixed_params[index].keys()]
vector = vector.copy()
for i, key in enumerate(varied_params):
vector[i] = (
vector[i] * (self.limits[key][1] - self.limits[key][0]) +
self.limits[key][0])
return dict(zip(varied_params, vector)) | self.fixed_params[index]
def _cost_function(self, params, index=0):
"""Cost function to optimize.
Parameters
----------
params : numpy.ndarray or dict
Array of varied parameters normalized to [0, 1]. Alternatively,
can be a dictionary listing all parameters.
index : int, optional
Index of the fixed parameters.
Returns
-------
float
Cost function value.
"""
if not isinstance(params, dict):
params = self._vector_to_params(params, index=index)
if self.neg_cost_key == 'log_likelihood':
return - (self.likelihood(params) -
self.likelihood.all_params.prior(**params))
return - self.likelihood(params)
def _get_start(self, n, max_init_attempts=100):
"""Generate cold-start samples.
This should only be called by the main process while the others are
waiting.
Parameters
----------
max_init_attempts: int, optional
Maximum number of attempts to initialize each sample. Default is
100.
Returns
-------
index : numpy.ndarray
Indices corresponding to the sample.
x_0 : list of numpy.ndarray
Starting positions.
Raises
------
ValueError
If a finite cost function value cannot be found for all samples
after ``max_init_attempts``.
"""
index = np.repeat(np.arange(len(self.samples)), n)
x_0 = [None] * len(index)
cost = np.repeat(np.inf, len(x_0))
for _ in range(max_init_attempts):
for i in range(len(x_0)):
if np.isfinite(cost[i]):
pass
n_free = len(self.params) - len(self.fixed_params[index[i]])
x_0[i] = self.rng.uniform(size=n_free)
args = [self._vector_to_params(x, i) for i, x, c in zip(
index, x_0, cost) if not np.isfinite(c)]
new_cost = self.pool.map(self._cost_function, args)
cost[~np.isfinite(cost)] = new_cost
if np.all(np.isfinite(cost)):
break
if not np.all(np.isfinite(cost)):
msg = ("Could not find finite likelihood/posterior after "
f"{max_init_attempts:d} attempts.")
raise ValueError(msg)
return index, x_0
def _run_optimizer(self, optimizer, args, **kwargs):
index, x_0, rng = args
cost_function = partial(self._cost_function, index=index)
if len(x_0) == 0:
return x_0, cost_function(x_0), True
return optimizer(cost_function, x_0, rng, **kwargs)
[docs]
def run(self, n_per_iter=10, max_iter=10, tol=1e-3, warm_start=False,
max_init_attempts=100, optimizer=scipy_dual_annealing,
optimizer_kwargs=None):
"""Run the profiler.
Parameters
----------
n_per_iter : int, optional
Independent optimizations per sample at each iteration. Default is
10.
max_iter : int, optional
Maximum number of iterations. Default is 10.
tol : float, optional
Optimization stops if maximum improvement accross all samples
drops below ``tol`` between optimizations. Default is 1e-2.
warm_start : bool, optional
If True, starting positions are derived from interpolating
previous points. This can only be done if the profiler was
run with ``warm_start=False`` before.
max_init_attempts: int, optional
Maximum number of attempts to initialize each sample. Default is
100.
optimizer : callable, optional
Optimizer function from ``desilike.profilers.optimizers``. Default
is ``desilike.profilers.scipy_dual_annealing``.
optimizer_kwargs : dict, optional
Optional keyword arguments passed to the optimizer. Default is
``None``.
Raises
------
ValueError
If trying to run the profiler without having added samples.
Returns
-------
samples : desilike.statistics.samples.Samples
Maxima found by the profiler.
"""
if len(self.samples) == 0:
raise ValueError("Cannot run profiler without samples.")
if optimizer_kwargs is None:
optimizer_kwargs = {}
run_optimizer = partial(self._run_optimizer, optimizer,
**optimizer_kwargs)
if self.pool.main:
for _ in range(max_iter):
index, x_0 = self._get_start(
n_per_iter, max_init_attempts=max_init_attempts)
result = self.pool.map(
run_optimizer, zip(index, x_0, self.rng.spawn(len(x_0))))
impr = np.zeros(len(self.samples))
for i, (x_min, f_min, success) in zip(index, result):
if f_min < -self.samples[self.neg_cost_key][i]:
impr[i] = -self.samples[self.neg_cost_key][i] - f_min
params = self.samples[i]
params.update(self._vector_to_params(x_min, i))
params[self.neg_cost_key] = -f_min
self.samples[i] = params
if self.directory is not None:
self._save()
if np.amax(impr) < tol:
break
self.pool.stop_wait()
else:
self.pool.wait()
return self.pool.bcast(self.samples)