Source code for spateo.external.STAGATE_pyG.utils

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.sparse as sp
import seaborn as sns
import sklearn.neighbors
import torch


[docs]def Transfer_pytorch_Data(adata): from torch_geometric.data import Data G_df = adata.uns["Spatial_Net"].copy() cells = np.array(adata.obs_names) cells_id_tran = dict(zip(cells, range(cells.shape[0]))) G_df["Cell1"] = G_df["Cell1"].map(cells_id_tran) G_df["Cell2"] = G_df["Cell2"].map(cells_id_tran) G = sp.coo_matrix((np.ones(G_df.shape[0]), (G_df["Cell1"], G_df["Cell2"])), shape=(adata.n_obs, adata.n_obs)) G = G + sp.eye(G.shape[0]) edgeList = np.nonzero(G) if type(adata.X) == np.ndarray: data = Data( edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])), x=torch.FloatTensor(adata.X) ) # .todense() else: data = Data( edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])), x=torch.FloatTensor(adata.X.todense()) ) # .todense() return data
[docs]def Batch_Data(adata, num_batch_x, num_batch_y, spatial_key=["X", "Y"], plot_Stats=False): Sp_df = adata.obs.loc[:, spatial_key].copy() Sp_df = np.array(Sp_df) batch_x_coor = [np.percentile(Sp_df[:, 0], (1 / num_batch_x) * x * 100) for x in range(num_batch_x + 1)] batch_y_coor = [np.percentile(Sp_df[:, 1], (1 / num_batch_y) * x * 100) for x in range(num_batch_y + 1)] Batch_list = [] for it_x in range(num_batch_x): for it_y in range(num_batch_y): min_x = batch_x_coor[it_x] max_x = batch_x_coor[it_x + 1] min_y = batch_y_coor[it_y] max_y = batch_y_coor[it_y + 1] temp_adata = adata.copy() temp_adata = temp_adata[temp_adata.obs[spatial_key[0]].map(lambda x: min_x <= x <= max_x)] temp_adata = temp_adata[temp_adata.obs[spatial_key[1]].map(lambda y: min_y <= y <= max_y)] Batch_list.append(temp_adata) if plot_Stats: f, ax = plt.subplots(figsize=(1, 3)) plot_df = pd.DataFrame([x.shape[0] for x in Batch_list], columns=["#spot/batch"]) sns.boxplot(y="#spot/batch", data=plot_df, ax=ax) sns.stripplot(y="#spot/batch", data=plot_df, ax=ax, color="red", size=5) return Batch_list
[docs]def Cal_Spatial_Net(adata, rad_cutoff=None, k_cutoff=None, model="Radius", verbose=True): """\ Construct the spatial neighbor networks. Parameters ---------- adata AnnData object of scanpy package. rad_cutoff radius cutoff when model='Radius' k_cutoff The number of nearest neighbors when model='KNN' model The network construction model. When model=='Radius', the spot is connected to spots whose distance is less than rad_cutoff. When model=='KNN', the spot is connected to its first k_cutoff nearest neighbors. Returns ------- The spatial networks are saved in adata.uns['Spatial_Net'] """ assert model in ["Radius", "KNN"] if verbose: print("------Calculating spatial graph...") coor = pd.DataFrame(adata.obsm["spatial"]) coor.index = adata.obs.index coor.columns = ["imagerow", "imagecol"] if model == "Radius": nbrs = sklearn.neighbors.NearestNeighbors(radius=rad_cutoff).fit(coor) distances, indices = nbrs.radius_neighbors(coor, return_distance=True) KNN_list = [] for it in range(indices.shape[0]): KNN_list.append(pd.DataFrame(zip([it] * indices[it].shape[0], indices[it], distances[it]))) if model == "KNN": nbrs = sklearn.neighbors.NearestNeighbors(n_neighbors=k_cutoff + 1).fit(coor) distances, indices = nbrs.kneighbors(coor) KNN_list = [] for it in range(indices.shape[0]): KNN_list.append(pd.DataFrame(zip([it] * indices.shape[1], indices[it, :], distances[it, :]))) KNN_df = pd.concat(KNN_list) KNN_df.columns = ["Cell1", "Cell2", "Distance"] Spatial_Net = KNN_df.copy() Spatial_Net = Spatial_Net.loc[ Spatial_Net["Distance"] > 0, ] id_cell_trans = dict( zip( range(coor.shape[0]), np.array(coor.index), ) ) Spatial_Net["Cell1"] = Spatial_Net["Cell1"].map(id_cell_trans) Spatial_Net["Cell2"] = Spatial_Net["Cell2"].map(id_cell_trans) if verbose: print("The graph contains %d edges, %d cells." % (Spatial_Net.shape[0], adata.n_obs)) print("%.4f neighbors per cell on average." % (Spatial_Net.shape[0] / adata.n_obs)) adata.uns["Spatial_Net"] = Spatial_Net
[docs]def Cal_Spatial_Net_3D( adata, rad_cutoff_2D, rad_cutoff_Zaxis, key_section="Section_id", section_order=None, verbose=True ): """\ Construct the spatial neighbor networks. Parameters ---------- adata AnnData object of scanpy package. rad_cutoff_2D radius cutoff for 2D SNN construction. rad_cutoff_Zaxis radius cutoff for 2D SNN construction for consturcting SNNs between adjacent sections. key_section The columns names of section_ID in adata.obs. section_order The order of sections. The SNNs between adjacent sections are constructed according to this order. Returns ------- The 3D spatial networks are saved in adata.uns['Spatial_Net']. """ adata.uns["Spatial_Net_2D"] = pd.DataFrame() adata.uns["Spatial_Net_Zaxis"] = pd.DataFrame() num_section = np.unique(adata.obs[key_section]).shape[0] if verbose: print("Radius used for 2D SNN:", rad_cutoff_2D) print("Radius used for SNN between sections:", rad_cutoff_Zaxis) for temp_section in np.unique(adata.obs[key_section]): if verbose: print("------Calculating 2D SNN of section ", temp_section) temp_adata = adata[ adata.obs[key_section] == temp_section, ] Cal_Spatial_Net(temp_adata, rad_cutoff=rad_cutoff_2D, verbose=False) temp_adata.uns["Spatial_Net"]["SNN"] = temp_section if verbose: print( "This graph contains %d edges, %d cells." % (temp_adata.uns["Spatial_Net"].shape[0], temp_adata.n_obs) ) print("%.4f neighbors per cell on average." % (temp_adata.uns["Spatial_Net"].shape[0] / temp_adata.n_obs)) adata.uns["Spatial_Net_2D"] = pd.concat([adata.uns["Spatial_Net_2D"], temp_adata.uns["Spatial_Net"]]) for it in range(num_section - 1): section_1 = section_order[it] section_2 = section_order[it + 1] if verbose: print("------Calculating SNN between adjacent section %s and %s." % (section_1, section_2)) Z_Net_ID = section_1 + "-" + section_2 temp_adata = adata[ adata.obs[key_section].isin([section_1, section_2]), ] Cal_Spatial_Net(temp_adata, rad_cutoff=rad_cutoff_Zaxis, verbose=False) spot_section_trans = dict(zip(temp_adata.obs.index, temp_adata.obs[key_section])) temp_adata.uns["Spatial_Net"]["Section_id_1"] = temp_adata.uns["Spatial_Net"]["Cell1"].map(spot_section_trans) temp_adata.uns["Spatial_Net"]["Section_id_2"] = temp_adata.uns["Spatial_Net"]["Cell2"].map(spot_section_trans) used_edge = temp_adata.uns["Spatial_Net"].apply(lambda x: x["Section_id_1"] != x["Section_id_2"], axis=1) temp_adata.uns["Spatial_Net"] = temp_adata.uns["Spatial_Net"].loc[ used_edge, ] temp_adata.uns["Spatial_Net"] = temp_adata.uns["Spatial_Net"].loc[:, ["Cell1", "Cell2", "Distance"]] temp_adata.uns["Spatial_Net"]["SNN"] = Z_Net_ID if verbose: print( "This graph contains %d edges, %d cells." % (temp_adata.uns["Spatial_Net"].shape[0], temp_adata.n_obs) ) print("%.4f neighbors per cell on average." % (temp_adata.uns["Spatial_Net"].shape[0] / temp_adata.n_obs)) adata.uns["Spatial_Net_Zaxis"] = pd.concat([adata.uns["Spatial_Net_Zaxis"], temp_adata.uns["Spatial_Net"]]) adata.uns["Spatial_Net"] = pd.concat([adata.uns["Spatial_Net_2D"], adata.uns["Spatial_Net_Zaxis"]]) if verbose: print("3D SNN contains %d edges, %d cells." % (adata.uns["Spatial_Net"].shape[0], adata.n_obs)) print("%.4f neighbors per cell on average." % (adata.uns["Spatial_Net"].shape[0] / adata.n_obs))
[docs]def Stats_Spatial_Net(adata): import matplotlib.pyplot as plt Num_edge = adata.uns["Spatial_Net"]["Cell1"].shape[0] Mean_edge = Num_edge / adata.shape[0] plot_df = pd.value_counts(pd.value_counts(adata.uns["Spatial_Net"]["Cell1"])) plot_df = plot_df / adata.shape[0] fig, ax = plt.subplots(figsize=[3, 2]) plt.ylabel("Percentage") plt.xlabel("") plt.title("Number of Neighbors (Mean=%.2f)" % Mean_edge) ax.bar(plot_df.index, plot_df)
[docs]def mclust_R(adata, num_cluster, modelNames="EEE", used_obsm="STAGATE", random_seed=2020): """\ Clustering using the mclust algorithm. The parameters are the same as those in the R package mclust. """ np.random.seed(random_seed) import rpy2.robjects as robjects robjects.r.library("mclust") import rpy2.robjects.numpy2ri rpy2.robjects.numpy2ri.activate() r_random_seed = robjects.r["set.seed"] r_random_seed(random_seed) rmclust = robjects.r["Mclust"] res = rmclust(rpy2.robjects.numpy2ri.numpy2rpy(adata.obsm[used_obsm]), num_cluster, modelNames) mclust_res = np.array(res[-2]) adata.obs["mclust"] = mclust_res adata.obs["mclust"] = adata.obs["mclust"].astype("int") adata.obs["mclust"] = adata.obs["mclust"].astype("category") return adata