Source code for espm.estimators.base

import numpy as np
from sklearn.base import BaseEstimator, TransformerMixin
from sklearn.utils.validation import check_is_fitted
from espm.estimators.updates import initialize_algorithms
from espm.measures import KLdiv_loss, Frobenius_loss, find_min_angle, find_min_MSE
from espm.conf import log_shift
from espm.utils import rescaled_DH
import time
from abc import ABC, abstractmethod
from espm.utils import create_laplacian_matrix 
from scipy.sparse import lil_matrix
from espm.models.base import PhysicalModel


def normalization_factor (X, nc) : 
    m = np.mean(X)
    return nc/(m*X.shape[0])

[docs] class NMFEstimator(ABC, TransformerMixin, BaseEstimator): r""" Abstract class for NMF algorithms. This abstract class `espm.estimators.NMFEstimator` is used to implement the different NMF algorithms. It solves problems of the form: .. math:: \dot{W}, \dot{H} = \arg \min_{W \geq \epsilon, H \geq \epsilon} \frac{1}{2} L(X, GWH) + R(W, H) where :math:`X` is the data matrix, :math:`G` is a matrix of known values, :math:`W` and :math:`H` are the matrices to be learned, :math:`R` is a regularization term and :math:`L` is a loss function. that represent the generalized KL divergence. As a reminder, the generalized KL divergence is defined as: .. math:: D_{GKL}(X || Y) = \sum_{i,j} X_{ij} \log \frac{X_{ij}}{Y_{ij}} - X_{ij} + Y_{ij} where :math:`Y = GWH`. Since :math:`X` does not depend on :math:`W` and :math:`H`, we obtain the loss function: .. math:: L(X, Y) = - \sum_{i,j} X_{ij} \log \frac{GWH_{ij}} + GWH_{ij} The Generalized KL divergence has the advantage of being zero when :math:`X = Y`, which is not the case for our loss. Therefore, we shift the loss function by a constant :math:`C` such that it equals the Generalized KL divergence. This constant is stored in the attribute :attr:`espm.estimators.NMFEstimator.const_KL_`. The loss function can also be selected to be the Frobenius norm. In this case, the loss function is: .. math:: L(X, Y) = \frac{1}{2} \sum_{i,j} (X_{ij} - Y_{ij})^2 While the code will work, it is not recommended to use the Frobenius norm as a loss function. This code is optimized for the KL divergence. The size of: * :math:`X` is :math:`(n, p)`, * :math:`W` is :math:`(m, k)`, * :math:`H` is :math:`(k, p)`, * :math:`G` is :math:`(n, m)`. The columns of the matrices :math:`H` and :math:`X` are assumed to be images. This is used typically for the smoothness regularization. The parameter `shape_2d` defines the shape of the images, i.e. `shape_2d[0]*shape_2d[1] = p`. Parameters ---------- n_components : int, default=2 Number of components, i.e. dimensionality of the latent space. init : str Method used to initialize the procedure. Default is None The method use the initialization of :mod:`sklearn.decomposition`. It can be imported using: .. code-block::python >>> from sklearn.decomposition._nmf import _initialize_nmf tol : float, default=1e-4 Tolerance of the stopping condition. max_iter : int, default=200 Maximum number of iterations before timing out. random_state : int, RandomState instance, default=None verbose : int, default=1 The verbosity level. debug : bool, default=False If True, the algorithm will log more and perform more checks. l2 : bool, default=False If True, the algorithm will use the l2 norm instead of the KL divergence. G : np.array, function or None, default=None If np.array, it is the known matrix of the data. If function, it is a function that takes as input the data matrix and returns the known matrix (np.array). If None, it is assumed that G is the identity matrix. shape_2d : tuple or None, default=None If not None, it is the image shape of the columns of the matrices :math:`X` and :math:`H`. normalize : bool, default=False If True, the algorithm will normalize the data matrix :math:`X`. log_shift : float, default=1e-10 Lower bound for W and H, i.e. :math:`\epsilon`. eval_print : int, default=10 Number of iterations between each evaluation of the loss function. true_D : np.array or None, default=None Ground truth for the matrix :math:`GW`. Used for evaluation purposes. true_H : np.array or None, default=None Ground truth for the matrix :math:`H`. Used for evaluation purposes. fixed_H : np.array or None, default=None If not None, it fixes the non-zero values of the matrix :math:`H`. Note that convergence is not guaranteed with fixed_H enabled. fixed_W : np.array or None, default=None If not None, it fixes the non-zero values of the matrix :math:`W`. Note that convergence is not guaranteed with fixed_W enabled. no_stop_criterion : bool, default=False If True, the algorithm will not stop when the stopping criterion is reached and will continue until max_iter is reached. hspy_comp : bool, default=False If True, the algorithm will use the format compatible with hyperspy. Use this option if you run the algorithm with the method decompositio in hyperspy. For example: .. code-block::python >>> est = SmoothNMF( n_components = 3, hspy_comp = True) >>> out = spim.decomposition(algorithm = est, return_info=True) """ loss_names_ = ["KL_div_loss"] const_KL_ = None def __init__(self, n_components=2, init=None, tol=1e-4, max_iter=200, random_state=None, verbose=1, debug=False, l2=False, G=None, shape_2d = None, normalize = False, log_shift=log_shift, eval_print=10, true_D = None, true_H = None, fixed_H = None, fixed_W = None, hspy_comp = False, no_stop_criterion = False, simplex_H=False, simplex_W = True ): self.n_components = n_components self.init = init self.tol = tol self.max_iter = max_iter self.random_state = random_state self.verbose = verbose self.log_shift = log_shift self.debug = debug self.l2 = l2 self.G = G self.shape_2d = shape_2d self.eval_print = eval_print self.true_D = true_D self.true_H = true_H self.fixed_H = fixed_H self.fixed_W = fixed_W self.hspy_comp = hspy_comp self.normalize = normalize self.no_stop_criterion = no_stop_criterion self.simplex_H = simplex_H self.simplex_W = simplex_W def _more_tags(self): return {'requires_positive_X': True} @abstractmethod def _iteration(self, W, H): pass
[docs] def loss(self, W, H, average=True, X = None): """Loss function. Compute the loss function for the given matrices W and H. Parameters ---------- W : np.array Matrix of shape (n, k) H : np.array Matrix of shape (k, p) average : bool, default=True If True, the loss is averaged over the number of elements of the matrices. X : np.array or None, default=None If not None, it is the data matrix. If None, it is assumed that the data matrix in `self.X_`. Returns ------- loss_ : float Value of the loss function. """ GW = self.G_ @ W if X is None : X = self.X_ assert(X.shape == (self.G_.shape[0],H.shape[1])) self.GWH_numel_ = self.G_.shape[0] * H.shape[1] if self.l2: loss_ = 0.5*Frobenius_loss(X, GW, H, average=False) else: if self.const_KL_ is None: self.const_KL_ = np.sum(X*np.log(np.maximum(self.X_, self.log_shift))) - np.sum(X) loss_ = KLdiv_loss(X, GW, H, self.log_shift, average=False) + self.const_KL_ if average: loss_ = loss_ / self.GWH_numel_ self.detailed_loss_ = [loss_] return loss_
[docs] def fit_transform(self, X, y=None, W=None, H=None): """ Main function of the estimator object. Learn a NMF model for the data X and returns the transformed data. This is more efficient than calling fit followed by transform. The size of: * :math:`X` is :math:`(n, p)`, * :math:`W` is :math:`(m, k)`, * :math:`H` is :math:`(k, p)`, * :math:`G` is :math:`(n, m)`. Parameters ---------- X : {array-like, sparse matrix} of shape (n, p) Data matrix to be decomposed y : Ignored Not used, present here for API consistency by convention. W : array-like of shape (m, k) If specified, it is used as initial guess for the solution. H : array-like of shape (k, p) If specified, it is used as initial guess for the solution. Returns ------- GW : ndarrays Transformed data. """ ############################ # Initialize the algorithm # ############################ if self.hspy_comp : self.X_ = self._validate_data(X.T, dtype=[np.float64, np.float32]) else : self.X_ = self._validate_data(X, dtype=[np.float64, np.float32]) if self.hspy_comp==False: try: import inspect curframe = inspect.currentframe() calframe = inspect.getouterframes(curframe, 2) if calframe[1][3]=="decomposition" and "hyperspy" in calframe[1][1]: print("Are you calling the function decomposition from Hyperspy?\n" + "If so, please set the compatibility argument 'hspy_comp' to True.\n\n" + "If this argument is not set correctly, the function will not work properly!!!") except: pass # The algorithm does not work when full columns or lines of X are zero self.X_ = self.remove_zeros_lines(self.X_, self.log_shift) self.const_KL_ = None if self.normalize : # We normalize the data so that the strength of the regularization is somewhat the same for all datasets self.norm_factor_ = normalization_factor(self.X_,self.n_components) self.X_ = self.norm_factor_ * self.X_ if isinstance(self.G, PhysicalModel): self.physics_model_ = self.G G = self.physics_model_.NMF_update() else: self.physics_model_ = None G = self.G self.G_, self.W_, self.H_ = initialize_algorithms(X = self.X_, G = G, W = W, H = H, n_components = self.n_components, init = self.init, random_state = self.random_state, simplex_H = self.simplex_H, simplex_W = self.simplex_W, physics_model = self.physics_model_) if not(self.shape_2d is None) : self.L_ = create_laplacian_matrix(*self.shape_2d) else : self.L_ =lil_matrix((self.X_.shape[1],self.X_.shape[1]),dtype=np.float32) self.L_.setdiag([1]*self.X_.shape[1]) algo_start = time.time() eval_before = np.inf eval_init = self.loss(self.W_, self.H_) self.n_iter_ = 0 self.losses_ = [] self.rel_ = [] self.detailed_losses_ = [] if not(self.true_D is None) and not(self.true_H is None) : if (self.true_D.shape[1] == self.n_components) and (self.true_H.shape[0] == self.n_components) : self.angles_ = [] self.mse_ = [] self.true_losses_ = [] true_DH = self.true_D @ self.true_H else : print("The chosen number of components does not match the number of components of the provided truth. The ground truth will be ignored.") ############# # Main loop # ############# try: while True: # Take one step in W, H old_W, old_H = self.W_.copy(), self.H_.copy() self.W_, self.H_ = self._iteration(self.W_, self.H_ ) eval_after = self.loss(self.W_, self.H_) self.n_iter_ +=1 rel_W = np.max(np.abs((self.W_ - old_W))/(self.W_ + self.tol*np.mean(self.W_) )) rel_H = np.max(np.abs((self.H_ - old_H))/(self.H_ + self.tol*np.mean(self.H_) )) # store some information for assessing the convergence # for debugging purposes # We need to store this value as # loss = self.loss(self.P_,self.A_, X = true_DA ) # might destroy it. Furthermore, saving the data before the if, might cause # an error if the optimization is stoped with a keyboard interrupt. detailed_loss_ = self.detailed_loss_ if not(self.true_D is None) and not(self.true_H is None) : if (self.true_D.shape[1] == self.n_components) and (self.true_H.shape[0] == self.n_components) : if self.simplex_H or self.simplex_W: W, H = self.W_, self.H_ else: W, H = rescaled_DH(self.W_, self.H_ ) GW = self.G_ @ W angles = find_min_angle(self.true_D.T,GW.T, unique=True) mse = find_min_MSE(self.true_H, H,unique=True) loss = self.loss(self.W_,H, X = true_DH ) self.angles_.append(angles) self.mse_.append(mse) self.true_losses_.append(loss) self.losses_.append(eval_after) self.detailed_losses_.append(detailed_loss_) self.rel_.append([rel_W,rel_H]) # check convergence criterions if self.n_iter_ >= self.max_iter: print("exits because max_iteration was reached") break if not self.no_stop_criterion: # If there is no regularization the algorithm stops with this criterion # Otherwise it goes to the data fitting step if max(rel_H,rel_W) < self.tol: print( "exits because of relative change rel_A {} and rel_P {} < tol ".format( rel_H,rel_W ) ) break elif abs((eval_before - eval_after)/eval_init) < self.tol: print("exits because of relative change < tol: {}".format((eval_before - eval_after)/eval_init)) break elif np.isnan(eval_after): print("exit because of the presence of NaN") break elif (eval_before - eval_after) < 0: print("exit because of negative decrease {}: {}, {}".format((eval_before - eval_after), eval_before, eval_after)) break if self.verbose > 0 and np.mod(self.n_iter_, self.eval_print) == 0: print( f"It {self.n_iter_} / {self.max_iter}: loss {eval_after:3e}, {self.n_iter_/(time.time()-algo_start):0.3f} it/s", ) pass # Update G might increase the loss so we reevaluate the loss to avoid artificial negative decrease # We do this update every 3 iterations, but it is arbitrary. if self.physics_model_ != None and self.n_iter_%3 == 0: self.G_ = self.physics_model_.NMF_update(self.W_) eval_before = self.loss(self.W_, self.H_) else : eval_before = eval_after except KeyboardInterrupt: pass ################### # End of the loop # ################### if not(self.simplex_H) and not(self.simplex_W): self.W_, self.H_ = rescaled_DH(self.W_, self.H_ ) algo_time = time.time() - algo_start print( f"Stopped after {self.n_iter_} iterations in {algo_time//60} minutes " f"and {np.round(algo_time) % 60} seconds." ) self.reconstruction_err_ = self.loss(self.W_, self.H_) if self.normalize : self.W_ = self.W_ / self.norm_factor_ GW = self.G_ @ self.W_ self.n_components_ = self.H_.shape[0] if self.hspy_comp : self.components_ = GW.T return self.H_.T else : self.components_ = self.H_ return GW
[docs] def fit(self, X, y=None, **params): """ Learn a NMF model for the data X. Parameters ---------- X : {array-like, sparse matrix} of shape (n_samples, n_features) Data matrix to be decomposed y : Ignored params : dict Parameters passed to the `fit_transform` method. Returns ------- self The model. """ self.fit_transform(X, **params) return self
# def transform(self, X): # """Transform the data X according to the fitted NMF model. # Parameters # ---------- # X : {array-like, sparse matrix} of shape (n_samples, n_features) # Data matrix to be transformed by the model. # Returns # ------- # P : ndarray of shape (n_samples, n_components) # Transformed data. # """ # check_is_fitted(self) # X = self._validate_data(X, accept_sparse=('csr', 'csc'), # dtype=[np.float64, np.float32], # reset=False) # return self.P_
[docs] def inverse_transform(self, W): """ Transform data back to its original space. Parameters ---------- W : {ndarray, sparse matrix} of shape (n_samples, n_components) Transformed data matrix. Returns ------- X : {ndarray, sparse matrix} of shape (n_samples, n_features) Data matrix of original shape. """ check_is_fitted(self) return self.G_ @ W @ self.H_
[docs] def get_losses(self): """ For debug purposes : return the evolution of losses. """ if not(self.true_D is None) and not(self.true_H is None) : mse_list = [] angles_list = [] for i in range(self.n_components) : angles_list.append("ang_p{}".format(i)) mse_list.append("mse_p{}".format(i)) names = ["full_loss"] + self.loss_names_ + ["rel_W","rel_H"] + angles_list + mse_list + ["true_KL_loss"] dt_list = [] for elt in names : dt_list.append((elt,"float64")) dt = np.dtype(dt_list) tup_list = [] for i in range(len(self.losses_)) : tup_list.append((self.losses_[i],) + tuple(self.detailed_losses_[i]) + tuple(self.rel_[i]) \ + tuple(self.angles_[i]) + tuple(self.mse_[i]) + (self.true_losses_[i],) ) array = np.array(tup_list,dtype=dt) #TODO : Check this part for optimization else : names = ["full_loss"] + self.loss_names_ + ["rel_W","rel_H"] dt_list = [] for elt in names : dt_list.append((elt,"float64")) dt = np.dtype(dt_list) tup_list = [] for i in range(len(self.losses_)) : tup_list.append((self.losses_[i],) + tuple(self.detailed_losses_[i]) + tuple(self.rel_[i])) array = np.array(tup_list,dtype=dt) return array
[docs] def remove_zeros_lines (self, X, epsilon) : if np.all(X >= 0) : new_X = X.copy() sum_cols = X.sum(axis = 0) sum_rows = X.sum(axis = 1) new_X[:,np.where(sum_cols == 0)] = epsilon new_X[np.where(sum_rows == 0),:] = epsilon return new_X else : raise ValueError("There are negative values in X")