Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions source/isaaclab/changelog.d/jmart-framestack-perf.minor.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
Added
^^^^^

* Added :paramref:`~isaaclab.utils.buffers.CircularBuffer.stack_dim` constructor argument
and :attr:`~isaaclab.utils.buffers.CircularBuffer.stacked` property: when ``stack_dim`` is
set, the internal storage is rearranged so ``stacked`` returns the ``K`` frames merged
along the chosen dim as a free contiguous view.
* Added :mod:`isaaclab.utils.images` with :func:`~isaaclab.utils.images.normalize_camera_image`
and the ``is_rgb_like`` / ``is_depth_like`` / ``is_normals_like`` predicates, shared
between the DirectRLEnv and ManagerBasedEnv camera observation paths.
* Added :func:`isaaclab.utils.warp.ops.normalize_image_uint8`, a fused Warp-kernel
implementation of ``(uint8 / 255) - per-image-channel mean`` for RGB-like camera
observations. Supports both ``(B, H, W, C)`` and ``(B, C, H, W)`` inputs via a
``channel_dim`` argument (``-1`` / ``3`` for BHWC, ``-3`` / ``1`` for BCHW); the
argument is also forwarded by :func:`~isaaclab.utils.images.normalize_camera_image`.
* Added a ``clone`` kwarg to :func:`isaaclab.envs.mdp.observations.image`; callers that
immediately copy the result into their own storage (e.g. a frame-stack buffer) can pass
``clone=False`` to skip the redundant allocation.

Changed
^^^^^^^

* Changed :class:`~isaaclab.envs.mdp.observations.stacked_image` to use the new ``stack_dim``
``CircularBuffer`` layout and defer normalization past the frame-stack buffer for RGB-like
data types, eliminating a per-frame float32 upcast and large transpose.
61 changes: 38 additions & 23 deletions source/isaaclab/isaaclab/envs/mdp/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
from isaaclab.managers.manager_base import ManagerTermBase
from isaaclab.managers.manager_term_cfg import ObservationTermCfg
from isaaclab.utils.buffers import CircularBuffer
from isaaclab.utils.images import is_rgb_like, normalize_camera_image

if TYPE_CHECKING:
from isaaclab.assets import Articulation, RigidObject
Expand Down Expand Up @@ -388,6 +389,7 @@ def image(
convert_perspective_to_orthogonal: bool = False,
normalize: bool = True,
permute: bool = False,
clone: bool = True,
) -> torch.Tensor:
"""Images of a specific datatype from the camera sensor.

Expand All @@ -406,6 +408,11 @@ def image(
normalize: Whether to normalize the images. This depends on the selected data type.
Defaults to True.
permute: Whether to permute the image to (num_envs, channel, height, width). Defaults to False.
clone: Whether to return a fresh clone of the result. Defaults to True (defensive: protects
against downstream in-place mutation of the camera buffer). Callers that immediately
copy the result into their own storage (e.g. a frame-stack buffer) can pass ``False``
to skip the redundant allocation.

Returns:
The images produced at the last time-step
"""
Expand All @@ -419,21 +426,13 @@ def image(
if (data_type == "distance_to_camera") and convert_perspective_to_orthogonal:
images = math_utils.orthogonalize_perspective_depth(images, sensor.data.intrinsic_matrices)

# rgb/depth/normals image normalization
if normalize:
if data_type == "rgb":
images = images.float() / 255.0
mean_tensor = torch.mean(images, dim=(1, 2), keepdim=True)
images -= mean_tensor
elif "distance_to" in data_type or "depth" in data_type:
images[images == float("inf")] = 0
elif "normals" in data_type:
images = (images + 1.0) * 0.5
images = normalize_camera_image(images, data_type)

if permute:
images = images.permute(0, 3, 1, 2)

return images.clone()
return images.clone() if clone else images


class image_features(ManagerTermBase):
Expand Down Expand Up @@ -697,10 +696,12 @@ def __init__(self, cfg: ObservationTermCfg, env: ManagerBasedEnv):
# K=1 is a documented passthrough; no buffer needed.
self._buffer: CircularBuffer | None = None
if frame_stack > 1:
# Channel-stack: K frames concatenated along C; .stacked is a free contiguous view.
self._buffer = CircularBuffer(
max_len=frame_stack,
batch_size=env.num_envs,
device=env.device,
stack_dim=-1,
)

def reset(self, env_ids: torch.Tensor | None = None):
Expand All @@ -716,26 +717,40 @@ def __call__(
convert_perspective_to_orthogonal: bool = False,
normalize: bool = True,
) -> torch.Tensor:
if self._buffer is None:
return image(
env=env,
sensor_cfg=sensor_cfg,
data_type=data_type,
convert_perspective_to_orthogonal=convert_perspective_to_orthogonal,
normalize=normalize,
)

# RGB-like camera output is uint8; defer normalize so the buffer can hold it raw.
# Depth / normals output is float32 — leave normalize per-frame in image().
defer_normalize = normalize and is_rgb_like(data_type)
single_frame = image(
env=env,
sensor_cfg=sensor_cfg,
data_type=data_type,
convert_perspective_to_orthogonal=convert_perspective_to_orthogonal,
normalize=normalize,
normalize=normalize and not defer_normalize,
clone=False,
)

# K=1 passthrough: no buffer allocated. ``image()`` already clones.
if self._buffer is None:
return single_frame

self._buffer.append(single_frame)

# CircularBuffer.buffer is (B, K, H, W, C) in oldest->newest order along dim 1.
# Channel-stack: move K next to C, then flatten so the last dim reads
# oldest_C, ..., newest_C.
stacked = self._buffer.buffer
b, k, h, w, c = stacked.shape
return stacked.permute(0, 2, 3, 1, 4).reshape(b, h, w, k * c).clone()
stacked = self._buffer.stacked

if defer_normalize:
# No ``out=`` -- a fresh float32 tensor is allocated per call. The caching
# allocator returns a different block than the previous step's (still
# referenced by the trainer), so the previous-iteration ``observations``
# is not overwritten before ``record_transition`` reads it. See
# :func:`isaaclab.utils.warp.ops.normalize_image_uint8` for the aliasing
# hazard documentation.
return normalize_camera_image(stacked, data_type)
# ``stacked`` is a view of the ring buffer storage which is overwritten on the next
# ``env.step``; clone so the returned tensor outlives the next step.
return stacked.clone()


"""
Expand Down
134 changes: 111 additions & 23 deletions source/isaaclab/isaaclab/utils/buffers/circular_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,34 +19,55 @@ class CircularBuffer:

The shape of the appended data is expected to be (batch_size, ...), where the first dimension is the
batch dimension. Correspondingly, the shape of the ring buffer is (max_len, batch_size, ...).

When ``stack_dim`` is set, the internal layout is rearranged so that :attr:`stacked`

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Re-review (4df4101): New commit is a minor docstring fix (:paramref: → inline code formatting in circular_buffer.py). No new issues introduced.

✅ Previous P1 concern (misleading RuntimeError) was already addressed.
ℹ️ Previous P2 suggestion (factoring _get_observations) remains open as a non-blocking style note.

LGTM — no further action needed from my side.

returns the K frames merged into the chosen dim as a free contiguous view; :meth:`__getitem__`
is disabled in this mode.
"""

def __init__(self, max_len: int, batch_size: int, device: str):
def __init__(self, max_len: int, batch_size: int, device: str, stack_dim: int | None = None):
"""Initialize the circular buffer.

Args:
max_len: The maximum length of the circular buffer. The minimum allowed value is 1.
batch_size: The batch dimension of the data.
device: The device used for processing.
stack_dim: If set, the buffer arranges its internal storage so :attr:`stacked` returns
the K stored frames merged into ``data.shape[stack_dim]`` of the appended data
as a free contiguous view. Any non-zero dim index in the appended data is valid
(positive or negative); ``0`` (the batch dim) is invalid. Range validation against
the actual data rank is deferred to the first :meth:`append`. For example,
``stack_dim=-1`` on a ``(B, H, W, C)`` input stacks K frames along the channel
dim, yielding :attr:`stacked` shape ``(B, H, W, K*C)``. Defaults to ``None``
(legacy layout).

Raises:
ValueError: If the buffer size is less than one.
ValueError: If the buffer size is less than one, or ``stack_dim == 0``.
Comment thread
jmart-nv marked this conversation as resolved.
"""
if max_len < 1:
raise ValueError(f"The buffer size should be greater than zero. However, it is set to {max_len}!")
# set the parameters
if stack_dim is not None and stack_dim == 0:
raise ValueError("stack_dim must not be 0 (cannot stack along the batch dimension).")

self._batch_size = batch_size
self._device = device
self._ALL_INDICES = torch.arange(batch_size, device=device)

# CPU mirror of max_len; avoids a GPU sync via ``.item()`` on every property access.
self._max_len_int: int = max_len
# max length tensor for comparisons
self._max_len = torch.full((batch_size,), max_len, dtype=torch.int, device=device)
# number of data pushes passed since the last call to :meth:`reset`
self._num_pushes = torch.zeros(batch_size, dtype=torch.long, device=device)
# the actual buffer for data storage
# note: this is initialized on the first call to :meth:`append`
# CPU gate; lets ``append`` skip a ``torch.any`` GPU sync on the steady-state path.
self._need_reset: bool = True
# Lazily allocated on the first ``append``.
self._buffer: torch.Tensor = None # type: ignore

self._stack_dim_arg: int | None = stack_dim
Comment thread
pbarejko marked this conversation as resolved.
# Normalized position of K in internal storage; set on first append. None == legacy mode.
self._stack_dim_internal: int | None = None

"""
Properties.
"""
Expand All @@ -64,7 +85,7 @@ def device(self) -> str:
@property
def max_length(self) -> int:
"""The maximum length of the ring buffer."""
return int(self._max_len[0].item())
return self._max_len_int

@property
def current_length(self) -> torch.Tensor:
Expand All @@ -83,7 +104,30 @@ def buffer(self) -> torch.Tensor:
Complete circular buffer with most recent entry at the end and oldest entry at the beginning of
dimension 1. The shape is [batch_size, max_length, data.shape[1:]].
"""
return torch.transpose(self._buffer, dim0=0, dim1=1)
if self._stack_dim_internal is None:
return torch.transpose(self._buffer, dim0=0, dim1=1)
return torch.movedim(self._buffer, source=self._stack_dim_internal, destination=1)

@property
def stacked(self) -> torch.Tensor:
"""Buffer contents with K frames merged along the configured ``stack_dim``.

Frames appear in oldest -> newest order along the merged dim. The result is a view of
the internal storage; callers must not mutate it.

Returns:
View of shape ``(batch_size, *frame_shape)`` with ``frame_shape[stack_dim]`` multiplied by ``max_length``.

Raises:
RuntimeError: If ``stack_dim`` was not set at construction.
"""
if self._stack_dim_internal is None:
if self._stack_dim_arg is None:
raise RuntimeError("stacked is only available when CircularBuffer was created with stack_dim set.")
raise RuntimeError("stacked is not yet available: call append() at least once to initialize the buffer.")
k_pos = self._stack_dim_internal
s = self._buffer.shape
return self._buffer.reshape(*s[:k_pos], s[k_pos] * s[k_pos + 1], *s[k_pos + 2 :])

"""
Operations.
Expand All @@ -100,12 +144,15 @@ def reset(self, batch_ids: Sequence[int] | None = None):
batch_ids_resolved = slice(None)
else:
batch_ids_resolved = batch_ids
# reset the number of pushes for the specified batch indices
self._num_pushes[batch_ids_resolved] = 0
self._need_reset = True
if self._buffer is not None:
# set buffer at batch_id reset indices to 0.0 so that the buffer() getter returns
# the cleared circular buffer after reset.
self._buffer[:, batch_ids_resolved] = 0.0
if self._stack_dim_internal is None:
self._buffer[:, batch_ids_resolved] = 0.0
else:
self._buffer[batch_ids_resolved] = 0.0

def append(self, data: torch.Tensor):
"""Append the data to the circular buffer.
Expand All @@ -116,25 +163,63 @@ def append(self, data: torch.Tensor):

Raises:
ValueError: If the input data has a different batch size than the buffer.
IndexError: On the first call, if the configured ``stack_dim`` is invalid for the
appended data's rank.
"""
# check the batch size
if data.shape[0] != self.batch_size:
raise ValueError(f"The input data has '{data.shape[0]}' batch size while expecting '{self.batch_size}'")

# move the data to the device
data = data.to(self._device)
is_first_push = self._num_pushes == 0

if self._buffer is None:
self._buffer = data.unsqueeze(0).expand(self.max_length, *data.shape).clone()
if torch.any(is_first_push):
self._buffer[:, is_first_push] = data[is_first_push]
# increment number of number of pushes for all batches
self._append(data)
self._allocate_buffer(data)

# Shift slots so the newest write lands at the last K slot. Iterating front-to-back
# keeps adjacent-slot copies non-overlapping. Cheap at the typical frame-stack K=2-4.
if self._stack_dim_internal is None:
for i in range(self._max_len_int - 1):
self._buffer[i].copy_(self._buffer[i + 1])
self._buffer[-1] = data
else:
k_pos = self._stack_dim_internal
k = self._max_len_int
for i in range(k - 1):
self._buffer.narrow(k_pos, i, 1).copy_(self._buffer.narrow(k_pos, i + 1, 1))
self._buffer.narrow(k_pos, k - 1, 1).copy_(data.unsqueeze(k_pos))

if self._need_reset:
is_first_push = self._num_pushes == 0
if torch.any(is_first_push):
if self._stack_dim_internal is None:
self._buffer[:, is_first_push] = data[is_first_push]
else:
self._buffer[is_first_push] = data[is_first_push].unsqueeze(self._stack_dim_internal)
self._need_reset = False

self._num_pushes += 1

def _append(self, data: torch.Tensor):
self._buffer = torch.roll(self._buffer, shifts=-1, dims=0)
self._buffer[-1] = data
def _allocate_buffer(self, data: torch.Tensor) -> None:
"""Allocate the internal buffer and finalize the storage layout on first append."""
if self._stack_dim_arg is None:
self._buffer = torch.empty((self._max_len_int, *data.shape), dtype=data.dtype, device=self._device)
return

ndim = data.ndim
k_pos = self._stack_dim_arg
if k_pos < 0:
k_pos += ndim
if k_pos < 1 or k_pos >= ndim:
raise IndexError(
f"stack_dim={self._stack_dim_arg} resolves to position {k_pos} for data with"
f" ndim={ndim}; must be in [1, {ndim - 1}] or [-{ndim - 1}, -1]."
)
self._stack_dim_internal = k_pos
self._buffer = torch.empty(
(*data.shape[:k_pos], self._max_len_int, *data.shape[k_pos:]),
dtype=data.dtype,
device=self._device,
)

def __getitem__(self, key: torch.Tensor) -> torch.Tensor:
"""Retrieve the data from the circular buffer in last-in-first-out (LIFO) fashion.
Expand All @@ -152,18 +237,21 @@ def __getitem__(self, key: torch.Tensor) -> torch.Tensor:
Raises:
ValueError: If the input key has a different batch size than the buffer.
RuntimeError: If the buffer is empty.
NotImplementedError: If the buffer was created with ``stack_dim`` set.
"""
if self._stack_dim_internal is not None:
raise NotImplementedError(
"Indexing via __getitem__ is not supported in stacked-output mode. Use .stacked or .buffer instead."
)
# check the batch size
if len(key) != self.batch_size:
raise ValueError(f"The argument 'key' has length {key.shape[0]}, while expecting {self.batch_size}")
if self._buffer is None:
raise RuntimeError("The buffer is empty. Please append data before retrieving.")

# admissible lag — clamp to [0, ..] so batches with _num_pushes == 0
# return the zeroed-out slot instead of indexing out of bounds.
# Clamp to [0, ..] so batches with _num_pushes == 0 return the zeroed slot.
valid_keys = torch.clamp(torch.minimum(key, self._num_pushes - 1), min=0)
# The buffer is stored oldest->newest along dimension 0, so the most
# recent item lives at the last index.
index_in_buffer = (self.max_length - 1 - valid_keys).to(dtype=torch.long)
# return output
index_in_buffer = (self._max_len_int - 1 - valid_keys).to(dtype=torch.long)
return self._buffer[index_in_buffer, self._ALL_INDICES]
3 changes: 1 addition & 2 deletions source/isaaclab/isaaclab/utils/buffers/delay_buffer.py
Original file line number Diff line number Diff line change
Expand Up @@ -174,5 +174,4 @@ def compute(self, data: torch.Tensor) -> torch.Tensor:
# add the new data to the last layer
self._circular_buffer.append(data)
# return output
delayed_data = self._circular_buffer[self._time_lags]
return delayed_data.clone()
return self._circular_buffer[self._time_lags]
Loading
Loading