# coding=utf-8
from __future__ import division
import numpy as np, sys
from astropy.table import Table
from .galaxy_parameters import GalaxyParameters
from .hyperspectral_cube import HyperspectralCube as HyperCube
from .math_utils import merge_where_nan, median_clip, safe_exp
[docs]class MCMC:
"""
An extension of the galpak class with the galpak MH algorithm (myMCMC class)
and method used for emcee and external algorithm
the following methods are for external samplers such as emcee:
- lnprob: returned by __call__ / the full log-probability function: Ln = Priors x L(p)
- loglike: the log L(p)
- logpriors: the log priors with uniform priors.
the following methods are for multinest:
- pyloglike: the log L(p)
- pycube: transforms unit cube to uniform priors.
the following methods are for dynesty:
- loglike: the log L(p)
- ptforms: transforms unit cube to uniform priors.
the following methods are for the internal galpak MH algorithm
- myMCMC
"""
CHISTAT_VALID = ['gaussian', 'Mighell', 'Cstat', 'Neyman', 'Pearson']
SAMPLING_VALID = ['Cauchy', 'Normal', 'AdaptiveCauchy',\
'DE', 'Snooker', 'walkers', 'walkersCauchy']
MCMC_VALID = ['galpak', 'emcee_MH', 'emcee_walkers', 'dynesty', 'multinest']
def __call__(self, x):
"""
call for instance ()
used for emcee
"""
return self.lnprob(x)
#For EMCEE
def lnprob(self, params):
"""
the full log-probability function: Ln = Priors x L(p)
"""
#convert ndarray to Parameter class {for emcee}
if not isinstance(params, GalaxyParameters):
if isinstance(params, np.ndarray):
params = self.model.Parameters().from_ndarray(params)
else:
self.logger.error("Param is not ModelParameter nor a ndarray")
lp = self.logpriors(params)
if not np.isfinite(lp):
return -np.inf
return lp+self.loglike(params)
def loglike(self, params):
"""
The log-likelihood function.
log L ~ log Pi_i [1/Vi^0.5 exp(-0.5 (xi-m)^/Vi]
log L ~ -0.5 * sum_i Vi - 0.5 sum_i (xi-m)^2/Vi
"""
#return -0.5*self.compute_chi(params)
#adding a constant
return -0.5*self.compute_chi(params)
def logpriors(self, params):
"""
Define uniform prior.
Make sure we're inside the preset boundaries
use gaussian priors if provided
:param params:
:return:
"""
if params is None:
self.logger.error("Param is None " )
in_boundaries = (params >= self.min_boundaries) * (params <= self.max_boundaries)
if (in_boundaries==False).as_vector().any():
return -np.inf
else:
self.model.sanitize_parameters(params)
if self.gprior_parameters is None :
return 0.0
else:
mu = self.gprior_parameters[0]
sig= self.gprior_parameters[1]
# Gprior = np.array(stats.norm.logpdf(params, loc=mu, scale=sig) * has_gaussian_prior)
Gprior = np.array(-0.5 * (mu-params)**2/sig**2)
return np.nansum(Gprior)
#for PyMultinest
def pyloglike(self, cube, ndim, nparams):
"""
wrap for pymultinest
"""
vect = [cube[i] for i in range(ndim)]
params = self.model.Parameters().from_ndarray(vect)
self.model.sanitize_parameters(params)
return self.loglike(params)
#return self.loglike(params)
def pycube(self, cube, ndim, nparams):
"""
Transforms samples `u` drawn from the unit cube to samples to those
from our uniform prior
for pymultinest
"""
if self.gprior_parameters is not None:
mu = self.gprior_parameters[0]
sig = self.gprior_parameters[1]
has_gaussian_prior = np.isfinite(mu)
else:
has_gaussian_prior = np.ones_like(self.initial_parameters)==0
for i in range(ndim):
if has_gaussian_prior[i]:
#cube[i] = stats.norm(loc=mu[i], scale=sig[i]).ppf(cube[i])
cube[i] = Phi_ppf(cube[i], loc=mu[i], scale=sig[i])
else:
cube[i] = (self.max_boundaries - self.min_boundaries)[i] * cube[i] + self.min_boundaries[i]
#for ultranest
#For Dynesty
def ptform(self, u):
"""Transforms samples `u` drawn from the unit cube to samples to those
from our uniform prior
for Dynesty
"""
return (self.max_boundaries - self.min_boundaries) * u + self.min_boundaries
def compute_chi(self, params):
# Sanitize the parameters
#
if not isinstance(params, GalaxyParameters):
if isinstance(params, np.ndarray):
params = self.model.Parameters().from_ndarray(params)
self.model.sanitize_parameters(params) # to deal with circularity
#dim_d = np.size(self.cube.data)
ones = self.cube**2/self.variance_cube #
dim_d = np.isfinite(ones.data).sum() #count only data with finite values
dim_p_free = dim_p = len(params)
#already taken care
#if self.model.flux_profile is not 'sersic':
# dim_p_free = dim_p -1
if self.known_parameters is not None:
fixed = np.where(np.isfinite(self.known_parameters),1,0).sum()
else:
_snr = self.galaxy.stdev / np.abs(self.galaxy)
fixed = np.where(_snr<1e-6,1,0).sum()
self.dim_p_free = dim_p_free - fixed
self.Ndegree = (dim_d - self.dim_p_free -1)
return self.chisq(self.chi_stat)(params)
def chisq(self, chi_stat):
# Chi2 satistics, default
if (chi_stat == 'gaussian'):
compute_chi_fct = lambda g: \
np.nansum(self._compute_error_gauss(g)
)
# https://heasarc.gsfc.nasa.gov/xanadu/xspec/manual/XSappendixStatistics.html
# Mighell http://adsabs.harvard.edu/abs/1999ApJ...518..380M
# Humphrey 2009, http://adsabs.harvard.edu/abs/2009ApJ...693..822H
# Mighell
elif (chi_stat == 'Mighell'):
compute_chi_fct = lambda g: \
np.nansum(self._compute_error_Mighell(g)
)
# C-statistique
elif (chi_stat == 'Cstat'):
compute_chi_fct = lambda g: \
np.nansum(self._compute_error_cstat(g)
)
# Neyman
elif (chi_stat == 'Neyman'):
compute_chi_fct = lambda g: \
np.nansum(self._compute_error_Neyman(g)
)
# Pearson
elif (chi_stat == 'Pearson'):
compute_chi_fct = lambda g: \
np.nansum(self._compute_error_Pearson(g)
)
else:
raise Exception("chi_stat is not Valid, should be %s" %(str(self.CHISTAT_VALID)))
#returns function
return compute_chi_fct
def myMCMC(self, max_iterations, random_amplitude, sampling_method='Cauchy', min_acceptance_rate=10 ):
"""
:param max_iterations:
:param random_amplitude:
:param sampling_method:
'Cauchy' [default] | 'Normal' | 'AdaptiveCauchy'
:param min_acceptance_rate: float [defailt = 5]
In %, the acceptance rate is the number of useful iterations divided
by the total number of iterations.
If it gets below the `failure_rate` treshold, iterations stop.
:return:
"""
self.logger.info("Random amplitude : %s", str(random_amplitude))
# Some useful vars
count_alpha_ok = 0
count_min = 0
total_iterations = 0
ki_min = sys.maxsize # that way, it can only go down
galaxy = self.initial_parameters
galaxy_old = galaxy.copy()
galaxy_new = galaxy_old.copy()
#chi_old = self.compute_chi(galaxy)
lnprob_old = self.lnprob(galaxy)
self.logger.critical("Running GalPak MHSampler with Sampling %s" % (sampling_method))
names_param = list(galaxy.names)
names_param.append('reduced_chi')
# names_param = [
# 'x', 'y', 'z', 'flux', 'radius', 'inclination', 'pa',
# 'turnover_radius', 'maximum_velocity', 'velocity_dispersion',
# 'reduced_chi',
# ]
# types_param = ['float32' for i in range(dim_p + 1)]
#dt = zip(names_param, types_param)
#chain = np.zeros(max_iterations, dtype=dt) # intensive operation !
# chain = Table(names=names_param,dtype=types_param)
self.chain_rows = []
rate = 100.
while (rate > min_acceptance_rate) and (count_alpha_ok < max_iterations):
# Update loop conditions
total_iterations += 1
# Cauchy jumping
galaxy_new = self._sampling_method(sampling_method, galaxy_old, random_amplitude)
#new way: from emceee
#newlnprob = self.lnprob(galaxy_new)
#diff = 0.5 * (newlnprob - lnprob)
#if diff < 0:
# diff = np.exp(diff) - np.random.rand()
#if diff > 0:
# lnprob = newlnprob
# galaxy = galaxy_new
# Make sure we're inside the preset boundaries
in_boundaries = (galaxy_new >= self.min_boundaries) * (galaxy_new <= self.max_boundaries)
if in_boundaries.all():
# Computing convolved model with new parameters
chi_new = self.compute_chi(galaxy_new)
if chi_new<1e-3:
#print chi_new,galaxy_new
self.logger.error('Chi2 is 0, quit')
exit()
lnprob_new = self.lnprob(galaxy_new)
# Compute ratio of likelihood a posteriori
# exp( old-new / 2 )
#likelihood = safe_exp(-0.5 * (chi_new - chi_old) )
likelihood = safe_exp(lnprob_new - lnprob_old)
# likelihood > 1 => new is better
# likelihood < 1 => accept or reject
alpha = np.random.rand()
# Conservation test
if alpha < likelihood:
count_alpha_ok += 1
rate = count_alpha_ok * 100. / total_iterations
# Save minimum
if ki_min > chi_new:
ki_min = chi_new
count_min = count_alpha_ok
galaxy_old = galaxy_new
#chi_old = chi_new
lnprob_old = lnprob_new
# vect = np.append(galaxy_new, chi_new / self.Ndegree)
#chain[count_alpha_ok - 1] = np.float32(vect)
# chain.add_row(vect)
self.chain_rows.append(list(galaxy_new) +
[chi_new / self.Ndegree])
if self.verbose:
# Check that parameters are not too close to the boundaries
too_close_to_max = np.abs((galaxy_new - self.max_boundaries) / self.max_boundaries) < self.eps
too_close_to_min = np.abs((galaxy_new - self.min_boundaries) / self.min_boundaries) < self.eps
too_close_to_boundaries = too_close_to_min + too_close_to_max
#
info = "{count:5d} MIN={count_min:5d} {rate:2.0f}% " \
"χ²={ki:3.6f}>{ki_min:3.6f} {params:s}"
info = info.format(
count=count_alpha_ok,
count_min=count_min,
ki=chi_new / self.Ndegree,
ki_min=ki_min / self.Ndegree,
rate=rate,
params=galaxy_new.colored_info(too_close_to_boundaries)
)
if self.model.redshift is not None:
mdyn = self.model.compute_Mdyn_at_Rhalf(galaxy_new)
info += "log_mdyn={mass:.2f} "
info = info.format(mass=np.log10(mdyn))
info += "lnlog={lnlog:6.4f} "
info = info.format(lnlog=self.loglike(galaxy_new))
print(info)
if len(self.chain_rows)>0:
chain = Table(names=names_param, rows=self.chain_rows)
else:
raise Exception("No useful iterations")
# Raise if no useful iteration was run
if rate == 0.:
self.logger.debug("Last Galaxy Parameters : %s", galaxy_new)
raise RuntimeError("No useful iteration was run. "
"Try with higher max_iterations?")
# Report
self.logger.info("Iterations report : %d Total, %d OK, %d%% Rate",
total_iterations, count_alpha_ok, rate)
self.logger.info("Storing results as parameters...")
# Acceptance Rate
self.acceptance_rate = rate
self.logger.info("self.acceptance_rate : useful iterations count / total iterations count : %s " % (self.acceptance_rate) )
return chain
#PRIVATE METHODS
def _sampling_method(self, sampling, params, scale):
"""
set proposal distribution with sampling
sampling: 'Cauchy' [default] | 'Normal'
Cauchy: Lorentz proposal
Normal: Gaussian proposal
"""
if sampling == 'Cauchy':
random_uniforms = np.random.uniform(-np.pi / 2., np.pi / 2., size=len(params))
galaxy_new = params + scale * np.tan(random_uniforms)
elif sampling == 'Normal':
galaxy_new = np.random.normal(params, scale, size=len(params))
elif sampling == 'AdaptiveCauchy':
random_uniforms = np.random.uniform(-np.pi / 2., np.pi / 2., size=len(params))
#@fixme should be covariance
if len(self.chain_rows) > 2500:
scale = np.sqrt(np.std(np.array(self.chain_rows)[-1500:, :-1], axis=0)) ** 2 / len(params)
elif len(self.chain_rows)>1500:
scale = np.sqrt(np.std(np.array(self.chain_rows)[-750:,:-1], axis=0) )**2 / len(params)
elif len(self.chain_rows) > 500:
scale = np.sqrt(np.std(np.array(self.chain_rows)[-250:, :-1], axis=0) * 0.8 ) ** 2 / len(params)
elif len(self.chain_rows) > 50:
scale = np.sqrt(np.std(np.array(self.chain_rows)[-50:, :-1], axis=0) * 0.5) ** 2 / len(params)
galaxy_new = params + scale * np.tan(random_uniforms)
else:
raise Exception("Not implemented. Should be %s" %(self.SAMPLING_VALID))
return galaxy_new
def _init_sampling_scale(self, random_scale, should_guess_flags):
dim_d = np.size(self.cube.data)
dim_p = len(self.galaxy)
# Tweak the random amplitude vector (Kp coeff, as pid)
# that we can document Model.setup_random_amplitude() adequately
random_amplitude = np.sqrt(
(self.min_boundaries - self.max_boundaries) ** 2 / 12.
) * dim_p / dim_d
# Let the model adjust the random amplitude of the parameter jump
self.model.setup_random_amplitude(random_amplitude)
# Scale MCMC if needed // allowing vectors
if random_scale is not None:
if np.size(random_scale) != 1:
merge_where_nan(random_scale, np.ones_like(random_amplitude))
random_amplitude = random_amplitude * random_scale
# Zero random amplitude where parameters are known
random_amplitude = random_amplitude * should_guess_flags
return random_amplitude
def _compute_error_gauss(self, galaxy):
"""
It computes the difference between the measured cube
and the computed cube from given galaxy parameters.
returns ( D - M ) / stdev
"""
# Compute convolved cube for given galaxy parameters
cube_convolved = self.create_convolved_cube(galaxy, self.cube.shape)
# Diff. between measured cube and convolved cube, scaled by the error
self.variance_chi = self.variance_cube
difference = (self.cube - cube_convolved)**2 / self.variance_chi
return np.ndarray.flatten(difference.data)
def _compute_error_Mighell(self, galaxy):
"""
Modified Statistic Mighell 1998
Sum ( D + min(1,D) - M)^2 / D + 1
"""
# Compute convolved cube for given galaxy parameters
cube_convolved = self.create_convolved_cube(galaxy, self.cube.shape)
tmp_cube = HyperCube(np.where(self.cube.data >= 1, 1., self.cube.data))
self.variance_chi = self.cube + 1.0
difference = (self.cube + tmp_cube - cube_convolved ) **2 \
/ self.variance_chi
return np.ndarray.flatten(difference.data)
def _compute_error_Neyman(self, galaxy):
"""
Modified Neyman statistic
Humphrey 2009, http://adsabs.harvard.edu/abs/2009ApJ...693..822H
Sum ( M - D )^2 / max(D,1)
"""
# Compute convolved cube for given galaxy parameters
cube_convolved = self.create_convolved_cube(galaxy, self.cube.shape)
self.variance_chi = HyperCube(np.where(self.cube.data <= 1, 1., self.cube.data))
difference = (cube_convolved - self.cube)**2 / self.variance_chi
return np.ndarray.flatten(difference.data)
def _compute_error_Pearson(self, galaxy):
"""
Pearson statistic
Humphrey 2009, http://adsabs.harvard.edu/abs/2009ApJ...693..822H
Sum ( M - D )^2 / M
"""
# Compute convolved cube for given galaxy parameters
cube_convolved = self.create_convolved_cube(galaxy, self.cube.shape)
difference = (cube_convolved - self.cube)**2 / (cube_convolved)
self.variance_chi = cube_convolved
return np.ndarray.flatten(difference.data)
def _compute_error_cstat(self, galaxy):
"""
Cash statistique for Poisson noise
Humphrey 2009, http://adsabs.harvard.edu/abs/2009ApJ...693..822H
Sum ( M - D + D * log(D/M) )
"""
# Compute convolved cube for given galaxy parameters
cube_convolved = self.create_convolved_cube(galaxy, self.cube.shape)
tmp_cube = np.log(self.cube.data/cube_convolved)
difference = (cube_convolved - self.cube) + self.cube * tmp_cube
self.variance_chi = np.ones_like(self.cube.data)
return np.ndarray.flatten(difference.data)
#########################
######EMCEE 3.0
########################
try:
from emcee.moves import MHMove
class CauchyMove(MHMove):
"""A Metropolis step with a Gaussian proposal function.
Args:
cov: The covariance of the proposal function. This can be a scalar,
vector, or matrix and the proposal will be assumed isotropic,
axis-aligned, or general respectively.
mode (Optional): Select the method used for updating parameters. This
can be one of ``"vector"``, ``"random"``, or ``"sequential"``. The
``"vector"`` mode updates all dimensions simultaneously,
``"random"`` randomly selects a dimension and only updates that
one, and ``"sequential"`` loops over dimensions and updates each
one in turn.
factor (Optional[float]): If provided the proposal will be made with a
standard deviation uniformly selected from the range
``exp(U(-log(factor), log(factor))) * cov``. This is invalid for
the ``"vector"`` mode.
Raises:
ValueError: If the proposal dimensions are invalid or if any of any of
the other arguments are inconsistent.
"""
def __init__(self, cov, mode="vector", factor=None):
# Parse the proposal type.
try:
float(cov)
except TypeError:
cov = np.atleast_1d(cov)
if len(cov.shape) == 1:
# A diagonal proposal was given.
ndim = len(cov)
proposal = _diagonal_proposal(np.sqrt(cov), factor, mode)
elif len(cov.shape) == 2 and cov.shape[0] == cov.shape[1]:
# The full, square covariance matrix was given.
ndim = cov.shape[0]
proposal = _proposal(cov, factor, mode)
else:
raise ValueError("Invalid proposal scale dimensions")
else:
# This was a scalar proposal.
ndim = None
proposal = _isotropic_proposal(np.sqrt(cov), factor, mode)
super(CauchyMove, self).__init__(proposal, ndim=ndim)
class _isotropic_proposal(object):
allowed_modes = ["vector", "random", "sequential"]
def __init__(self, scale, factor, mode):
self.index = 0
self.scale = scale
if factor is None:
self._log_factor = None
else:
if factor < 1.0:
raise ValueError("'factor' must be >= 1.0")
self._log_factor = np.log(factor)
if mode not in self.allowed_modes:
raise ValueError(("'{0}' is not a recognized mode. "
"Please select from: {1}")
.format(mode, self.allowed_modes))
self.mode = mode
def get_factor(self, rng):
if self._log_factor is None:
return 1.0
return np.exp(rng.uniform(-self._log_factor, self._log_factor))
def get_updated_vector(self, rng, x0):
return x0 + self.get_factor(rng) * self.scale * rng.randn(*(x0.shape))
def __call__(self, x0, rng):
nw, nd = x0.shape
xnew = self.get_updated_vector(rng, x0)
if self.mode == "random":
m = (range(nw), rng.randint(x0.shape[-1], size=nw))
elif self.mode == "sequential":
m = (range(nw), self.index % nd + np.zeros(nw, dtype=int))
self.index = (self.index + 1) % nd
else:
return xnew, np.zeros(nw)
x = np.array(x0)
x[m] = xnew[m]
return x, np.zeros(nw)
class _diagonal_proposal(_isotropic_proposal):
def get_updated_vector(self, rng, x0):
random_uniforms = np.random.uniform(-np.pi / 2., np.pi / 2., size=len(self.scale))
return x0 + self.get_factor(rng) * self.scale * np.tan(random_uniforms)
#return x0 + self.get_factor(rng) * self.scale * rng.randn(*(x0.shape))
class _proposal(_isotropic_proposal):
allowed_modes = ["vector"]
def get_updated_vector(self, rng, x0):
random_uniforms = np.random.uniform(-np.pi / 2., np.pi / 2., size=len(self.scale))
return x0 + self.get_factor(rng) * self.scale * np.tan(random_uniforms)
#return x0 + self.get_factor(rng) * rng.multivariate_normal(
# np.zeros(len(self.scale)), self.scale)
except:
pass