Source code for spateo.tools.cluster.leiden

from typing import Literal, Optional, Union

import anndata
import igraph
import leidenalg
import numpy as np
import scipy
from sklearn.neighbors import kneighbors_graph

from ...configuration import SKM


[docs]def distance_knn_graph(dist: np.ndarray, num_neighbors: int): """Construct a k-nearest neighbor graph from a distance matrix. Args: dist: Pairwise distance matrix num_neighbors: Number of nearest neighbors Returns: G: K-nearest neighbor graph """ n = dist.shape[0] G = igraph.Graph() G.add_vertices(n) edges = [] weights = [] for i in range(n): sorted_ind = np.argsort(dist[i, :]) for j in range(1, 1 + num_neighbors): # if i < sorted_ind[j]: edges.append((i, sorted_ind[j])) weights.append(dist[i, sorted_ind[j]]) # weights.append(1.) G.add_edges(edges) G.es["weight"] = weights return G
[docs]def embedding_knn_graph(X: np.ndarray, num_neighbors: int): """Construct a k-nearest neighbor graph from an arbitrary array, of shape [n_samples, n_features] Args: X: Embedding matrix num_neighbors: Number of nearest neighbors Returns: G: K-nearest neighbor graph """ adj = kneighbors_graph(X, num_neighbors, include_self=False, mode="distance") G = igraph.Graph.Weighted_Adjacency(adj) return G
[docs]def adj_to_igraph(adj: np.ndarray): """Convert an adjacency matrix to an igraph graph.""" G = igraph.Graph.Adjacency(adj) return G
[docs]def calculate_leiden_partition( adj: Optional[Union[scipy.sparse.spmatrix, np.ndarray]] = None, input_mat: Optional[np.ndarray] = None, num_neighbors: int = 10, graph_type: Literal["distance", "embedding"] = "distance", resolution: float = 1.0, n_iterations: int = -1, ) -> np.ndarray: """Performs Leiden clustering on a given dataset. Args: adj: Optional precomputed adjacency matrix input_mat: Optional, will be used only if 'adj' is not given. The input data, will be interepreted as either a distance matrix (if :param `graph_type` is "distance" or an embedding matrix (if :param `graph_type` is "embedding") num_neighbors: Only used if 'adj' is not given- the number of nearest neighbors for constructing the graph graph_type: Only used if 'adj' is not given- specifies the input type, either 'distance' or 'embedding' resolution: The resolution parameter for the Leiden algorithm n_iterations: The number of iterations for the Leiden algorithm (-1 for unlimited iterations) Returns: clusters: Array containing cluster assignments """ from ...logging import logger_manager as lm logger = lm.get_main_logger() if adj is None and input_mat is None: raise ValueError("Either `adj` or `input_mat` must be specified") logger.info("using adj_matrix from arg for clustering...") if adj is not None: if scipy.sparse.issparse(adj): pass else: adj = scipy.sparse.csr_matrix(adj) sources, targets = adj.nonzero() weights = adj[sources, targets] if isinstance(weights, np.matrix): weights = weights.A1 G = igraph.Graph(directed=None) G.add_vertices(adj.shape[0]) # this adds adjacency.shape[0] vertices G.add_edges(list(zip(sources, targets))) try: G.es["weight"] = weights except KeyError: pass if G.vcount() != adj.shape[0]: print( f"The constructed graph has only {G.vcount()} nodes. " "Your adjacency matrix contained redundant nodes." ) else: if graph_type == "distance": G = distance_knn_graph(input_mat, num_neighbors) elif graph_type == "embedding": G = embedding_knn_graph(input_mat, num_neighbors) logger.info("Converting graph_sparse_matrix to igraph object", indent_level=2) partition_kwargs = {"resolution_parameter": resolution, "seed": 888, "n_iterations": n_iterations} partition = leidenalg.find_partition(G, leidenalg.RBConfigurationVertexPartition, **partition_kwargs) clusters = np.array(partition.membership, dtype=int) logger.finish_progress(progress_name="Community clustering with %s" % ("leiden")) return clusters
[docs]def calculate_louvain_partition( adj: Optional[Union[scipy.sparse.spmatrix, np.ndarray]] = None, input_mat: Optional[np.ndarray] = None, num_neighbors: int = 10, graph_type: Literal["distance", "embedding"] = "distance", resolution: float = 1.0, n_iterations: int = -1, ) -> np.ndarray: """Performs Louvain clustering on a given dataset. Args: adj: Optional precomputed adjacency matrix input_mat: Optional, will be used only if 'adj' is not given. The input data, will be interepreted as either a distance matrix (if :param `graph_type` is "distance" or an embedding matrix (if :param `graph_type` is "embedding") num_neighbors: Only used if 'adj' is not given- the number of nearest neighbors for constructing the graph graph_type: Only used if 'adj' is not given- specifies the input type, either 'distance' or 'embedding' resolution: The resolution parameter for the Louvain algorithm n_iterations: The number of iterations for the Louvain algorithm (-1 for unlimited iterations) Returns: clusters: Array containing cluster assignments """ import louvain from ...logging import logger_manager as lm logger = lm.get_main_logger() if adj is None and input_mat is None: raise ValueError("Either `adj` or `input_mat` must be specified") logger.info("using adj_matrix from arg for clustering...") if adj is not None: if scipy.sparse.issparse(adj): pass else: adj = scipy.sparse.csr_matrix(adj) sources, targets = adj.nonzero() weights = adj[sources, targets] if isinstance(weights, np.matrix): weights = weights.A1 G = igraph.Graph(directed=None) G.add_vertices(adj.shape[0]) # this adds adjacency.shape[0] vertices G.add_edges(list(zip(sources, targets))) try: G.es["weight"] = weights except KeyError: pass if G.vcount() != adj.shape[0]: print( f"The constructed graph has only {G.vcount()} nodes. " "Your adjacency matrix contained redundant nodes." ) else: if graph_type == "distance": G = distance_knn_graph(input_mat, num_neighbors) elif graph_type == "embedding": G = embedding_knn_graph(input_mat, num_neighbors) logger.info("Converting graph_sparse_matrix to igraph object", indent_level=2) partition_kwargs = {"resolution_parameter": resolution, "seed": 42, "weights": G.es["weight"]} partition = louvain.find_partition(G, louvain.RBConfigurationVertexPartition, **partition_kwargs) clusters = np.array(partition.membership, dtype=int) logger.finish_progress(progress_name="Community clustering with %s" % ("louvain")) return clusters