Source code for spateo.tools.glm

import re
from typing import Optional

import numpy as np
import pandas as pd
import patsy
import statsmodels.api as sm
from anndata import AnnData
from patsy import dmatrix
from scipy import stats
from scipy.sparse import issparse
from statsmodels.sandbox.stats.multicomp import multipletests

from ..logging import logger_manager as lm


[docs]def glm_degs( adata: AnnData, X_data: Optional[np.ndarray] = None, genes: Optional[list] = None, layer: Optional[str] = None, key_added: str = "glm_degs", fullModelFormulaStr: str = "~cr(time, df=3)", reducedModelFormulaStr: str = "~1", qval_threshold: Optional[float] = 0.05, llf_threshold: Optional[float] = -2000, ci_alpha: float = 0.05, inplace: bool = True, ) -> Optional[AnnData]: """Differential genes expression tests using generalized linear regressions. Here only size factor normalized gene expression matrix can be used, and SCT/pearson residuals transformed gene expression can not be used. Tests each gene for differential expression as a function of integral time (the time estimated via the reconstructed vector field function) or pseudo-time using generalized additive models with natural spline basis. This function can also use other co-variates as specified in the full (i.e `~clusters`) and reduced model formula to identify differentially expression genes across different categories, group, etc. glm_degs relies on statsmodels package and is adapted from the `differentialGeneTest` function in Monocle. Note that glm_degs supports performing deg analysis for any layer or normalized data in your adata object. That is you can either use the total, new, unspliced or velocity, etc. for the differential expression analysis. Args: adata: An Anndata object. The anndata object must contain a size factor normalized gene expression matrix. X_data: The user supplied data that will be used for differential expression analysis directly. genes: The list of genes that will be used to subset the data for differential expression analysis. If ``genes = None``, all genes will be used. layer: The layer that will be used to retrieve data for dimension reduction and clustering. If ``layer = None``, ``.X`` is used. key_added: The key that will be used for the glm_degs key in ``.uns``. fullModelFormulaStr: A formula string specifying the full model in differential expression tests (i.e. likelihood ratio tests) for each gene/feature. reducedModelFormulaStr: A formula string specifying the reduced model in differential expression tests (i.e. likelihood ratio tests) for each gene/feature. qval_threshold: Only keep the glm test results whose qval is less than the ``qval_threshold``. llf_threshold: Only keep the glm test results whose log-likelihood is less than the ``llf_threshold``. ci_alpha: The significance level for the confidence interval. The default ``ci_alpha = .05`` returns a 95% confidence interval. inplace: Whether to copy adata or modify it inplace. Returns: An ``AnnData`` object is updated/copied with the ``key_added`` dictionary in the ``.uns`` attribute, storing the differential expression test results after the GLM test. """ adata = adata if inplace else adata.copy() if X_data is None: from dynamo.tools.utils import fetch_X_data genes, X_data = fetch_X_data(adata, genes, layer) else: assert ( genes is not None ), "When providing X_data, a list of genes name that corresponds to the columns of X_data must be provided." assert ( len(genes) == X_data.shape[1] ), "When providing X_data, the number of genes must be equal the columns of X_data." lm.main_warning( "Gene expression matrix must be normalized by the size factor, please check if the input gene expression matrix is correct." "If you don't have the size factor normalized gene expression matrix, please run `dynamo.pp.normalize_cell_expr_by_size_factors(skip_log = True)`." ) md = patsy.ModelDesc.from_formula(fullModelFormulaStr) termlist = md.rhs_termlist + md.lhs_termlist factors = [] for term in termlist: for factor in term.factors: factor_name = re.findall(r"cr\((.+?), df=\d\)", factor.name()) factors.append(factor.name() if factor_name == [] else factor_name[0]) assert set(factors).issubset( set(adata.obs.columns) ), f"adata object doesn't include the factors from the model formula {fullModelFormulaStr} you provided." df_factors = adata.obs[factors] sparse = issparse(X_data) deg_df = pd.DataFrame(index=genes, columns=["status", "family", "log-likelihood", "pval"]) deg_dict = {} for i in lm.progress_logger( range(len(genes)), progress_name="Detecting genes via Generalized Additive Models (GAMs)" ): gene = genes[i] expression = X_data[:, i].A if sparse else X_data[:, i] df_factors["expression"] = expression try: nb2_full, nb2_null = glm_test(df_factors, fullModelFormulaStr, reducedModelFormulaStr) pval = lrt(nb2_full, nb2_null) deg_df.iloc[i, :] = ("ok", "NB2", nb2_full.llf, pval) df_factors_gene = df_factors.copy() df_factors_gene["mu"] = nb2_full.mu # df_factors_gene["fitted_expression"] = nb2_full.predict() this is equal to nb2_full.mu df_factors_gene["resid_deviance"] = nb2_full.resid_deviance df_factors_gene["resid_pearson"] = nb2_full.resid_pearson df_factors_gene[["ci_lower", "ci_upper"]] = nb2_full.get_prediction().conf_int(alpha=ci_alpha) deg_dict[gene] = df_factors_gene except: deg_df.iloc[i, :] = ("fail", "NB2", "None", 1) deg_df["qval"] = multipletests(deg_df["pval"], method="fdr_bh")[1] deg_df = deg_df[deg_df["log-likelihood"] != "None"] deg_df.dropna(axis=0, how="any", inplace=True) deg_df[["log-likelihood", "pval", "qval"]] = deg_df[["log-likelihood", "pval", "qval"]].astype(np.float32) deg_df = deg_df.sort_values(by=["qval", "pval", "log-likelihood"], ascending=[True, True, True]) if not (qval_threshold is None and llf_threshold is None): cut_deg_df = deg_df[deg_df["qval"] <= qval_threshold] if not (qval_threshold is None) else deg_df cut_deg_df = ( cut_deg_df[cut_deg_df["log-likelihood"] <= llf_threshold] if not (llf_threshold is None) else cut_deg_df ) cut_deg_dict = {gene: deg_dict[gene] for gene in cut_deg_df.index} adata.uns[key_added] = {"glm_result": cut_deg_df, "correlation": cut_deg_dict} else: adata.uns[key_added] = {"glm_result": deg_df, "correlation": deg_dict} return None if inplace else adata
[docs]def glm_test( data, fullModelFormulaStr="~cr(time, df=3)", reducedModelFormulaStr="~1", ): transformed_x = dmatrix(fullModelFormulaStr, data, return_type="dataframe") transformed_x_null = dmatrix(reducedModelFormulaStr, data, return_type="dataframe") nb2_family = sm.families.NegativeBinomial() # (alpha=aux_olsr_results.params[0]) nb2_full = sm.GLM(data["expression"], transformed_x, family=nb2_family).fit() nb2_null = sm.GLM(data["expression"], transformed_x_null, family=nb2_family).fit() return nb2_full, nb2_null
[docs]def lrt(full, restr): llf_full = full.llf llf_restr = restr.llf df_full = full.df_resid df_restr = restr.df_resid lrdf = df_restr - df_full lrstat = -2 * (llf_restr - llf_full) lr_pvalue = stats.chi2.sf(lrstat, df=lrdf) return lr_pvalue