Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions helm/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ serviceAccount:
- apiGroups: [""]
resources: [nodes, pods]
verbs: [get, list, watch]
- apiGroups: [discovery.k8s.io]
resources: [endpointslices]
verbs: [get, list, watch]

podAnnotations: {}
podLabels: {}
Expand Down
73 changes: 70 additions & 3 deletions src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ use error::{RobotLBError, RobotLBResult};
use futures::StreamExt;
use hcloud::apis::configuration::Configuration as HCloudConfig;
use k8s_openapi::{
api::core::v1::{Node, Pod, Service},
api::{
core::v1::{Node, Pod, Service},
discovery::v1::EndpointSlice,
},
serde_json::json,
};
use kube::{
Expand Down Expand Up @@ -181,8 +184,16 @@ async fn get_nodes_dynamically(
.unwrap_or_else(|| context.client.default_namespace()),
);

let Some(pod_selector) = svc.spec.as_ref().and_then(|spec| spec.selector.clone()) else {
return Err(RobotLBError::ServiceWithoutSelector);
let Some(pod_selector) = svc
.spec
.as_ref()
.and_then(|spec| spec.selector.clone())
.filter(|s| !s.is_empty())
else {
tracing::info!(
"Service has no selector, falling back to EndpointSlice-based node discovery"
);
return get_nodes_from_endpointslices(svc, context).await;
};

let label_selector = pod_selector
Expand Down Expand Up @@ -215,6 +226,62 @@ async fn get_nodes_dynamically(
Ok(nodes)
}

/// Get nodes from `EndpointSlice` resources associated with a Service.
/// This method is used as a fallback when the Service has no selector,
/// such as when `EndpointSlice` resources are managed by an external controller
/// (e.g. kubevirt cloud-controller-manager).
/// It discovers target nodes by reading the `nodeName` field from each endpoint.
async fn get_nodes_from_endpointslices(
svc: &Arc<Service>,
context: &Arc<CurrentContext>,
) -> RobotLBResult<Vec<Node>> {
let namespace = svc
.namespace()
.unwrap_or_else(|| context.client.default_namespace().to_string());
let eps_api = kube::Api::<EndpointSlice>::namespaced(context.client.clone(), &namespace);
let eps_list = eps_api
.list(&ListParams {
label_selector: Some(format!(
"kubernetes.io/service-name={}",
svc.name_any()
)),
..Default::default()
})
.await?;

let target_nodes = eps_list
.into_iter()
.flat_map(|eps| eps.endpoints)
.filter(|ep| {
ep.conditions
.as_ref()
.and_then(|c| c.ready)
.unwrap_or(true)
})
.filter_map(|ep| ep.node_name)
.collect::<HashSet<_>>();

if target_nodes.is_empty() {
tracing::warn!("No ready endpoints found in EndpointSlices for service");
return Ok(vec![]);
}

tracing::info!(
"Discovered {} target node(s) from EndpointSlices",
target_nodes.len()
);

let nodes_api = kube::Api::<Node>::all(context.client.clone());
let nodes = nodes_api
.list(&ListParams::default())
.await?
.into_iter()
.filter(|node| target_nodes.contains(&node.name_any()))
.collect::<Vec<_>>();

Ok(nodes)
}

/// Get nodes based on the node selector.
/// This method will find the nodes based on the node selector
/// from the service annotations.
Expand Down