"""
numpy/cupy switch with a single `xp` and a safe `ArrayLike` type.
- If cupy is importable, `xp` is cupy; otherwise it's numpy.
- `ArrayLike` works with Pylance/mypy without requiring cupy at runtime.
"""
from __future__ import annotations
import warnings
from typing import TYPE_CHECKING, Any, TypeAlias
import functools
import numpy as np
import numpy.typing as npt
import dask.array as da
# Typing: consider only CPU libraries and minimal typing in this module.
# This is to avoid excessive pylance slowdowns.
if TYPE_CHECKING:
cp = Any
xp = np
GPU_AVAILABLE = False
import skimage as ski
import scipy.ndimage as ndi
ArrayLike: TypeAlias = npt.NDArray[Any]
else:
ArrayLike: TypeAlias = npt.NDArray[Any]
# Defaults
xp = np
cp = None
GPU_AVAILABLE = False
try:
import cupy as _cp
import cupyx.scipy.ndimage as _ndi
import cucim.skimage as _ski
cp = _cp
xp = _cp
GPU_AVAILABLE = True
except Exception:
import scipy.ndimage as _ndi
import skimage as _ski
warnings.warn(
"cupy is not installed/available. Falling back to numpy (CPU).",
UserWarning,
stacklevel=2,
)
# Updates ndimage
ndi = _ndi
ski = _ski
[docs]
def to_host(arr: ArrayLike) -> npt.NDArray[Any]:
"""Move an array to host (CPU) memory.
Parameters
----------
arr : ArrayLike
Input array on any device.
Returns
-------
numpy.ndarray
Array on host memory. If *arr* is already a NumPy array it is
returned via ``np.asarray`` without copying. If CuPy is available
and *arr* is a CuPy array, it is copied to host memory.
"""
if GPU_AVAILABLE and cp is not None and isinstance(arr, cp.ndarray):
return cp.asnumpy(arr) # type: ignore[arg-type]
return np.asarray(arr)
[docs]
def to_device(arr: ArrayLike) -> ArrayLike:
"""Move an array to the active compute device.
Parameters
----------
arr : ArrayLike
Input array, either a NumPy or CuPy ndarray.
Returns
-------
ArrayLike
CuPy array if GPU is available, NumPy array otherwise. If *arr* is
already on the target device it is returned unchanged.
"""
if GPU_AVAILABLE and cp is not None:
if isinstance(arr, cp.ndarray):
return arr
return cp.asarray(arr) # type: ignore[no-any-return]
return np.asarray(arr) # type: ignore[no-any-return]
def _map_arrays(obj, fn):
"""Apply *fn* to every ndarray inside a nested container.
Recursively traverses lists, tuples, and dicts. Non-array, non-container
values are returned unchanged.
Parameters
----------
obj : Any
Object to traverse. May be an ndarray, list, tuple, dict, or any
other value.
fn : callable
Function applied to each ndarray encountered.
Returns
-------
Any
A new container of the same type as *obj* with *fn* applied to all
arrays; scalars and non-array leaves are returned unchanged.
"""
if isinstance(obj, (np.ndarray,)) or (
cp is not None and isinstance(obj, cp.ndarray)
): # type: ignore[name-defined]
return fn(obj)
if isinstance(obj, list):
return [_map_arrays(x, fn) for x in obj]
if isinstance(obj, tuple):
return tuple(_map_arrays(x, fn) for x in obj)
if isinstance(obj, dict):
return {k: _map_arrays(v, fn) for k, v in obj.items()}
return obj
[docs]
def gpu_dispatch(
*,
return_to_host: bool = False,
host_kwarg: str = "_to_host",
):
"""
Decorator that dispatches function inputs to GPU, if available, before the call.
Kernels should use the global `xp` and `ndi` imported by this module instead of
numpy or scipy.ndimage to ensure that calculations work as expected.
Parameters
----------
return_to_host : bool, optional
Default behavior for the wrapped function:
- False (default): return arrays on the active device (CuPy if GPU, NumPy otherwise)
- True: always convert result back to NumPy if GPU is used
host_kwarg : str, optional
Name of a special keyword argument that can override `return_to_host` at
*call time*. The kwarg is popped from kwargs and never forwarded to the
wrapped function.
"""
def decorate(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
use_gpu = GPU_AVAILABLE and (cp is not None)
# Call-time override, if provided
host_flag = kwargs.pop(host_kwarg, return_to_host)
if use_gpu:
args = tuple(_map_arrays(a, to_device) for a in args)
kwargs = {k: _map_arrays(v, to_device) for k, v in kwargs.items()}
out = func(*args, **kwargs)
if use_gpu and host_flag:
result = _map_arrays(out, to_host)
# Free GPU memory pool when returning to host
# This ensures GPU arrays created during computation are released
if cp is not None:
mempool = cp.get_default_memory_pool()
mempool.free_all_blocks()
return result
return out
return wrapper
return decorate
[docs]
def da_to_device(x: da.Array) -> da.Array:
"""Return a Dask array whose blocks are on the active compute device.
Parameters
----------
x : dask.array.Array
Input Dask array with NumPy-backed blocks.
Returns
-------
dask.array.Array
Dask array with CuPy-backed blocks if GPU is available, otherwise
NumPy-backed blocks.
"""
if GPU_AVAILABLE and cp is not None:
meta = cp.empty((0,) * x.ndim, dtype=x.dtype)
else:
meta = np.empty((0,) * x.ndim, dtype=x.dtype)
return da.map_blocks(to_device, x, meta=meta)
[docs]
def da_to_host(x: da.Array) -> da.Array:
"""Return a Dask array whose blocks are NumPy ndarrays on host memory.
Parameters
----------
x : dask.array.Array
Input Dask array, potentially with CuPy-backed blocks.
Returns
-------
dask.array.Array
Dask array with NumPy-backed blocks. If no GPU is available and the
array is already NumPy-backed, it is returned unchanged.
"""
# If there's no GPU / CuPy, just ensure NumPy arrays
if not (GPU_AVAILABLE and cp is not None):
if isinstance(x._meta, np.ndarray):
return x
return x.map_blocks(
np.asarray,
meta=np.empty((0,) * x.ndim, dtype=x.dtype),
)
# GPU path: convert each block cp.ndarray -> np.ndarray
def _to_host_block(block):
if isinstance(block, cp.ndarray):
return cp.asnumpy(block)
return np.asarray(block)
return x.map_blocks(
_to_host_block,
meta=np.empty((0,) * x.ndim, dtype=x.dtype),
)