Source code for espm.models.edxs

r"""
EDXS model
----------

The :mod:`espm.models.edxs` module implements the creation of the G matrix that contains a modelisation of the characteristic and continuous X-rays.

**This module is a key component of the EDXS data simulation and the EDXS data analysis.**

"""

import numpy as np
import re
from espm.models import PhysicalModel
from espm.models.EDXS_function import G_bremsstrahlung, continuum_xrays, gaussian, read_lines_db, read_compact_db, elts_dict_from_dict_list
from espm.conf import DEFAULT_EDXS_PARAMS
from espm.utils import arg_helper, symbol_to_number_dict, symbol_to_number_list
from espm.models.absorption_edxs import absorption_correction, det_efficiency, det_efficiency_from_curve, absorption_mass_thickness
# Class to model the EDXS spectra. This is a temporary version since there are some design issues.


[docs] class EDXS(PhysicalModel): def __init__( self, *args, width_slope=0.01, width_intercept=0.065, custom_init = False, **kwargs ): r""" :database_path: file path to a database of x-ray data (energy and intensity ratios of the peaks) :abs_db_path: file path to a database of attenuation coefficients (Useful for artificial data only for the moment) :brstlg_pars: dictionary of parameters for the continuum X-rays. Only a part of the parameters can be used without issues. :e_offset: energy offset of the energy axis (float) :e_size: number of energy channels (int) :e_scale: ev/channel calibration of the energy axis (float) :width_slope: The FWHM of the detector increases with energy which is modeled with an affine function. This is the slope of this affine function (float). :width_intercept: The FWHM of the detector increases with energy which is modeled with an affine function. This is the intercept of this affine function (float). """ super().__init__(*args,**kwargs) self.width_slope = width_slope self.width_intercept = width_intercept self.lines = self.db_mdata["lines"] self.norm = 1.0 self.model_elts = [] self.custom_init = custom_init # Tranfer the ranges from eds_espm to the physical model self.ranges = None def __add_elts_G(self, reference_elt = {}, *, elements=[]): for elt in elements: if self.lines : energies, cs = read_lines_db(elt,self.db_dict) else : energies, cs = read_compact_db(elt,self.db_dict) if elt in reference_elt : peaks_low = np.zeros((self.x.shape[0], 1)) peaks_high = np.zeros((self.x.shape[0], 1)) for i, energy in enumerate(energies): if (energy > np.min(self.x)) and (energy < np.max(self.x)): if type(self.params_dict["Det"]) == str : D = det_efficiency_from_curve(energy,self.params_dict["Det"]) else : D = det_efficiency(energy,self.params_dict["Det"]) A = absorption_correction(energy,**self.params_dict["Abs"],elements_dict = {elt : 1.0}) width = self.width_slope * energy + self.width_intercept if energy < reference_elt[elt] : peaks_low += ( cs[i] * gaussian(self.x, energy, width / 2.3548)[ np.newaxis ].T )*D*A else : peaks_high += ( cs[i] * gaussian(self.x, energy, width / 2.3548)[ np.newaxis ].T )*D*A peaks = np.hstack((peaks_low,peaks_high)) else : peaks = np.zeros((self.x.shape[0], 1)) for i, energy in enumerate(energies): # The actual detected width is calculated at each energy if (energy > np.min(self.x)) and (energy < np.max(self.x)): if type(self.params_dict["Det"]) == str : D = det_efficiency_from_curve(energy,self.params_dict["Det"]) else : D = det_efficiency(energy,self.params_dict["Det"]) A = absorption_correction(energy,**self.params_dict["Abs"],elements_dict = {elt : 1.0}) width = self.width_slope * energy + self.width_intercept peaks += ( cs[i] * gaussian(self.x, energy, width / 2.3548)[ np.newaxis ].T )*D*A # print(np.max(peaks, axis = 0)) # print(str(elt)) if np.all((np.max(peaks, axis = 0)) > 0.0): self.G = np.concatenate((self.G, peaks), axis=1) if elt in reference_elt : self.model_elts.append(str(elt)+'_lo') self.model_elts.append(str(elt)+'_hi') else : self.model_elts.append(str(elt)) else : print("The energy split of the element : {} leads to empty G columns. Please remove split or change its energy.".format(elt)) raise ValueError("Empty G column") def _add_ignored_elts(self, elements = []) : for elt in elements: if self.lines : energies, cs = read_lines_db(elt,self.db_dict) else : energies, cs = read_compact_db(elt,self.db_dict) peaks_list = [] for i, energy in enumerate(energies): # The actual detected width is calculated at each energy if (energy > np.min(self.x)) and (energy < np.max(self.x)): if type(self.params_dict["Det"]) == str : D = det_efficiency_from_curve(energy,self.params_dict["Det"]) else : D = det_efficiency(energy,self.params_dict["Det"]) A = absorption_correction(energy,**self.params_dict["Abs"],elements_dict = {elt : 1.0}) width = self.width_slope * energy + self.width_intercept peaks_list.append( ( cs[i] * gaussian(self.x, energy, width / 2.3548) )*D*A ) peaks = np.array(peaks_list).T if np.all((np.max(peaks, axis = 0)) > 0.0): self.G = np.concatenate((self.G, peaks), axis=1) for i in range(peaks.shape[1]) : self.model_elts.append(str(elt) + '_ign' + str(i)) else : print("The energy split of the element : {} leads to empty G columns. Please remove split or change its energy.".format(elt)) raise ValueError("Empty G column")
[docs] @symbol_to_number_list @symbol_to_number_dict def generate_g_matr(self, g_type="bremsstrahlung", ignored_elements = ['Cu'],*,elements=[],elements_dict = {},**kwargs): r""" Generate the G matrix. With a complete model the matrix is (e_size,n+2). The first n columns correspond to the sum of X-ray characteristic peaks associated to each shell of the elements. The last 2 columns correspond to a bremsstrahlung model. The first n columns of G (noted :math:`\kappa`), corresponding to the characteristic X-rays, can be calculated using the following formula: .. math:: \kappa_{k,Z} = \sum_{ij} x_{ij}(Z) \frac{1}{\Delta(\varepsilon_k) \sqrt{2\pi} } e{^{-\frac{1}{2} {\left( \frac{\varepsilon_k - \varepsilon^{ij}(Z)}{\Delta(\varepsilon_k)} \right)}^2}} where :math:`\varepsilon_k` is the energy of the kth energy channel, :math:`\varepsilon^{ij}(Z)` is the energy of the ijth line of the Zth element, :math:`\Delta(\varepsilon_k)` is the FWHM of the detector at the energy :math:`\varepsilon_k` and :math:`x_{ij}(Z)` is the intensity ratio of the ijth line of the Zth element. The last term of the equation is a gaussian function centered at :math:`\varepsilon^{ij}(Z)` and with a FWHM of :math:`\Delta(\varepsilon_k)`. The last two columns of G (noted :math:`\beta`), corresponding to the bremsstrahlung model, can be calculated using the following formula: .. math:: \beta_{k,n+1} = \frac{\varepsilon_0 - \varepsilon_k}{\varepsilon_0 \varepsilon_k} \times \frac{1 - (\varepsilon_0 - \varepsilon_k)}{\varepsilon_0} \beta_{k,n+2} = \frac{(\varepsilon_0 - \varepsilon_k)^2}{\varepsilon^2_0 \varepsilon_k} Note that there is no parameter for the bremsstrahlung since it is supposed to be learned with the W matrix (see espm.estimators). Parameters ---------- g_type : :string: Options of the edxs model to include in the G matrix. The three possibilities are "identity", "no_brstlg" and "bremsstrahlung". G is going to be the identity matrix, only characteristic X-ray peaks or characteristic X-rays plus bremsstrahlung model, respectively. elements_dict : :dict: The keys are chemical elements (atomic number) and the values are cut-off energies. This argument is used to split some of the columns of G into 2 columns. The first column corresponds to characteristic X-rays before the cut-off and second one corresponds to characteristic X-rays before the cut-off. This feature is implemented to enable more accurate absorption correction. elements : :list: List of modeled chemical elements. The list can be populated either with atomic numbers or chemical symbols, e.g. "Fe" or 26. Returns ------- g matrix : :np.array 2D: matrix of the edx model. Notes ----- See our paper about the espm package :cite:p:`teurtrie2023espm` for more information about the equations of this model. """ # Reset the internally stored elements list self.model_elts = [] @symbol_to_number_list def convert_elts(elements = ignored_elements) : return elements conv_ignored_elts = convert_elts(elements=ignored_elements) valid_elts = self.__check_elts_in_G(elements) valid_ignored = self.__check_elts_in_G(conv_ignored_elts) if g_type == "bremsstrahlung" : self.bkgd_in_G = True else : self.bkgd_in_G = False # None is a default value for the G matrix and thus G will be considered to be the identity matrix in most of espm functions. if len(valid_elts) == 0: self.G = None elif g_type == "identity" : self.G = None # model based on elements_list elif (g_type == "bremsstrahlung") or (g_type == "no_brstlg"): # The number of shells depend on the element, it is then not straightforward to pre-determine the size of g_matr self.G = np.zeros((self.x.shape[0], 0)) # For each element we unpack all shells and then unpack all lines of each shell. self.__add_elts_G( reference_elt = elements_dict, elements = valid_elts) self._add_ignored_elts(elements=valid_ignored) # Appends a pure continuum spectrum is needed if self.bkgd_in_G: approx_elts = {key : 1.0/len(valid_elts) for key in valid_elts} brstlg_spectrum = G_bremsstrahlung(self.x,self.E0,self.params_dict,elements_dict=approx_elts) if np.max(brstlg_spectrum) > 0.0 : self.G = np.concatenate((self.G, brstlg_spectrum), axis=1) else : print("Bremsstrahlung parameters were not provided, bkgd not added in G") self.bkgd_in_G = False norms = np.sum(self.G, axis=0, keepdims=True) if g_type == "bremsstrahlung" : norms[0][:-2] = np.mean(norms[0][:-2]) else : norms[0] = np.mean(norms[0]) self.norm = norms self.G /= self.norm else : print("g_type has to be one of those : \"bremsstrahlung\", \"no_brstlg\" or \"identity\". G will be None, corresponding to \"identity\". ")
def __check_elts_in_G(self, elements): """ Check if the elements of the metadata are in the range of the energy axis. """ valid_elts = [] for elt in elements: if self.lines : energies, cs = read_lines_db(elt,self.db_dict) else : energies, cs = read_compact_db(elt,self.db_dict) energy_range = [np.min(self.x), np.max(self.x)] found = 0 for energy in energies: if (energy > energy_range[0]) and (energy < energy_range[1]): valid_elts.append(elt) found = 1 break if not found: print("No peak is present in the energy range for element : {}".format(elt)) return valid_elts
[docs] def generate_phases(self, phases_parameters) : r""" Generate a series of spectra from list of phase parameters. Parameters ---------- phases_parameters : :list: List of dicts containing the phase parameters. Returns ------- phase_array : :np.array 2D: Array which lines correspond to the modelled phases in the input list. Notes ----- The absorption correction is done using the average composition of the phases. The same correction is used for each phase. """ self.phases = [] unique_elts = dict(elts_dict_from_dict_list([x["elements_dict"] for x in phases_parameters])) for p in phases_parameters: # p.update({"elements_dict" : unique_elts}) self.phases.append(self.generate_spectrum(**p,abs_elts_dict = unique_elts)) self.phases = np.array(self.phases) self.phases /= self.phases.sum(axis = 1)[:,np.newaxis]
[docs] @symbol_to_number_dict def generate_spectrum(self, b0=0, b1 = 0, scale = 1.0,abs_elts_dict = {},*,elements_dict = {}): r""" Generate a spectrum from bremsstrahlung parameters and a chemical composition. The modelling is done using the same formula as for the generate_g_matr function. Parameters ---------- b0 : :float: First bremsstrahlung parameter. b1 : :float: Second bremsstrahlung parameter. scale : :float: Scale factor to apply to the bremsstrahlung. Feature to be removed, scaling should be done with b0 and b1. abs_elts_dict : :dict: Dictionnary of elements and associated concentrations. This dictionnary is used to calculate the contribution of the absorption in the spectrum. elements_dict : :dict: Dictionnary of elements and associated concentrations describing the chemical composition of the modeled sample. Returns ------- spectrum : :np.array 1D: Output edx spectrum corresponding to the input chemical composition. Examples -------- .. plot:: :context: close-figs >>> import matplotlib.pyplot as plt >>> from espm.models.edxs import EDXS >>> from espm.conf import DEFAULT_EDXS_PARAMS >>> b0, b1 = 5.5367e-5, 0.00192181 >>> elts_dict = {"Si" : 1.0,"Ca" : 1.0,"O" : 3.0,"C" : 0.3} >>> model = EDXS(**DEFAULT_EDXS_PARAMS) >>> spectrum = model.generate_spectrum(b0,b1, elements_dict = elts_dict) >>> plt.plot(spectrum) Notes ----- Check EDXS_function for details about the bremsstrahlung model. """ temp = np.zeros_like(self.x) for elt in elements_dict.keys(): if self.lines : energies, cs = read_lines_db(elt,self.db_dict) else : energies, cs = read_compact_db(elt,self.db_dict) for i, energy in enumerate(energies): if (energy > np.min(self.x)) and (energy < np.max(self.x)): if type(self.params_dict["Det"]) == str : D = det_efficiency_from_curve(energy,self.params_dict["Det"]) else : D = det_efficiency(energy,self.params_dict["Det"]) A = absorption_correction(energy,**self.params_dict["Abs"],elements_dict = {elt : 1.0}) width = self.width_slope * energy + self.width_intercept temp += ( elements_dict[elt] * cs[i] * gaussian(self.x, energy, width / 2.3548) )*A*D temp /= temp.sum() if abs_elts_dict == {} : temp += continuum_xrays(self.x,self.params_dict,b0,b1,self.E0,elements_dict=elements_dict) * scale else : temp += continuum_xrays(self.x,self.params_dict,b0,b1,self.E0,elements_dict=abs_elts_dict) * scale return temp
[docs] @symbol_to_number_dict def bremsstrahlung_only_tools(self,*,mass_thickness = 1.0, ranges = None,elements_dict = {}): self.params_dict['Abs']['density'] = mass_thickness self.params_dict['Abs']['thickness'] = 1.0 brstlg_lines = G_bremsstrahlung(self.x,self.E0,self.params_dict,elements_dict=elements_dict) norms = np.sum(brstlg_lines, axis=0, keepdims=True) normed_brstlg = brstlg_lines/norms mask = self.carac_X_span(ranges) filtered_brstlg = normed_brstlg[mask,:] return filtered_brstlg, mask
[docs] def get_elements(self, include_ignored = True) : ign = include_ignored # If include_ignored is True, it means that this generator will pass once to capture ignored elements # To automatically calculate bremsstrahlung windows it is important to include ignore elements # To estimate mass-thickness (see eds_spim.py), it is important to remove ignored elements for elt in self.model_elts: if re.match(r'[0-9]*(_lo)',elt) : pass elif re.match(r'[0-9]*(_ign)[0-9]*', elt) and ign : m = re.match(r'([0-9]*)(_ign)([0-9]*)', elt) yield m.group(1) ign = False elif re.match(r'[0-9]*(_ign)[0-9]*', elt) and not(ign) : pass elif re.match(r'[0-9]*(_hi)',elt) : m = re.match(r'([0-9]*)(_hi)',elt) yield m.group(1) else : yield elt
[docs] def carac_X_span(self, ranges = None, include_ignored = True) : all_indices = [] if ranges is not None : for span in ranges : indices = np.where((self.x > span[0]) & (self.x < span[1]))[0] all_indices.append(indices) else : for elt in self.get_elements(include_ignored=include_ignored): if self.lines : energies, cs = read_lines_db(elt,self.db_dict) else : energies, cs = read_compact_db(elt,self.db_dict) for energy in energies: width = self.width_slope * energy + self.width_intercept span = [energy - 2*width, energy + 2*width] indices = np.where((self.x > span[0]) & (self.x < span[1]))[0] all_indices.append(indices) idx = np.unique(np.concatenate(all_indices)) # If we use manually selected windows, the mask is True in selected windows (i.e. bremsstrahlung) # If we automatically set windows, the mask is False in spectral areas where there are caracteristic X-rays if self.ranges is None : mask = np.ones(self.G.shape[0], bool) mask[idx] = 0 else : mask = np.zeros(self.G.shape[0], bool) mask[idx] = 1 return mask
[docs] def NMF_initialize_W(self, D) : if self.G is None : raise ValueError('The G matrix is identity, the W matrix cannot be initialized. Please use a np.array for G in the ESpM-NMF instead of the model object') if self.bkgd_in_G and self.custom_init: mask = self.carac_X_span(self.ranges) anti_mask = np.logical_not(mask) Wbrem = (np.linalg.lstsq(self.G[mask,-2:],D[mask,:],rcond = None)[0]).clip(min = 0) Wcarac = (np.linalg.lstsq(self.G[anti_mask,:-2],D[anti_mask,:] ,rcond = None)[0]).clip(min = 0) # filter = np.where(np.mean(self.G[:,:-2],axis=1)<(np.max(np.mean(self.G[:,:-2],axis=1))*0.001))[0] W = np.vstack((Wcarac,Wbrem)) else : W = (np.linalg.lstsq(self.G,D,rcond = None)[0]).clip(min = 0) return W
[docs] def NMF_simplex(self): """ Produce the indices of the rows of W on which to perform the simplex constraint. """ # We don't check here whether G was correctyl initialized since it is tested in the init. ind_list = [] # We skip the low energy lines for i, elt in enumerate(self.model_elts): if re.match(r'[0-9]*(_lo)',elt) : pass elif re.match(r'[0-9]*(_ign)[0-9]*', elt) : pass else : ind_list.append(i) # We skip the bremsstrahlung return ind_list
[docs] def NMF_update(self, W=None): """ Update the G matrix with the new absorption correction. """ # We don't need to check whether G was correctyl initialized it should work anyway. if W is None: return self.G if not(self.bkgd_in_G) : return self.G else : new_brstlg = self.update_bremsstrahlung(W) new_G = self.G.copy() new_G[:,-2:] = new_brstlg/self.norm[0][-2:] self.G = new_G return self.G
[docs] def update_bremsstrahlung(self, W) : """ Update the bremsstrahlung part of the G matrix. This function is used for the NMF decomposition so that the absorption correction is updated in between each step. """ if not(self.bkgd_in_G) : raise AttributeError("The bremsstrahlung is not comprised in the model.") indices = self.NMF_simplex() mean_compo = np.mean(W[indices,:],axis=1) normed_compo = mean_compo/np.sum(mean_compo) elements_dict = {key : normed_compo[i] for i,key in enumerate(self.get_elements(False))} bremsstrahlung = G_bremsstrahlung(self.x,self.E0,self.params_dict,elements_dict=elements_dict) return bremsstrahlung