Source code for spateo.segmentation.label

"""Functions for use when labeling individual nuclei/cells, after obtaining a
mask.
"""
import math
from typing import Dict, Optional, Tuple

import cv2
import numpy as np
from anndata import AnnData
from joblib import Parallel, delayed
from numba import njit
from skimage import feature, filters, measure, segmentation
from sympy import Segment
from tqdm import tqdm

from ..configuration import SKM, config
from ..errors import SegmentationError
from ..logging import logger_manager as lm
from . import utils


[docs]def _replace_labels(labels: np.ndarray, mapping: Dict[int, int]) -> np.ndarray: """Replace labels according to mapping. Args: labels: Numpy array containing integer labels. mapping: Dictionary mapping from labels to labels. Returns: Replaced labels """ replacement = np.full(labels.max() + 1, -1, dtype=int) for from_label, to_label in mapping.items(): replacement[from_label] = to_label new_labels = labels.copy() for i in range(labels.shape[0]): for j in range(labels.shape[1]): new_label = replacement[labels[i, j]] if new_label >= 0: new_labels[i, j] = new_label return new_labels
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def replace_labels(adata: AnnData, layer: str, mapping: Dict[int, int], out_layer: Optional[str] = None): """Replace labels according to mapping. Args: adata: Input Anndata layer: Layer containing labels to replace mapping: Dictionary mapping that defines label replacement. out_layer: Layer to save results. By default, the input layer is overridden. """ labels = SKM.select_layer_data(adata, layer) lm.main_info(f"Replacing labels with mapping {mapping}") new_labels = _replace_labels(labels, mapping) SKM.set_layer_data(adata, out_layer or layer, new_labels)
[docs]def _watershed( X: np.ndarray, mask: np.ndarray, markers: np.ndarray, k: int, ) -> np.ndarray: """Assign individual nuclei/cells using the Watershed algorithm. Args: X: Data array. This array will be Gaussian blurred and used as the input values to Watershed. mask: Nucleus/cell mask. markers: Numpy array indicating where the Watershed markers are. May either be a boolean or integer array. If this is a boolean array, the markers are identified by calling `cv2.connectedComponents`. k: Size of the kernel to use for Gaussian blur. Returns: Watershed labels. """ blur = utils.conv2d(X, k, mode="gauss") if markers.dtype == np.dtype(bool): lm.main_debug("Finding connected components.") markers = cv2.connectedComponents(markers.astype(np.uint8))[1] lm.main_debug("Running Watershed algorithm.") watershed = segmentation.watershed(-blur, markers, mask=mask) return watershed
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def find_peaks_with_erosion( adata: AnnData, layer: str = SKM.STAIN_LAYER_KEY, k: int = 3, square: bool = False, min_area: int = 80, n_iter: int = -1, float_k: int = 5, float_threshold: Optional[float] = None, out_layer: Optional[str] = None, ): """Find peaks for use in Watershed via iterative erosion. Args: adata: Input Anndata layer: Layer that was used to create scores or masks. If `{layer}_scores` is present, that is used. Otherwise if `{layer}_mask` is present, that is used. Otherwise, the layer is taken as a literal. k: Erosion kernel size square: Whether to use a square kernel min_area: Minimum area n_iter: Number of erosions to perform. float_k: Morphological close and open kernel size when `X` is a float array. float_threshold: Threshold to use to determine connected components when `X` is a float array. By default, a threshold is automatically determined by using Otsu method. out_layer: Layer to save results. By default, this will be `{layer}_markers`. """ _layer1 = SKM.gen_new_layer_key(layer, SKM.SCORES_SUFFIX) _layer2 = SKM.gen_new_layer_key(layer, SKM.MASK_SUFFIX) if _layer1 not in adata.layers and _layer2 not in adata.layers and layer not in adata.layers: raise SegmentationError( f'Neither "{_layer1}", "{_layer2}", nor "{layer}" are present in AnnData. ' "Please run either `st.cs.mask_nuclei_from_stain` or `st.cs.score_and_mask_pixels` first." ) _layer = layer if _layer1 in adata.layers: _layer = _layer1 elif _layer2 in adata.layers: _layer = _layer2 X = SKM.select_layer_data(adata, _layer, make_dense=True) if np.issubdtype(X.dtype, np.floating) and not float_threshold: lm.main_debug("Finding threshold with Multi-otsu.") float_threshold = filters.threshold_otsu(X) lm.main_info("Finding Watershed markers with iterative erosion.") markers = utils.safe_erode(X, k, square, min_area, n_iter, float_k, float_threshold) out_layer = out_layer or SKM.gen_new_layer_key(layer, SKM.MARKERS_SUFFIX) SKM.set_layer_data(adata, out_layer, markers)
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def watershed( adata: AnnData, layer: str = SKM.STAIN_LAYER_KEY, k: int = 3, mask_layer: Optional[str] = None, markers_layer: Optional[str] = None, out_layer: Optional[str] = None, ): """Assign individual nuclei/cells using the Watershed algorithm. Args: adata: Input AnnData layer: Original data layer from which segmentation will derive from. k: Size of the kernel to use for Gaussian blur. mask_layer: Layer containing mask. This will default to `{layer}_mask`. markers_layer: Layer containing Watershed markers. This will default to `{layer}_markers`. May either be a boolean or integer array. If this is a boolean array, the markers are identified by calling `cv2.connectedComponents`. out_layer: Layer to save results. Defaults to `{layer}_labels`. """ X = SKM.select_layer_data(adata, layer, make_dense=True) mask_layer = mask_layer or SKM.gen_new_layer_key(layer, SKM.MASK_SUFFIX) mask = SKM.select_layer_data(adata, mask_layer) markers_layer = markers_layer or SKM.gen_new_layer_key(layer, SKM.MARKERS_SUFFIX) markers = SKM.select_layer_data(adata, markers_layer) lm.main_info("Running Watershed.") # Markers should always be included in the mask. labels = _watershed(X, mask | (markers > 0), markers, k) areas = np.bincount(labels.flatten()) if (areas[1:] > 10000).any(): lm.main_warning( "Some labels have area greater than 10000. If you are segmenting based on RNA, consider " "using `st.cs.label_connected_components` instead." ) out_layer = out_layer or SKM.gen_new_layer_key(layer, SKM.LABELS_SUFFIX) SKM.set_layer_data(adata, out_layer, labels)
[docs]def _expand_labels( labels: np.ndarray, distance: int, max_area: int, mask: Optional[np.ndarray] = None, ) -> np.ndarray: """Expand labels up to a certain distance, while ignoring labels that are above a certain size. Args: labels: Numpy array containing integer labels. distance: Distance to expand. Internally, this is used as the number of iterations of distance 1 dilations. max_area: Maximum area of each label. mask: Only expand within the provided mask. Returns: New label array with expanded labels. """ masked_labels = labels[mask] if mask is not None else labels if (masked_labels > 0).all() or (masked_labels == 0).all(): return labels @njit def _expand(X, areas, max_area, mask, start_i, end_i): expanded = X[start_i:end_i].copy() new_areas = np.zeros_like(areas) n_neighbors = 0 neighbors = np.zeros(4, dtype=X.dtype) for i in range(start_i, end_i): for j in range(X.shape[1]): if X[i, j] > 0 or not mask[i, j]: continue if i - 1 >= 0: neighbors[n_neighbors] = X[i - 1, j] n_neighbors += 1 if i + 1 < X.shape[0]: neighbors[n_neighbors] = X[i + 1, j] n_neighbors += 1 if j - 1 >= 0: neighbors[n_neighbors] = X[i, j - 1] n_neighbors += 1 if j + 1 < X.shape[1]: neighbors[n_neighbors] = X[i, j + 1] n_neighbors += 1 unique = np.unique(neighbors[:n_neighbors]) unique_labels = unique[unique > 0] if len(unique_labels) == 1: label = unique_labels[0] if areas[label] < max_area: expanded[i - start_i, j] = label new_areas[label] += 1 n_neighbors = 0 return expanded, new_areas areas = np.bincount(labels.flatten()) mask = np.ones(labels.shape, dtype=bool) if mask is None else mask step = math.ceil(labels.shape[0] / config.n_threads) expanded = labels.copy() with Parallel(n_jobs=config.n_threads) as parallel: for _ in tqdm(range(distance), desc="Expanding"): new_areas = np.zeros_like(areas) subis = range(0, labels.shape[0], step) sublabels = [] submasks = [] for i in subis: sl = slice(max(0, i - 1), min(labels.shape[0], i + step + 1)) sublabels.append(expanded[sl]) submasks.append(mask[sl]) for i, (_expanded, _new_areas) in zip( subis, parallel( delayed(_expand)( sl, areas, max_area, sm, int(i - 1 >= 0), sl.shape[0] - int(i + step + 1 < labels.shape[0]) ) for i, sl, sm in zip(subis, sublabels, submasks) ), ): expanded[i : i + step] = _expanded new_areas += _new_areas areas += new_areas return expanded
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def expand_labels( adata: AnnData, layer: str, distance: int = 5, max_area: int = 400, mask_layer: Optional[str] = None, out_layer: Optional[str] = None, ): """Expand labels up to a certain distance. Args: adata: Input Anndata layer: Layer from which the labels were derived. Then, `{layer}_labels` is used as the labels. If not present, it is taken as a literal. distance: Distance to expand. Internally, this is used as the number of iterations of distance 1 dilations. max_area: Maximum area of each label. mask_layer: Layer containing mask to restrict expansion to within. out_layer: Layer to save results. By default, uses `{layer}_labels_expanded`. """ label_layer = SKM.gen_new_layer_key(layer, SKM.LABELS_SUFFIX) if label_layer not in adata.layers: label_layer = layer labels = SKM.select_layer_data(adata, label_layer) mask = SKM.select_layer_data(adata, mask_layer) if mask_layer else None lm.main_info("Expanding labels.") expanded = _expand_labels(labels, distance, max_area, mask=mask) out_layer = out_layer or SKM.gen_new_layer_key(label_layer, SKM.EXPANDED_SUFFIX) SKM.set_layer_data(adata, out_layer, expanded)
[docs]def _label_connected_components( X: np.ndarray, area_threshold: int = 500, k: int = 3, min_area: int = 100, n_iter: int = -1, distance: int = 8, max_area: int = 400, seed_labels: Optional[np.ndarray] = None, ) -> np.ndarray: """Label connected components while splitting components that are too large. Args: X: Boolean mask to compute connected components from. area_threshold: Connected components with area greater than this value will be split into smaller portions by first eroding and then expanding. k: Kernel size for erosion. min_area: Don't erode labels smaller than this area. n_iter: Number of erosion operations. -1 means continue eroding until every label is less than `min_area`. distance: Distance to expand eroded labels. max_area: Maximum area when expanding labels. seed_labels: Seed labels. Returns: New label array """ components = cv2.connectedComponentsWithStats(X.astype(np.uint8)) areas = components[2][:, cv2.CC_STAT_AREA] to_erode = np.zeros(X.shape, dtype=bool) saved = np.zeros(X.shape, dtype=int) saved_i = (seed_labels.max() + 1) if seed_labels is not None else 1 for label, area in enumerate(areas): if label > 0: stats = components[2][label] left, top, width, height = ( stats[cv2.CC_STAT_LEFT], stats[cv2.CC_STAT_TOP], stats[cv2.CC_STAT_WIDTH], stats[cv2.CC_STAT_HEIGHT], ) label_mask = components[1][top : top + height, left : left + width] == label if seed_labels is not None and (seed_labels[top : top + height, left : left + width][label_mask] > 0).any(): continue if area <= area_threshold: saved[top : top + height, left : left + width] += label_mask * saved_i saved_i += 1 else: to_erode[top : top + height, left : left + width] += label_mask erode = (to_erode > 0).any() if erode: eroded = utils.safe_erode(to_erode, k=k, min_area=min_area, n_iter=n_iter) labels = cv2.connectedComponents(eroded.astype(np.uint8))[1] labels[labels > 0] += saved_i - 1 elif seed_labels is None: return saved else: labels = np.zeros_like(saved) if seed_labels is not None: labels += seed_labels expanded = _expand_labels(labels, distance=distance, max_area=max_area, mask=X > 0) return saved + expanded
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def label_connected_components( adata: AnnData, layer: str, seed_layer: Optional[str] = None, area_threshold: int = 500, k: int = 3, min_area: int = 100, n_iter: int = -1, distance: int = 8, max_area: int = 400, out_layer: Optional[str] = None, ): """Label connected components while splitting components that are too large. Args: adata: Input Anndata layer: Data layer that was used to generate the mask. First, will look for `{layer}_mask`. Otherwise, this will be use as a literal. seed_layer: Layer containing seed labels. These are labels that should be used whenever possible in labeling connected components. area_threshold: Connected components with area greater than this value will be split into smaller portions by first eroding and then expanding. k: Kernel size for erosion. min_area: Don't erode labels smaller than this area. n_iter: Number of erosion operations. -1 means continue eroding until every label is less than `min_area`. distance: Distance to expand eroded labels. max_area: Maximum area when expanding labels. out_layer: Layer to save results. Defaults to `{layer}_labels`. Returns: New label array """ mask_layer = SKM.gen_new_layer_key(layer, SKM.MASK_SUFFIX) if mask_layer not in adata.layers: mask_layer = layer mask = SKM.select_layer_data(adata, mask_layer) seed_labels = SKM.select_layer_data(adata, seed_layer) if seed_layer else None labels = _label_connected_components(mask, area_threshold, k, min_area, n_iter, distance, max_area, seed_labels) out_layer = out_layer or SKM.gen_new_layer_key(layer, SKM.LABELS_SUFFIX) SKM.set_layer_data(adata, out_layer, labels)
[docs]def _find_peaks(X: np.ndarray, **kwargs) -> Tuple[np.ndarray, np.ndarray]: """Find peaks from an arbitrary image. This function is a wrapper around :func:`feature.peak_local_max`. Args: X: Array to find peaks from **kwargs: Keyword arguments to pass to :func:`feature.peak_local_max`. Returns: Numpy array of the same size as `X` where each peak is labeled with a unique positive integer. """ _kwargs = dict(p_norm=2) _kwargs.update(kwargs) peak_idx = feature.peak_local_max(X, **_kwargs) peaks = np.zeros(X.shape, dtype=int) for label, (i, j) in enumerate(peak_idx): peaks[i, j] = label + 1 return peaks
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def find_peaks( adata: AnnData, layer: str, k: int, min_distance: int, mask_layer: Optional[str] = None, out_layer: Optional[str] = None, ): """Find peaks from an array. Args: adata: Input AnnData layer: Layer to use as values to find peaks from. k: Apply a Gaussian blur with this kernel size prior to peak detection. min_distance: Minimum distance, in pixels, between peaks. mask_layer: Find peaks only in regions specified by the mask. out_layer: Layer to save identified peaks as markers. By default, uses `{layer}_markers`. """ X = SKM.select_layer_data(adata, layer, make_dense=True) if X.dtype == np.dtype(bool): raise SegmentationError( f"Layer {layer} contains a boolean array. Please use `st.cs.find_peaks_from_mask` instead." ) X = utils.conv2d(X, k, mode="gauss") peaks = _find_peaks(X, min_distance=min_distance) if mask_layer: peaks *= SKM.select_layer_data(adata, mask_layer) out_layer = out_layer or SKM.gen_new_layer_key(layer, SKM.MARKERS_SUFFIX) SKM.set_layer_data(adata, out_layer, peaks)
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def find_peaks_from_mask( adata: AnnData, layer: str, min_distance: int, distances_layer: Optional[str] = None, markers_layer: Optional[str] = None, ): """Find peaks from a boolean mask. Used to obatin Watershed markers. Args: adata: Input AnnData layer: Layer containing boolean mask. This will default to `{layer}_mask`. If not present in the provided AnnData, this argument used as a literal. min_distance: Minimum distance, in pixels, between peaks. distances_layer: Layer to save distance from each pixel to the nearest zero (False) pixel (a.k.a. distance transform). By default, uses `{layer}_distances`. markers_layer: Layer to save identified peaks as markers. By default, uses `{layer}_markers`. """ mask_layer = SKM.gen_new_layer_key(layer, SKM.MASK_SUFFIX) if mask_layer not in adata.layers: mask_layer = layer mask = SKM.select_layer_data(adata, mask_layer) if mask.dtype != np.dtype(bool): raise SegmentationError(f"Only boolean masks are supported for this function, but got {mask.dtype} instead.") lm.main_info(f"Finding peaks with minimum distance {min_distance}.") distances = cv2.distanceTransform(mask.astype(np.uint8), cv2.DIST_L2, 3) peaks = _find_peaks(distances, min_distance=min_distance) distances_layer = distances_layer or SKM.gen_new_layer_key(layer, SKM.DISTANCES_SUFFIX) SKM.set_layer_data(adata, distances_layer, distances) markers_layer = markers_layer or SKM.gen_new_layer_key(layer, SKM.MARKERS_SUFFIX) SKM.set_layer_data(adata, markers_layer, peaks)
[docs]def _augment_labels(source_labels: np.ndarray, target_labels: np.ndarray) -> np.ndarray: """Augment the labels in one label array using the labels in another. This function modifies the labels in `target_labels` in the following way. Note that this function operates on a copy of `target_labels`. It does NOT modify in-place. * Any labels that are in `source_labels` that have no overlap with any labels in `target_labels` is copied over to `target_labels`. * Any labels that are in `target_labels` that have no overlap with any labels in `source_labels` is removed. Args: source_labels: Numpy array containing labels to (possibly) transfer. target_labels: Numpy array containing labels to augment. Returns: New Numpy array containing augmented labels. """ augmented = np.zeros_like(target_labels) label = 1 lm.main_debug("Removing non-overlapping labels.") target_props = measure.regionprops(target_labels) # lazy evaluation for props in target_props: _label = props.label min_row, min_col, max_row, max_col = props.bbox target_mask = target_labels[min_row:max_row, min_col:max_col] == _label if source_labels[min_row:max_row, min_col:max_col][target_mask].sum() > 0: augmented[min_row:max_row, min_col:max_col][target_mask] = label label += 1 lm.main_debug("Copying over non-overlapping labels.") source_props = measure.regionprops(source_labels) # lazy evaluation for props in source_props: _label = props.label min_row, min_col, max_row, max_col = props.bbox source_mask = source_labels[min_row:max_row, min_col:max_col] == _label if target_labels[min_row:max_row, min_col:max_col][source_mask].sum() == 0: augmented[min_row:max_row, min_col:max_col][source_mask] = label label += 1 return augmented
@SKM.check_adata_is_type(SKM.ADATA_AGG_TYPE)
[docs]def augment_labels( adata: AnnData, source_layer: str, target_layer: str, out_layer: Optional[str] = None, ): """Augment the labels in one label array using the labels in another. Args: adata: Input Anndata source_layer: Layer containing source labels to (possibly) take labels from. target_layer: Layer containing target labels to augment. out_layer: Layer to save results. Defaults to `{target_layer}_augmented`. """ source_labels = SKM.select_layer_data(adata, source_layer) target_labels = SKM.select_layer_data(adata, target_layer) augmented = _augment_labels(source_labels, target_labels) out_layer = out_layer or SKM.gen_new_layer_key(target_layer, SKM.AUGMENTED_SUFFIX) SKM.set_layer_data(adata, out_layer, augmented)