"""IO functions for NanoString CosMx technology.
"""
import glob
import os
import re
import warnings
from typing import List, NamedTuple, Optional, Union
import numpy as np
import pandas as pd
with warnings.catch_warnings():
warnings.simplefilter("ignore")
import skimage.io
from anndata import AnnData
from scipy.sparse import csr_matrix
from typing_extensions import Literal
from ..configuration import SKM
from ..logging import logger_manager as lm
from .utils import bin_indices, get_bin_props, get_points_props
try:
import ngs_tools as ngs
[docs] VERSIONS = {
"cosmx": ngs.chemistry.get_chemistry("CosMx").resolution,
}
except ModuleNotFoundError:
class SpatialResolution(NamedTuple):
scale: float = 1.0
unit: Optional[Literal["nm", "um", "mm"]] = None
VERSIONS = {"cosmx": SpatialResolution(0.18, "um")}
[docs]FOV_PARSER = re.compile("^.+_F(?P<fov>[0-9]+)\..+$")
[docs]def read_nanostring_as_dataframe(path: str, label_columns: Optional[List[str]] = None) -> pd.DataFrame:
"""Read a NanoString CSV tx or metadata file as pandas DataFrame.
Args:
path: Path to file.
label_columns: Column names, the combination of which indicates unique cells.
Returns:
Pandas Dataframe with the following standardized column names.
* `gene`: Gene name/ID (whatever was used in the original file)
* `x`, `y`: X and Y coordinates
"""
dtype = {
"target": "category",
"CellComp": "category",
"fov": np.uint8,
"cell_ID": np.uint16,
"Area": np.uint32,
"Width": np.uint16,
"Height": np.uint16,
}
# These are actually stored as floats in the CSV, but we cast them to
# integers because they are in terms of the TIFF coordinates,
# which are integers.
convert = {
"x_global_px": np.uint32,
"y_global_px": np.uint32,
"x_local_px": np.uint16,
"y_local_px": np.uint16,
"CenterX_local_px": np.uint16,
"CenterY_local_px": np.uint16,
"CenterX_global_px": np.uint32,
"CenterY_global_px": np.uint32,
}
rename = {
"target": "gene",
"x_global_px": "x",
"y_global_px": "y",
}
# Use first 10 rows for validation.
df = pd.read_csv(path, dtype=dtype, nrows=10)
if label_columns:
for column in label_columns:
if column not in df.columns:
raise IOError(f"Column `{column}` is not present.")
df = pd.read_csv(path, dtype=dtype)
for column, t in convert.items():
if column in df.columns:
df[column] = df[column].astype(t)
if label_columns:
labels = df[label_columns[0]].astype(str)
for label in label_columns[1:]:
labels += "-" + df[label].astype(str)
df["label"] = labels
return df.rename(columns=rename)
[docs]def stitch_images(stain_dir: str, positions_path: str, labels: bool = False) -> np.ndarray:
"""Stitch multiple FOVs into a single image using position information.
Args:
stain_dir: Directory containing JPEG or TIFF files with filenames
ending in '_FXXX' where XXX indicates the FOV index.
positions_path: Path to CSV file containing FOV positions.
labels: Whether these are labels (and therefore should be made unique).
Returns:
A numpy array containing the stitched image. May contain multiple channels,
which is the last dimension of the array.
"""
# Load all images in stain_dir, indexed by FOV
stain_fov_paths = {}
for filename in os.listdir(stain_dir):
path = os.path.join(stain_dir, filename)
match = FOV_PARSER.match(filename)
if match:
fov = int(match["fov"])
if fov in stain_fov_paths:
raise IOError(f"Multiple images for FOV {fov} were found: {stain_fov_paths[fov]}, {path}.")
stain_fov_paths[fov] = path
lm.main_debug(f"Found {len(stain_fov_paths)} FOV images.")
# Read FOV positions and make sure they match exactly with the files.
fov_df = pd.read_csv(positions_path, dtype={"fov": int}, index_col="fov")
if set(fov_df.index) != set(stain_fov_paths.keys()):
raise IOError(f"FOVs defined in {positions_path} do not match exactly with those found in {stain_dir}.")
fov_x = dict(fov_df["x_global_px"].astype(np.uint32))
fov_y = dict(fov_df["y_global_px"].astype(np.uint32))
# Detect the size of the entire image.
# Also, check that all the images have the same non-XY dimensions.
xmin, ymin = min(fov_x.values()), min(fov_y.values())
xmax, ymax = 0, 0
extra_dims = None
dtype = None
stain_fovs = {}
for fov, path in stain_fov_paths.items():
x, y = fov_x[fov], fov_y[fov]
img = skimage.io.imread(path)
xmax = max(xmax, x + img.shape[1] - 1)
ymax = max(ymax, y + img.shape[0] - 1)
stain_fovs[fov] = img
if extra_dims is None:
extra_dims = img.shape[2:]
elif extra_dims != img.shape[2:]:
raise IOError(f"FOV {path} has inconsistent non-XY dimensions.")
if dtype is None:
dtype = img.dtype
elif dtype != img.dtype:
raise IOError(f"FOV {path} has inconsistent dtype.")
if labels:
dtype = np.uint
last_label = 0
img = np.zeros((xmax - xmin + 1, ymax - ymin + 1) + extra_dims, dtype=dtype)
for fov, _img in stain_fovs.items():
x, y = fov_x[fov] - xmin, fov_y[fov] - ymin
if labels:
_img[_img > 0] += last_label
last_label = _img.max()
img[x : x + _img.shape[1], y : y + _img.shape[0]] = np.fliplr(np.swapaxes(_img, 0, 1))
return img
# def read_nanostring_agg(
# path: str,
# stain_path: Optional[str] = None,
# binsize: int = 1,
# gene_agg: Optional[Dict[str, Union[List[str], Callable[[str], bool]]]] = None,
# prealigned: bool = False,
# label_columns: Optional[Union[str, List[str]]] = None,
# version: Literal["cosmx"] = "cosmx",
# ) -> AnnData:
# lm.main_debug(f"Reading data from {path}.")
# data = read_nanostring_as_dataframe(path, label_columns)
# x_min, y_min = data["x"].min(), data["y"].min()
# x, y = data["x"].values, data["y"].values
# x_max, y_max = x.max(), y.max()
# shape = (x_max + 1, y_max + 1)
#
# # Read image and update x,y max if appropriate
# layers = {}
# if stain_path:
# lm.main_debug(f"Reading stain image from {stain_path}.")
# image = skimage.io.imread(stain_path)
# if prealigned:
# lm.main_warning(
# (
# "Assuming stain image was already aligned with the minimum x and y RNA coordinates. "
# "(prealinged=True)"
# )
# )
# image = np.pad(image, ((x_min, 0), (y_min, 0)))
# x_max = max(x_max, image.shape[0] - 1)
# y_max = max(y_max, image.shape[1] - 1)
# shape = (x_max + 1, y_max + 1)
# # Reshape image to match new x,y max
# if image.shape != shape:
# lm.main_warning(f"Padding stain image from {image.shape} to {shape} with zeros.")
# image = np.pad(image, ((0, shape[0] - image.shape[0]), (0, shape[1] - image.shape[1])))
# layers[SKM.STAIN_LAYER_KEY] = image
[docs]def read_nanostring(
path: str,
meta_path: Optional[str] = None,
binsize: Optional[int] = None,
label_columns: Optional[Union[str, List[str]]] = None,
add_props: bool = True,
version: Literal["cosmx"] = "cosmx",
) -> AnnData:
"""Read NanoString CosMx data as AnnData.
Args:
path: Path to transcript detection CSV file.
meta_path: Path to cell metadata CSV file.
scale: Physical length per coordinate. For visualization only.
scale_unit: Scale unit.
binsize: Size of pixel bins
label_columns: Columns that contain already-segmented cell labels. Each
unique combination is considered a unique cell.
add_props: Whether or not to compute label properties, such as area,
bounding box, centroid, etc.
version: NanoString technology version. Currently only used to set the scale and
scale units of each unit coordinate. This may change in the future.
Returns:
Bins x genes or labels x genes AnnData.
"""
if sum([binsize is not None, label_columns is not None]) != 1:
raise IOError("Exactly one of `binsize`, `label_columns` must be provided.")
if binsize is not None and abs(int(binsize)) != binsize:
raise IOError("Positive integer `binsize` must be provided when `segmentation_adata` is not provided.")
label_columns = [label_columns] if isinstance(label_columns, str) else label_columns
data = read_nanostring_as_dataframe(path, label_columns)
metadata = None
uniq_gene = sorted(data["gene"].unique())
props = None
if label_columns:
if meta_path:
metadata = read_nanostring_as_dataframe(meta_path, label_columns)
lm.main_info(f"Using cell labels from `{label_columns}` columns.")
binsize = 1
# cell_ID == 0 indicates not assigned
data = data[data["cell_ID"] > 0]
if add_props:
props = get_points_props(data[["x", "y", "label"]])
elif binsize is not None:
lm.main_info(f"Using binsize={binsize}")
if binsize < 2:
lm.main_warning("Please consider using a larger bin size.")
if binsize > 1:
x_bin = bin_indices(data["x"].values, 0, binsize)
y_bin = bin_indices(data["y"].values, 0, binsize)
data["x"], data["y"] = x_bin, y_bin
data["label"] = data["x"].astype(str) + "-" + data["y"].astype(str)
if add_props:
props = get_bin_props(data[["x", "y", "label"]].drop_duplicates(), binsize)
else:
raise NotImplementedError()
uniq_cell = sorted(data["label"].unique())
shape = (len(uniq_cell), len(uniq_gene))
cell_dict = dict(zip(uniq_cell, range(len(uniq_cell))))
gene_dict = dict(zip(uniq_gene, range(len(uniq_gene))))
label_gene_counts = data.groupby(["label", "gene"], observed=True, sort=False).size()
label_gene_counts.name = "count"
label_gene_counts = label_gene_counts.reset_index()
x_ind = label_gene_counts["label"].map(cell_dict).astype(int).values
y_ind = label_gene_counts["gene"].map(gene_dict).astype(int).values
lm.main_info("Constructing count matrix.")
X = csr_matrix((label_gene_counts["count"].values, (x_ind, y_ind)), shape=shape)
obs = pd.DataFrame(index=uniq_cell)
var = pd.DataFrame(index=uniq_gene)
adata = AnnData(X=X, obs=obs, var=var)
if metadata is not None:
adata.obs = metadata.set_index("label").loc[adata.obs_names]
if props is not None:
ordered_props = props.loc[adata.obs_names]
adata.obs["area"] = ordered_props["area"].values
adata.obsm["spatial"] = ordered_props.filter(regex="centroid-").values
adata.obsm["contour"] = ordered_props["contour"].values
adata.obsm["bbox"] = ordered_props.filter(regex="bbox-").values
scale, scale_unit = 1.0, None
if version in VERSIONS:
resolution = VERSIONS[version]
scale, scale_unit = resolution.scale, resolution.unit
# Set uns
SKM.init_adata_type(adata, SKM.ADATA_UMI_TYPE)
SKM.init_uns_pp_namespace(adata)
SKM.init_uns_spatial_namespace(adata)
SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_BINSIZE_KEY, binsize)
SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_KEY, scale)
SKM.set_uns_spatial_attribute(adata, SKM.UNS_SPATIAL_SCALE_UNIT_KEY, scale_unit)
return adata