from typing import Optional, Tuple, Union
try:
from typing import Literal
except ImportError:
from typing_extensions import Literal
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
# import tensorflow as tf
from dynamo.tools.sampling import sample
from .backbone_utils import sort_nodes_of_curve
#####################################################################
# Principal curves algorithm #
# ================================================================= #
# Original Code Repository Author: Matthew Artuso. #
# Adapted to Spateo by: spateo authors #
# Created Date: 6/11/2022 #
# Description: A principal curve is a smooth n-dimensional curve #
# that passes through the middle of a dataset. #
# Reference: https://doi.org/10.1016/j.cam.2015.11.041 #
# ================================================================= #
#####################################################################
[docs]def orth_dist(y_true, y_pred):
"""
Loss function for the NLPCA NN. Returns the sum of the orthogonal
distance from the output tensor to the real tensor.
"""
loss = torch.sum((y_true - y_pred) ** 2)
return loss
[docs]class NLPCA(object):
"""This is a global solver for principal curves that uses neural networks.
Attributes:
None
"""
def __init__(self):
super(NLPCA, self).__init__()
[docs] def fit(self, data: np.ndarray, epochs: int = 500, nodes: int = 25, lr: float = 0.01, verbose: int = 0):
"""
This method creates a model and will fit it to the given m x n dimensional data.
Args:
data: A numpy array of shape (m,n), where m is the number of points and n is the number of dimensions.
epochs: Number of epochs to train neural network, defaults to 500.
nodes: Number of nodes for the construction layers. Defaults to 25. The more complex the curve, the higher
this number should be.
lr: Learning rate for backprop. Defaults to .01
verbose: Verbose = 0 mutes the training text from Keras. Defaults to 0.
"""
num_dim = data.shape[1] # get number of dimensions for pts
# create model
self.model = self.create_model(num_dim, nodes=nodes)
optimizer = optim.Adam(self.model.parameters(), lr=lr)
data_tensor = torch.tensor(data, dtype=torch.float32)
for epoch in range(epochs):
self.model.train()
optimizer.zero_grad()
output = self.model(data_tensor)
loss = orth_dist(data_tensor, output)
loss.backward()
optimizer.step()
if verbose and epoch % 100 == 0:
print(f"Epoch {epoch}, Loss: {loss.item()}")
return
[docs] def project(self, data: np.ndarray) -> Tuple[np.ndarray, np.ndarray]:
"""
The project function will project the points to the curve generated by the fit function. Given back is the
projection index of the original data and a sorted version of the original data.
Args:
data: m x n array to project to the curve
Returns:
proj: A one-dimension array that contains the projection index for each point in data.
all_sorted: A m x n+1 array that contains data sorted by its projection index, along with the index.
"""
num_dim = data.shape[1] # get number of dimensions for pts
data_tensor = torch.tensor(data, dtype=torch.float32)
self.model.eval()
with torch.no_grad():
pts = self.model(data_tensor).numpy()
proj = self.intermediate_layer_model.numpy()
self.fit_points = pts
all_data = np.concatenate([pts, proj], axis=1)
all_sorted = all_data[all_data[:, num_dim].argsort()]
return proj, all_sorted
[docs] def create_model(self, num_dim: int, nodes: int):
"""
Creates a PyTorch model.
Args:
num_dim: How many dimensions the input space is
nodes: How many nodes for the construction layers
Returns:
model (object): PyTorch Model
"""
class Autoencoder(nn.Module):
def __init__(self, num_dim, nodes):
super(Autoencoder, self).__init__()
# Encoder
self.encoder = nn.Sequential(nn.Linear(num_dim, nodes), nn.Sigmoid(), nn.Linear(nodes, 1), nn.Sigmoid())
# Decoder
self.decoder = nn.Sequential(nn.Linear(1, nodes), nn.Sigmoid(), nn.Linear(nodes, num_dim))
def forward(self, x):
bottleneck = self.encoder(x)
self.intermediate_layer_model = bottleneck
output = self.decoder(bottleneck)
return output
return Autoencoder(num_dim, nodes)
###########
# Methods #
###########
[docs]def ElPiGraph_method(
X: np.ndarray,
NumNodes: int = 50,
topology: Literal["tree", "circle", "curve"] = "curve",
Lambda: float = 0.01,
Mu: float = 0.1,
alpha: float = 0.0,
FinalEnergy: Literal["Base", "Penalized"] = "Penalized",
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate a principal elastic tree.
Reference: Albergante et al. (2020), Robust and Scalable Learning of Complex Intrinsic Dataset Geometry via ElPiGraph.
Args:
X: DxN, data matrix list.
NumNodes: The number of nodes of the principal graph. Use a range of 10 to 100 for ElPiGraph approach.
topology:The appropriate topology used to fit a principal graph for each dataset.
Lambda: The attractive strength of edges between nodes (constrains edge lengths)
Mu: The repulsive strength of a node’s neighboring nodes (constrains angles to be close to harmonic)
alpha: Branching penalty (penalizes number of branches for the principal tree)
FinalEnergy: Indicating the final elastic emergy associated with the configuration. Currently it can be “Base” or “Penalized”
**kwargs: Other parameters used in elpigraph.computeElasticPrincipalTree. For details, please see:
https://elpigraph-python.readthedocs.io/en/latest/basics.html
Returns:
nodes: The nodes in the principal tree.
edges: The edges between nodes in the principal tree.
"""
try:
import elpigraph
except ImportError:
raise ImportError(
"You need to install the package `elpigraph-python`."
"\nInstall elpigraph-python via `pip install elpigraph-python`."
)
ElPiGraph_kwargs = {
"NumNodes": NumNodes,
"Lambda": Lambda,
"Mu": Mu,
"alpha": alpha,
"FinalEnergy": FinalEnergy,
}
ElPiGraph_kwargs.update(kwargs)
if str(topology).lower() == "tree":
elpi_tree = elpigraph.computeElasticPrincipalTree(X=np.asarray(X), **ElPiGraph_kwargs)
elif str(topology).lower() == "circle":
elpi_tree = elpigraph.computeElasticPrincipalCircle(X=np.asarray(X), **ElPiGraph_kwargs)
elif str(topology).lower() == "curve":
elpi_tree = elpigraph.computeElasticPrincipalCurve(X=np.asarray(X), **ElPiGraph_kwargs)
else:
raise ValueError("`topology` value is wrong." "\nAvailable `topology` are: `'tree'`, `'circle'`, `'curve'`.")
nodes = elpi_tree[0]["NodePositions"]
edges = np.asarray(elpi_tree[0]["Edges"][0])
if str(topology).lower() in ["curve", "circle"]:
unique_values, occurrence_count = np.unique(edges.flatten(), return_counts=True)
started_node_indices = [v for c, v in zip(occurrence_count, unique_values) if c == 1]
started_node = nodes[started_node_indices[0]] if len(started_node_indices) != 0 else nodes[0]
nodes = sort_nodes_of_curve(nodes, started_node)
if str(topology).lower() == "curve":
edges = np.c_[np.arange(0, len(nodes) - 1, 1).reshape(-1, 1), np.arange(1, len(nodes), 1).reshape(-1, 1)]
else:
edges = np.c_[
np.arange(0, len(nodes), 1).reshape(-1, 1), np.asarray(list(range(1, len(nodes))) + [0]).reshape(-1, 1)
]
return nodes, edges
[docs]def SimplePPT_method(
X: np.ndarray,
NumNodes: int = 50,
sigma: Optional[Union[float, int]] = 0.1,
lam: Optional[Union[float, int]] = 1,
metric: str = "euclidean",
nsteps: int = 50,
err_cut: float = 5e-3,
seed: Optional[int] = 1,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
Generate a simple principal tree.
Reference: Mao et al. (2015), SimplePPT: A simple principal tree algorithm, SIAM International Conference on Data Mining.
Args:
X: DxN, data matrix list.
NumNodes: The number of nodes of the principal graph. Use a range of 100 to 2000 for PPT approach.
sigma: Regularization parameter.
lam: Penalty for the tree length.
metric: The metric to use to compute distances in high dimensional space. For compatible metrics, check the
documentation of sklearn.metrics.pairwise_distances.
nsteps: Number of steps for the optimisation process.
err_cut: Stop algorithm if proximity of principal points between iterations less than defined value.
seed: A numpy random seed.
**kwargs: Other parameters used in simpleppt.ppt. For details, please see:
https://github.com/LouisFaure/simpleppt/blob/main/simpleppt/ppt.py
Returns:
nodes: The nodes in the principal tree.
edges: The edges between nodes in the principal tree.
"""
try:
import igraph
import simpleppt
except ImportError:
raise ImportError(
"You need to install the package `simpleppt` and `igraph`."
"\nInstall simpleppt via `pip install -U simpleppt`."
"\nInstall igraph via `pip install -U igraph`"
)
SimplePPT_kwargs = {
"seed": seed,
"lam": lam,
"sigma": sigma,
"metric": metric,
"nsteps": nsteps,
"err_cut": err_cut,
}
SimplePPT_kwargs.update(kwargs)
X = np.asarray(X)
ppt_tree = simpleppt.ppt(X=X, Nodes=NumNodes, **SimplePPT_kwargs)
R = ppt_tree.R
nodes = (np.dot(X.T, R) / R.sum(axis=0)).T
B = ppt_tree.B
edges = np.array(igraph.Graph.Adjacency((B > 0).tolist(), mode="undirected").get_edgelist())
return nodes, edges
[docs]def PrinCurve_method(
X: np.ndarray,
NumNodes: int = 50,
epochs: int = 500,
lr: float = 0.01,
scale_factor: Union[int, float] = 1,
**kwargs,
) -> Tuple[np.ndarray, np.ndarray]:
"""
This is the global module that contains principal curve and nonlinear principal component analysis algorithms that
work to optimize a line over an entire dataset.
Reference: Chen et al. (2016), Constraint local principal curve: Concept, algorithms and applications.
Args:
X: DxN, data matrix list.
NumNodes: Number of nodes for the construction layers. Defaults to 50. The more complex the curve, the higher this number should be.
epochs: Number of epochs to train neural network, defaults to 500.
lr: Learning rate for backprop. Defaults to .01
scale_factor:
**kwargs: Other parameters used in global algorithms. For details, please see:
https://github.com/artusoma/prinPy/blob/master/prinpy/glob.py
Returns:
nodes: The nodes in the principal tree.
edges: The edges between nodes in the principal tree.
"""
PrinCurve_kwargs = {
"epochs": epochs,
"lr": lr,
"verbose": 0,
}
PrinCurve_kwargs.update(kwargs)
raw_X = np.asarray(X)
dims = raw_X.shape[1]
new_X = raw_X.copy() / scale_factor
trans = []
for i in range(dims):
sub_trans = new_X[:, i].min()
new_X[:, i] = new_X[:, i] - sub_trans
trans.append(sub_trans)
# create solver
pca_project = NLPCA()
# transform data for better training with the neural net using built in preprocessor.
# fit the data
pca_project.fit(new_X, nodes=NumNodes, **PrinCurve_kwargs)
# project the current data. This returns a projection index for each point and points to plot the curve.
_, curve_pts = pca_project.project(new_X)
curve_pts = np.unique(curve_pts, axis=0)
curve_pts = np.einsum("ij->ij", curve_pts[curve_pts[:, -1].argsort(), :])
for i in range(dims):
curve_pts[:, i] = curve_pts[:, i] + trans[i]
nodes = curve_pts[:, :3] * scale_factor
sampling = sample(arr=np.asarray(range(nodes.shape[0])), n=NumNodes, method="trn", X=nodes)
sampling.sort()
nodes = nodes[sampling, :]
n_nodes = nodes.shape[0]
edges = np.asarray([np.arange(0, n_nodes, 1), np.arange(1, n_nodes + 1, 1)]).T
edges[-1, 1] = n_nodes - 1
"""
unique_values, occurrence_count = np.unique(edges.flatten(), return_counts=True)
started_node_indices = [v for c, v in zip(occurrence_count, unique_values) if c == 1]
started_node = nodes[started_node_indices[0]] if len(started_node_indices) != 0 else nodes[0]
sorted_nodes = sort_nodes_of_curve(nodes, started_node)
sorted_edges = np.c_[np.arange(0, len(nodes) - 1, 1).reshape(-1, 1), np.arange(1, len(nodes), 1).reshape(-1, 1)]
"""
return nodes, edges