Source code for breads.fit

from copy import copy

import matplotlib.pyplot as plt
import numpy as np
from scipy.optimize import lsq_linear
from scipy.special import loggamma

__all__ =  ('fitfm', 'log_prob', 'combined_log_prob', 'nlog_prob')

[docs] def fitfm(nonlin_paras, dataobj, fm_func, fm_paras, computeH0=True, bounds=None, scale_noise=True, marginalize_noise_scaling=False, debug=False): """ Fit a forward model to data returning probabilities and best fit linear parameters. Args: nonlin_paras: [p1,p2,...] List of non-linear parameters such as rv, y, x. The meaning and number of non-linear parameters depends on the forward model defined. dataobj: A data object of type breads.instruments.instrument.Instrument to be analyzed. fm_func: A forward model function. See breads.fm.template.template() for an example. fm_paras: Additional parameters for fm_func (other than non-linear parameters and dataobj) computeH0: If true (default), compute the probability of the model removing the first element of the linear model; See second ouput log_prob_H0. This can be used to compute the Bayes factor for a fixed set of non-linear parameters bounds: (Caution: the calculation of log prob is only theoretically accurate if no bounds are used.) Bounds on the linear parameters used in lsq_linear as a tuple of arrays (min_vals, maxvals). e.g. ([0,0,...], [np.inf,np.inf,...]) default no bounds. Each numpy array must have shape (N_linear_parameters,). Returns: log_prob: Probability of the model marginalized over linear parameters. log_prob_H0: Probability of the model without the planet marginalized over linear parameters. s2: noise scaling factor linparas: Best fit linear parameters linparas_err: Uncertainties of best fit linear parameters """ fm_out = fm_func(nonlin_paras, dataobj, **fm_paras) #Check the forward model matrices if len(fm_out) == 3: d_no_reg, M_no_reg, s_no_reg = fm_out elif len(fm_out) == 4: d_no_reg, M_no_reg, s_no_reg, extra_outputs = fm_out else: raise ValueError(f"Unrecognized number of matrices for forward model, the number of outputs for {fm_func.__name__} is expected to be 3 or 4 but {len(fm_out)} were given.") N_linpara = M_no_reg.shape[1] N_data = np.size(d_no_reg) if N_data == 0: #Nothing can be fitted return _invalid_outputs(N_linpara) if N_linpara == 1 and computeH0: #Only one parameter to fit so cannot test both H0 and H1 hypothesis. computeH0 = False raise Warning("Only one parameter to fit so cannot test H0 hypothesis.") #Initializing the boundaries for least-square fit if bounds is None: _bounds = ([-np.inf]*N_linpara, [np.inf]*N_linpara) else: _bounds = (copy(bounds[0]), copy(bounds[1])) if any(np.any(np.isfinite(arr)) for arr in _bounds): #check if there is finite boundaries raise Warning("The calculation of log prob is only theoretically accurate if no finite bounds are used...") # Reject the column(s) full of 0 of the model matrix M (without regularization) validpara = np.where(np.any(M_no_reg != 0, axis=0)) if 0 not in validpara[0]: #the first linear parameters is invalid which means that the companion cannot be fitted #Hence, we return nan for the best fit linear parameters and -inf for their probability print("Companion cannot be fitted, returning nan arrays") return _invalid_outputs(N_linpara) M_no_reg = M_no_reg[:, validpara[0]] # Filtering the column(s) full of 0 _bounds = (np.array(_bounds[0])[validpara[0]], np.array(_bounds[1])[validpara[0]]) #Selecting the bounds for the valid parameters d_no_reg = d_no_reg / s_no_reg #Normalizing the data by the data standard deviation M_no_reg = M_no_reg / s_no_reg [:, None] #Normalizing the M_ij by the data standard deviation s_i ##### Concatenating the regularization vectors with model matrix and data vector + computing the scale noise factor with a first lsq best fit if len(fm_out) == 4 and "regularization" in extra_outputs.keys(): if marginalize_noise_scaling: raise Exception("The maths for the marginalization of the noise scaling factor is not compatible with the regularization. Set marginalize_noise_scaling = False") M, d, s, M_reg, d_reg, s_reg = _concatenate_model_regularization(d_no_reg, M_no_reg, s_no_reg, extra_outputs, validpara) if scale_noise: _, _, _, _, rchi2, noise_scaling = _get_lsq_fit(M, d, _bounds, N_data=N_data) M = np.concatenate([M_no_reg/noise_scaling, M_reg], axis=0) d = np.concatenate([d_no_reg/noise_scaling, d_reg/s_reg]) s = np.concatenate([s_no_reg*noise_scaling, s_reg]) # noise scaling is done, set to unity for later as no more scaling is necessary noise_scaling = 1 else: # No regularization used in this case M = M_no_reg d = d_no_reg s = s_no_reg logdet_Sigma = np.sum(2 * np.log(s)) paras, _, residuals, chi2, _, _ = _get_lsq_fit(M, d, _bounds, N_data=None) # Section to compute error bars of linear parameters MTM = np.dot(M.T, M) try: iMTM = np.linalg.inv(MTM) if len(fm_out) == 4 and "regularization" in extra_outputs.keys(): if not scale_noise: rchi2 = np.nansum(residuals[:N_data] ** 2) / N_data #rchi2 computed only on the data residuals without the regularization. MTM_no_reg = np.dot(M_no_reg.T, M_no_reg) covphi = np.dot(iMTM, np.dot(MTM_no_reg,iMTM.T)) # The formula below assumes that we are using the determinant of the inverse covariance # That's why we are adding the minus sign logdet_icovphi0 = -np.sum(np.log(np.diag(covphi))) if debug: pass slogdet_covphi0 = np.linalg.slogdet(covphi) logdet_icovphi01 = -slogdet_covphi0[1] logdet_MTM = np.linalg.slogdet(MTM)[1] logdet_MTM_no_reg = np.linalg.slogdet(MTM_no_reg)[1] logdet_icovphi02 = -(-2*logdet_MTM+logdet_MTM_no_reg) print("logdet_icovphi02, logdet_icovphi0",logdet_icovphi01,logdet_icovphi02,-np.sum(np.log(np.diag(covphi)))) else: if scale_noise: rchi2 = np.nansum(residuals[:N_data] ** 2) / N_data noise_scaling = np.sqrt(rchi2) else: rchi2 = 1 noise_scaling = 1 covphi = noise_scaling * iMTM slogdet_icovphi0 = np.linalg.slogdet(MTM) logdet_icovphi0 = slogdet_icovphi0[1] if debug: plt.plot(np.diag(covphi)) plt.show() print("logdet_icovphi02,logdet_icovphi0",logdet_icovphi0,-np.sum(np.log(np.diag(covphi)))) exit() except Exception as e: print("Exiting covariance section in fitfm() with error:") print(e) return _invalid_outputs(N_linpara) diagcovphi = copy(np.diag(covphi)) diagcovphi[np.where(diagcovphi<0.0)] = np.nan #uncertainties cannot be negative so replacing by nan here paras_err = np.sqrt(diagcovphi) #get the uncertainties via the diagonal of the matrix # JB: the N_data is kinda wrong I think because is N_data the size of d or the number of pixels on the detector? # But this is just a constant in the log probability, so as long as we don't use the absolute likelihood values, # we are fine if marginalize_noise_scaling: log_prob = (M.shape[1] - N_data) / 2 * np.log(2 * np.pi) -0.5 * logdet_Sigma - 0.5 * logdet_icovphi0 \ - ((N_data-M.shape[1]+2-1)/2) * np.log(chi2) + loggamma((N_data - M.shape[1] + 2 - 1) / 2) else: # log(Eq 36) in Ruffio+2019: log_prob = ((M.shape[1]-N_data)/2)*np.log(2*np.pi) -0.5 * logdet_Sigma - 0.5 * logdet_icovphi0 \ -((N_data-M.shape[1])/2) * np.log(noise_scaling**2) -0.5*chi2/noise_scaling**2 # log_prob = -0.5*chi2/noise_scaling**2 if computeH0: log_prob_H0 = _compute_H0(M, d, N_data, _bounds, logdet_Sigma, marginalize_noise_scaling) else: log_prob_H0 = np.nan # Initialize the arrays of best-fit linear parameters and their uncertainties linparas = np.full(N_linpara, np.nan) linparas_err = np.full(N_linpara, np.nan) #Bookeeping the valid best fit linear parameters linparas[validpara] = paras linparas_err[validpara] = paras_err return log_prob, log_prob_H0, rchi2, linparas, linparas_err
[docs] def log_prob(nonlin_paras, dataobj, fm_func, fm_paras, nonlin_lnprior_func=None, bounds=None, scale_noise=True): """ Wrapper to fit_fm() but only returns the log probability marginalized over the linear parameters. Args: nonlin_paras: [p1,p2,...] List of non-linear parameters such as rv, y, x. The meaning and number of non-linear parameters depends on the forward model defined. dataobj: A data object of type breads.instruments.instrument.Instrument to be analyzed. fm_func: A forward model function. See breads.fm.template.template() for an example. fm_paras: Additional parameters for fm_func (other than non-linear parameters and dataobj) computeH0: If true (default), compute the probability of the model removing the first element of the linear model; See second ouput log_prob_H0. This can be used to compute the Bayes factor for a fixed set of non-linear parameters bounds: (Caution: the calculation of log prob is only theoretically accurate if no bounds are used.) Bounds on the linear parameters used in lsq_linear as a tuple of arrays (min_vals, maxvals). e.g. ([0,0,...], [np.inf,np.inf,...]). default no bounds. Each numpy array must have shape (N_linear_parameters,). Returns: log_prob: Probability of the model marginalized over linear parameters. """ if nonlin_lnprior_func is not None: prior = nonlin_lnprior_func(nonlin_paras) else: prior = 0 try: lnprob = fitfm(nonlin_paras, dataobj, fm_func, fm_paras,computeH0=False,bounds=bounds,scale_noise=scale_noise)[0]+prior except: lnprob = -np.inf return lnprob
[docs] def combined_log_prob(nonlin_paras, dataobjlist,fm_funclist, fm_paraslist, nonlin_lnprior_func=None, bounds=None): """ For use when you have multiple data objects and want to combine the log-likelihoods for MCMC sampling Parameters ---------- nonlin_paras : list of floats [p1,p2,...] List of non-linear parameters such as rv, y, x. The meaning and number of non-linear parameters depends on the forward model defined. dataobjlist: A list of data objects to combine fm_funclist: A list of fm_func for each data object fm_paraslist: A list of fm_paras to use as arguments for it's respective fm_func nonlin_lnprior_func: A function to compute priors, if None, defaults to zero priors Returns ------- """ combined_lnprob = 0 for i, dataobj in enumerate(dataobjlist): lnprob=log_prob(nonlin_paras = nonlin_paras, dataobj=dataobj, fm_func=fm_funclist[i], fm_paras= fm_paraslist[i], nonlin_lnprior_func=nonlin_lnprior_func, bounds=None) combined_lnprob += lnprob return combined_lnprob
[docs] def nlog_prob(nonlin_paras, dataobj, fm_func, fm_paras,nonlin_lnprior_func=None,bounds=None,scale_noise=True): """ Returns the negative of the log_prob() for minimization routines. Args: nonlin_paras: [p1,p2,...] List of non-linear parameters such as rv, y, x. The meaning and number of non-linear parameters depends on the forward model defined. dataobj: A data object of type breads.instruments.instrument.Instrument to be analyzed. fm_func: A forward model function. See breads.fm.template.template() for an example. fm_paras: Additional parameters for fm_func (other than non-linear parameters and dataobj) computeH0: If true (default), compute the probability of the model removing the first element of the linear model; See second ouput log_prob_H0. This can be used to compute the Bayes factor for a fixed set of non-linear parameters bounds: (Caution: the calculation of log prob is only theoretically accurate if no bounds are used.) Bounds on the linear parameters used in lsq_linear as a tuple of arrays (min_vals, maxvals). e.g. ([0,0,...], [np.inf,np.inf,...]) default no bounds. Each numpy array must have shape (N_linear_parameters,). Returns: log_prob: Probability of the model marginalized over linear parameters. """ nlogprob_val = - log_prob(nonlin_paras, dataobj, fm_func, fm_paras,nonlin_lnprior_func,bounds,scale_noise) # print( nlogprob_val, nonlin_paras) return nlogprob_val
def _invalid_outputs(N_linear_parameters): """Helper to return invalid outputs""" linparas = np.full(N_linear_parameters, np.nan) linparas_err = np.full(N_linear_parameters, np.nan) log_prob = -np.inf log_prob_H0 = -np.inf s2 = np.inf return log_prob, log_prob_H0, s2, linparas, linparas_err def _concatenate_model_regularization(d_no_reg, M_no_reg, s_no_reg, extra_outputs, validpara): """Helper function to concatenate regularization parameters with model matrix and data vectors""" #Retrieve the regularization priors d_reg, s_reg = extra_outputs["regularization"] #Filtering the bad columns s_reg = s_reg[validpara] d_reg = d_reg[validpara] #Filtering the finite values to match with the M_no_reg matrix where_finite_reg = np.where(np.isfinite(s_reg)) s_reg = s_reg[where_finite_reg] d_reg = d_reg[where_finite_reg] #Creating the matrix for regularization M_reg = np.zeros((np.size(where_finite_reg[0]), M_no_reg.shape[1])) #Setting the diagonal of the matrix M_reg[np.arange(np.size(where_finite_reg[0])), where_finite_reg[0]] = 1 / s_reg #Concatenate each matrix/vector M = np.concatenate([M_no_reg, M_reg], axis=0) d = np.concatenate([d_no_reg, d_reg / s_reg]) s = np.concatenate([s_no_reg, s_reg]) return M, d, s, M_reg, d_reg, s_reg def _get_lsq_fit(M_normalized, d_normalized, _bounds, N_data=None): """Helper to get the least squares fit of the model on the data. Model matrix and input data have to be normalized by the noise.""" paras = lsq_linear(M_normalized, d_normalized, bounds=_bounds).x d_estimated = np.dot(M_normalized, paras) residuals = d_normalized - d_estimated if N_data is not None: #Truncating to N_data because we don't care about the residuals on the regularization priors chi2 = np.nansum(residuals[:N_data] ** 2) rchi2 = chi2 / N_data else: N_data = np.size(residuals) chi2 = np.nansum(residuals ** 2) / N_data rchi2 = chi2 / N_data noise_scaling = np.sqrt(rchi2) return paras, d_estimated, residuals, chi2, rchi2, noise_scaling def _compute_H0(M_normalized, d_normalized, N_data, _bounds, logdet_Sigma, marginalize_noise_scaling=True, noise_scaling=1): """Helper to compute the H0 hypothesis (i.e. without off axis companion)""" M_H0 = M_normalized[:, 1:] #removing the first column i.e. the off axis companion model _crop_bounds = (np.array(_bounds[0])[1::], np.array(_bounds[1])[1::]) paras_H0, d_estimated, residuals_H0, chi2_H0, rchi2, _ = _get_lsq_fit(M_H0, d_normalized, _crop_bounds, N_data=None) slogdet_icovphi0_H0 = np.linalg.slogdet(np.dot(M_H0.T, M_H0)) # todo check the maths when N_linpara is different from M.shape[1]. E.g. at the edge of the FOV if marginalize_noise_scaling: log_prob_H0 = -0.5 * logdet_Sigma - 0.5 * slogdet_icovphi0_H0[1] - ( N_data + M_H0.shape[1] + 2 - 1) / 2 * np.log(chi2_H0) + \ loggamma((N_data - M_H0.shape[1] + 2 - 1) / 2) + (M_H0.shape[1] - N_data) / 2 * np.log( 2 * np.pi) else: # log(Eq 36) in Ruffio+2019: # TODO Seems more logical to me that it should be M_H0.shape[1] here, but I didn't redo the math. Need to check. log_prob_H0 = ((M_normalized.shape[1] - N_data) / 2) * np.log(2 * np.pi) - 0.5 * logdet_Sigma - 0.5 * slogdet_icovphi0_H0[ 1] \ - ((N_data - M_normalized.shape[1]) / 2) * np.log(noise_scaling ** 2) - 0.5 * chi2_H0 / noise_scaling ** 2 return log_prob_H0