Skip to content
94 changes: 78 additions & 16 deletions beltmap/phase.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,44 @@ class PhaseRegistrationConfig:
robust_normalization: bool = False

def candidate_offsets(self) -> FloatArray:
if self.search_radius_px < 0:
cfg = self.normalized()
radius = cfg.search_radius_px
if radius < 0:
raise ValueError("search_radius_px must be non-negative")
if self.search_step_px <= 0:
step = cfg.search_step_px
if step <= 0:
raise ValueError("search_step_px must be positive")
radius = float(self.search_radius_px)
if radius == 0.0:
return np.asarray([0.0], dtype=np.float64)
step = float(self.search_step_px)
positive = step * np.arange(int(np.floor(radius / step)) + 1, dtype=np.float64)
if not np.any(np.isclose(positive, radius)):
positive = np.append(positive, radius)
return np.r_[-positive[:0:-1], positive].astype(np.float64, copy=False)

def normalized(self) -> PhaseRegistrationConfig:
search_radius_px = _finite_float_value(
self.search_radius_px,
"search_radius_px",
)
search_step_px = _finite_float_value(
self.search_step_px,
"search_step_px",
)
trim_fraction = _finite_float_value(self.trim_fraction, "trim_fraction")
highpass_radius_px = _nonnegative_integer_value(
self.highpass_radius_px,
"highpass_radius_px",
)
if not 0 <= trim_fraction < 1:
raise ValueError("trim_fraction must be in [0, 1)")
return replace(
self,
search_radius_px=search_radius_px,
search_step_px=search_step_px,
trim_fraction=trim_fraction,
highpass_radius_px=highpass_radius_px,
)


@dataclass(frozen=True)
class PhaseTrajectorySmoothingConfig:
Expand All @@ -70,15 +95,22 @@ class PhaseTrajectorySmoothingConfig:
min_support: int = 3

def validate(self) -> None:
if self.window_radius_frames < 0:
raise ValueError("window_radius_frames must be non-negative")
if self.min_support < 1:
raise ValueError("min_support must be positive")
if self.robust_sigma <= 0:
_nonnegative_integer_value(
self.window_radius_frames,
"window_radius_frames",
)
_positive_integer_value(self.min_support, "min_support")
robust_sigma = _finite_float_value(self.robust_sigma, "robust_sigma")
if robust_sigma <= 0:
raise ValueError("robust_sigma must be positive")
if self.min_score is not None and self.min_score < 0:
min_score = _optional_finite_float_value(self.min_score, "min_score")
if min_score is not None and min_score < 0:
raise ValueError("min_score must be non-negative when set")
if self.max_abs_correction_px is not None and self.max_abs_correction_px < 0:
max_abs_correction_px = _optional_finite_float_value(
self.max_abs_correction_px,
"max_abs_correction_px",
)
if max_abs_correction_px is not None and max_abs_correction_px < 0:
raise ValueError("max_abs_correction_px must be non-negative when set")


Expand Down Expand Up @@ -169,11 +201,13 @@ def _accepts(self, estimate: PhaseEstimate) -> bool:


def wrap_phase(phase_px: float, period_px: float | None) -> float:
phase = _finite_float_value(phase_px, "phase_px")
if period_px is None:
return float(phase_px)
if period_px <= 0:
return phase
period = _finite_float_value(period_px, "period_px")
if period <= 0:
raise ValueError("period_px must be positive")
return float(phase_px % period_px)
return float(phase % period)


def render_belt_view(
Expand All @@ -193,13 +227,14 @@ def render_belt_view(
belt = _as_float_image(belt_map, name="belt_map")
if belt.ndim != 2:
raise ValueError("belt_map must be a 2-D array")
phase = _finite_float_value(phase_px, "phase_px")
if height <= 0:
raise ValueError("height must be positive")
if x_slice is not None:
belt = belt[:, x_slice]

map_height = belt.shape[0]
rows = np.arange(height, dtype=np.float64) + float(phase_px)
rows = np.arange(height, dtype=np.float64) + phase
if periodic:
rows = rows % map_height
row0 = np.floor(rows).astype(np.int64)
Expand Down Expand Up @@ -254,7 +289,7 @@ def refine_phase_by_registration(
config: PhaseRegistrationConfig | None = None,
mask: ArrayLike | None = None,
) -> PhaseEstimate:
cfg = config or PhaseRegistrationConfig()
cfg = (config or PhaseRegistrationConfig()).normalized()
observed = _as_float_image(frame, name="frame")
belt = _as_float_image(belt_map, name="belt_map")
if observed.ndim != 2 or belt.ndim != 2:
Expand Down Expand Up @@ -518,6 +553,33 @@ def _as_float_image(image: ArrayLike, *, name: str) -> FloatArray:
return arr


def _finite_float_value(value: float, name: str) -> float:
parsed = float(value)
if not np.isfinite(parsed):
raise ValueError(f"{name} must be finite")
return parsed


def _optional_finite_float_value(value: float | None, name: str) -> float | None:
if value is None:
return None
return _finite_float_value(value, name)


def _nonnegative_integer_value(value: int, name: str) -> int:
parsed = _finite_float_value(value, name)
if parsed < 0 or not parsed.is_integer():
raise ValueError(f"{name} must be a finite non-negative integer")
return int(parsed)


def _positive_integer_value(value: int, name: str) -> int:
parsed = _finite_float_value(value, name)
if parsed < 1 or not parsed.is_integer():
raise ValueError(f"{name} must be a finite positive integer")
return int(parsed)


def _prepare_mask(mask: ArrayLike | None, shape: tuple[int, int]) -> NDArray[np.bool_] | None:
if mask is None:
return None
Expand Down
34 changes: 30 additions & 4 deletions beltmap/rendering.py
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ def render_expected_clean_belt(
observed = _as_float_image(observed_frame, name="observed_frame")
if observed.ndim != 2:
raise ValueError("observed_frame must be a 2-D array")
if output_shape is not None and tuple(int(value) for value in output_shape) != observed.shape:
if output_shape is not None and _resolve_shape(output_shape) != observed.shape:
raise ValueError(
"output_shape must match observed_frame.shape when observed_frame "
"is supplied"
Expand Down Expand Up @@ -180,11 +180,21 @@ def _resolve_belt_region(
height, width = output_shape if output_shape is not None else fallback_shape
return BeltRegion(top=0, left=0, height=height, width=width)
if isinstance(belt_region, BeltRegion):
return belt_region
return BeltRegion(
top=_integer_config_value(belt_region.top, "belt_region top"),
left=_integer_config_value(belt_region.left, "belt_region left"),
height=_integer_config_value(belt_region.height, "belt_region height"),
width=_integer_config_value(belt_region.width, "belt_region width"),
)
if len(belt_region) != 4:
raise ValueError("belt_region tuple must be (top, left, height, width)")
top, left, height, width = belt_region
region = BeltRegion(int(top), int(left), int(height), int(width))
region = BeltRegion(
_integer_config_value(top, "belt_region top"),
_integer_config_value(left, "belt_region left"),
_integer_config_value(height, "belt_region height"),
_integer_config_value(width, "belt_region width"),
)
if output_shape is None and region.width != belt_width:
raise ValueError("belt_region width must match belt_map width")
return region
Expand All @@ -196,12 +206,28 @@ def _resolve_output_shape(
region: BeltRegion,
) -> tuple[int, int]:
if output_shape is not None:
return tuple(int(value) for value in output_shape)
return _resolve_shape(output_shape)
if observed is not None:
return observed.shape
return (region.top + region.height, region.left + region.width)


def _resolve_shape(output_shape: tuple[int, int]) -> tuple[int, int]:
if len(output_shape) != 2:
raise ValueError("output_shape must be (height, width)")
return (
_integer_config_value(output_shape[0], "output_shape height"),
_integer_config_value(output_shape[1], "output_shape width"),
)


def _integer_config_value(value: int, name: str) -> int:
parsed = float(value)
if not np.isfinite(parsed) or not parsed.is_integer():
raise ValueError(f"{name} must be a finite integer")
return int(parsed)


def _validate_region(region: BeltRegion, output_shape: tuple[int, int]) -> None:
if region.top < 0 or region.left < 0:
raise ValueError("belt_region top and left must be non-negative")
Expand Down
70 changes: 48 additions & 22 deletions beltmap/residual.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,16 +139,20 @@ def estimate_local_noise(
"""Estimate robust local noise scale for a residual image."""

cfg = config or ResidualConfig()
if cfg.noise_radius_px < 0:
raise ValueError("noise_radius_px must be non-negative")
if cfg.min_noise <= 0:
raise ValueError("min_noise must be positive")
if cfg.clip_sigma is not None and cfg.clip_sigma <= 0:
raise ValueError("clip_sigma must be positive when set")
if cfg.noise_exclusion_sigma is not None and cfg.noise_exclusion_sigma <= 0:
raise ValueError("noise_exclusion_sigma must be positive when set")
if cfg.noise_exclusion_radius_px < 0:
raise ValueError("noise_exclusion_radius_px must be non-negative")
noise_radius_px = _nonnegative_integer_config_value(
cfg.noise_radius_px,
"noise_radius_px",
)
min_noise = _positive_config_value(cfg.min_noise, "min_noise")
clip_sigma = _optional_positive_config_value(cfg.clip_sigma, "clip_sigma")
noise_exclusion_sigma = _optional_positive_config_value(
cfg.noise_exclusion_sigma,
"noise_exclusion_sigma",
)
noise_exclusion_radius_px = _nonnegative_integer_config_value(
cfg.noise_exclusion_radius_px,
"noise_exclusion_radius_px",
)
noise_exclusion_mode = _validate_noise_exclusion_mode(cfg.noise_exclusion_mode)

values = _as_float_image(residual, name="residual")
Expand All @@ -163,14 +167,15 @@ def estimate_local_noise(

sample = values[valid]
center = float(np.median(sample))
global_sigma = _robust_sigma(sample, center=center, min_noise=cfg.min_noise)
global_sigma = _robust_sigma(sample, center=center, min_noise=min_noise)
noise_valid = valid.copy()
particle_noise_mask = _particle_noise_exclusion_mask(
values,
valid=valid,
center=center,
global_sigma=global_sigma,
config=cfg,
noise_exclusion_sigma=noise_exclusion_sigma,
noise_exclusion_radius_px=noise_exclusion_radius_px,
mode=noise_exclusion_mode,
)
if particle_noise_mask.any():
Expand All @@ -179,24 +184,24 @@ def estimate_local_noise(
noise_valid = valid.copy()
centered = np.zeros(values.shape, dtype=np.float64)
centered[valid] = values[valid] - center
if cfg.clip_sigma is not None:
if clip_sigma is not None:
centered = np.clip(
centered,
-cfg.clip_sigma * global_sigma,
cfg.clip_sigma * global_sigma,
-clip_sigma * global_sigma,
clip_sigma * global_sigma,
)

local_var = _masked_box_mean(
np.square(centered),
noise_valid,
radius=cfg.noise_radius_px,
radius=noise_radius_px,
)
local_var = np.where(
np.isfinite(local_var),
local_var,
global_sigma * global_sigma,
)
local_noise = np.sqrt(np.maximum(local_var, cfg.min_noise * cfg.min_noise))
local_noise = np.sqrt(np.maximum(local_var, min_noise * min_noise))
local_noise[~valid] = cfg.fill_value
return local_noise

Expand Down Expand Up @@ -227,18 +232,39 @@ def _validate_noise_exclusion_mode(mode: str) -> str:
)


def _positive_config_value(value: float, name: str) -> float:
parsed = float(value)
if not np.isfinite(parsed) or parsed <= 0:
raise ValueError(f"{name} must be finite and positive")
return parsed


def _optional_positive_config_value(value: float | None, name: str) -> float | None:
if value is None:
return None
return _positive_config_value(value, name)


def _nonnegative_integer_config_value(value: int, name: str) -> int:
parsed = float(value)
if not np.isfinite(parsed) or parsed < 0 or not parsed.is_integer():
raise ValueError(f"{name} must be a finite non-negative integer")
return int(parsed)


def _particle_noise_exclusion_mask(
values: FloatArray,
*,
valid: NDArray[np.bool_],
center: float,
global_sigma: float,
config: ResidualConfig,
noise_exclusion_sigma: float | None,
noise_exclusion_radius_px: int,
mode: str,
) -> NDArray[np.bool_]:
if config.noise_exclusion_sigma is None:
if noise_exclusion_sigma is None:
return np.zeros(values.shape, dtype=bool)
threshold = config.noise_exclusion_sigma * global_sigma
threshold = noise_exclusion_sigma * global_sigma
centered = values - center
if mode == "positive":
particle_like = valid & (centered > threshold)
Expand All @@ -248,9 +274,9 @@ def _particle_noise_exclusion_mask(
particle_like = valid & (np.abs(centered) > threshold)
if not particle_like.any():
return particle_like
if config.noise_exclusion_radius_px > 0:
if noise_exclusion_radius_px > 0:
particle_like = (
_dilate_mask(particle_like, radius=config.noise_exclusion_radius_px)
_dilate_mask(particle_like, radius=noise_exclusion_radius_px)
& valid
)
return particle_like
Expand Down
Loading