"""
Functions for finding nearest neighbors, the distances between them and the spatial weighting between points in
spatial transcriptomics data.
"""
import os
import sys
from functools import partial
from typing import Optional, Tuple, Union
import anndata
import numpy as np
import pandas as pd
import scipy
from anndata import AnnData
from joblib import Parallel, delayed
from scipy.spatial.distance import pdist, squareform
from sklearn.decomposition import PCA
from sklearn.metrics import pairwise_distances
from sklearn.neighbors import NearestNeighbors
from ..configuration import SKM
from ..logging import logger_manager as lm
# ---------------------------------------------------------------------------------------------------
# Utility functions
# ---------------------------------------------------------------------------------------------------
[docs]def calculate_distance(position: np.ndarray, dist_metric: str = "euclidean") -> np.ndarray:
"""Given array of x- and y-coordinates, compute pairwise distances between all samples using Euclidean distance."""
distance_matrix = squareform(pdist(position, metric=dist_metric))
return distance_matrix
[docs]def local_dist(coords_i: np.ndarray, coords: np.ndarray):
"""For single sample, compute distance between that sample and each other sample in the data.
Args:
coords_i: Array of shape (n, ), where n is the dimensionality of the data; the coordinates of a single point
coords: Array of shape (m, n), where n is the dimensionality of the data and m is an arbitrary number of
samples; pairwise distances from `coords_i`.
Returns:
distances: array-like, shape (m, ), where m is an arbitrary number of samples. The pairwise distances
between `coords_i` and each point in `coords`.
"""
distances = np.sqrt(np.sum((coords_i - coords) ** 2, axis=1))
return distances
[docs]def jaccard_index(row_i: np.ndarray, array: np.ndarray):
"""Compute the Jaccard index between a row of a binary array and all other rows.
Args:
row_i: 1D binary array representing the row for which to compute the Jaccard index.
array: 2D binary array containing the rows to compare against.
Returns:
jaccard_indices: 1D array of Jaccard indices between `row_i` and each row in `array`.
"""
intersect = np.logical_and(row_i, array)
union = np.logical_or(row_i, array)
jaccard_scores = np.sum(intersect, axis=1) / np.sum(union, axis=1)
return jaccard_scores
[docs]def normalize_adj(adj: np.ndarray, exclude_self: bool = True) -> np.ndarray:
"""Symmetrically normalize adjacency matrix, set diagonal to 1 and return processed adjacency array.
Args:
adj: Pairwise distance matrix of shape [n_samples, n_samples].
exclude_self: Set True to set diagonal of adjacency matrix to 1.
Returns:
adj_proc: The normalized adjacency matrix.
"""
adj = scipy.sparse.coo_matrix(adj)
rowsum = np.array(adj.sum(1))
d_inv_sqrt = np.power(rowsum, -0.5).flatten()
d_inv_sqrt[np.isinf(d_inv_sqrt)] = 0.0
d_mat_inv_sqrt = scipy.sparse.diags(d_inv_sqrt)
adj = adj.dot(d_mat_inv_sqrt).transpose().dot(d_mat_inv_sqrt)
adj_proc = adj.toarray() if exclude_self else adj.toarray() + np.eye(adj.shape[0])
return adj_proc
[docs]def adj_to_knn(adj: np.ndarray, n_neighbors: int = 15) -> Tuple[np.ndarray, np.ndarray]:
"""Given an adjacency matrix, convert to KNN graph.
Args:
adj: Adjacency matrix of shape (n_samples, n_samples)
n_neighbors: Number of nearest neighbors to include in the KNN graph
Returns:
indices: Array (n_samples x n_neighbors) storing the indices for each node's nearest neighbors in the
knn graph.
weights: Array (n_samples x n_neighbors) storing the edge weights for each node's nearest neighbors in
the knn graph.
"""
n_obs = adj.shape[0]
indices = np.zeros((n_obs, n_neighbors), dtype=int)
weights = np.zeros((n_obs, n_neighbors), dtype=float)
for i in range(n_obs):
current_neighbors = adj[i, :].nonzero()
# Set self as nearest neighbor
indices[i, :] = i
weights[i, :] = 0.0
# there could be more or less than n_neighbors because of an approximate search
current_n_neighbors = len(current_neighbors[1])
if current_n_neighbors > n_neighbors - 1:
sorted_indices = np.argsort(adj[i][:, current_neighbors[1]].A)[0][: (n_neighbors - 1)]
indices[i, 1:] = current_neighbors[1][sorted_indices]
weights[i, 1:] = adj[i][0, current_neighbors[1][sorted_indices]].A
else:
idx_ = np.arange(1, (current_n_neighbors + 1))
indices[i, idx_] = current_neighbors[1]
weights[i, idx_] = adj[i][:, current_neighbors[1]].A
return indices, weights
[docs]def knn_to_adj(knn_indices: np.ndarray, knn_weights: np.ndarray) -> scipy.sparse.csr_matrix:
"""Given the indices and weights of a KNN graph, convert to adjacency matrix.
Args:
knn_indices: Array (n_samples x n_neighbors) storing the indices for each node's nearest neighbors in the
knn graph.
knn_weights: Array (n_samples x n_neighbors) storing the edge weights for each node's nearest neighbors in
the knn graph.
Returns:
adj: The adjacency matrix corresponding to the KNN graph
"""
adj = scipy.sparse.csr_matrix(
(
knn_weights.flatten(),
(
np.repeat(knn_indices[:, 0], knn_indices.shape[1]),
knn_indices.flatten(),
),
)
)
adj.eliminate_zeros()
return adj
[docs]def compute_distances_and_connectivities(
knn_indices: np.ndarray, distances: np.ndarray
) -> Tuple[scipy.sparse.csr_matrix, scipy.sparse.csr_matrix]:
"""Computes connectivity and sparse distance matrices
Args:
knn_indices: Array of shape (n_samples, n_samples) containing the indices of the nearest neighbors for each
sample.
distances: The distances to the n_neighbors the closest points in knn graph
Returns:
distances: Sparse distance matrix
connectivities: Sparse connectivity matrix
"""
n_obs, n_neighbors = knn_indices.shape
distances = scipy.sparse.csr_matrix(
(
distances.flatten(),
(np.repeat(np.arange(n_obs), n_neighbors), knn_indices.flatten()),
),
shape=(n_obs, n_obs),
)
connectivities = distances.copy()
connectivities.data[connectivities.data > 0] = 1
distances.eliminate_zeros()
connectivities.eliminate_zeros()
return distances, connectivities
[docs]def calculate_distances_chunk(
coords_chunk: np.ndarray,
chunk_start_idx: int,
coords: np.ndarray,
n_nonzeros: Optional[dict] = None,
metric: str = "euclidean",
) -> np.ndarray:
"""Pairwise distance computation, coupled with :func `find_bw`.
Args:
coords_chunk: Array of shape (n_samples_chunk, n_features) containing coordinates of the chunk of interest.
chunk_start_idx: Index of the first sample in the chunk. Required if `n_nonzeros` is not None.
coords: Array of shape (n_samples, n_features) containing the coordinates of all points.
n_nonzeros: Optional dictionary containing the number of non-zero columns for each row in the distance matrix.
metric: Distance metric to use for pairwise distance computation, can be any of the metrics supported by
:func `sklearn.metrics.pairwise_distances`.
"""
distances_chunk = pairwise_distances(coords_chunk, coords, metric=metric)
# If n_nonzeros is not None, find the number of columns that are nonzero across both rows:
if n_nonzeros is not None:
# Normalization factors:
paired_nonzeros = np.zeros_like(distances_chunk)
for i in range(distances_chunk.shape[0]):
for j in range(distances_chunk.shape[1]):
paired_nonzeros[i, j] = len(n_nonzeros[chunk_start_idx + i] & n_nonzeros[j])
normalized_chunk = distances_chunk / paired_nonzeros
distances_chunk = normalized_chunk
return distances_chunk
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def find_bw_for_n_neighbors(
adata: anndata.AnnData,
coords_key: str = "spatial",
n_anchors: Optional[int] = None,
target_n_neighbors: int = 6,
initial_bw: Optional[float] = None,
chunk_size: int = 1000,
exclude_self: bool = False,
normalize_distances: bool = False,
verbose: bool = True,
max_iterations: int = 100,
alpha: float = 0.5,
) -> float:
"""Finds the bandwidth such that on average, cells in the sample have n neighbors.
Args:
adata: AnnData object containing coordinates for all cells
coords_key: Key in adata.obsm where the spatial coordinates are stored
target_n_neighbors: Target average number of neighbors per cell
initial_bw: Can optionally be used to set the starting distance for the bandwidth search
chunk_size: Number of cells to compute pairwise distance for at once
exclude_self: Whether to exclude self from the list of neighbors
normalize_distances: Whether to normalize the distances by the number of nonzero columns (should be used only
if the entry in .obs[coords_key] contains something other than x-, y-, z-coordinates).
verbose: Whether to print the bandwidth at each iteration. If False, will only print the final bandwidth.
max_iterations: Will stop the process and return the bandwidth that results in the closest number of neighbors
to the specified target if it takes more than this number of iterations.
alpha: Factor used in determining the new bandwidth- ratio of found neighbors to target neighbors will be
raised to this power.
Returns:
bandwidth: Bandwidth in distance units
"""
coords = adata.obsm[coords_key]
# Select n_anchors random indices if applicable:
if n_anchors is not None:
np.random.seed(0) # Seed for reproducibility
anchor_indices = np.random.choice(coords.shape[0], size=n_anchors, replace=False)
anchor_coords = coords[anchor_indices]
chunk_size = min(chunk_size, anchor_coords.shape[0])
else:
anchor_indices = np.arange(coords.shape[0])
anchor_coords = coords
metric = "jaccard" if "jaccard" in coords_key else "euclidean"
# If normalize_distances is True, get the indices of nonzero columns for each row in the distance matrix- only
# used if metric is Euclidean distance:
if normalize_distances and metric == "euclidean":
n_nonzeros = {}
for i in range(coords.shape[0]):
n_nonzeros[i] = set(np.nonzero(coords[i, :])[0])
else:
n_nonzeros = None
# Compute distances in chunks, include start and end indices:
chunks_with_indices = [
(anchor_coords[i : i + chunk_size], anchor_indices[i]) for i in range(0, anchor_coords.shape[0], chunk_size)
]
# Calculate pairwise distances for each chunk in parallel
if metric == "jaccard":
partial_func = partial(calculate_distances_chunk, coords=coords, metric=metric)
else:
partial_func = partial(calculate_distances_chunk, coords=coords, n_nonzeros=n_nonzeros, metric=metric)
distances = Parallel(n_jobs=-1)(delayed(partial_func)(chunk, start_idx) for chunk, start_idx in chunks_with_indices)
# Concatenate the results to get the full pairwise distance matrix
distances = np.concatenate(distances, axis=0)
# Initialize bandwidth and iteration counter:
bandwidth = 88 if initial_bw is None else initial_bw
iteration = 0
if verbose:
print(f"Initial bandwidth: {bandwidth}")
closest_bw = bandwidth
lower_bound = 0.9 * target_n_neighbors
upper_bound = 1.1 * target_n_neighbors
closest_avg_neighbors = float("inf")
while iteration < max_iterations:
iteration += 1
bw_dist = distances / bandwidth
if exclude_self:
neighbor_counts = np.sum(bw_dist <= 1, axis=1) - 1
else:
neighbor_counts = np.sum(bw_dist <= 1, axis=1)
# Check if the average number of neighbors is close to the target
avg_neighbors = np.mean(neighbor_counts)
if verbose:
print(f"For bandwidth {bandwidth}, found {avg_neighbors} neighbors on average.")
# Store the closest bandwidth and closest average neighbors:
if abs(target_n_neighbors - closest_avg_neighbors) > abs(target_n_neighbors - avg_neighbors):
closest_bw = bandwidth
closest_avg_neighbors = avg_neighbors
# Check for exit condition
if lower_bound <= avg_neighbors <= upper_bound:
if verbose:
print(f"Final bandwidth: {bandwidth}")
return bandwidth
# Dynamic adjustment of bandwidth
adjustment_factor = target_n_neighbors / avg_neighbors
bandwidth *= adjustment_factor**alpha
if verbose:
print(f"Iteration {iteration}, new bandwidth: {bandwidth}")
if verbose:
print(
f"Max iterations reached. Returning closest bandwidth: {closest_bw}, with average neighbors: "
f"{closest_avg_neighbors}"
)
return closest_bw
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def find_threshold_distance(
adata: anndata.AnnData,
coords_key: str = "X_pca",
n_neighbors: int = 10,
chunk_size: int = 1000,
normalize_distances: bool = False,
) -> float:
"""Finds threshold distance beyond which there is a dramatic increase in the average distance to remaining
nearest neighbors.
Args:
adata: AnnData object containing coordinates for all cells
coords_key: Key in adata.obsm where the spatial coordinates are stored
n_neighbors: Will first compute the number of nearest neighbors as a comparison for querying additional
distance values.
chunk_size: Number of cells to compute pairwise distance for at once
normalize_distances: Whether to normalize the distances by the number of nonzero columns (should be used only
if the entry in .obs[coords_key] contains something other than x-, y-, z-coordinates).
Returns:
bandwidth: Bandwidth in distance units
"""
coords = adata.obsm[coords_key]
# If normalize_distances is True, get the indices of nonzero columns for each row in the distance matrix:
if normalize_distances:
n_nonzeros = {}
for i in range(coords.shape[0]):
n_nonzeros[i] = set(np.nonzero(coords[i, :])[0])
else:
n_nonzeros = None
# Compute distances in chunks, include start and end indices:
chunks_with_indices = [(coords[i : i + chunk_size], i) for i in range(0, coords.shape[0], chunk_size)]
# Calculate pairwise distances for each chunk in parallel
partial_func = partial(calculate_distances_chunk, coords=coords, n_nonzeros=n_nonzeros)
distances = Parallel(n_jobs=-1)(delayed(partial_func)(chunk, start_idx) for chunk, start_idx in chunks_with_indices)
# Concatenate the results to get the full pairwise distance matrix
distances = np.concatenate(distances, axis=0)
# Find the k nearest neighbors for each sample
k_nearest_distances = np.sort(distances)[:, :n_neighbors]
# Compute the mean and standard deviation of the k nearest distances
mean_k_distances = np.mean(k_nearest_distances, axis=1)
std_k_distances = np.std(k_nearest_distances, axis=1)
# Find the distance where there is a dramatic increase compared to the k nearest neighbors
threshold_distance = np.max(mean_k_distances + 3 * std_k_distances)
return threshold_distance
# ---------------------------------------------------------------------------------------------------
# Kernel functions
# ---------------------------------------------------------------------------------------------------
[docs]class Kernel(object):
"""
Spatial weights for regression models are learned using kernel functions.
Args:
i: Index of the point for which to estimate the density
data: Array of shape (n_samples, n_features) representing the data. If aiming to derive weights from spatial
distance, this should be the array of spatial positions.
bw: Bandwidth parameter for the kernel density estimation
cov: Optional array of shape (n_samples, ). Can be used to adjust the distance calculation to look only at
samples of interest vs. samples not of interest, which is determined from nonzero values in this vector.
This can be used to modify the modeling process based on factors thought to reflect biological differences,
for example, to condition on histological classification, passing a distance threshold, etc. If 'ct' is
also given, will look for samples of interest that are also of the same cell type.
ct: Optional array of shape (n_samples, ), containing vector where cell types are encoded as integers. Can be
used to condition nearest neighbor finding on cell type or other category.
expr_mat: Can be used together with 'cov' (so will only be used if 'cov' is not None)- if the spatial neighbors
are not consistent with the sample in question (determined by assessing similarity by "cov"),
there may be different mechanisms at play. In this case, will instead search for nearest neighbors in
the gene expression space if given.
fixed: If True, `bw` is treated as a fixed bandwidth parameter. Otherwise, it is treated as the number
of nearest neighbors to include in the bandwidth estimation.
exclude_self: If True, ignore each sample itself when computing the kernel density estimation
function: The name of the kernel function to use. Valid options are as follows (note that in equations,
any "1" can be substituted for any other value(s)):
- 'triangular': linearly decaying kernel,
:math K(u) =
\begin{cases}
1-|u| & \text{if} |u| \leq 1 \ 0 & \text{otherwise}
\end{cases},
- 'quadratic': quadratically decaying kernel,
:math K(u) =
\begin{cases}
\dfrac{3}{4}(1-u^2)
\end{cases},
- 'gaussian': decays following normal distribution, :math K(u) = \dfrac{1}{\sqrt{2\pi}} e^{-\frac{1}{2}u^2},
- 'uniform': AKA the tophat kernel, sets weight of all observations within the bandwidth to the same value,
:math K(u) =
\begin{cases}
1 & \text{if} |u| \leq 1 \\ 0 & \text{otherwise}
\end{cases},
- 'exponential': exponentially decaying kernel, :math K(u) = e^{-|u|},
- 'bisquare': assigns a weight of zero to observations outside of the bandwidth, and decays within the
bandwidth following equation
:math K(u) =
\begin{cases}
\dfrac{15}{16}(1-u^2)^2 & \text{if} |u| \leq 1 \\ 0 & \text{otherwise}
\end{cases}.
threshold: Threshold for the kernel density estimation. If the density is below this threshold, the density
will be set to zero.
eps: Error-correcting factor to avoid division by zero
sparse_array: If True, the kernel will be converted to sparse array. Recommended for large datasets.
normalize_weights: If True, the weights will be normalized to sum to 1.
use_expression_neighbors: If True, will only use the expression matrix to find nearest neighbors.
"""
def __init__(
self,
i: int,
data: Union[np.ndarray, scipy.sparse.spmatrix],
bw: Union[int, float],
cov: Optional[np.ndarray] = None,
ct: Optional[np.ndarray] = None,
expr_mat: Optional[np.ndarray] = None,
fixed: bool = True,
exclude_self: bool = False,
function: str = "triangular",
threshold: float = 1e-5,
eps: float = 1.0000001,
sparse_array: bool = False,
normalize_weights: bool = False,
use_expression_neighbors: bool = False,
):
if use_expression_neighbors:
self.dist_vector = local_dist(expr_mat[i], expr_mat).reshape(-1)
self.function = "uniform"
else:
self.dist_vector = local_dist(data[i], data).reshape(-1)
self.function = function.lower()
if fixed:
self.bandwidth = float(bw)
else:
if exclude_self:
self.bandwidth = np.partition(self.dist_vector, int(bw) + 1)[int(bw) + 1] * eps
else:
self.bandwidth = np.partition(self.dist_vector, int(bw))[int(bw)] * eps
max_dist = np.max(self.dist_vector)
if cov is not None and ct is not None:
# If condition is met, compare to samples of the same cell type:
if cov[i] == 1:
self.dist_vector[ct != ct[i]] = max_dist
elif cov is not None and ct is None:
# Ignore samples that do not meet the condition:
self.dist_vector[cov == 0] = max_dist
elif ct is not None:
# Compare to samples of the same cell type:
self.dist_vector[ct != ct[i]] = max_dist
bw_dist = self.dist_vector / self.bandwidth
# Exclude self as a neighbor:
if exclude_self:
bw_dist = np.where(bw_dist == 0.0, np.max(bw_dist), bw_dist)
self.kernel = self._kernel_functions(bw_dist)
# Bisquare and uniform need to be truncated if the sample is outside of the provided bandwidth:
self.kernel[bw_dist > 1] = 0
# Set density to zero if below threshold:
self.kernel[self.kernel < threshold] = 0
n_neighbors = np.count_nonzero(self.kernel)
# Normalize the kernel by the number of non-zero neighbors, if applicable:
if normalize_weights:
self.kernel = self.kernel / n_neighbors
if sparse_array:
self.kernel = scipy.sparse.csr_matrix(self.kernel)
[docs] def _kernel_functions(self, x):
if self.function == "triangular":
return 1 - x
elif self.function == "uniform":
return np.ones(x.shape) * 0.5
elif self.function == "quadratic":
return (3.0 / 4) * (1 - x**2)
# elif self.function == "bisquare":
# return (15.0 / 16) * (1 - x**2) ** 2
elif self.function == "bisquare":
return (1 - (x) ** 2) ** 2
elif self.function == "gaussian":
return np.exp(-0.5 * (x) ** 2)
elif self.function == "exponential":
return np.exp(-x)
else:
raise ValueError(
f'Unsupported kernel function. Valid options: "triangular", "uniform", "quadratic", '
f'"bisquare", "gaussian" or "exponential". Got {self.function}.'
)
[docs]def get_wi(
i: int,
n_samples: int,
coords: np.ndarray,
cov: Optional[np.ndarray] = None,
ct: Optional[np.ndarray] = None,
expr_mat: Optional[np.ndarray] = None,
fixed_bw: bool = True,
exclude_self: bool = False,
kernel: str = "gaussian",
bw: Union[float, int] = 100,
threshold: float = 1e-5,
sparse_array: bool = False,
normalize_weights: bool = False,
use_expression_neighbors: bool = False,
) -> scipy.sparse.csr_matrix:
"""Get spatial weights for an individual sample, given the coordinates of all samples in space.
Args:
i: Index of sample for which weights are to be calculated to all other samples in the dataset
n_samples: Total number of samples in the dataset
coords: Array of shape (n_samples, 2) or (n_samples, 3) representing the spatial coordinates of each sample
cov: Optional array of shape (n_samples, ). Can be used to adjust the distance calculation to look only at
samples of interest vs. samples not of interest, which is determined from nonzero values in this vector.
This can be used to modify the modeling process based on factors thought to reflect biological differences,
for example, to condition on histological classification, passing a distance threshold, etc. If 'ct' is
also given, will look for samples of interest that are also of the same cell type.
ct: Optional array of shape (n_samples, ), containing vector where cell types are encoded as integers. Can be
used to condition nearest neighbor finding on cell type or other category.
expr_mat: Can be used together with 'cov'- if the spatial neighbors are not consistent with the sample in
question (determined by assessing similarity by "cov"), there may be different mechanisms at play. In this
case, will instead search for nearest neighbors in the gene expression space if given.
fixed_bw: If True, `bw` is treated as a spatial distance for computing spatial weights. Otherwise,
it is treated as the number of neighbors.
exclude_self: If True, ignore each sample itself when computing the kernel density estimation
kernel: The name of the kernel function to use. Valid options: "triangular", "uniform", "quadratic",
"bisquare", "gaussian" or "exponential"
bw: Bandwidth for the spatial kernel
threshold: Threshold for the kernel density estimation. If the density is below this threshold, the density
will be set to zero.
sparse_array: If True, the kernel will be converted to sparse array. Recommended for large datasets.
normalize_weights: If True, the weights will be normalized to sum to 1.
use_expression_neighbors: If True, will only use expression neighbors to determine the bandwidth.
Returns:
wi: Array of weights for sample of interest
"""
if bw == np.inf:
wi = np.ones(n_samples)
return wi
wi = Kernel(
i,
coords,
bw,
cov=cov,
ct=ct,
expr_mat=expr_mat,
fixed=fixed_bw,
exclude_self=exclude_self,
function=kernel,
threshold=threshold,
sparse_array=sparse_array,
normalize_weights=normalize_weights,
use_expression_neighbors=use_expression_neighbors,
).kernel
return wi
# ---------------------------------------------------------------------------------------------------
# Construct nearest neighbor graphs
# ---------------------------------------------------------------------------------------------------
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE, "adata")
[docs]def construct_nn_graph(
adata: AnnData,
spatial_key: str = "spatial",
dist_metric: str = "euclidean",
n_neighbors: int = 8,
exclude_self: bool = True,
make_symmetrical: bool = False,
save_id: Union[None, str] = None,
) -> None:
"""Constructing bucket-to-bucket nearest neighbors graph.
Args:
adata: An anndata object.
spatial_key: Key in .obsm in which x- and y-coordinates are stored.
dist_metric: Distance metric to use. Options: ‘braycurtis’, ‘canberra’, ‘chebyshev’, ‘cityblock’, ‘correlation’,
‘cosine’, ‘dice’, ‘euclidean’, ‘hamming’, ‘jaccard’, ‘jensenshannon’, ‘kulczynski1’, ‘mahalanobis’,
‘matching’, ‘minkowski’, ‘rogerstanimoto’, ‘russellrao’, ‘seuclidean’, ‘sokalmichener’, ‘sokalsneath’,
‘sqeuclidean’, ‘yule’.
n_neighbors: Number of nearest neighbors to compute for each bucket.
exclude_self: Set True to set elements along the diagonal to zero.
make_symmetrical: Set True to make sure adjacency matrix is symmetrical (i.e. ensure that if A is a neighbor
of B, B is also included among the neighbors of A)
save_id: Optional string; if not None, will save distance matrix and neighbors matrix to path:
'./neighbors/{save_id}_distance.csv' and path: './neighbors/{save_id}_neighbors.csv', respectively.
"""
position = adata.obsm[spatial_key]
# calculate distance matrix
distance_matrix = calculate_distance(position, dist_metric)
n_bucket = distance_matrix.shape[0]
adata.obsp["distance_matrix"] = distance_matrix
# find k-nearest neighbors
interaction = np.zeros([n_bucket, n_bucket])
for i in range(n_bucket):
vec = distance_matrix[i, :]
distance = vec.argsort()
for t in range(1, n_neighbors + 1):
y = distance[t]
interaction[i, y] = 1
if save_id is not None:
if not os.path.exists(os.path.join(os.getcwd(), "neighbors")):
os.makedirs(os.path.join(os.getcwd(), "neighbors"))
dist_df = pd.DataFrame(distance_matrix, index=list(adata.obs_names), columns=list(adata.obs_names))
dist_df.to_csv(os.path.join(os.getcwd(), f"neighbors/{save_id}_distance.csv"))
neighbors_df = pd.DataFrame(interaction, index=list(adata.obs_names), columns=list(adata.obs_names))
neighbors_df.to_csv(os.path.join(os.getcwd(), f"./neighbors/{save_id}_neighbors.csv"))
# transform adj to symmetrical adj
adj = interaction
if make_symmetrical:
adj = adj + adj.T
adj = np.where(adj > 1, 1, adj)
if exclude_self:
np.fill_diagonal(adj, 0)
adata.obsp["adj"] = scipy.sparse.csr_matrix(adj)
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE, "adata")
[docs]def neighbors(
adata: AnnData,
nbr_object: NearestNeighbors = None,
basis: str = "pca",
spatial_key: str = "spatial",
n_neighbors_method: str = "ball_tree",
n_pca_components: int = 30,
n_neighbors: int = 10,
) -> Tuple[NearestNeighbors, AnnData]:
"""Given an AnnData object, compute pairwise connectivity matrix in transcriptomic or physical space
Args:
adata : an anndata object.
nbr_object: An optional sklearn.neighbors.NearestNeighbors object. Can optionally create a nearest neighbor
object with custom functionality.
basis: str, default 'pca'
The space that will be used for nearest neighbor search. Valid names includes, for example, `pca`, `umap`,
or `X` for gene expression neighbors, 'spatial' for neighbors in the physical space.
spatial_key: Optional, can be used to specify .obsm entry in adata that contains spatial coordinates. Only
used if basis is 'spatial'.
n_neighbors_method: str, default 'ball_tree'
Specifies algorithm to use in computing neighbors using sklearn's implementation. Options:
"ball_tree" and "kd_tree".
n_pca_components: Only used if 'basis' is 'pca'. Sets number of principal components to compute (if PCA has
not already been computed for this dataset).
n_neighbors: Number of neighbors for kneighbors queries.
Returns:
nbrs : Object of class `sklearn.neighbors.NearestNeighbors`
adata : Modified AnnData object
"""
logger = lm.get_main_logger()
if basis == "pca" and "X_pca" not in adata.obsm_keys():
logger.info(
"PCA to be used as basis for :func `transcriptomic_connectivity`, X_pca not found, " "computing PCA...",
indent_level=2,
)
pca = PCA(
n_components=min(n_pca_components, adata.X.shape[1] - 1),
svd_solver="arpack",
random_state=0,
)
fit = pca.fit(adata.X.toarray()) if scipy.sparse.issparse(adata.X) else pca.fit(adata.X)
X_pca = fit.transform(adata.X.toarray()) if scipy.sparse.issparse(adata.X) else fit.transform(adata.X)
adata.obsm["X_pca"] = X_pca
if basis == "X":
X_data = adata.X
elif basis == "spatial":
X_data = adata.obsm[spatial_key]
elif "X_" + basis in adata.obsm_keys():
# Assume basis can be found in .obs under "X_{basis}":
X_data = adata.obsm["X_" + basis]
else:
raise ValueError("Invalid option given to 'basis'. Options: 'pca', 'umap', 'spatial' or 'X'.")
if nbr_object is None:
# set up neighbour object
nbrs = NearestNeighbors(algorithm=n_neighbors_method, n_neighbors=n_neighbors, metric="euclidean").fit(X_data)
else: # use provided sklearn NN object
nbrs = nbr_object
# Update AnnData to add spatial distances, spatial connectivities and spatial neighbors from the sklearn
# NearestNeighbors run:
distances, knn = nbrs.kneighbors(X_data)
distances, connectivities = compute_distances_and_connectivities(knn, distances)
if basis != "spatial":
logger.info_insert_adata("expression_connectivities", adata_attr="obsp")
logger.info_insert_adata("expression_distances", adata_attr="obsp")
logger.info_insert_adata("expression_neighbors", adata_attr="uns")
logger.info_insert_adata("expression_neighbors.indices", adata_attr="uns")
logger.info_insert_adata("expression_neighbors.params", adata_attr="uns")
adata.obsp["expression_distances"] = distances
adata.obsp["expression_connectivities"] = connectivities
adata.uns["expression_neighbors"] = {}
adata.uns["expression_neighbors"]["indices"] = knn
adata.uns["expression_neighbors"]["params"] = {"n_neighbors": n_neighbors, "metric": "euclidean"}
else:
logger.info_insert_adata("spatial_distances", adata_attr="obsp")
logger.info_insert_adata("spatial_connectivities", adata_attr="obsp")
logger.info_insert_adata("spatial_neighbors", adata_attr="uns")
logger.info_insert_adata("spatial_neighbors.indices", adata_attr="uns")
logger.info_insert_adata("spatial_neighbors.params", adata_attr="uns")
adata.obsp["spatial_distances"] = distances
adata.obsp["spatial_connectivities"] = connectivities
adata.uns["spatial_neighbors"] = {}
adata.uns["spatial_neighbors"]["indices"] = knn
adata.uns["spatial_neighbors"]["params"] = {"n_neighbors": n_neighbors, "metric": "euclidean"}
return nbrs, adata
# ---------------------------------------------------------------------------------------------------
# Compute affinity matrix
# ---------------------------------------------------------------------------------------------------
[docs]def calculate_affinity(position: np.ndarray, dist_metric: str = "euclidean", n_neighbors: int = 10) -> np.ndarray:
"""Given array of x- and y-coordinates, compute affinity matrix between all samples using Euclidean distance.
Math from: Zelnik-Manor, L., & Perona, P. (2004). Self-tuning spectral clustering.
Advances in neural information processing systems, 17.
https://proceedings.neurips.cc/paper/2004/file/40173ea48d9567f1f393b20c855bb40b-Paper.pdf
"""
# Calculate euclidian distance matrix
dists = squareform(pdist(position, metric=dist_metric))
# For each row, sort the distances in ascending order and take the index of the n-th position (nearest neighbors)
knn_distances = np.sort(dists, axis=0)[n_neighbors]
knn_distances = knn_distances[np.newaxis].T
# Calculate sigma_i * sigma_j
local_scale = knn_distances.dot(knn_distances.T)
affinity_matrix = dists * dists
affinity_matrix = -affinity_matrix / local_scale
# Divide square distance matrix by local scale
affinity_matrix[np.where(np.isnan(affinity_matrix))] = 0.0
# Apply exponential
affinity_matrix = np.exp(affinity_matrix)
np.fill_diagonal(affinity_matrix, 0)
return affinity_matrix