Skip to content



Source code in sopa/segmentation/
class StainingSegmentation:
    def __init__(
        sdata: SpatialData,
        method: Callable,
        channels: list[str] | str,
        image_key: str | None = None,
        min_area: float = 0,
        clip_limit: float = 0.2,
        clahe_kernel_size: int | Iterable[int] | None = None,
        gaussian_sigma: float = 1,
        """Generalized staining-based segmentation

        !!! note "Sequential segmentation (slower)"
            from sopa.segmentation.stainings import StainingSegmentation

            method = ... # custom callable that runs segmentation on each patch

            segmentation = StainingSegmentation(sdata, method, "DAPI")

            cells = StainingSegmentation.read_patches_cells("./temp_dir")
            StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")

        !!! note "Parallel segmentation (faster)"
            from sopa.segmentation.stainings import StainingSegmentation

            method = ... # custom callable that runs segmentation on each patch

            segmentation = StainingSegmentation(sdata, method, "DAPI")

            # Run all this in a parallel manner, e.g. on different jobs
            for i in range(len(sdata['sopa_patches'])):
                segmentation.write_patch_cells("./temp_dir", i)

            cells = StainingSegmentation.read_patches_cells("./temp_dir")
            StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")

            sdata: A `SpatialData` object
            method: A segmentation `callable` whose input is an image of shape `(C, Y, X)` and output is a cell mask of shape `(Y, X)`. Each mask value `>0` represent a unique cell ID. Such callables can be found in `sopa.segmentation.methods`.
            channels: One or a list of channel names used for segmentation. If only one channel is provided, the image given to the `method` will be of shape `(1, Y, X)`.
            image_key: Optional key of `sdata` containing the image (no needed if there is only one image)
            min_area: Minimum area (in pixels^2) for a cell to be kept
            clip_limit: Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)
            clahe_kernel_size: Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)
            gaussian_sigma: Parameter for scipy gaussian_filter (applied before running cellpose)
        self.sdata = sdata
        self.method = method
        self.channels = [channels] if isinstance(channels, str) else channels

        self.min_area = min_area
        self.clip_limit = clip_limit
        self.clahe_kernel_size = clahe_kernel_size
        self.gaussian_sigma = gaussian_sigma

        self.image_key, self.image = get_spatial_image(sdata, key=image_key, return_key=True)

        image_channels = self.image.coords["c"].values
        assert np.isin(
            channels, image_channels
        ).all(), f"Channel names must be a subset of: {', '.join(image_channels)}"

    def _run_patch(self, patch: Polygon) -> list[Polygon]:
        """Run segmentation on one patch

            patch: Patch, represented as a `shapely` polygon

            A list of cells, represented as polygons
        bounds = [int(x) for x in patch.bounds]

        image = self.image.sel(
            x=slice(bounds[0], bounds[2]),
            y=slice(bounds[1], bounds[3]),

        assert np.issubdtype(
            image.dtype, np.integer
        ), f"Invalid image type {image.dtype}. Transform it to an integer dtype, e.g. `np.uint8`."

        if self.gaussian_sigma > 0:
            image = np.stack([gaussian_filter(c, sigma=self.gaussian_sigma) for c in image])
        if self.clip_limit > 0:
            image = np.stack(
                    for c in image

        if patch.area < box(*bounds).area:
            image = image * shapes.rasterize(patch, image.shape[1:], bounds)

        cells = shapes.geometrize(self.method(image))

        if self.min_area > 0:
            cells = [cell for cell in cells if cell.area >= self.min_area]

        return [affinity.translate(cell, *bounds[:2]) for cell in cells]

    def write_patch_cells(self, patch_dir: str, patch_index: int):
        """Run segmentation on one patch, and save the result in a dedicated directory

            patch_dir: Directory inside which segmentation results will be saved
            patch_index: Index of the patch on which to run segmentation. NB: the number of patches is `len(sdata['sopa_patches'])`
        patch = self.sdata[SopaKeys.PATCHES].geometry[patch_index]

        cells = self._run_patch(patch)
        gdf = gpd.GeoDataFrame(geometry=cells)

        patch_dir: Path = Path(patch_dir)
        patch_dir.mkdir(parents=True, exist_ok=True)
        patch_file = patch_dir / f"{patch_index}.parquet"


    def write_patches_cells(self, patch_dir: str):
            "Running segmentation in a sequential manner. This is not recommended on large images because it can be extremely slow (see for more details)"
        for patch_index in tqdm(range(len(self.sdata[SopaKeys.PATCHES])), desc="Run all patches"):
            self.write_patch_cells(patch_dir, patch_index)

    def read_patches_cells(cls, patch_dir: str | list[str]) -> list[Polygon]:
        """Read all patch segmentation results after running `write_patch_cells` on all patches

            patch_dir: Directory provided when running `write_patch_cells` containing the `.parquet` files. For multi-step segmentation, provide a list of directories (one per step).

            A list of cells represented as `shapely` polygons
        cells = []

        files = [f for f in Path(patch_dir).iterdir() if f.suffix == ".parquet"]
        for file in tqdm(files, desc="Reading patches"):
            cells += list(gpd.read_parquet(file).geometry)"Found {len(cells)} total cells")
        return cells

    def add_shapes(cls, sdata: SpatialData, cells: list[Polygon], image_key: str, shapes_key: str):
        """Adding `shapely` polygon to the `SpatialData` object

            sdata: A `SpatialData` object
            cells: List of polygons after segmentation
            image_key: Key of the image on which segmentation has been run
            shapes_key: Name to provide to the geodataframe to be created
        image = get_spatial_image(sdata, image_key)

        geo_df = gpd.GeoDataFrame({"geometry": cells})
        geo_df.index = image_key + geo_df.index.astype(str)

        geo_df = ShapesModel.parse(geo_df, transformations=get_transformation(image, get_all=True).copy())
        sdata.shapes[shapes_key] = geo_df

        if sdata.is_backed():
            sdata.write_element(shapes_key, overwrite=True)"Added {len(geo_df)} cell boundaries in sdata['{shapes_key}']")

__init__(sdata, method, channels, image_key=None, min_area=0, clip_limit=0.2, clahe_kernel_size=None, gaussian_sigma=1)

Generalized staining-based segmentation

Sequential segmentation (slower)

from sopa.segmentation.stainings import StainingSegmentation

method = ... # custom callable that runs segmentation on each patch

segmentation = StainingSegmentation(sdata, method, "DAPI")

cells = StainingSegmentation.read_patches_cells("./temp_dir")
StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")

Parallel segmentation (faster)

from sopa.segmentation.stainings import StainingSegmentation

method = ... # custom callable that runs segmentation on each patch

segmentation = StainingSegmentation(sdata, method, "DAPI")

# Run all this in a parallel manner, e.g. on different jobs
for i in range(len(sdata['sopa_patches'])):
    segmentation.write_patch_cells("./temp_dir", i)

cells = StainingSegmentation.read_patches_cells("./temp_dir")
StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")


Name Type Description Default
sdata SpatialData

A SpatialData object

method Callable

A segmentation callable whose input is an image of shape (C, Y, X) and output is a cell mask of shape (Y, X). Each mask value >0 represent a unique cell ID. Such callables can be found in sopa.segmentation.methods.

channels list[str] | str

One or a list of channel names used for segmentation. If only one channel is provided, the image given to the method will be of shape (1, Y, X).

image_key str | None

Optional key of sdata containing the image (no needed if there is only one image)

min_area float

Minimum area (in pixels^2) for a cell to be kept

clip_limit float

Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)

clahe_kernel_size int | Iterable[int] | None

Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)

gaussian_sigma float

Parameter for scipy gaussian_filter (applied before running cellpose)

Source code in sopa/segmentation/
def __init__(
    sdata: SpatialData,
    method: Callable,
    channels: list[str] | str,
    image_key: str | None = None,
    min_area: float = 0,
    clip_limit: float = 0.2,
    clahe_kernel_size: int | Iterable[int] | None = None,
    gaussian_sigma: float = 1,
    """Generalized staining-based segmentation

    !!! note "Sequential segmentation (slower)"
        from sopa.segmentation.stainings import StainingSegmentation

        method = ... # custom callable that runs segmentation on each patch

        segmentation = StainingSegmentation(sdata, method, "DAPI")

        cells = StainingSegmentation.read_patches_cells("./temp_dir")
        StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")

    !!! note "Parallel segmentation (faster)"
        from sopa.segmentation.stainings import StainingSegmentation

        method = ... # custom callable that runs segmentation on each patch

        segmentation = StainingSegmentation(sdata, method, "DAPI")

        # Run all this in a parallel manner, e.g. on different jobs
        for i in range(len(sdata['sopa_patches'])):
            segmentation.write_patch_cells("./temp_dir", i)

        cells = StainingSegmentation.read_patches_cells("./temp_dir")
        StainingSegmentation.add_shapes(sdata, cells, image_key, "method_name")

        sdata: A `SpatialData` object
        method: A segmentation `callable` whose input is an image of shape `(C, Y, X)` and output is a cell mask of shape `(Y, X)`. Each mask value `>0` represent a unique cell ID. Such callables can be found in `sopa.segmentation.methods`.
        channels: One or a list of channel names used for segmentation. If only one channel is provided, the image given to the `method` will be of shape `(1, Y, X)`.
        image_key: Optional key of `sdata` containing the image (no needed if there is only one image)
        min_area: Minimum area (in pixels^2) for a cell to be kept
        clip_limit: Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)
        clahe_kernel_size: Parameter for skimage.exposure.equalize_adapthist (applied before running cellpose)
        gaussian_sigma: Parameter for scipy gaussian_filter (applied before running cellpose)
    self.sdata = sdata
    self.method = method
    self.channels = [channels] if isinstance(channels, str) else channels

    self.min_area = min_area
    self.clip_limit = clip_limit
    self.clahe_kernel_size = clahe_kernel_size
    self.gaussian_sigma = gaussian_sigma

    self.image_key, self.image = get_spatial_image(sdata, key=image_key, return_key=True)

    image_channels = self.image.coords["c"].values
    assert np.isin(
        channels, image_channels
    ).all(), f"Channel names must be a subset of: {', '.join(image_channels)}"

add_shapes(sdata, cells, image_key, shapes_key) classmethod

Adding shapely polygon to the SpatialData object


Name Type Description Default
sdata SpatialData

A SpatialData object

cells list[Polygon]

List of polygons after segmentation

image_key str

Key of the image on which segmentation has been run

shapes_key str

Name to provide to the geodataframe to be created

Source code in sopa/segmentation/
def add_shapes(cls, sdata: SpatialData, cells: list[Polygon], image_key: str, shapes_key: str):
    """Adding `shapely` polygon to the `SpatialData` object

        sdata: A `SpatialData` object
        cells: List of polygons after segmentation
        image_key: Key of the image on which segmentation has been run
        shapes_key: Name to provide to the geodataframe to be created
    image = get_spatial_image(sdata, image_key)

    geo_df = gpd.GeoDataFrame({"geometry": cells})
    geo_df.index = image_key + geo_df.index.astype(str)

    geo_df = ShapesModel.parse(geo_df, transformations=get_transformation(image, get_all=True).copy())
    sdata.shapes[shapes_key] = geo_df

    if sdata.is_backed():
        sdata.write_element(shapes_key, overwrite=True)"Added {len(geo_df)} cell boundaries in sdata['{shapes_key}']")

read_patches_cells(patch_dir) classmethod

Read all patch segmentation results after running write_patch_cells on all patches


Name Type Description Default
patch_dir str | list[str]

Directory provided when running write_patch_cells containing the .parquet files. For multi-step segmentation, provide a list of directories (one per step).



Type Description

A list of cells represented as shapely polygons

Source code in sopa/segmentation/
def read_patches_cells(cls, patch_dir: str | list[str]) -> list[Polygon]:
    """Read all patch segmentation results after running `write_patch_cells` on all patches

        patch_dir: Directory provided when running `write_patch_cells` containing the `.parquet` files. For multi-step segmentation, provide a list of directories (one per step).

        A list of cells represented as `shapely` polygons
    cells = []

    files = [f for f in Path(patch_dir).iterdir() if f.suffix == ".parquet"]
    for file in tqdm(files, desc="Reading patches"):
        cells += list(gpd.read_parquet(file).geometry)"Found {len(cells)} total cells")
    return cells

write_patch_cells(patch_dir, patch_index)

Run segmentation on one patch, and save the result in a dedicated directory


Name Type Description Default
patch_dir str

Directory inside which segmentation results will be saved

patch_index int

Index of the patch on which to run segmentation. NB: the number of patches is len(sdata['sopa_patches'])

Source code in sopa/segmentation/
def write_patch_cells(self, patch_dir: str, patch_index: int):
    """Run segmentation on one patch, and save the result in a dedicated directory

        patch_dir: Directory inside which segmentation results will be saved
        patch_index: Index of the patch on which to run segmentation. NB: the number of patches is `len(sdata['sopa_patches'])`
    patch = self.sdata[SopaKeys.PATCHES].geometry[patch_index]

    cells = self._run_patch(patch)
    gdf = gpd.GeoDataFrame(geometry=cells)

    patch_dir: Path = Path(patch_dir)
    patch_dir.mkdir(parents=True, exist_ok=True)
    patch_file = patch_dir / f"{patch_index}.parquet"
