Skip to content

Add support for rosidl::Buffer-aware per-endpoint pub/sub#930

Open
nvcyc wants to merge 12 commits intorollingfrom
native_buffer
Open

Add support for rosidl::Buffer-aware per-endpoint pub/sub#930
nvcyc wants to merge 12 commits intorollingfrom
native_buffer

Conversation

@nvcyc
Copy link
Copy Markdown

@nvcyc nvcyc commented Mar 17, 2026

Description

This pull request adds full rosidl::Buffer support to rmw_zenoh_cpp, enabling per-endpoint Zenoh publishers and subscribers for zero-copy buffer transport between compatible backends. When a publisher and subscriber share compatible non-CPU buffer backends, data can be transferred via a lightweight descriptor; when backends are incompatible, the system falls back to standard CPU-based buffer serialization.

This pull request consists of the following key changes:

  • Backend lifecycle: Calls rosidl_buffer_backend_registry::initialize_buffer_backends() / shutdown_buffer_backends() during RMW init/shutdown to load and tear down buffer backend plugins.
  • Liveliness key-expression extension: Extended key-expressions to advertise each endpoint's supported backends, enabling dynamic discovery.
  • Graph cache discovery callbacks: Buffer-aware publishers and subscribers register discovery callbacks to detect each other and dynamically create per-endpoint Zenoh publishers/subscribers.
  • Buffer-aware publishers: Create per-subscriber Zenoh endpoints; endpoint info is passed to the typesupport serialization layer, which delegates compatibility to each backend's create_descriptor_with_endpoint() (nullptr: CPU fallback). Publisher creation explicitly adds "cpu" to backend_aux_info.
  • Fallback publish: publish() first sends endpoint-aware messages via publish_buffer_aware(), then conditionally falls through to the standard base-key publish path only when the total matched subscription count exceeds discovered buffer-aware subscribers -- avoiding unnecessary CPU conversion when all subscribers are buffer-aware.
  • Buffer-aware subscribers: Create per-publisher Zenoh subscriptions; the Message struct owns endpoint info via std::optional<EndpointInfoStorage> Endpoint info is passed into deserialization for correct backend reconstruction.
  • acceptable_buffer_backends: Parses the subscription option -- NULL/empty/"cpu": CPU-only (advertises "cpu" in liveliness token); "any": all installed; specific names: filtered. In on_publisher_discovered(), CPU is always added to the publisher's backend list.

Is this user-facing behavior change?

This pull request does not change existing rmw_zenoh_cpp behavior for standard (non-Buffer) messages. For messages with uint8[] fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info automatically, and CPU fallback ensures correctness when backends are incompatible.

Did you use Generative AI?

Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.

Additional Information

This PR is part of the broader ROS 2 native buffer feature introduced in this post.

@nvcyc nvcyc marked this pull request as draft March 17, 2026 04:15
@nvcyc nvcyc marked this pull request as ready for review March 17, 2026 04:16
for (const auto & [gid_hash,
callback] : publisher_discovery_callbacks_[graph_topic_data->info_.name_])
{
callback(*entity);
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is a risk of this callback trying to do something with the graph that is also under the discovery_mutex_ (or under a lock further down in DDS) and we could end up with a deadlock.

The better approach is probably to follow the BufferEndpointRegistry::register_subscriber_discovery_callback pattern where you push all of the callbacks into a temporary vector, release the lock, and then iterate them.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that. Same elsewhere.

Copy link
Copy Markdown

@hidmic hidmic left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

First pass. Partial pass. I need more brain to parse all of this.

for (const auto & [gid_hash,
callback] : publisher_discovery_callbacks_[graph_topic_data->info_.name_])
{
callback(*entity);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 to that. Same elsewhere.

if (topic_info.backend_metadata_.has_value() && !topic_info.backend_metadata_.value().empty()) {
std::vector<std::string> backend_names;
backend_names.reserve(topic_info.backend_metadata_->size());
for (const auto & pair : topic_info.backend_metadata_.value()) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc nit: would it make sense to get a reference to the backend_metadata value and reuse?

if (entity_type != EntityType::Node) {
if (parts.size() < KEYEXPR_INDEX_MAX + 1) {
// Minimum required: up to TopicQoS (backends is optional)
if (parts.size() < KeyexprIndex::TopicQoS + 1) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc meta: perhaps there should be a MANDATORY_KEYEXPR_INDEX_MAX macro to future proof this clause.

base_endpoint->pub = std::optional<zenoh::ext::AdvancedPublisher>(std::move(pub_));
endpoints_[base_endpoint->key] = base_endpoint;
}
// For buffer-aware publishers, endpoints are created dynamically on subscriber discovery
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc meta: won't this interfere with system introspection? IIRC ros2 topic list need not subscribe to publishers to find them.

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In rmw_zenoh, publisher discovery (used by ros2 topic list, ros2 topic info, etc.) works entirely through liveliness tokens and the graph cache, therefore ros2 topic list will not be affected by these changes in the Zenoh data-plane publisher/subscriber creation.


// Quadruple the buffer size for safety (endpoint-aware serialization needs more space)
// TODO(native-buffer): Fix the size estimation in buffer_serialization.hpp to be more accurate
max_data_length = max_data_length * 4 + 16384;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc maybe something along the lines of having a maximum size for descriptors would work here?

// We only fall through when the total matched subscription count exceeds
// the number of discovered buffer-aware subscribers, meaning at least one
// non-buffer-aware subscriber exists. This avoids an unnecessary CPU
// conversion (to_vector()) of vendor-backed buffer data on every publish.
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc meta: isn't this a different approach compared to the one chosen for rmw_fastrtps_cpp? IIUC the latter skips buffer aware publishing entirely if it discovers non-buffer aware subscribers.

Copy link
Copy Markdown
Author

@nvcyc nvcyc Apr 2, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great question. You caught this diff when we updated rmw_fastrtps_cpp and then were in the progress of updating rmw_zenoh_cpp. This part of the logics is now the same with new commits pushed.


namespace
{
std::string gid_to_hex(const rmw_gid_t & gid)
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@nvcyc hmm, aren't these functions duplicated in rmw_publisher_data.cpp?

@nvcyc nvcyc marked this pull request as draft March 31, 2026 17:33
@ahcorde ahcorde requested review from JEnoch and YuanYuYuan April 1, 2026 15:55
@ahcorde
Copy link
Copy Markdown
Contributor

ahcorde commented Apr 1, 2026

@YuanYuYuan or @JEnoch do you mind to take a look ? mw freeze it's next monday (6th April)

@nvcyc nvcyc marked this pull request as ready for review April 1, 2026 23:52
Copy link
Copy Markdown
Member

@wjwwood wjwwood left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Partial review


rosidl::BufferDescriptorOps ops;

auto backend_ptr = backend;
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I feel like this is violating the "almost always auto" ideal. I love auto as much as the next person, but here, this is hiding important information, which isn't impossible to deduce from the context, but I'm left frustrated wondering, is this a shared ptr, a unique ptr, something else? It's definitely a nitpick, but I think it would be better to avoid using auto unless it's very obvious from the right-hand side of the =.

In this particular case the type is important because we're capturing it in the next lambda.

rosidl::BufferDescriptorOps ops;

auto backend_ptr = backend;
ops.create_descriptor_with_endpoint = [backend_ptr](
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it safe to capture this (what I assume is a shared_ptr) in this lambda? I get nervous any time I see a shared pointer being captured by value in a lambda, because it extends the life of the ptr to be the same as the lambda. So feel free to resolve this if it's not the case, but maybe this should be captured as a weak_ptr and locked in the callback each time.

@JEnoch
Copy link
Copy Markdown
Contributor

JEnoch commented Apr 5, 2026

@YuanYuYuan or @JEnoch do you mind to take a look ? mw freeze it's next monday (6th April)

Apologies for not having been able to attend the working group sessions or catch up on the discussions over the past few weeks. Unfortunately I still don't have bandwidth to do a thorough review of this PR at this point. I suspect the same is true for my colleague @YuanYuYuan.

I just have one suggestion: it would be great if docs/design.md could be updated as part of this PR to document the new buffer-aware pub/sub architecture and the changes to all key expression formats (liveliness tokens and topics).

If the other reviewers are satisfied, I'm happy to defer to their judgment and approve the merge.

@YuanYuYuan
Copy link
Copy Markdown
Contributor

@YuanYuYuan or @JEnoch do you mind to take a look ? mw freeze it's next monday (6th April)

Just returned from a long holiday 😄 I went through the deadlock issue and verified it last week. A patch has been proposed at #955

@asymingt
Copy link
Copy Markdown
Member

Just a quick note that we are in a RMW freeze right now for the ROS 2 Lyrical release. Please reach out to @sloretz before you merge this one.

@nvcyc
Copy link
Copy Markdown
Author

nvcyc commented Apr 17, 2026

Thanks for the reminder.
Yes, I'm aware of the current RMW freeze state for the ROS 2 Lyrical release.

Based on the discussion in the ROS 2 Lyrical release working group meeting on 2026/4/6, we'll target this PR for Lyrical patch 1 release to match the same level of rosidl::Buffer support rmw_fastrtps_cpp currently has, so I'm continuing the development here mainly for that goal.

I'll confirm in the working group with @sloretz before merging this PR when it's ready.

nvcyc added 5 commits April 17, 2026 04:51
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
nvcyc and others added 5 commits April 17, 2026 04:52
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
…backs

update_topic_map_for_put() collected discovery callbacks under
discovery_mutex_ but still invoked them while graph_mutex_ was held
(via the lock_guard in parse_put).  Any callback that re-enters
graph_cache — e.g. creating a per-endpoint subscription which calls
register_publisher_discovery_callback() — would attempt to re-acquire
graph_mutex_ on the same thread, deadlocking immediately.

Fix: change update_topic_map_for_put() and update_topic_maps_for_put()
to return the collected callbacks instead of invoking them.  parse_put()
switches from lock_guard to unique_lock so it can call lock.unlock()
before iterating over the returned callbacks.

This is a defensive complement to the lock-order fix in e91c15a.
While no current callback directly re-acquires graph_mutex_, invoking
external callbacks under an internal mutex is an API contract violation
that creates fragility for future changes.

Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>
Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>
nvcyc added 2 commits April 19, 2026 23:36
Signed-off-by: CY Chen <cyc@nvidia.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

8 participants