From 1c0210376505364aa200caca1df2292454649d6a Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B5=A9?= Date: Sat, 28 Feb 2026 15:59:08 +0800 Subject: [PATCH 1/4] Support namespace unsubscribe when bundles are unloaded --- .../broker/admin/impl/NamespacesBase.java | 163 +++++++++--------- .../pulsar/broker/admin/v1/Namespaces.java | 29 ++-- .../pulsar/broker/admin/v2/Namespaces.java | 29 ++-- .../pulsar/broker/admin/AdminApiTest.java | 86 +++++++++ 4 files changed, 206 insertions(+), 101 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bfbd65234813e..d59f19caf266a 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1667,70 +1667,51 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri subscription, namespaceName, bundleRange); } - protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); + protected CompletableFuture internalUnsubscribeNamespaceAsync(String subscription, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); - final List> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there are no subscriptions - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( - namespaceName.toString(), nsBundle.getBundleRange(), subscription)); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to unsubscribe {} on the bundles for namespace {}: {}", clientAppId(), - subscription, namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(), - subscription, namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName)); } @SuppressWarnings("deprecation") - protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); + protected CompletableFuture internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); checkNotNull(bundleRange, "BundleRange should not be null"); - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - unsubscribe(namespaceName, bundleRange, subscription); - log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", clientAppId(), subscription, - namespaceName, bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) + .thenCompose(__ -> { + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return validateClusterOwnershipAsync(namespaceName.getCluster()) + .thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(), + namespaceName.getCluster())); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(__ -> unsubscribeAsync(namespaceName, bundleRange, subscription)) + .thenRun(() -> log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange)); } protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) { @@ -1918,32 +1899,52 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc } } - private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) { - try { - List topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), - nsName.toString() + "/" + bundleRange); - List> futures = new ArrayList<>(); - if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { - throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor"); - } else { - for (Topic topic : topicList) { - Subscription sub = topic.getSubscription(subscription); - if (sub != null) { - futures.add(sub.delete()); - } - } - } - FutureUtil.waitForAll(futures).get(); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription, - nsName.toString(), bundleRange, e); - if (e.getCause() instanceof SubscriptionBusyException) { - throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers"); - } - throw new RestException(e.getCause()); + private CompletableFuture unsubscribeAsync(NamespaceName nsName, String bundleRange, String subscription) { + if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { + return CompletableFuture.failedFuture( + new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor")); } + + return pulsar().getNamespaceService().getListOfPersistentTopics(nsName) + .thenCompose(topicsInNamespace -> { + List> futures = new ArrayList<>(); + NamespaceBundleFactory bundleFactory = + pulsar().getNamespaceService().getNamespaceBundleFactory(); + NamespaceBundle targetBundle = bundleFactory.getBundle(nsName.toString(), bundleRange); + + for (String topic : topicsInNamespace) { + TopicName topicName = TopicName.get(topic); + if (pulsar().getBrokerService().isSystemTopic(topicName)) { + continue; + } + NamespaceBundle bundle = bundleFactory.getBundle(topicName); + if (bundle == null || !bundle.equals(targetBundle)) { + continue; + } + futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + Topic loaded = optTopic.get(); + Subscription sub = loaded.getSubscription(subscription); + if (sub == null) { + return CompletableFuture.completedFuture(null); + } + return sub.delete(); + })); + } + return FutureUtil.waitForAll(futures); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription, + nsName.toString(), bundleRange, cause); + if (cause instanceof SubscriptionBusyException) { + throw new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers"); + } + throw new RestException(cause); + }); } protected BundlesData validateBundlesData(BundlesData initialBundles) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index cad6899c8a290..1809456476c6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1446,14 +1446,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(property, cluster, namespace); - internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalUnsubscribeNamespaceAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1462,12 +1463,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative); + internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(), + subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 90f4b087bfe85..53501d16f485f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1588,14 +1588,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalUnsubscribeNamespaceAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1605,12 +1606,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ @ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant, + public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative); + internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(), + subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 663a6f0a41900..680b1a7842b36 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2339,6 +2339,92 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { new ArrayList<>()); } + @Test(dataProvider = "bundling") + public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns-unsub-bundle"; + admin.namespaces().createNamespace(namespace, numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + consumer1.close(); + consumer2.close(); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().unsubscribeNamespaceBundle(namespace, bundle.getBundleRange(), subscription); + + List subscriptions = + admin.topics().getSubscriptions(topic).stream().sorted().toList(); + assertEquals(subscriptions, List.of(otherSubscription)); + } + + @Test(dataProvider = "bundling") + public void testUnsubscribeNamespaceOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns-unsub-namespace"; + admin.namespaces().createNamespace(namespace, numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + consumer1.close(); + consumer2.close(); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().unsubscribeNamespace(namespace, subscription); + + List subscriptions = + admin.topics().getSubscriptions(topic).stream().sorted().toList(); + assertEquals(subscriptions, List.of(otherSubscription)); + } + private List publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception { return publishMessagesOnPersistentTopic(topicName, messages, 0, false); } From 8246c0c42036b9826c87f31f9639bfb930b4cfdc Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B5=A9?= Date: Sat, 28 Feb 2026 17:03:31 +0800 Subject: [PATCH 2/4] fix --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index d59f19caf266a..598c4038b053c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1905,7 +1905,7 @@ private CompletableFuture unsubscribeAsync(NamespaceName nsName, String bu new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor")); } - return pulsar().getNamespaceService().getListOfPersistentTopics(nsName) + return pulsar().getNamespaceService().getFullListOfTopics(nsName) .thenCompose(topicsInNamespace -> { List> futures = new ArrayList<>(); NamespaceBundleFactory bundleFactory = From 26acbb61f73721665fc08a533cd0d04f3c5f6a81 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B5=A9?= Date: Sat, 28 Feb 2026 17:32:15 +0800 Subject: [PATCH 3/4] fix --- .../java/org/apache/pulsar/broker/admin/AdminApiTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 680b1a7842b36..9a77487ac45c6 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2339,7 +2339,7 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { new ArrayList<>()); } - @Test(dataProvider = "bundling") + @Test(dataProvider = "numBundles") public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer numBundles) throws Exception { String namespace = "prop-xyz/ns-unsub-bundle"; admin.namespaces().createNamespace(namespace, numBundles); @@ -2382,7 +2382,7 @@ public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer numBundles) t assertEquals(subscriptions, List.of(otherSubscription)); } - @Test(dataProvider = "bundling") + @Test(dataProvider = "numBundles") public void testUnsubscribeNamespaceOnUnloadedBundle(Integer numBundles) throws Exception { String namespace = "prop-xyz/ns-unsub-namespace"; admin.namespaces().createNamespace(namespace, numBundles); From db6fe0649fb885ce2c40cf6ea1a0c36d857a9e93 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=BC=A0=E6=B5=A9?= Date: Mon, 2 Mar 2026 11:33:43 +0800 Subject: [PATCH 4/4] delete duplicate log --- .../org/apache/pulsar/broker/admin/impl/NamespacesBase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index 598c4038b053c..ef97c888c0f71 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1937,8 +1937,6 @@ private CompletableFuture unsubscribeAsync(NamespaceName nsName, String bu return FutureUtil.waitForAll(futures); }).exceptionally(ex -> { Throwable cause = FutureUtil.unwrapCompletionException(ex); - log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription, - nsName.toString(), bundleRange, cause); if (cause instanceof SubscriptionBusyException) { throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers");