"""IO functions for BGI stereo technology.
import gzip
import math
import warnings
from typing import Callable, Dict, List, NamedTuple, Optional, Tuple, Union

import cv2
import numpy as np
import pandas as pd

with warnings.catch_warnings():

from anndata import AnnData
from scipy.sparse import csr_matrix
from typing_extensions import Literal

from ..configuration import SKM
from ..logging import logger_manager as lm
from .utils import (

    import ngs_tools as ngs

[docs] VERSIONS = { "stereo": ngs.chemistry.get_chemistry("Stereo-seq").resolution, }
except ModuleNotFoundError: class SpatialResolution(NamedTuple): scale: float = 1.0 unit: Optional[Literal["nm", "um", "mm"]] = None VERSIONS = {"stereo": SpatialResolution(0.5, "um")}
[docs]def read_bgi_as_dataframe(path: str, label_column: Optional[str] = None) -> pd.DataFrame: """Read a BGI read file as a pandas DataFrame. Args: path: Path to read file. label_column: Column name containing positive cell labels. Returns: Pandas Dataframe with the following standardized column names. * `gene`: Gene name/ID (whatever was used in the original file) * `x`, `y`: X and Y coordinates * `total`, `spliced`, `unspliced`: Counts for each RNA species. The latter two is only present if they are in the original file. """ dtype = { "geneID": "category", # geneID "x": np.uint32, # x "y": np.uint32, # y # Multiple different names for total counts "MIDCounts": np.uint16, "MIDCount": np.uint16, "UMICount": np.uint16, "UMICounts": np.uint16, "EXONIC": np.uint16, # spliced "INTRONIC": np.uint16, # unspliced, } rename = { "MIDCounts": "total", "MIDCount": "total", "UMICount": "total", "UMICounts": "total", "EXONIC": "spliced", "INTRONIC": "unspliced", } # Use first 10 rows for validation. df = pd.read_csv(path, sep="\t", dtype=dtype, comment="#", nrows=10) if label_column: dtype.update({label_column: np.uint32}) rename.update({label_column: "label"}) if label_column not in df.columns: raise IOError(f"Column `{label_column}` is not present.") # If duplicate columns are provided, we don't know which to use! rename_inverse = {} for _from, _to in rename.items(): rename_inverse.setdefault(_to, []).append(_from) for _to, _cols in rename_inverse.items(): if sum(_from in df.columns for _from in _cols) > 1: raise IOError(f"Found multiple columns mapping to `{_to}`.") return pd.read_csv( path, sep="\t", dtype=dtype, comment="#", ).rename(columns=rename)
[docs]def dataframe_to_labels(df: pd.DataFrame, column: str, shape: Optional[Tuple[int, int]] = None) -> np.ndarray: """Convert a BGI dataframe that contains cell labels to a labels matrix. Args: df: Read dataframe, as returned by :func:`read_bgi_as_dataframe`. columns: Column that contains cell labels as positive integers. Any labels that are non-positive are ignored. Returns: Labels matrix """ shape = shape or (df["x"].max() + 1, df["y"].max() + 1) labels = np.zeros(shape, dtype=int) for label, _df in df.drop_duplicates(subset=[column, "x", "y"]).groupby(column): if label <= 0: continue labels[(_df["x"].values, _df["y"].values)] = label return labels
[docs]def dataframe_to_filled_labels(df: pd.DataFrame, column: str, shape: Optional[Tuple[int, int]] = None) -> np.ndarray: """Convert a BGI dataframe that contains cell labels to a (filled) labels matrix. Args: df: Read dataframe, as returned by :func:`read_bgi_as_dataframe`. columns: Column that contains cell labels as positive integers. Any labels that are non-positive are ignored. Returns: Labels matrix """ shape = shape or (df["x"].max() + 1, df["y"].max() + 1) labels = np.zeros(shape, dtype=int) for label, _df in df.drop_duplicates(subset=[column, "x", "y"]).groupby(column): if label <= 0: continue points = _df[["x", "y"]].values.astype(int) min_offset = points.min(axis=0) max_offset = points.max(axis=0) xmin, ymin = min_offset xmax, ymax = max_offset points -= min_offset hull = cv2.convexHull(points, returnPoints=True) mask = cv2.fillConvexPoly(np.zeros((max_offset - min_offset + 1)[::-1], dtype=np.uint8), hull, color=1).T labels[xmin : xmax + 1, ymin : ymax + 1][mask == 1] = label return labels
[docs]def read_bgi_agg( path: str, stain_path: Optional[str] = None, binsize: int = 1, gene_agg: Optional[Dict[str, Union[List[str], Callable[[str], bool]]]] = None, prealigned: bool = False, label_column: Optional[str] = None, version: Literal["stereo"] = "stereo", ) -> AnnData: """Read BGI read file to calculate total number of UMIs observed per coordinate. Args: path: Path to read file. stain_path: Path to nuclei staining image. Must have the same coordinate system as the read file. binsize: Size of pixel bins. gene_agg: Dictionary of layer keys to gene names to aggregate. For example, `{'mito': ['list', 'of', 'mitochondrial', 'genes']}` will yield an AnnData with a layer named "mito" with the aggregate total UMIs of the provided gene list. prealigned: Whether the stain image is already aligned with the minimum x and y RNA coordinates. label_column: Column that contains already-segmented cell labels. version: BGI technology version. Currently only used to set the scale and scale units of each unit coordinate. This may change in the future. Returns: An AnnData object containing the UMIs per coordinate and the nucleus staining image, if provided. The total UMIs are stored as a sparse matrix in `.X`, and spliced and unspliced counts (if present) are stored in `.layers['spliced']` and `.layers['unspliced']` respectively. The nuclei image is stored as a Numpy array in `.layers['nuclei']`. """ lm.main_debug(f"Reading data from {path}.") data = read_bgi_as_dataframe(path, label_column) x_min, y_min = data["x"].min(), data["y"].min() x, y = data["x"].values, data["y"].values x_max, y_max = x.max(), y.max() shape = (x_max + 1, y_max + 1) # Read image and update x,y max if appropriate layers = {} if stain_path: lm.main_debug(f"Reading stain image from {stain_path}.") image = if prealigned: lm.main_warning( ( "Assuming stain image was already aligned with the minimum x and y RNA coordinates. " "(prealinged=True)" ) ) image = np.pad(image, ((x_min, 0), (y_min, 0))) x_max = max(x_max, image.shape[0] - 1) y_max = max(y_max, image.shape[1] - 1) shape = (x_max + 1, y_max + 1) # Reshape image to match new x,y max if image.shape != shape: lm.main_warning(f"Padding stain image from {image.shape} to {shape} with zeros.") image = np.pad(image, ((0, shape[0] - image.shape[0]), (0, shape[1] - image.shape[1]))) layers[SKM.STAIN_LAYER_KEY] = image # Construct labels matrix if present labels = None if "label" in data.columns: lm.main_warning("Using the `label_column` option may result in disconnected labels.") labels = dataframe_to_labels(data, "label", shape) layers[SKM.LABELS_LAYER_KEY] = labels if binsize > 1: lm.main_info(f"Binning counts with binsize={binsize}.") shape = (math.ceil(shape[0] / binsize), math.ceil(shape[1] / binsize)) x = bin_indices(x, 0, binsize) y = bin_indices(y, 0, binsize) x_min, y_min = x.min(), y.min() # Resize image if necessary if stain_path: layers[SKM.STAIN_LAYER_KEY] = cv2.resize(image, shape[::-1]) if labels is not None: lm.main_warning("Cell labels were provided, but `binsize` > 1. There may be slight inconsistencies.") layers[SKM.LABELS_LAYER_KEY] = labels[::binsize, ::binsize] # See read_bgi_as_dataframe for standardized column names lm.main_info("Constructing count matrices.") X = csr_matrix((data["total"].values, (x, y)), shape=shape, dtype=np.uint16) if "spliced" in data.columns: layers[SKM.SPLICED_LAYER_KEY] = csr_matrix((data["spliced"].values, (x, y)), shape=shape, dtype=np.uint16) if "unspliced" in data.columns: layers[SKM.UNSPLICED_LAYER_KEY] = csr_matrix((data["unspliced"].values, (x, y)), shape=shape, dtype=np.uint16) # Aggregate gene lists if gene_agg: lm.main_info("Aggregating counts for genes provided by `gene_agg`.") for name, genes in gene_agg.items(): mask = data["geneID"].isin(genes) if isinstance(genes, list) else data["geneID"].map(genes) data_genes = data[mask] _x, _y = data_genes["x"].values, data_genes["y"].values layers[name] = csr_matrix( (data_genes["total"].values, (_x, _y)), shape=shape, dtype=np.uint16, ) adata = AnnData(X=X, layers=layers)[x_min:, y_min:].copy() scale, scale_unit = 1.0, None if version in VERSIONS: resolution = VERSIONS[version] scale, scale_unit = resolution.scale, resolution.unit # Set uns SKM.init_adata_type(adata, SKM.ADATA_AGG_TYPE) SKM.init_uns_pp_namespace(adata) SKM.init_uns_spatial_namespace(adata) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_BINSIZE_KEY, binsize) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_KEY, scale) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_UNIT_KEY, scale_unit) return adata
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE, "segmentation_adata", optional=True)
[docs]def read_bgi( path: str, binsize: Optional[int] = None, segmentation_adata: Optional[AnnData] = None, labels_layer: Optional[str] = None, labels: Optional[Union[np.ndarray, str]] = None, seg_binsize: int = 1, label_column: Optional[str] = None, add_props: bool = True, version: Literal["stereo"] = "stereo", ) -> AnnData: """Read BGI read file as AnnData. Args: path: Path to read file. binsize: Size of pixel bins. Should only be provided when labels (i.e. the `segmentation_adata` and `labels` arguments) are not used. segmentation_adata: AnnData containing segmentation results. labels_layer: Layer name in `segmentation_adata` containing labels. labels: Numpy array or path to numpy array saved with `` that contains labels. seg_binsize: the bin size used in cell segmentation, used in conjunction with `labels` and will be overwritten when `labels_layer` and `segmentation_adata` are not None. label_column: Column that contains already-segmented cell labels. If this column is present, this takes prescedence. add_props: Whether or not to compute label properties, such as area, bounding box, centroid, etc. version: BGI technology version. Currently only used to set the scale and scale units of each unit coordinate. This may change in the future. Returns: Bins x genes or labels x genes AnnData. """ # Check inputs if sum([binsize is not None, segmentation_adata is not None, labels is not None, label_column is not None]) != 1: raise IOError("Exactly one of `segmentation_adata`, `binsize`, `labels`, `label_column` must be provided.") if (segmentation_adata is None) ^ (labels_layer is None): raise IOError("Both `segmentation_adata` and `labels_layer` must be provided.") if segmentation_adata is not None: if SKM.get_adata_type(segmentation_adata) != SKM.ADATA_AGG_TYPE: raise IOError("Only `AGG` type AnnDatas are supported.") if binsize is not None and abs(int(binsize)) != binsize: raise IOError("Positive integer `binsize` must be provided when `segmentation_adata` is not provided.") if isinstance(labels, str): labels = np.load(labels) lm.main_debug(f"Reading data from {path}.") data = read_bgi_as_dataframe(path, label_column) n_columns = data.shape[1] # Obtain total genes from raw data, so that the columns always match # regardless of what method was used. uniq_gene = sorted(data["geneID"].unique()) props = None if label_column is not None: lm.main_info(f"Using cell labels from `{label_column}` column.") binsize = 1 data = data[data["label"] > 0] if add_props: lm.main_warning( "Using `label_column` as cell labels with `add_props=True` may result in incorrect contours." ) props = get_points_props(data[["x", "y", "label"]]) elif binsize is not None: lm.main_info(f"Using binsize={binsize}") if binsize < 2: lm.main_warning("Please consider using a larger bin size.") if binsize > 1: x_bin = bin_indices(data["x"].values, 0, binsize) y_bin = bin_indices(data["y"].values, 0, binsize) data["x"], data["y"] = x_bin, y_bin data["label"] = data["x"].astype(str) + "-" + data["y"].astype(str) if add_props: props = get_bin_props(data[["x", "y", "label"]].drop_duplicates(), binsize) # Use labels. else: binsize = 1 shape = (data["x"].max(), data["y"].max()) if labels is not None: lm.main_info(f"Using labels provided with `labels` argument.") if labels.shape != shape: lm.main_warning(f"Labels matrix {labels.shape} has different shape as data matrix {shape}") else: labels = SKM.select_layer_data(segmentation_adata, labels_layer) label_coords = get_coords_labels(labels) if labels_layer is not None: lm.main_info(f"Using labels provided with `segmentation_adata` and `labels_layer` arguments.") seg_binsize = SKM.get_uns_spatial_attribute(segmentation_adata, SKM.UNS_SPATIAL_BINSIZE_KEY) x_min, y_min = ( int(segmentation_adata.obs_names[0]) * seg_binsize, int(segmentation_adata.var_names[0]) * seg_binsize, ) label_coords["x"] += x_min label_coords["y"] += y_min # When binning was used for segmentation, need to expand indices to cover # every binned pixel. if seg_binsize > 1: lm.main_warning("Binning was used for segmentation.") coords_dfs = [] for i in range(seg_binsize): for j in range(seg_binsize): coords = label_coords.copy() coords["x"] += i coords["y"] += j coords_dfs.append(coords) label_coords = pd.concat(coords_dfs, ignore_index=True) data = pd.merge(data, label_coords, on=["x", "y"], how="inner") if add_props: props = get_label_props(labels) uniq_cell = sorted(data["label"].unique()) shape = (len(uniq_cell), len(uniq_gene)) cell_dict = dict(zip(uniq_cell, range(len(uniq_cell)))) gene_dict = dict(zip(uniq_gene, range(len(uniq_gene)))) x_ind = data["label"].map(cell_dict).astype(int).values y_ind = data["geneID"].map(gene_dict).astype(int).values # See read_bgi_as_dataframe for standardized column names lm.main_info("Constructing count matrices.") X = csr_matrix((data["total"].values, (x_ind, y_ind)), shape=shape) layers = {} if "spliced" in data.columns: layers[SKM.SPLICED_LAYER_KEY] = csr_matrix((data["spliced"].values, (x_ind, y_ind)), shape=shape) if "unspliced" in data.columns: layers[SKM.UNSPLICED_LAYER_KEY] = csr_matrix((data["unspliced"].values, (x_ind, y_ind)), shape=shape) obs = pd.DataFrame(index=uniq_cell) var = pd.DataFrame(index=uniq_gene) adata = AnnData(X=X, obs=obs, var=var, layers=layers) if props is not None: ordered_props = props.loc[adata.obs_names] adata.obs["area"] = ordered_props["area"].values adata.obsm["spatial"] = ordered_props.filter(regex="centroid-").values adata.obsm["contour"] = ordered_props["contour"].values adata.obsm["bbox"] = ordered_props.filter(regex="bbox-").values scale, scale_unit = 1.0, None if version in VERSIONS: resolution = VERSIONS[version] scale, scale_unit = resolution.scale, resolution.unit # Set uns SKM.init_adata_type(adata, SKM.ADATA_UMI_TYPE) SKM.init_uns_pp_namespace(adata) SKM.init_uns_spatial_namespace(adata) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_BINSIZE_KEY, binsize) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_KEY, scale) SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_UNIT_KEY, scale_unit) return adata