Source code for spateo.tdr.models.models_backbone.backbone_methods

from typing import Optional, Tuple, Union

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

import numpy as np
import tensorflow as tf
from dynamo.tools.sampling import sample
from keras import optimizers
from keras.layers import Dense, Input
from keras.models import Model

from .backbone_utils import sort_nodes_of_curve

#####################################################################
# Principal curves algorithm                                        #
# ================================================================= #
# Original Code Repository Author: Matthew Artuso.                  #
# Adapted to Spateo by: spateo authors                              #
# Created Date: 6/11/2022                                           #
# Description: A principal curve is a smooth n-dimensional curve    #
# that passes through the middle of a dataset.                      #
# Reference: https://doi.org/10.1016/j.cam.2015.11.041              #
# ================================================================= #
#####################################################################


[docs]def orth_dist(y_true, y_pred): """ Loss function for the NLPCA NN. Returns the sum of the orthogonal distance from the output tensor to the real tensor. """ loss = tf.math.reduce_sum((y_true - y_pred) ** 2) return loss
[docs]class NLPCA(object): """This is a global solver for principal curves that uses neural networks. Attributes: None """ def __init__(self): self.fit_points = None self.model = None self.intermediate_layer_model = None
[docs] def fit(self, data: np.ndarray, epochs: int = 500, nodes: int = 25, lr: float = 0.01, verbose: int = 0): """ This method creates a model and will fit it to the given m x n dimensional data. Args: data: A numpy array of shape (m,n), where m is the number of points and n is the number of dimensions. epochs: Number of epochs to train neural network, defaults to 500. nodes: Number of nodes for the construction layers. Defaults to 25. The more complex the curve, the higher this number should be. lr: Learning rate for backprop. Defaults to .01 verbose: Verbose = 0 mutes the training text from Keras. Defaults to 0. """ num_dim = data.shape[1] # get number of dimensions for pts # create models, base and intermediate model = self.create_model(num_dim, nodes=nodes, lr=lr) bname = model.layers[2].name # bottle-neck layer name # The itermediate model gets the output of the bottleneck layer, # which acts as the projection layer. self.intermediate_layer_model = Model(inputs=model.input, outputs=model.get_layer(bname).output) # Fit the model and set the instances self.model to model model.fit(data, data, epochs=epochs, verbose=verbose) self.model = model return
[docs] def project(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]: """ The project function will project the points to the curve generated by the fit function. Given back is the projection index of the original data and a sorted version of the original data. Args: data: m x n array to project to the curve Returns: proj: A one-dimension array that contains the projection index for each point in data. all_sorted: A m x n+1 array that contains data sorted by its projection index, along with the index. """ num_dim = data.shape[1] # get number of dimensions for pts pts = self.model.predict(data) proj = self.intermediate_layer_model.predict(data) self.fit_points = pts all = np.concatenate([pts, proj], axis=1) all_sorted = all[all[:, num_dim].argsort()] return proj, all_sorted
[docs] def create_model(self, num_dim: int, nodes: int, lr: float): """ Creates a tf model. Args: num_dim: How many dimensions the input space is nodes: How many nodes for the construction layers lr: Learning rate of backpropigation Returns: model (object): Keras Model """ # Create layers: # Function G input = Input(shape=(num_dim,)) # input layer mapping = Dense(nodes, activation="sigmoid")(input) # mapping layer bottle = Dense(1, activation="sigmoid")(mapping) # bottle-neck layer # Function H demapping = Dense(nodes, activation="sigmoid")(bottle) # mapping layer output = Dense(num_dim)(demapping) # output layer # Connect and compile model: model = Model(inputs=input, outputs=output) gradient_descent = optimizers.Adam(learning_rate=lr) model.compile(loss=orth_dist, optimizer=gradient_descent) return model
########### # Methods # ###########
[docs]def ElPiGraph_method( X: np.ndarray, NumNodes: int = 50, topology: Literal["tree", "circle", "curve"] = "curve", Lambda: float = 0.01, Mu: float = 0.1, alpha: float = 0.0, FinalEnergy: Literal["Base", "Penalized"] = "Penalized", **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Generate a principal elastic tree. Reference: Albergante et al. (2020), Robust and Scalable Learning of Complex Intrinsic Dataset Geometry via ElPiGraph. Args: X: DxN, data matrix list. NumNodes: The number of nodes of the principal graph. Use a range of 10 to 100 for ElPiGraph approach. topology:The appropriate topology used to fit a principal graph for each dataset. Lambda: The attractive strength of edges between nodes (constrains edge lengths) Mu: The repulsive strength of a node’s neighboring nodes (constrains angles to be close to harmonic) alpha: Branching penalty (penalizes number of branches for the principal tree) FinalEnergy: Indicating the final elastic emergy associated with the configuration. Currently it can be “Base” or “Penalized” **kwargs: Other parameters used in elpigraph.computeElasticPrincipalTree. For details, please see: https://elpigraph-python.readthedocs.io/en/latest/basics.html Returns: nodes: The nodes in the principal tree. edges: The edges between nodes in the principal tree. """ try: import elpigraph except ImportError: raise ImportError( "You need to install the package `elpigraph-python`." "\nInstall elpigraph-python via `pip install elpigraph-python`." ) ElPiGraph_kwargs = { "NumNodes": NumNodes, "Lambda": Lambda, "Mu": Mu, "alpha": alpha, "FinalEnergy": FinalEnergy, } ElPiGraph_kwargs.update(kwargs) if str(topology).lower() == "tree": elpi_tree = elpigraph.computeElasticPrincipalTree(X=np.asarray(X), **ElPiGraph_kwargs) elif str(topology).lower() == "circle": elpi_tree = elpigraph.computeElasticPrincipalCircle(X=np.asarray(X), **ElPiGraph_kwargs) elif str(topology).lower() == "curve": elpi_tree = elpigraph.computeElasticPrincipalCurve(X=np.asarray(X), **ElPiGraph_kwargs) else: raise ValueError("`topology` value is wrong." "\nAvailable `topology` are: `'tree'`, `'circle'`, `'curve'`.") nodes = elpi_tree[0]["NodePositions"] edges = np.asarray(elpi_tree[0]["Edges"][0]) if str(topology).lower() in ["curve", "circle"]: unique_values, occurrence_count = np.unique(edges.flatten(), return_counts=True) started_node_indices = [v for c, v in zip(occurrence_count, unique_values) if c == 1] started_node = nodes[started_node_indices[0]] if len(started_node_indices) != 0 else nodes[0] nodes = sort_nodes_of_curve(nodes, started_node) if str(topology).lower() == "curve": edges = np.c_[np.arange(0, len(nodes) - 1, 1).reshape(-1, 1), np.arange(1, len(nodes), 1).reshape(-1, 1)] else: edges = np.c_[ np.arange(0, len(nodes), 1).reshape(-1, 1), np.asarray(list(range(1, len(nodes))) + [0]).reshape(-1, 1) ] return nodes, edges
[docs]def SimplePPT_method( X: np.ndarray, NumNodes: int = 50, sigma: Optional[Union[float, int]] = 0.1, lam: Optional[Union[float, int]] = 1, metric: str = "euclidean", nsteps: int = 50, err_cut: float = 5e-3, seed: Optional[int] = 1, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ Generate a simple principal tree. Reference: Mao et al. (2015), SimplePPT: A simple principal tree algorithm, SIAM International Conference on Data Mining. Args: X: DxN, data matrix list. NumNodes: The number of nodes of the principal graph. Use a range of 100 to 2000 for PPT approach. sigma: Regularization parameter. lam: Penalty for the tree length. metric: The metric to use to compute distances in high dimensional space. For compatible metrics, check the documentation of sklearn.metrics.pairwise_distances. nsteps: Number of steps for the optimisation process. err_cut: Stop algorithm if proximity of principal points between iterations less than defined value. seed: A numpy random seed. **kwargs: Other parameters used in simpleppt.ppt. For details, please see: https://github.com/LouisFaure/simpleppt/blob/main/simpleppt/ppt.py Returns: nodes: The nodes in the principal tree. edges: The edges between nodes in the principal tree. """ try: import igraph import simpleppt except ImportError: raise ImportError( "You need to install the package `simpleppt` and `igraph`." "\nInstall simpleppt via `pip install -U simpleppt`." "\nInstall igraph via `pip install -U igraph`" ) SimplePPT_kwargs = { "seed": seed, "lam": lam, "sigma": sigma, "metric": metric, "nsteps": nsteps, "err_cut": err_cut, } SimplePPT_kwargs.update(kwargs) X = np.asarray(X) ppt_tree = simpleppt.ppt(X=X, Nodes=NumNodes, **SimplePPT_kwargs) R = ppt_tree.R nodes = (np.dot(X.T, R) / R.sum(axis=0)).T B = ppt_tree.B edges = np.array(igraph.Graph.Adjacency((B > 0).tolist(), mode="undirected").get_edgelist()) return nodes, edges
[docs]def PrinCurve_method( X: np.ndarray, NumNodes: int = 50, epochs: int = 500, lr: float = 0.01, scale_factor: Union[int, float] = 1, **kwargs, ) -> Tuple[np.ndarray, np.ndarray]: """ This is the global module that contains principal curve and nonlinear principal component analysis algorithms that work to optimize a line over an entire dataset. Reference: Chen et al. (2016), Constraint local principal curve: Concept, algorithms and applications. Args: X: DxN, data matrix list. NumNodes: Number of nodes for the construction layers. Defaults to 50. The more complex the curve, the higher this number should be. epochs: Number of epochs to train neural network, defaults to 500. lr: Learning rate for backprop. Defaults to .01 scale_factor: **kwargs: Other parameters used in global algorithms. For details, please see: https://github.com/artusoma/prinPy/blob/master/prinpy/glob.py Returns: nodes: The nodes in the principal tree. edges: The edges between nodes in the principal tree. """ PrinCurve_kwargs = { "epochs": epochs, "lr": lr, "verbose": 0, } PrinCurve_kwargs.update(kwargs) raw_X = np.asarray(X) dims = raw_X.shape[1] new_X = raw_X.copy() / scale_factor trans = [] for i in range(dims): sub_trans = new_X[:, i].min() new_X[:, i] = new_X[:, i] - sub_trans trans.append(sub_trans) # create solver pca_project = NLPCA() # transform data for better training with the neural net using built in preprocessor. # fit the data pca_project.fit(new_X, nodes=NumNodes, **PrinCurve_kwargs) # project the current data. This returns a projection index for each point and points to plot the curve. _, curve_pts = pca_project.project(new_X) curve_pts = np.unique(curve_pts, axis=0) curve_pts = np.einsum("ij->ij", curve_pts[curve_pts[:, -1].argsort(), :]) for i in range(dims): curve_pts[:, i] = curve_pts[:, i] + trans[i] nodes = curve_pts[:, :3] * scale_factor sampling = sample(arr=np.asarray(range(nodes.shape[0])), n=NumNodes, method="trn", X=nodes) sampling.sort() nodes = nodes[sampling, :] n_nodes = nodes.shape[0] edges = np.asarray([np.arange(0, n_nodes, 1), np.arange(1, n_nodes + 1, 1)]).T edges[-1, 1] = n_nodes - 1 """ unique_values, occurrence_count = np.unique(edges.flatten(), return_counts=True) started_node_indices = [v for c, v in zip(occurrence_count, unique_values) if c == 1] started_node = nodes[started_node_indices[0]] if len(started_node_indices) != 0 else nodes[0] sorted_nodes = sort_nodes_of_curve(nodes, started_node) sorted_edges = np.c_[np.arange(0, len(nodes) - 1, 1).reshape(-1, 1), np.arange(1, len(nodes), 1).reshape(-1, 1)] """ return nodes, edges