Source code for sphero_vem.utils.accelerator

"""
numpy/cupy switch with a single `xp` and a safe `ArrayLike` type.
- If cupy is importable, `xp` is cupy; otherwise it's numpy.
- `ArrayLike` works with Pylance/mypy without requiring cupy at runtime.
"""

from __future__ import annotations

import warnings
from typing import TYPE_CHECKING, Any, TypeAlias
import functools

import numpy as np
import numpy.typing as npt
import dask.array as da


# Typing: consider only CPU libraries and minimal typing in this module.
# This is to avoid excessive pylance slowdowns.
if TYPE_CHECKING:
    cp = Any
    xp = np
    GPU_AVAILABLE = False
    import skimage as ski
    import scipy.ndimage as ndi

    ArrayLike: TypeAlias = npt.NDArray[Any]
else:
    ArrayLike: TypeAlias = npt.NDArray[Any]
    # Defaults
    xp = np
    cp = None
    GPU_AVAILABLE = False

    try:
        import cupy as _cp
        import cupyx.scipy.ndimage as _ndi
        import cucim.skimage as _ski

        cp = _cp
        xp = _cp
        GPU_AVAILABLE = True
    except Exception:
        import scipy.ndimage as _ndi
        import skimage as _ski

        warnings.warn(
            "cupy is not installed/available. Falling back to numpy (CPU).",
            UserWarning,
            stacklevel=2,
        )

    # Updates ndimage
    ndi = _ndi
    ski = _ski


[docs] def to_host(arr: ArrayLike) -> npt.NDArray[Any]: """Move an array to host (CPU) memory. Parameters ---------- arr : ArrayLike Input array on any device. Returns ------- numpy.ndarray Array on host memory. If *arr* is already a NumPy array it is returned via ``np.asarray`` without copying. If CuPy is available and *arr* is a CuPy array, it is copied to host memory. """ if GPU_AVAILABLE and cp is not None and isinstance(arr, cp.ndarray): return cp.asnumpy(arr) # type: ignore[arg-type] return np.asarray(arr)
[docs] def to_device(arr: ArrayLike) -> ArrayLike: """Move an array to the active compute device. Parameters ---------- arr : ArrayLike Input array, either a NumPy or CuPy ndarray. Returns ------- ArrayLike CuPy array if GPU is available, NumPy array otherwise. If *arr* is already on the target device it is returned unchanged. """ if GPU_AVAILABLE and cp is not None: if isinstance(arr, cp.ndarray): return arr return cp.asarray(arr) # type: ignore[no-any-return] return np.asarray(arr) # type: ignore[no-any-return]
def _map_arrays(obj, fn): """Apply *fn* to every ndarray inside a nested container. Recursively traverses lists, tuples, and dicts. Non-array, non-container values are returned unchanged. Parameters ---------- obj : Any Object to traverse. May be an ndarray, list, tuple, dict, or any other value. fn : callable Function applied to each ndarray encountered. Returns ------- Any A new container of the same type as *obj* with *fn* applied to all arrays; scalars and non-array leaves are returned unchanged. """ if isinstance(obj, (np.ndarray,)) or ( cp is not None and isinstance(obj, cp.ndarray) ): # type: ignore[name-defined] return fn(obj) if isinstance(obj, list): return [_map_arrays(x, fn) for x in obj] if isinstance(obj, tuple): return tuple(_map_arrays(x, fn) for x in obj) if isinstance(obj, dict): return {k: _map_arrays(v, fn) for k, v in obj.items()} return obj
[docs] def gpu_dispatch( *, return_to_host: bool = False, host_kwarg: str = "_to_host", ): """ Decorator that dispatches function inputs to GPU, if available, before the call. Kernels should use the global `xp` and `ndi` imported by this module instead of numpy or scipy.ndimage to ensure that calculations work as expected. Parameters ---------- return_to_host : bool, optional Default behavior for the wrapped function: - False (default): return arrays on the active device (CuPy if GPU, NumPy otherwise) - True: always convert result back to NumPy if GPU is used host_kwarg : str, optional Name of a special keyword argument that can override `return_to_host` at *call time*. The kwarg is popped from kwargs and never forwarded to the wrapped function. """ def decorate(func): @functools.wraps(func) def wrapper(*args, **kwargs): use_gpu = GPU_AVAILABLE and (cp is not None) # Call-time override, if provided host_flag = kwargs.pop(host_kwarg, return_to_host) if use_gpu: args = tuple(_map_arrays(a, to_device) for a in args) kwargs = {k: _map_arrays(v, to_device) for k, v in kwargs.items()} out = func(*args, **kwargs) if use_gpu and host_flag: result = _map_arrays(out, to_host) # Free GPU memory pool when returning to host # This ensures GPU arrays created during computation are released if cp is not None: mempool = cp.get_default_memory_pool() mempool.free_all_blocks() return result return out return wrapper return decorate
[docs] def da_to_device(x: da.Array) -> da.Array: """Return a Dask array whose blocks are on the active compute device. Parameters ---------- x : dask.array.Array Input Dask array with NumPy-backed blocks. Returns ------- dask.array.Array Dask array with CuPy-backed blocks if GPU is available, otherwise NumPy-backed blocks. """ if GPU_AVAILABLE and cp is not None: meta = cp.empty((0,) * x.ndim, dtype=x.dtype) else: meta = np.empty((0,) * x.ndim, dtype=x.dtype) return da.map_blocks(to_device, x, meta=meta)
[docs] def da_to_host(x: da.Array) -> da.Array: """Return a Dask array whose blocks are NumPy ndarrays on host memory. Parameters ---------- x : dask.array.Array Input Dask array, potentially with CuPy-backed blocks. Returns ------- dask.array.Array Dask array with NumPy-backed blocks. If no GPU is available and the array is already NumPy-backed, it is returned unchanged. """ # If there's no GPU / CuPy, just ensure NumPy arrays if not (GPU_AVAILABLE and cp is not None): if isinstance(x._meta, np.ndarray): return x return x.map_blocks( np.asarray, meta=np.empty((0,) * x.ndim, dtype=x.dtype), ) # GPU path: convert each block cp.ndarray -> np.ndarray def _to_host_block(block): if isinstance(block, cp.ndarray): return cp.asnumpy(block) return np.asarray(block) return x.map_blocks( _to_host_block, meta=np.empty((0,) * x.ndim, dtype=x.dtype), )