diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java index bfbd65234813e..9cfaa570ced7b 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/impl/NamespacesBase.java @@ -1667,70 +1667,51 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri subscription, namespaceName, bundleRange); } - protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription, - boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); + protected CompletableFuture internalUnsubscribeNamespaceAsync(String subscription, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); - final List> futures = new ArrayList<>(); - try { - NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory() - .getBundles(namespaceName); - for (NamespaceBundle nsBundle : bundles.getBundles()) { - // check if the bundle is owned by any broker, if not then there are no subscriptions - if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) { - futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( - namespaceName.toString(), nsBundle.getBundleRange(), subscription)); - } - } - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - return; - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - return; - } - - FutureUtil.waitForAll(futures).handle((result, exception) -> { - if (exception != null) { - log.warn("[{}] Failed to unsubscribe {} on the bundles for namespace {}: {}", clientAppId(), - subscription, namespaceName, exception.getCause().getMessage()); - if (exception.getCause() instanceof PulsarAdminException) { - asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause())); - return null; - } else { - asyncResponse.resume(new RestException(exception.getCause())); - return null; - } - } - log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(), - subscription, namespaceName); - asyncResponse.resume(Response.noContent().build()); - return null; - }); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) + .thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory() + .getBundlesAsync(namespaceName)) + .thenCompose(bundles -> { + final List> futures = new ArrayList<>(); + for (NamespaceBundle nsBundle : bundles.getBundles()) { + try { + futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync( + namespaceName.toString(), nsBundle.getBundleRange(), subscription)); + } catch (PulsarServerException e) { + return CompletableFuture.failedFuture(e); + } + } + return FutureUtil.waitForAll(futures); + }).thenRun(() -> log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", + clientAppId(), subscription, namespaceName)); } @SuppressWarnings("deprecation") - protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) { - validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE); + protected CompletableFuture internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange, + boolean authoritative) { checkNotNull(subscription, "Subscription should not be null"); checkNotNull(bundleRange, "BundleRange should not be null"); - Policies policies = getNamespacePolicies(namespaceName); - - if (namespaceName.isGlobal()) { - // check cluster ownership for a given global namespace: redirect if peer-cluster owns it - validateGlobalNamespaceOwnership(namespaceName); - } else { - validateClusterOwnership(namespaceName.getCluster()); - validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster()); - } - - validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true); - - unsubscribe(namespaceName, bundleRange, subscription); - log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", clientAppId(), subscription, - namespaceName, bundleRange); + return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE) + .thenCompose(__ -> { + if (namespaceName.isGlobal()) { + // check cluster ownership for a given global namespace: redirect if peer-cluster owns it + return validateGlobalNamespaceOwnershipAsync(namespaceName); + } + return validateClusterOwnershipAsync(namespaceName.getCluster()) + .thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(), + namespaceName.getCluster())); + }) + .thenCompose(__ -> getNamespacePoliciesAsync(namespaceName)) + .thenCompose(policies -> + validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange, + authoritative, false)) + .thenCompose(bundle -> unsubscribeAsync(bundle, subscription)) + .thenRun(() -> log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", + clientAppId(), subscription, namespaceName, bundleRange)); } protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) { @@ -1918,32 +1899,42 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc } } - private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) { - try { - List topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(), - nsName.toString() + "/" + bundleRange); - List> futures = new ArrayList<>(); - if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { - throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor"); - } else { - for (Topic topic : topicList) { - Subscription sub = topic.getSubscription(subscription); - if (sub != null) { - futures.add(sub.delete()); - } - } - } - FutureUtil.waitForAll(futures).get(); - } catch (RestException re) { - throw re; - } catch (Exception e) { - log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription, - nsName.toString(), bundleRange, e); - if (e.getCause() instanceof SubscriptionBusyException) { - throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers"); - } - throw new RestException(e.getCause()); + private CompletableFuture unsubscribeAsync(NamespaceBundle bundle, String subscription) { + if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) { + return CompletableFuture.failedFuture( + new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor")); } + + return pulsar().getNamespaceService().getOwnedTopicListForNamespaceBundle(bundle) + .thenCompose(topicsInBundle -> { + List> futures = new ArrayList<>(); + for (String topic : topicsInBundle) { + TopicName topicName = TopicName.get(topic); + if (pulsar().getBrokerService().isSystemTopic(topicName)) { + continue; + } + futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false) + .thenCompose(optTopic -> { + if (optTopic.isEmpty()) { + return CompletableFuture.completedFuture(null); + } + Topic loaded = optTopic.get(); + Subscription sub = loaded.getSubscription(subscription); + if (sub == null) { + return CompletableFuture.completedFuture(null); + } + return sub.delete(); + })); + } + return FutureUtil.waitForAll(futures); + }).exceptionally(ex -> { + Throwable cause = FutureUtil.unwrapCompletionException(ex); + if (cause instanceof SubscriptionBusyException) { + throw new RestException(Status.PRECONDITION_FAILED, + "Subscription has active connected consumers"); + } + throw new RestException(cause); + }); } protected BundlesData validateBundlesData(BundlesData initialBundles) { diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java index cad6899c8a290..1809456476c6c 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v1/Namespaces.java @@ -1446,14 +1446,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(property, cluster, namespace); - internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(property, cluster, namespace); + internalUnsubscribeNamespaceAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1462,12 +1463,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ApiResponses(value = { @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster, + public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse, + @PathParam("property") String property, @PathParam("cluster") String cluster, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(property, cluster, namespace); - internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative); + internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(), + subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java index 90f4b087bfe85..53501d16f485f 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/admin/v2/Namespaces.java @@ -1588,14 +1588,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { - try { - validateNamespaceName(tenant, namespace); - internalUnsubscribeNamespace(asyncResponse, subscription, authoritative); - } catch (WebApplicationException wae) { - asyncResponse.resume(wae); - } catch (Exception e) { - asyncResponse.resume(new RestException(e)); - } + validateNamespaceName(tenant, namespace); + internalUnsubscribeNamespaceAsync(subscription, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(), + subscription, namespaceName, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST @@ -1605,12 +1606,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @ @ApiResponse(code = 204, message = "Operation successful"), @ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"), @ApiResponse(code = 404, message = "Namespace does not exist") }) - public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant, + public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse, + @PathParam("tenant") String tenant, @PathParam("namespace") String namespace, @PathParam("subscription") String subscription, @PathParam("bundle") String bundleRange, @QueryParam("authoritative") @DefaultValue("false") boolean authoritative) { validateNamespaceName(tenant, namespace); - internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative); + internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative) + .thenAccept(__ -> asyncResponse.resume(Response.noContent().build())) + .exceptionally(ex -> { + log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(), + subscription, namespaceName, bundleRange, ex); + resumeAsyncResponseExceptionally(asyncResponse, ex); + return null; + }); } @POST diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java index 663a6f0a41900..8dabc75e52d8b 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/admin/AdminApiTest.java @@ -2339,6 +2339,221 @@ public void testUnsubscribeOnNamespace(Integer numBundles) throws Exception { new ArrayList<>()); } + @Test(dataProvider = "numBundles") + public void testUnsubscribeNamespaceBundleOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns-unsub-bundle"; + admin.namespaces().createNamespace(namespace, numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + consumer1.close(); + consumer2.close(); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().unsubscribeNamespaceBundle(namespace, bundle.getBundleRange(), subscription); + + List subscriptions = + admin.topics().getSubscriptions(topic).stream().sorted().toList(); + assertEquals(subscriptions, List.of(otherSubscription)); + } + + @Test(dataProvider = "numBundles") + public void testUnsubscribeNamespaceOnUnloadedBundle(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns-unsub-namespace"; + admin.namespaces().createNamespace(namespace, numBundles); + + String topic = "persistent://" + namespace + "/t1"; + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + consumer1.close(); + consumer2.close(); + + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(topic)); + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + + admin.namespaces().unsubscribeNamespace(namespace, subscription); + + List subscriptions = + admin.topics().getSubscriptions(topic).stream().sorted().toList(); + assertEquals(subscriptions, List.of(otherSubscription)); + } + + @Test(dataProvider = "numBundles") + public void testUnsubscribeNamespaceOnUnloadedBundleWithPartitionedTopic(Integer numBundles) throws Exception { + String namespace = "prop-xyz/ns-unsub-partitioned"; + admin.namespaces().createNamespace(namespace, numBundles); + + String topic = "persistent://" + namespace + "/pt"; + int partitions = 3; + admin.topics().createPartitionedTopic(topic, partitions); + + String subscription = "sub1"; + String otherSubscription = "sub2"; + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(subscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic) + .subscriptionName(otherSubscription) + .subscribe(); + + consumer1.close(); + consumer2.close(); + + Set bundles = new HashSet<>(); + for (int i = 0; i < partitions; i++) { + String partitionTopic = TopicName.get(topic).getPartition(i).toString(); + bundles.add(pulsar.getNamespaceService().getNamespaceBundleFactory() + .getBundle(TopicName.get(partitionTopic))); + } + for (NamespaceBundle bundle : bundles) { + admin.namespaces().unloadNamespaceBundle(namespace, bundle.getBundleRange()); + } + + for (NamespaceBundle bundle : bundles) { + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + } + + admin.namespaces().unsubscribeNamespace(namespace, subscription); + + for (int i = 0; i < partitions; i++) { + String partitionTopic = TopicName.get(topic).getPartition(i).toString(); + List subs = admin.topics().getSubscriptions(partitionTopic).stream().sorted().toList(); + assertEquals(subs, List.of(otherSubscription)); + } + } + + @Test + public void testUnsubscribeNamespaceOnUnloadedBundleWithMultiTopicCrossBundle() throws Exception { + String namespace = "prop-xyz/ns-unsub-cross-bundle"; + admin.namespaces().createNamespace(namespace, 4); + + String subscription = "sub1"; + String otherSubscription = "sub2"; + + String topic1 = null; + String topic2 = null; + NamespaceBundle bundle1 = null; + NamespaceBundle bundle2 = null; + for (int i = 0; i < 50; i++) { + String candidate = "persistent://" + namespace + "/t-" + i; + @Cleanup + Consumer c = pulsarClient.newConsumer() + .topic(candidate) + .subscriptionName(subscription) + .subscribe(); + c.close(); + NamespaceBundle bundle = + pulsar.getNamespaceService().getNamespaceBundleFactory().getBundle(TopicName.get(candidate)); + if (topic1 == null) { + topic1 = candidate; + bundle1 = bundle; + continue; + } + if (!bundle.equals(bundle1)) { + topic2 = candidate; + bundle2 = bundle; + break; + } + } + assertNotNull(topic1); + assertNotNull(topic2); + assertNotNull(bundle1); + assertNotNull(bundle2); + + Consumer consumer1 = pulsarClient.newConsumer() + .topic(topic1) + .subscriptionName(otherSubscription) + .subscribe(); + + Consumer consumer2 = pulsarClient.newConsumer() + .topic(topic2) + .subscriptionName(otherSubscription) + .subscribe(); + consumer1.close(); + consumer2.close(); + + admin.namespaces().unloadNamespaceBundle(namespace, bundle1.getBundleRange()); + admin.namespaces().unloadNamespaceBundle(namespace, bundle2.getBundleRange()); + + for (NamespaceBundle bundle : List.of(bundle1, bundle2)) { + Awaitility.await() + .atMost(30, TimeUnit.SECONDS) + .pollInterval(200, TimeUnit.MILLISECONDS) + .untilAsserted(() -> { + assertFalse(pulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by current broker"); + assertFalse(otherPulsar.getNamespaceService().isServiceUnitOwned(bundle), + "Bundle should not be owned by other broker"); + }); + } + + admin.namespaces().unsubscribeNamespace(namespace, subscription); + + List subs1 = admin.topics().getSubscriptions(topic1).stream().sorted().toList(); + List subs2 = admin.topics().getSubscriptions(topic2).stream().sorted().toList(); + assertEquals(subs1, List.of(otherSubscription)); + assertEquals(subs2, List.of(otherSubscription)); + } + private List publishMessagesOnPersistentTopic(String topicName, int messages) throws Exception { return publishMessagesOnPersistentTopic(topicName, messages, 0, false); }