"""Spatial markers.
"""
import warnings
from random import sample
from typing import Tuple
import anndata
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import geopandas
import numpy as np
import pandas as pd
from joblib import Parallel, delayed
from ..configuration import SKM
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def lisa_geo_df(
adata: anndata.AnnData,
gene: str,
spatial_key: str = "spatial",
n_neighbors: int = 8,
layer: Tuple[None, str] = None,
) -> geopandas.GeoDataFrame:
"""Perform Local Indicators of Spatial Association (LISA) analyses on specific genes and prepare a geopandas
dataframe for downstream lisa plots to reveal the quantile plots and the hotspot, coldspot, doughnut and
diamond regions.
Args:
adata: An adata object that has spatial information (via `spatial_key` key in adata.obsm).
gene: The gene that will be used for lisa analyses, must be included in the data.
spatial_key: The spatial key of the spatial coordinate of each bucket.
n_neighbors: The number of nearest neighbors of each bucket that will be used in calculating the spatial lag.
layer: the key to the layer. If it is None, adata.X will be used by default.
Returns:
df: a geopandas dataframe that includes the coordinate (`x`, `y` columns), expression (`exp` column) and lagged
expression (`w_exp` column), z-score (`exp_zscore`, `w_exp_zscore`) and the LISA (`Is` column).
score.
"""
from pysal import explore
from pysal.lib import weights
coords = adata.obsm[spatial_key]
# Generate W from the GeoDataFrame
w = weights.distance.KNN.from_array(coords, k=n_neighbors)
# Row-standardization
w.transform = "R"
df = pd.DataFrame(coords, columns=["x", "y"])
if layer is None:
df["exp"] = adata[:, gene].X.toarray().flatten()
else:
df["exp"] = np.log1p(adata[:, gene].layers[layer].toarray().flatten())
df["w_exp"] = weights.spatial_lag.lag_spatial(w, df["exp"])
df["exp_zscore"] = (df["exp"] - df["exp"].mean()) / df["exp"].std()
df["w_exp_zscore"] = (df["w_exp"] - df["w_exp"].mean()) / df["w_exp"].std()
df["exp"] = df["exp"].astype(np.float64)
lisa = explore.esda.moran.Moran_Local(df["exp"], w)
df = geopandas.GeoDataFrame(df, geometry=geopandas.points_from_xy(df.x, df.y))
df = df.assign(Is=lisa.Is)
q_labels = ["Q1", "Q2", "Q3", "Q4"]
labels = [q_labels[i - 1] for i in lisa.q]
df = df.assign(labels=labels)
sig = 1 * (lisa.p_sim < 0.05)
df = df.assign(sig=sig)
hotspot = 1 * (sig * lisa.q == 1)
coldspot = 3 * (sig * lisa.q == 3)
doughnut = 2 * (sig * lisa.q == 2)
diamond = 4 * (sig * lisa.q == 4)
spots = hotspot + coldspot + doughnut + diamond
spot_labels = ["0 ns", "1 hot spot", "2 doughnut", "3 cold spot", "4 diamond"]
group = [spot_labels[i] for i in spots]
df = df.assign(group=group)
return (lisa, df)
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def local_moran_i(
adata: anndata.AnnData,
group: str,
spatial_key: str = "spatial",
genes: Tuple[None, list] = None,
layer: Tuple[None, str] = None,
n_neighbors: int = 5,
copy: bool = False,
n_jobs: int = 30,
):
"""Identify cell type specific genes with local Moran's I test.
Args:
adata: An adata object that has spatial information (via `spatial_key` key in adata.obsm).
group: The key to the cell group in the adata.obs.
spatial_key: The spatial key of the spatial coordinate of each bucket.
genes: The gene that will be used for lisa analyses, must be included in the data.
layer: the key to the layer. If it is None, adata.X will be used by default.
n_neighbors: The number of nearest neighbors of each bucket that will be used in calculating the spatial lag.
copy: Whether to copy the adata object.
Returns:
Depend on the `copy` argument, return a deep copied adata object (when `copy = True`) or inplace updated adata
object. The resultant adata will include the following new columns in `adata.var`:
{*}_num_val: The maximum number of categories (`{"hotspot", "coldspot", "doughnut", "diamond"}) across all
cell groups
{*}_frac_val: The maximum fraction of categories across all cell groups
{*}_spec_val: The maximum specificity of categories across all cell groups
{*}_num_group: The corresponding cell group with the largest number of each category (this can be affect by
the cell group size).
{*}_frac_group: The corresponding cell group with the highest fraction of each category.
{*}_spec_group: The corresponding cell group with the highest specificity of each category.
{*} can be one of `{"hotspot", "coldspot", "doughnut", "diamond"}`.
Examples:
>>> import spateo as st
>>> markers_df = pd.DataFrame(adata.var).query("hotspot_frac_val > 0.05 & mean > 0.05").\
>>> groupby(['hotspot_spec_group'])['hotspot_spec_val'].nlargest(5)
>>> markers = markers_df.index.get_level_values(1)
>>>
>>> for i in adata.obs[group].unique():
>>> if i in markers_df.index.get_level_values(0):
>>> print(markers_df[i])
>>> dyn.pl.space(adata, color=group, highlights=[i], pointsize=0.1, alpha=1, figsize=(12, 8))
>>> st.pl.space(adata, color=markers_df[i].index, pointsize=0.1, alpha=1, figsize=(12, 8))
"""
from pysal import explore
from pysal.lib import weights
group_num = adata.obs[group].value_counts()
group_name = adata.obs[group]
uniq_g, group_name = group_name.unique(), group_name.to_list()
# Generate W from the GeoDataFrame
w = weights.distance.KNN.from_array(adata.obsm[spatial_key], k=n_neighbors)
# Row-standardization
w.transform = "R"
if genes is None:
genes = adata.var.index[adata.var.use_for_pca]
else:
genes = adata.var.index.intersection(genes)
db = pd.DataFrame(adata.obsm[spatial_key], columns=["x", "y"])
suffix = ["_num", "_frac", "_spec"]
# hotspot: HH; coldspot: LL; doughnut: HL, diamond: LH; the first one is the query point
# while the second the neighbors. Order on the quantile plot is 1, 3, 2, 4
def _assign_columns(type):
cat_group_list = [type + i + "_group" for i in suffix]
cat_val_list = [type + i + "_val" for i in suffix]
adata.var[cat_group_list[0]], adata.var[cat_group_list[1]], adata.var[cat_group_list[2]] = None, None, None
adata.var[cat_val_list[0]], adata.var[cat_val_list[1]], adata.var[cat_val_list[2]] = None, None, None
for i in ["hotspot", "coldspot", "doughnut", "diamond"]:
_assign_columns(i)
the_tuple = (
np.zeros(len(uniq_g)),
np.zeros(len(uniq_g)),
np.zeros(len(uniq_g)),
)
hotspot_num, hotspot_frac, hotspot_spec = the_tuple
coldspot_num, coldspot_frac, coldspot_spec = the_tuple
doughnut_num, doughnut_frac, doughnut_spec = the_tuple
diamond_num, diamond_frac, diamond_spec = the_tuple
valid_inds_list = [np.array(group_name) == g for g in uniq_g]
###
def _single(
cur_g,
db,
w,
adata,
uniq_g,
valid_inds_list,
hotspot_num,
hotspot_frac,
hotspot_spec,
coldspot_num,
coldspot_frac,
coldspot_spec,
doughnut_num,
doughnut_frac,
doughnut_spec,
diamond_num,
diamond_frac,
diamond_spec,
):
if layer is None:
db["exp"] = adata[:, cur_g].X.toarray().flatten()
else:
db["exp"] = np.log1p(adata[:, cur_g].layers[layer].toarray().flatten())
db["w_exp"] = weights.spatial_lag.lag_spatial(w, db["exp"])
db["exp"] = db["exp"].astype(np.float64)
lisa = explore.esda.moran.Moran_Local(db["exp"], w, permutations=199)
# find significant cells
sig = 1 * (lisa.p_sim < 0.05)
# get quantiles (z-score of cells within the neighborhood and that of the smoothed expression)
hotspot = 1 * (sig * lisa.q == 1)
coldspot = 3 * (sig * lisa.q == 3)
doughnut = 2 * (sig * lisa.q == 2)
diamond = 4 * (sig * lisa.q == 4)
# calculate the value
def _get_nums(spots, valid_inds, g):
return (
sum(spots[valid_inds] > 0), # number of {*} (like hotspot, colospot, etc.) in this cell group
sum(spots[valid_inds] > 0) / group_num[g], # fraction of {*} in this cell group
sum(spots[valid_inds] > 0) / sum(spots > 0), # specificity of {*} in this cell group
)
# calculate the maximum val across all cell groups
def _get_group_max(num, frac, spec):
return (np.max(num), np.max(frac), np.max(spec))
# get the group name with the maximum
def _get_max_group_name(num, frac, spec):
return (
uniq_g[np.argsort(num)[-1]],
uniq_g[np.argsort(frac)[-1]],
uniq_g[np.argsort(spec)[-1]],
)
for ind, g in enumerate(uniq_g):
valid_inds = valid_inds_list[ind]
hotspot_num[ind], hotspot_frac[ind], hotspot_spec[ind] = _get_nums(hotspot, valid_inds, g)
coldspot_num[ind], coldspot_frac[ind], coldspot_spec[ind] = _get_nums(coldspot, valid_inds, g)
doughnut_num[ind], doughnut_frac[ind], doughnut_spec[ind] = _get_nums(doughnut, valid_inds, g)
diamond_num[ind], diamond_frac[ind], diamond_spec[ind] = _get_nums(diamond, valid_inds, g)
(
adata.var.loc[cur_g, "hotspot_num_val"],
adata.var.loc[cur_g, "hotspot_frac_val"],
adata.var.loc[cur_g, "hotspot_spec_val"],
) = _get_group_max(hotspot_num, hotspot_frac, hotspot_spec)
(
adata.var.loc[cur_g, "coldspot_num_val"],
adata.var.loc[cur_g, "coldspot_frac_val"],
adata.var.loc[cur_g, "coldspot_spec_val"],
) = _get_group_max(coldspot_num, coldspot_frac, coldspot_spec)
(
adata.var.loc[cur_g, "doughnut_num_val"],
adata.var.loc[cur_g, "doughnut_frac_val"],
adata.var.loc[cur_g, "doughnut_spec_val"],
) = _get_group_max(doughnut_num, doughnut_frac, doughnut_spec)
(
adata.var.loc[cur_g, "diamond_num_val"],
adata.var.loc[cur_g, "diamond_frac_val"],
adata.var.loc[cur_g, "diamond_spec_val"],
) = _get_group_max(diamond_num, diamond_frac, diamond_spec)
(
adata.var.loc[cur_g, "hotspot_num_group"],
adata.var.loc[cur_g, "hotspot_frac_group"],
adata.var.loc[cur_g, "hotspot_spec_group"],
) = _get_max_group_name(hotspot_num, hotspot_frac, hotspot_spec)
(
adata.var.loc[cur_g, "coldspot_num_group"],
adata.var.loc[cur_g, "coldspot_frac_group"],
adata.var.loc[cur_g, "coldspot_spec_group"],
) = _get_max_group_name(coldspot_num, coldspot_frac, coldspot_spec)
(
adata.var.loc[cur_g, "doughnut_num_group"],
adata.var.loc[cur_g, "doughnut_frac_group"],
adata.var.loc[cur_g, "doughnut_spec_group"],
) = _get_max_group_name(doughnut_num, doughnut_frac, doughnut_spec)
(
adata.var.loc[cur_g, "diamond_num_group"],
adata.var.loc[cur_g, "diamond_frac_group"],
adata.var.loc[cur_g, "diamond_spec_group"],
) = _get_max_group_name(diamond_num, diamond_frac, diamond_spec)
return adata.var.loc[cur_g, :].values
# parallel computing
res = Parallel(n_jobs)(
delayed(_single)(
cur_g,
db,
w,
adata,
uniq_g,
valid_inds_list,
hotspot_num,
hotspot_frac,
hotspot_spec,
coldspot_num,
coldspot_frac,
coldspot_spec,
doughnut_num,
doughnut_frac,
doughnut_spec,
diamond_num,
diamond_frac,
diamond_spec,
)
for cur_g in genes
)
res = pd.DataFrame(res, index=genes)
res = res.drop(columns=0)
res.columns = adata.var.loc[genes, :].columns.drop("mt")
return res
@SKM.check_adata_is_type(SKM.ADATA_UMI_TYPE)
[docs]def GM_lag_model(
adata: anndata.AnnData,
group: str,
spatial_key: str = "spatial",
genes: Tuple[None, list] = None,
drop_dummy: Tuple[None, str] = None,
n_neighbors: int = 5,
layer: Tuple[None, str] = None,
copy: bool = False,
n_jobs=30,
):
"""Spatial lag model with spatial two stage least squares (S2SLS) with results and diagnostics; Anselin (1988).
:math: `\log{P_i} = \alpha + \rho \log{P_{lag-i}} + \sum_k \beta_k X_{ki} + \epsilon_i`
Reference:
https://geographicdata.science/book/notebooks/11_regression.html
http://darribas.org/gds_scipy16/ipynb_md/08_spatial_regression.html
Args:
adata: An adata object that has spatial information (via `spatial_key` key in adata.obsm).
group: The key to the cell group in the adata object.
spatial_key: The spatial key of the spatial coordinate of each bucket.
genes: The gene that will be used for S2SLS analyses, must be included in the data.
drop_dummy: The name of the dummy group.
n_neighbors: The number of nearest neighbors of each bucket that will be used in calculating the spatial lag.
layer: The key to the layer. If it is None, adata.X will be used by default.
copy: Whether to copy the adata object.
Returns:
Depend on the `copy` argument, return a deep copied adata object (when `copy = True`) or inplace updated adata
object. The result adata will include the following new columns in `adata.var`:
{*}_GM_lag_coeff: coefficient of GM test for each cell group (denoted by {*})
{*}_GM_lag_zstat: z-score of GM test for each cell group (denoted by {*})
{*}_GM_lag_pval: p-value of GM test for each cell group (denoted by {*})
Examples:
>>> import spateo as st
>>> st.tl.GM_lag_model(adata, group='simpleanno')
>>> coef_cols = adata.var.columns[adata.var.columns.str.endswith('_GM_lag_coeff')]
>>> adata.var.loc[["Hbb-bt", "Hbb-bh1", "Hbb-y", "Hbb-bs"], :].T
>>> for i in coef_cols[1:-1]:
>>> print(i)
>>> top_markers = adata.var.sort_values(i, ascending=False).index[:5]
>>> st.pl.space(adata, basis='spatial', color=top_markers, ncols=5, pointsize=0.1, alpha=1)
>>> st.pl.space(adata.copy(), basis='spatial', color=['simpleanno'],
>>> highlights=[i.split('_GM_lag_coeff')[0]], pointsize=0.1, alpha=1, show_legend='on data')
"""
from pysal.lib import weights
from pysal.model import spreg
group_num = adata.obs[group].value_counts()
max_group, min_group, min_group_ncells = (
group_num.index[0],
group_num.index[-1],
group_num.values[-1],
)
group_name = adata.obs[group]
db = pd.DataFrame({"group": group_name})
categories = np.array(adata.obs[group].unique().tolist() + ["others"])
db["group"] = pd.Categorical(db["group"], categories=categories)
if drop_dummy is None:
db.iloc[sample(np.arange(adata.n_obs).tolist(), min_group_ncells), :] = "others"
drop_columns = ["group_others"] # ?
elif drop_dummy in group_name:
group_inds = np.where(db["group"] == drop_dummy)[0]
db.iloc[group_inds, :] = "others"
drop_columns = ["group_others", "group_" + str(drop_dummy)] # ?
else:
raise ValueError(f"drop_dummy, {drop_dummy} you provided is not in the adata.obs[{group}].")
X = pd.get_dummies(data=db, drop_first=False)
variable_names = X.columns.difference(drop_columns).to_list()
uniq_g, group_name = (
set(group_name).difference([drop_dummy]),
group_name.to_list(),
)
uniq_g = list(np.sort(list(uniq_g))) # sort and convert to list
# Generate W from the GeoDataFrame
knn = weights.distance.KNN.from_array(adata.obsm[spatial_key], k=n_neighbors)
knn.transform = "R"
if genes is None:
genes = adata.var.index[adata.var.use_for_pca]
else:
genes = adata.var.index.intersection(genes)
for i in ["const"] + uniq_g + ["W_log_exp"]:
adata.var[str(i) + "_GM_lag_coeff"] = None
adata.var[str(i) + "_GM_lag_zstat"] = None
adata.var[str(i) + "_GM_lag_pval"] = None
def _single(
cur_g,
genes,
X,
adata,
knn,
):
if layer is None:
X["log_exp"] = adata[:, cur_g].X.toarray()
else:
X["log_exp"] = np.log1p(adata[:, cur_g].layers[layer].toarray())
try:
model = spreg.GM_Lag(
X[["log_exp"]].values,
X[variable_names].values,
w=knn,
name_y="log_exp",
name_x=variable_names,
)
a = pd.DataFrame(model.betas, model.name_x + ["W_log_exp"], columns=["coef"])
b = pd.DataFrame(
model.z_stat,
model.name_x + ["W_log_exp"],
columns=["z_stat", "p_val"],
) # ?
df = a.merge(b, left_index=True, right_index=True)
for ind, g in enumerate(["const"] + uniq_g + ["W_log_exp"]):
adata.var.loc[cur_g, str(g) + "_GM_lag_coeff"] = df.iloc[ind, 0]
adata.var.loc[cur_g, str(g) + "_GM_lag_zstat"] = df.iloc[ind, 1]
adata.var.loc[cur_g, str(g) + "_GM_lag_pval"] = df.iloc[ind, 2]
except:
for ind, g in enumerate(["const"] + uniq_g + ["W_log_exp"]):
adata.var.loc[cur_g, str(g) + "_GM_lag_coeff"] = np.nan
adata.var.loc[cur_g, str(g) + "_GM_lag_zstat"] = np.nan
adata.var.loc[cur_g, str(g) + "_GM_lag_pval"] = np.nan
return adata.var.loc[cur_g, :].values
res = Parallel(n_jobs)(
delayed(_single)(
cur_g,
genes,
X,
adata,
knn,
)
for cur_g in genes
)
res = pd.DataFrame(res, index=genes)
res.columns = adata.var.loc[genes, :].columns
return res