import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import scipy.sparse as sp
import seaborn as sns
import sklearn.neighbors
import torch
[docs]def Transfer_pytorch_Data(adata):
from torch_geometric.data import Data
G_df = adata.uns["Spatial_Net"].copy()
cells = np.array(adata.obs_names)
cells_id_tran = dict(zip(cells, range(cells.shape[0])))
G_df["Cell1"] = G_df["Cell1"].map(cells_id_tran)
G_df["Cell2"] = G_df["Cell2"].map(cells_id_tran)
G = sp.coo_matrix((np.ones(G_df.shape[0]), (G_df["Cell1"], G_df["Cell2"])), shape=(adata.n_obs, adata.n_obs))
G = G + sp.eye(G.shape[0])
edgeList = np.nonzero(G)
if type(adata.X) == np.ndarray:
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])), x=torch.FloatTensor(adata.X)
) # .todense()
else:
data = Data(
edge_index=torch.LongTensor(np.array([edgeList[0], edgeList[1]])), x=torch.FloatTensor(adata.X.todense())
) # .todense()
return data
[docs]def Batch_Data(adata, num_batch_x, num_batch_y, spatial_key=["X", "Y"], plot_Stats=False):
Sp_df = adata.obs.loc[:, spatial_key].copy()
Sp_df = np.array(Sp_df)
batch_x_coor = [np.percentile(Sp_df[:, 0], (1 / num_batch_x) * x * 100) for x in range(num_batch_x + 1)]
batch_y_coor = [np.percentile(Sp_df[:, 1], (1 / num_batch_y) * x * 100) for x in range(num_batch_y + 1)]
Batch_list = []
for it_x in range(num_batch_x):
for it_y in range(num_batch_y):
min_x = batch_x_coor[it_x]
max_x = batch_x_coor[it_x + 1]
min_y = batch_y_coor[it_y]
max_y = batch_y_coor[it_y + 1]
temp_adata = adata.copy()
temp_adata = temp_adata[temp_adata.obs[spatial_key[0]].map(lambda x: min_x <= x <= max_x)]
temp_adata = temp_adata[temp_adata.obs[spatial_key[1]].map(lambda y: min_y <= y <= max_y)]
Batch_list.append(temp_adata)
if plot_Stats:
f, ax = plt.subplots(figsize=(1, 3))
plot_df = pd.DataFrame([x.shape[0] for x in Batch_list], columns=["#spot/batch"])
sns.boxplot(y="#spot/batch", data=plot_df, ax=ax)
sns.stripplot(y="#spot/batch", data=plot_df, ax=ax, color="red", size=5)
return Batch_list
[docs]def Cal_Spatial_Net(adata, rad_cutoff=None, k_cutoff=None, model="Radius", verbose=True):
"""\
Construct the spatial neighbor networks.
Parameters
----------
adata
AnnData object of scanpy package.
rad_cutoff
radius cutoff when model='Radius'
k_cutoff
The number of nearest neighbors when model='KNN'
model
The network construction model. When model=='Radius', the spot is connected to spots whose distance is less than rad_cutoff. When model=='KNN', the spot is connected to its first k_cutoff nearest neighbors.
Returns
-------
The spatial networks are saved in adata.uns['Spatial_Net']
"""
assert model in ["Radius", "KNN"]
if verbose:
print("------Calculating spatial graph...")
coor = pd.DataFrame(adata.obsm["spatial"])
coor.index = adata.obs.index
coor.columns = ["imagerow", "imagecol"]
if model == "Radius":
nbrs = sklearn.neighbors.NearestNeighbors(radius=rad_cutoff).fit(coor)
distances, indices = nbrs.radius_neighbors(coor, return_distance=True)
KNN_list = []
for it in range(indices.shape[0]):
KNN_list.append(pd.DataFrame(zip([it] * indices[it].shape[0], indices[it], distances[it])))
if model == "KNN":
nbrs = sklearn.neighbors.NearestNeighbors(n_neighbors=k_cutoff + 1).fit(coor)
distances, indices = nbrs.kneighbors(coor)
KNN_list = []
for it in range(indices.shape[0]):
KNN_list.append(pd.DataFrame(zip([it] * indices.shape[1], indices[it, :], distances[it, :])))
KNN_df = pd.concat(KNN_list)
KNN_df.columns = ["Cell1", "Cell2", "Distance"]
Spatial_Net = KNN_df.copy()
Spatial_Net = Spatial_Net.loc[
Spatial_Net["Distance"] > 0,
]
id_cell_trans = dict(
zip(
range(coor.shape[0]),
np.array(coor.index),
)
)
Spatial_Net["Cell1"] = Spatial_Net["Cell1"].map(id_cell_trans)
Spatial_Net["Cell2"] = Spatial_Net["Cell2"].map(id_cell_trans)
if verbose:
print("The graph contains %d edges, %d cells." % (Spatial_Net.shape[0], adata.n_obs))
print("%.4f neighbors per cell on average." % (Spatial_Net.shape[0] / adata.n_obs))
adata.uns["Spatial_Net"] = Spatial_Net
[docs]def Cal_Spatial_Net_3D(
adata, rad_cutoff_2D, rad_cutoff_Zaxis, key_section="Section_id", section_order=None, verbose=True
):
"""\
Construct the spatial neighbor networks.
Parameters
----------
adata
AnnData object of scanpy package.
rad_cutoff_2D
radius cutoff for 2D SNN construction.
rad_cutoff_Zaxis
radius cutoff for 2D SNN construction for consturcting SNNs between adjacent sections.
key_section
The columns names of section_ID in adata.obs.
section_order
The order of sections. The SNNs between adjacent sections are constructed according to this order.
Returns
-------
The 3D spatial networks are saved in adata.uns['Spatial_Net'].
"""
adata.uns["Spatial_Net_2D"] = pd.DataFrame()
adata.uns["Spatial_Net_Zaxis"] = pd.DataFrame()
num_section = np.unique(adata.obs[key_section]).shape[0]
if verbose:
print("Radius used for 2D SNN:", rad_cutoff_2D)
print("Radius used for SNN between sections:", rad_cutoff_Zaxis)
for temp_section in np.unique(adata.obs[key_section]):
if verbose:
print("------Calculating 2D SNN of section ", temp_section)
temp_adata = adata[
adata.obs[key_section] == temp_section,
]
Cal_Spatial_Net(temp_adata, rad_cutoff=rad_cutoff_2D, verbose=False)
temp_adata.uns["Spatial_Net"]["SNN"] = temp_section
if verbose:
print(
"This graph contains %d edges, %d cells." % (temp_adata.uns["Spatial_Net"].shape[0], temp_adata.n_obs)
)
print("%.4f neighbors per cell on average." % (temp_adata.uns["Spatial_Net"].shape[0] / temp_adata.n_obs))
adata.uns["Spatial_Net_2D"] = pd.concat([adata.uns["Spatial_Net_2D"], temp_adata.uns["Spatial_Net"]])
for it in range(num_section - 1):
section_1 = section_order[it]
section_2 = section_order[it + 1]
if verbose:
print("------Calculating SNN between adjacent section %s and %s." % (section_1, section_2))
Z_Net_ID = section_1 + "-" + section_2
temp_adata = adata[
adata.obs[key_section].isin([section_1, section_2]),
]
Cal_Spatial_Net(temp_adata, rad_cutoff=rad_cutoff_Zaxis, verbose=False)
spot_section_trans = dict(zip(temp_adata.obs.index, temp_adata.obs[key_section]))
temp_adata.uns["Spatial_Net"]["Section_id_1"] = temp_adata.uns["Spatial_Net"]["Cell1"].map(spot_section_trans)
temp_adata.uns["Spatial_Net"]["Section_id_2"] = temp_adata.uns["Spatial_Net"]["Cell2"].map(spot_section_trans)
used_edge = temp_adata.uns["Spatial_Net"].apply(lambda x: x["Section_id_1"] != x["Section_id_2"], axis=1)
temp_adata.uns["Spatial_Net"] = temp_adata.uns["Spatial_Net"].loc[
used_edge,
]
temp_adata.uns["Spatial_Net"] = temp_adata.uns["Spatial_Net"].loc[:, ["Cell1", "Cell2", "Distance"]]
temp_adata.uns["Spatial_Net"]["SNN"] = Z_Net_ID
if verbose:
print(
"This graph contains %d edges, %d cells." % (temp_adata.uns["Spatial_Net"].shape[0], temp_adata.n_obs)
)
print("%.4f neighbors per cell on average." % (temp_adata.uns["Spatial_Net"].shape[0] / temp_adata.n_obs))
adata.uns["Spatial_Net_Zaxis"] = pd.concat([adata.uns["Spatial_Net_Zaxis"], temp_adata.uns["Spatial_Net"]])
adata.uns["Spatial_Net"] = pd.concat([adata.uns["Spatial_Net_2D"], adata.uns["Spatial_Net_Zaxis"]])
if verbose:
print("3D SNN contains %d edges, %d cells." % (adata.uns["Spatial_Net"].shape[0], adata.n_obs))
print("%.4f neighbors per cell on average." % (adata.uns["Spatial_Net"].shape[0] / adata.n_obs))
[docs]def Stats_Spatial_Net(adata):
import matplotlib.pyplot as plt
Num_edge = adata.uns["Spatial_Net"]["Cell1"].shape[0]
Mean_edge = Num_edge / adata.shape[0]
plot_df = pd.value_counts(pd.value_counts(adata.uns["Spatial_Net"]["Cell1"]))
plot_df = plot_df / adata.shape[0]
fig, ax = plt.subplots(figsize=[3, 2])
plt.ylabel("Percentage")
plt.xlabel("")
plt.title("Number of Neighbors (Mean=%.2f)" % Mean_edge)
ax.bar(plot_df.index, plot_df)
[docs]def mclust_R(adata, num_cluster, modelNames="EEE", used_obsm="STAGATE", random_seed=2020):
"""\
Clustering using the mclust algorithm.
The parameters are the same as those in the R package mclust.
"""
np.random.seed(random_seed)
import rpy2.robjects as robjects
robjects.r.library("mclust")
import rpy2.robjects.numpy2ri
rpy2.robjects.numpy2ri.activate()
r_random_seed = robjects.r["set.seed"]
r_random_seed(random_seed)
rmclust = robjects.r["Mclust"]
res = rmclust(rpy2.robjects.numpy2ri.numpy2rpy(adata.obsm[used_obsm]), num_cluster, modelNames)
mclust_res = np.array(res[-2])
adata.obs["mclust"] = mclust_res
adata.obs["mclust"] = adata.obs["mclust"].astype("int")
adata.obs["mclust"] = adata.obs["mclust"].astype("category")
return adata