Source code for spateo.tools.cell_communication

from typing import List, Optional, Tuple

import numpy as np
import pandas as pd
from anndata import AnnData
from scipy import sparse
from scipy.sparse import issparse
from scipy.stats import gmean, pearsonr

try:
    from typing import Literal
except ImportError:
    from typing_extensions import Literal

from ..configuration import SKM


# Niches
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def niches( adata: AnnData, path: str, layer: Tuple[None, str] = None, weighted: bool = False, spatial_neighbors: str = "spatial_neighbors", spatial_distances: str = "spatial_distances", species: Literal["human", "mouse", "drosophila", "zebrafish", "axolotl"] = "human", system: Literal["niches_c2c", "niches_n2c", "niches_c2n", "niches_n2n"] = "niches_n2n", method: Literal["gmean", "mean", "sum"] = "sum", ) -> AnnData: """Performing cell-cell transformation on an anndata object, while also limiting the nearest neighbor per cell to k. This function returns another anndata object, in which the columns of the matrix are bucket -bucket pairs, while the rows ligand-receptor mechanisms. This resultant anndated object allows flexible downstream manipulations such as the dimensional reduction of the row or column of this object. Our method is adapted from: Micha Sam Brickman Raredon, Junchen Yang, Neeharika Kothapalli, Naftali Kaminski, Laura E. Niklason, Yuval Kluger. Comprehensive visualization of cell-cell interactions in single-cell and spatial transcriptomics with NICHES. doi: https://doi.org/10.1101/2022.01.23.477401 Args: adata: An Annodata object. path: Path to ligand_receptor network of NicheNet (prior lr_network). layer: the key to the layer. If it is None, adata.X will be used by default. weighted: 'False' (defult) whether to supply the edge weights according to the actual spatial distance(just as weighted kNN). Defult is 'False', means all neighbor edge weights equal to 1, others is 0. spatial_neighbors : neighbor_key {spatial_neighbors} in adata.uns.keys(), spatial_distances : neighbor_key {spatial_distances} in adata.obsp.keys(). system: 'niches_n2n'(defult) cell-cell signaling ('niches_c2c'), defined as the signals passed between cells, determined by the product of the ligand expression of the sending cell and the receptor expression of the receiving cell, and system-cell signaling ('niches_n2c'), defined as the signaling input to a cell, determined by taking the geometric mean of the ligand profiles of the surrounding cells and the receptor profile of the receiving cell.similarly, 'niches_c2n','niches_n2n'. Returns: An anndata of Niches, which rows are mechanisms and columns are all possible cell x cell interactions. """ # prior lr_network if species == "human": lr_network = pd.read_csv(path + "lr_db_human.csv", index_col=0) if system == "niches_n2c": lr_network[["from", "to"]] = lr_network[["to", "from"]] elif species == "mouse": lr_network = pd.read_csv(path + "lr_db_mouse.csv", index_col=0) if system == "niches_n2c": lr_network[["from", "to"]] = lr_network[["to", "from"]] elif species == "drosophila": lr_network = pd.read_csv(path + "lr_network_drosophila.csv", index_col=0) if system == "niches_n2c": lr_network[["from", "to"]] = lr_network[["to", "from"]] elif species == "zebrafish": lr_network = pd.read_csv(path + "lr_network_zebrafish.csv", index_col=0) if system == "niches_n2c": lr_network[["from", "to"]] = lr_network[["to", "from"]] elif species == "axolotl": lr_network = pd.read_csv(path + "lr_network_axolotl.csv", index_col=0) if system == "niches_n2c": lr_network[["from", "to"]] = lr_network[["to", "from"]] if layer is None: adata.X = adata.X else: adata.X = adata.layers[layer] x_sparse = issparse(adata.X) # expressed lr_network ligand = lr_network["from"].unique() expressed_ligand = list(set(ligand) & set(adata.var_names)) if len(expressed_ligand) == 0: raise ValueError(f"No intersected ligand between your adata object" f" and lr_network dataset.") lr_network = lr_network[lr_network["from"].isin(expressed_ligand)] receptor = lr_network["to"].unique() expressed_receptor = list(set(receptor) & set(adata.var_names)) if len(expressed_receptor) == 0: raise ValueError(f"No intersected receptor between your adata object" f" and lr_network dataset.") lr_network = lr_network[lr_network["to"].isin(expressed_receptor)] ligand_matrix = adata[:, lr_network["from"]].X.A.T if x_sparse else adata[:, lr_network["from"]].X.T # spatial neighbors if spatial_neighbors not in adata.uns.keys(): raise ValueError( f"No spatial_key {spatial_neighbors} exists in adata," f"using 'dyn.tl.neighbors' to calulate the spatial neighbors first." ) if spatial_distances not in adata.obsp.keys(): raise ValueError( f"No spatial_key {spatial_distances} exists in adata," f"using 'dyn.tl.neighbors' to calulate the spatial diatances first." ) nw = {"neighbors": adata.uns["spatial_neighbors"]["indices"], "weights": adata.obsp["spatial_distances"]} k = adata.uns["spatial_neighbors"]["params"]["n_neighbors"] # construct c2c matrix if system == "niches_c2c": X = np.zeros(shape=(ligand_matrix.shape[0], k * adata.n_obs)) if weighted: # weighted matrix (weighted distance) row, col = np.diag_indices_from(nw["weights"]) nw["weights"][row, col] = 1 weight = np.zeros(shape=(adata.n_obs, k)) for i, row in enumerate(nw["weights"].A): weight[i, :] = 1 / row[nw["neighbors"][i]] for i in range(ligand_matrix.shape[1]): receptor_matrix = adata[nw["neighbors"][i], lr_network["to"]].X.A.T * weight[i, :] X[:, i * k : (i + 1) * k] = receptor_matrix * ligand_matrix[:, i].reshape(-1, 1) else: for i in range(ligand_matrix.shape[1]): receptor_matrix = adata[nw["neighbors"][i], lr_network["to"]].X.A.T X[:, i * k : (i + 1) * k] = receptor_matrix * ligand_matrix[:, i].reshape(-1, 1) # bucket-bucket pair cell_pair = [] for i, cell_id in enumerate(nw["neighbors"]): cell_pair.append(adata.obs.index[i] + "-" + adata.obs.index[cell_id]) cell_pair = [i for j in cell_pair for i in j] cell_pair = pd.DataFrame({"cell_pair_name": cell_pair}) cell_pair.set_index("cell_pair_name", inplace=True) # construct n2c matrix or construct c2n matrix if system == "niches_n2c" or system == "niches_c2n": X = np.zeros(shape=(ligand_matrix.shape[0], adata.n_obs)) if weighted: # weighted matrix (weighted distance) row, col = np.diag_indices_from(nw["weights"]) nw["weights"][row, col] = 1 weight = np.zeros(shape=(adata.n_obs, k)) for i, row in enumerate(nw["weights"].A): weight[i, :] = 1 / row[nw["neighbors"][i]] for i in range(ligand_matrix.shape[1]): if method == "gmean": receptor_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["to"]].X.A.T + 1) * weight[i, :], axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["to"]].X.T + 1) * weight[i, :], axis=1) ) elif method == "mean": receptor_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.T * weight[i, :], axis=1) ) else: receptor_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.T * weight[i, :], axis=1) ) X[:, i] = receptor_matrix * ligand_matrix[:, i] else: for i in range(ligand_matrix.shape[1]): if method == "gmean": receptor_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["to"]].X.A.T + 1), axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["to"]].X.T + 1), axis=1) ) elif method == "mean": receptor_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.A.T, axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.T, axis=1) ) else: receptor_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.A.T, axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.T, axis=1) ) X[:, i] = receptor_matrix * ligand_matrix[:, i] # bucket-bucket pair cell_pair = [] for i, cell_id in enumerate(nw["neighbors"]): cell_pair.append(adata.obs.index[i] + "-" + adata.obs.index[cell_id]) cell_pair = pd.DataFrame({"cell_pair_name": cell_pair}) # construct n2n matrix if system == "niches_n2n": X = np.zeros(shape=(ligand_matrix.shape[0], adata.n_obs)) if weighted: # weighted matrix (weighted distance) row, col = np.diag_indices_from(nw["weights"]) nw["weights"][row, col] = 1 weight = np.zeros(shape=(adata.n_obs, k)) for i, row in enumerate(nw["weights"].A): weight[i, :] = 1 / row[nw["neighbors"][i]] for i in range(ligand_matrix.shape[1]): if method == "gmean": receptor_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["to"]].X.A.T + 1) * weight[i, :], axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["to"]].X.T + 1) * weight[i, :], axis=1) ) ligand_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["from"]].X.A.T + 1) * weight[i, :], axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["from"]].X.T + 1) * weight[i, :], axis=1) ) elif method == "mean": receptor_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.T * weight[i, :], axis=1) ) ligand_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["from"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["from"]].X.T * weight[i, :], axis=1) ) else: receptor_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.T * weight[i, :], axis=1) ) ligand_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["from"]].X.A.T * weight[i, :], axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["from"]].X.T * weight[i, :], axis=1) ) X[:, i] = np.array(receptor_matrix).reshape(receptor_matrix.shape[0]) * np.array(ligand_matrix).reshape( ligand_matrix.shape[0] ) else: for i in range(ligand_matrix.shape[1]): if method == "gmean": receptor_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["to"]].X.A.T + 1), axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["to"]].X.T + 1), axis=1) ) ligand_matrix = ( gmean((adata[nw["neighbors"][i], lr_network["from"]].X.A.T + 1), axis=1) if x_sparse else gmean((adata[nw["neighbors"][i], lr_network["from"]].X.T + 1), axis=1) ) elif method == "mean": receptor_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.A.T, axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["to"]].X.T, axis=1) ) ligand_matrix = ( np.mean(adata[nw["neighbors"][i], lr_network["from"]].X.A.T, axis=1) if x_sparse else np.mean(adata[nw["neighbors"][i], lr_network["from"]].X.T, axis=1) ) else: receptor_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.A.T, axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["to"]].X.T, axis=1) ) ligand_matrix = ( np.sum(adata[nw["neighbors"][i], lr_network["from"]].X.A.T, axis=1) if x_sparse else np.sum(adata[nw["neighbors"][i], lr_network["from"]].X.T, axis=1) ) X[:, i] = np.array(receptor_matrix).reshape(receptor_matrix.shape[0]) * np.array(ligand_matrix).reshape( ligand_matrix.shape[0] ) # bucket-bucket pair cell_pair = [] for i, cell_id in enumerate(nw["neighbors"]): cell_pair.append(adata.obs.index[i] + "-" + adata.obs.index[cell_id]) cell_pair = pd.DataFrame({"cell_pair_name": cell_pair}) # lr_pair lr_pair = lr_network["from"] + "-" + lr_network["to"] lr_pair = pd.DataFrame({"lr_pair_name": lr_pair}) lr_pair.set_index("lr_pair_name", inplace=True) # csr_matrix X = sparse.csr_matrix(X.T) # adata_nichec2c adata_niche = AnnData(X=X, obs=cell_pair, var=lr_pair) return adata_niche
# NicheNet @SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def predict_ligand_activities( adata: AnnData, path: str, sender_cells: Optional[List[str]] = None, receiver_cells: Optional[List[str]] = None, geneset: Optional[List[str]] = None, ratio_expr_thresh: float = 0.01, species: Literal["human", "mouse"] = "human", ) -> pd.DataFrame: """Function to predict the ligand activity. Our method is adapted from: Robin Browaeys, Wouter Saelens & Yvan Saeys. NicheNet: modeling intercellular communication by linking ligands to target genes. Nature Methods volume 17, pages159–162 (2020). Args: path: Path to ligand_target_matrix, lr_network (human and mouse). adata: An Annodata object. sender_cells: Ligand cells. receiver_cells: Receptor cells. geneset: The genes set of interest. This may be the differentially expressed genes in receiver cells (comparing cells in case and control group). Ligands activity prediction is based on this gene set. By default, all genes expressed in receiver cells is used. ratio_expr_thresh: The minimum percentage of buckets expressing the ligand (target) in sender(receiver) cells. Returns: A pandas DataFrame of the predicted activity ligands. """ # load ligand_target_matrix if species == "human": ligand_target_matrix = pd.read_csv(path + "ligand_target_matrix_human_nichenet.csv", index_col=0) lr_network = pd.read_csv(path + "lr_network_human.csv", index_col=0) else: ligand_target_matrix = pd.read_csv(path + "ligand_target_matrix_mouse_nichenet.csv", index_col=0) lr_network = pd.read_csv(path + "lr_network_mouse.csv", index_col=0) ligand_target_matrix = ligand_target_matrix.T # row:ligand,col:target gene lr_network = lr_network.loc[lr_network["from"].isin(ligand_target_matrix.index)] # Define expressed genes in sender and receiver cell populations(pct>0.1) expressed_genes_sender = np.array(adata.var_names)[ np.count_nonzero(adata[sender_cells, :].X.A, axis=0) / len(sender_cells) > 0.01 ].tolist() expressed_genes_receiver = np.array(adata.var_names)[ np.count_nonzero(adata[receiver_cells, :].X.A, axis=0) / len(receiver_cells) > 0.01 ].tolist() # Define a set of potential ligands ligands = lr_network["from"].unique() expressed_ligand = list(set(ligands) & set(expressed_genes_sender)) if len(expressed_ligand) == 0: raise ValueError(f"No intersected ligand between your adata object and lr_network dataset.") receptor = lr_network["to"].unique() expressed_receptor = list(set(receptor) & set(expressed_genes_receiver)) if len(expressed_receptor) == 0: raise ValueError(f"No intersected receptor between your adata object and lr_network dataset.") lr_network_expressed = lr_network[ lr_network["from"].isin(expressed_ligand) & lr_network["to"].isin(expressed_receptor) ] potential_ligands = lr_network_expressed["from"].unique() if geneset is None: # Calculate the pearson coeffcient between potential score and actual the average expression of genes response_expressed_genes = list(set(expressed_genes_receiver) & set(ligand_target_matrix.index)) response_expressed_genes_df = pd.DataFrame(response_expressed_genes) response_expressed_genes_df = response_expressed_genes_df.rename(columns={0: "gene"}) response_expressed_genes_df["avg_expr"] = np.mean(adata[receiver_cells, response_expressed_genes].X.A, axis=0) lt_matrix = ligand_target_matrix[potential_ligands.tolist()].loc[response_expressed_genes_df["gene"].tolist()] de = [] for ligand in lt_matrix: pear_coef = pearsonr(lt_matrix[ligand], response_expressed_genes_df["avg_expr"])[0] pear_pvalue = pearsonr(lt_matrix[ligand], response_expressed_genes_df["avg_expr"])[1] de.append( ( ligand, pear_coef, pear_pvalue, ) ) else: # Define the gene set of interest and a background of genes gene_io = list(set(geneset) & set(ligand_target_matrix.index)) background_expressed_genes = list(set(expressed_genes_receiver) & set(ligand_target_matrix.index)) # Perform NicheNet’s ligand activity analysis on the gene set of interest gene_io = pd.DataFrame(gene_io) gene_io["logical"] = 1 gene_io = gene_io.rename(columns={0: "gene"}) background_expressed_genes = pd.DataFrame(background_expressed_genes) background = background_expressed_genes[~(background_expressed_genes[0].isin(gene_io))] background = background.rename(columns={0: "gene"}) background["logical"] = 0 # response gene vector response = pd.concat([gene_io, background], axis=0, join="outer") # lt_matrix potential score lt_matrix = ligand_target_matrix[potential_ligands.tolist()].loc[response["gene"].tolist()] # predict ligand activity by pearson coefficient. de = [] for ligand in lt_matrix: pear_coef = pearsonr(lt_matrix[ligand], response["logical"])[0] pear_pvalue = pearsonr(lt_matrix[ligand], response["logical"])[1] de.append( ( ligand, pear_coef, pear_pvalue, ) ) res = pd.DataFrame( de, columns=[ "ligand", "pearson_coef", "pearson_pvalue", ], ) return res
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE, optional=True)
[docs]def predict_target_genes( adata: AnnData, path: str, sender_cells: Optional[List[str]] = None, receiver_cells: Optional[List[str]] = None, geneset: Optional[List[str]] = None, species: Literal["human", "mouse"] = "human", top_ligand: int = 20, top_target: int = 300, ) -> pd.DataFrame: """Function to predict the target genes. Args: lt_matrix_path: Path to ligand_target_matrix of NicheNet. adata: An Annodata object. sender_cells: Ligand cells. receiver_cells: Receptor cells. geneset: The genes set of interest. This may be the differentially expressed genes in receiver cells (comparing cells in case and control group). Ligands activity prediction is based on this gene set. By default, all genes expressed in receiver cells is used. top_ligand: `int` (default=20) select 20 top-ranked ligands for further biological interpretation. top_target: `int` (default=300) Infer target genes of top-ranked ligands, and choose the top targets according to the general prior model. Returns: A pandas DataFrame of the predict target genes. """ if species == "human": ligand_target_matrix = pd.read_csv(path + "ligand_target_matrix.csv", index_col=0) else: ligand_target_matrix = pd.read_csv(path + "ligand_target_matrix_mouse.csv", index_col=0) predict_ligand = predict_ligand_activities( adata=adata, path=path, sender_cells=sender_cells, receiver_cells=receiver_cells, geneset=geneset, species=species, ) predict_ligand.sort_values(by="pearson_coef", axis=0, ascending=False, inplace=True) predict_ligand_top = predict_ligand[:top_ligand]["ligand"] res = pd.DataFrame(columns=("ligand", "targets", "weights")) for ligand in ligand_target_matrix[predict_ligand_top]: top_n_score = ligand_target_matrix[ligand].sort_values(ascending=False)[:top_target] if geneset is None: expressed_genes_receiver = np.array(adata.var_names)[ np.count_nonzero(adata[receiver_cells, :].X.A, axis=0) / len(receiver_cells) > 0.01 ].tolist() response_expressed_genes = list(set(expressed_genes_receiver) & set(ligand_target_matrix.index)) targets = list(set(top_n_score.index) & set(response_expressed_genes)) else: gene_io = list(set(geneset) & set(ligand_target_matrix.index)) targets = list(set(top_n_score.index) & set(gene_io)) res = pd.concat( [ res, pd.DataFrame( {"ligand": ligand, "targets": targets, "weights": ligand_target_matrix.loc[targets, ligand]} ).reset_index(drop=True), ], axis=0, join="outer", ) return res