Add support for rosidl::Buffer-aware per-endpoint pub/sub#930
Add support for rosidl::Buffer-aware per-endpoint pub/sub#930
Conversation
| for (const auto & [gid_hash, | ||
| callback] : publisher_discovery_callbacks_[graph_topic_data->info_.name_]) | ||
| { | ||
| callback(*entity); |
There was a problem hiding this comment.
I think there is a risk of this callback trying to do something with the graph that is also under the discovery_mutex_ (or under a lock further down in DDS) and we could end up with a deadlock.
The better approach is probably to follow the BufferEndpointRegistry::register_subscriber_discovery_callback pattern where you push all of the callbacks into a temporary vector, release the lock, and then iterate them.
hidmic
left a comment
There was a problem hiding this comment.
First pass. Partial pass. I need more brain to parse all of this.
| for (const auto & [gid_hash, | ||
| callback] : publisher_discovery_callbacks_[graph_topic_data->info_.name_]) | ||
| { | ||
| callback(*entity); |
| if (topic_info.backend_metadata_.has_value() && !topic_info.backend_metadata_.value().empty()) { | ||
| std::vector<std::string> backend_names; | ||
| backend_names.reserve(topic_info.backend_metadata_->size()); | ||
| for (const auto & pair : topic_info.backend_metadata_.value()) { |
There was a problem hiding this comment.
@nvcyc nit: would it make sense to get a reference to the backend_metadata value and reuse?
| if (entity_type != EntityType::Node) { | ||
| if (parts.size() < KEYEXPR_INDEX_MAX + 1) { | ||
| // Minimum required: up to TopicQoS (backends is optional) | ||
| if (parts.size() < KeyexprIndex::TopicQoS + 1) { |
There was a problem hiding this comment.
@nvcyc meta: perhaps there should be a MANDATORY_KEYEXPR_INDEX_MAX macro to future proof this clause.
| base_endpoint->pub = std::optional<zenoh::ext::AdvancedPublisher>(std::move(pub_)); | ||
| endpoints_[base_endpoint->key] = base_endpoint; | ||
| } | ||
| // For buffer-aware publishers, endpoints are created dynamically on subscriber discovery |
There was a problem hiding this comment.
@nvcyc meta: won't this interfere with system introspection? IIRC ros2 topic list need not subscribe to publishers to find them.
There was a problem hiding this comment.
In rmw_zenoh, publisher discovery (used by ros2 topic list, ros2 topic info, etc.) works entirely through liveliness tokens and the graph cache, therefore ros2 topic list will not be affected by these changes in the Zenoh data-plane publisher/subscriber creation.
|
|
||
| // Quadruple the buffer size for safety (endpoint-aware serialization needs more space) | ||
| // TODO(native-buffer): Fix the size estimation in buffer_serialization.hpp to be more accurate | ||
| max_data_length = max_data_length * 4 + 16384; |
There was a problem hiding this comment.
@nvcyc maybe something along the lines of having a maximum size for descriptors would work here?
| // We only fall through when the total matched subscription count exceeds | ||
| // the number of discovered buffer-aware subscribers, meaning at least one | ||
| // non-buffer-aware subscriber exists. This avoids an unnecessary CPU | ||
| // conversion (to_vector()) of vendor-backed buffer data on every publish. |
There was a problem hiding this comment.
@nvcyc meta: isn't this a different approach compared to the one chosen for rmw_fastrtps_cpp? IIUC the latter skips buffer aware publishing entirely if it discovers non-buffer aware subscribers.
There was a problem hiding this comment.
Great question. You caught this diff when we updated rmw_fastrtps_cpp and then were in the progress of updating rmw_zenoh_cpp. This part of the logics is now the same with new commits pushed.
|
|
||
| namespace | ||
| { | ||
| std::string gid_to_hex(const rmw_gid_t & gid) |
There was a problem hiding this comment.
@nvcyc hmm, aren't these functions duplicated in rmw_publisher_data.cpp?
|
@YuanYuYuan or @JEnoch do you mind to take a look ? mw freeze it's next monday (6th April) |
|
|
||
| rosidl::BufferDescriptorOps ops; | ||
|
|
||
| auto backend_ptr = backend; |
There was a problem hiding this comment.
I feel like this is violating the "almost always auto" ideal. I love auto as much as the next person, but here, this is hiding important information, which isn't impossible to deduce from the context, but I'm left frustrated wondering, is this a shared ptr, a unique ptr, something else? It's definitely a nitpick, but I think it would be better to avoid using auto unless it's very obvious from the right-hand side of the =.
In this particular case the type is important because we're capturing it in the next lambda.
| rosidl::BufferDescriptorOps ops; | ||
|
|
||
| auto backend_ptr = backend; | ||
| ops.create_descriptor_with_endpoint = [backend_ptr]( |
There was a problem hiding this comment.
Is it safe to capture this (what I assume is a shared_ptr) in this lambda? I get nervous any time I see a shared pointer being captured by value in a lambda, because it extends the life of the ptr to be the same as the lambda. So feel free to resolve this if it's not the case, but maybe this should be captured as a weak_ptr and locked in the callback each time.
Apologies for not having been able to attend the working group sessions or catch up on the discussions over the past few weeks. Unfortunately I still don't have bandwidth to do a thorough review of this PR at this point. I suspect the same is true for my colleague @YuanYuYuan. I just have one suggestion: it would be great if If the other reviewers are satisfied, I'm happy to defer to their judgment and approve the merge. |
Just returned from a long holiday 😄 I went through the deadlock issue and verified it last week. A patch has been proposed at #955 |
|
Just a quick note that we are in a RMW freeze right now for the ROS 2 Lyrical release. Please reach out to @sloretz before you merge this one. |
|
Thanks for the reminder. Based on the discussion in the ROS 2 Lyrical release working group meeting on 2026/4/6, we'll target this PR for Lyrical patch 1 release to match the same level of I'll confirm in the working group with @sloretz before merging this PR when it's ready. |
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
…backs update_topic_map_for_put() collected discovery callbacks under discovery_mutex_ but still invoked them while graph_mutex_ was held (via the lock_guard in parse_put). Any callback that re-enters graph_cache — e.g. creating a per-endpoint subscription which calls register_publisher_discovery_callback() — would attempt to re-acquire graph_mutex_ on the same thread, deadlocking immediately. Fix: change update_topic_map_for_put() and update_topic_maps_for_put() to return the collected callbacks instead of invoking them. parse_put() switches from lock_guard to unique_lock so it can call lock.unlock() before iterating over the returned callbacks. This is a defensive complement to the lock-order fix in e91c15a. While no current callback directly re-acquires graph_mutex_, invoking external callbacks under an internal mutex is an API contract violation that creates fragility for future changes. Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>
Signed-off-by: YuanYu Yuan <yuanyu.yuan@zettascale.tech>
Signed-off-by: CY Chen <cyc@nvidia.com>
Signed-off-by: CY Chen <cyc@nvidia.com>
Description
This pull request adds full
rosidl::Buffersupport tormw_zenoh_cpp, enabling per-endpoint Zenoh publishers and subscribers for zero-copy buffer transport between compatible backends. When a publisher and subscriber share compatible non-CPU buffer backends, data can be transferred via a lightweight descriptor; when backends are incompatible, the system falls back to standard CPU-based buffer serialization.This pull request consists of the following key changes:
rosidl_buffer_backend_registry::initialize_buffer_backends()/shutdown_buffer_backends()during RMW init/shutdown to load and tear down buffer backend plugins.create_descriptor_with_endpoint()(nullptr: CPU fallback). Publisher creation explicitly adds"cpu"tobackend_aux_info.publish()first sends endpoint-aware messages viapublish_buffer_aware(), then conditionally falls through to the standard base-key publish path only when the total matched subscription count exceeds discovered buffer-aware subscribers -- avoiding unnecessary CPU conversion when all subscribers are buffer-aware.Messagestruct owns endpoint info viastd::optional<EndpointInfoStorage>Endpoint info is passed into deserialization for correct backend reconstruction.acceptable_buffer_backends: Parses the subscription option --NULL/empty/"cpu": CPU-only (advertises"cpu"in liveliness token);"any": all installed; specific names: filtered. Inon_publisher_discovered(), CPU is always added to the publisher's backend list.Is this user-facing behavior change?
This pull request does not change existing
rmw_zenoh_cppbehavior for standard (non-Buffer) messages. For messages withuint8[]fields, the per-endpoint transport is transparent -- publishers and subscribers share backend info automatically, and CPU fallback ensures correctness when backends are incompatible.Did you use Generative AI?
Yes. Claude (claude-4.6-opus) via Cursor was used to assist with creating an initial prototype version of the changes contained in this PR.
Additional Information
This PR is part of the broader ROS 2 native buffer feature introduced in this post.