Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1667,70 +1667,51 @@ protected void internalClearNamespaceBundleBacklogForSubscription(String subscri
subscription, namespaceName, bundleRange);
}

protected void internalUnsubscribeNamespace(AsyncResponse asyncResponse, String subscription,
boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
protected CompletableFuture<Void> internalUnsubscribeNamespaceAsync(String subscription,
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");

final List<CompletableFuture<Void>> futures = new ArrayList<>();
try {
NamespaceBundles bundles = pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundles(namespaceName);
for (NamespaceBundle nsBundle : bundles.getBundles()) {
// check if the bundle is owned by any broker, if not then there are no subscriptions
if (pulsar().getNamespaceService().checkOwnershipPresent(nsBundle)) {
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
}
}
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
return;
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
return;
}

FutureUtil.waitForAll(futures).handle((result, exception) -> {
if (exception != null) {
log.warn("[{}] Failed to unsubscribe {} on the bundles for namespace {}: {}", clientAppId(),
subscription, namespaceName, exception.getCause().getMessage());
if (exception.getCause() instanceof PulsarAdminException) {
asyncResponse.resume(new RestException((PulsarAdminException) exception.getCause()));
return null;
} else {
asyncResponse.resume(new RestException(exception.getCause()));
return null;
}
}
log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}", clientAppId(),
subscription, namespaceName);
asyncResponse.resume(Response.noContent().build());
return null;
});
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ -> pulsar().getNamespaceService().getNamespaceBundleFactory()
.getBundlesAsync(namespaceName))
.thenCompose(bundles -> {
final List<CompletableFuture<Void>> futures = new ArrayList<>();
for (NamespaceBundle nsBundle : bundles.getBundles()) {
try {
futures.add(pulsar().getAdminClient().namespaces().unsubscribeNamespaceBundleAsync(
namespaceName.toString(), nsBundle.getBundleRange(), subscription));
} catch (PulsarServerException e) {
return CompletableFuture.failedFuture(e);
}
}
return FutureUtil.waitForAll(futures);
}).thenRun(() -> log.info("[{}] Successfully unsubscribed {} on all the bundles for namespace {}",
clientAppId(), subscription, namespaceName));
}

@SuppressWarnings("deprecation")
protected void internalUnsubscribeNamespaceBundle(String subscription, String bundleRange, boolean authoritative) {
validateNamespaceOperation(namespaceName, NamespaceOperation.UNSUBSCRIBE);
protected CompletableFuture<Void> internalUnsubscribeNamespaceBundleAsync(String subscription, String bundleRange,
boolean authoritative) {
checkNotNull(subscription, "Subscription should not be null");
checkNotNull(bundleRange, "BundleRange should not be null");

Policies policies = getNamespacePolicies(namespaceName);

if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
validateGlobalNamespaceOwnership(namespaceName);
} else {
validateClusterOwnership(namespaceName.getCluster());
validateClusterForTenant(namespaceName.getTenant(), namespaceName.getCluster());
}

validateNamespaceBundleOwnership(namespaceName, policies.bundles, bundleRange, authoritative, true);

unsubscribe(namespaceName, bundleRange, subscription);
log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}", clientAppId(), subscription,
namespaceName, bundleRange);
return validateNamespaceOperationAsync(namespaceName, NamespaceOperation.UNSUBSCRIBE)
.thenCompose(__ -> {
if (namespaceName.isGlobal()) {
// check cluster ownership for a given global namespace: redirect if peer-cluster owns it
return validateGlobalNamespaceOwnershipAsync(namespaceName);
}
return validateClusterOwnershipAsync(namespaceName.getCluster())
.thenCompose(unused -> validateClusterForTenantAsync(namespaceName.getTenant(),
namespaceName.getCluster()));
})
.thenCompose(__ -> getNamespacePoliciesAsync(namespaceName))
.thenCompose(policies ->
validateNamespaceBundleOwnershipAsync(namespaceName, policies.bundles, bundleRange,
authoritative, false))
.thenCompose(__ -> unsubscribeAsync(namespaceName, bundleRange, subscription))
.thenRun(() -> log.info("[{}] Successfully unsubscribed {} on namespace bundle {}/{}",
clientAppId(), subscription, namespaceName, bundleRange));
}

protected void internalSetSubscriptionAuthMode(SubscriptionAuthMode subscriptionAuthMode) {
Expand Down Expand Up @@ -1918,32 +1899,50 @@ private void clearBacklog(NamespaceName nsName, String bundleRange, String subsc
}
}

private void unsubscribe(NamespaceName nsName, String bundleRange, String subscription) {
try {
List<Topic> topicList = pulsar().getBrokerService().getAllTopicsFromNamespaceBundle(nsName.toString(),
nsName.toString() + "/" + bundleRange);
List<CompletableFuture<Void>> futures = new ArrayList<>();
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
throw new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor");
} else {
for (Topic topic : topicList) {
Subscription sub = topic.getSubscription(subscription);
if (sub != null) {
futures.add(sub.delete());
}
}
}
FutureUtil.waitForAll(futures).get();
} catch (RestException re) {
throw re;
} catch (Exception e) {
log.error("[{}] Failed to unsubscribe {} for namespace {}/{}", clientAppId(), subscription,
nsName.toString(), bundleRange, e);
if (e.getCause() instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED, "Subscription has active connected consumers");
}
throw new RestException(e.getCause());
private CompletableFuture<Void> unsubscribeAsync(NamespaceName nsName, String bundleRange, String subscription) {
if (subscription.startsWith(pulsar().getConfiguration().getReplicatorPrefix())) {
return CompletableFuture.failedFuture(
new RestException(Status.PRECONDITION_FAILED, "Cannot unsubscribe a replication cursor"));
}

return pulsar().getNamespaceService().getFullListOfTopics(nsName)
.thenCompose(topicsInNamespace -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can the logic of NamespaceService#getOwnedTopicListForNamespaceBundle be reused here (or switch to using targetBundle.includes)?

List<CompletableFuture<Void>> futures = new ArrayList<>();
NamespaceBundleFactory bundleFactory =
pulsar().getNamespaceService().getNamespaceBundleFactory();
NamespaceBundle targetBundle = bundleFactory.getBundle(nsName.toString(), bundleRange);

for (String topic : topicsInNamespace) {
TopicName topicName = TopicName.get(topic);
if (pulsar().getBrokerService().isSystemTopic(topicName)) {
continue;
}
NamespaceBundle bundle = bundleFactory.getBundle(topicName);
if (bundle == null || !bundle.equals(targetBundle)) {
continue;
}
futures.add(pulsar().getBrokerService().getTopic(topicName.toString(), false)
.thenCompose(optTopic -> {
if (optTopic.isEmpty()) {
return CompletableFuture.completedFuture(null);
}
Topic loaded = optTopic.get();
Subscription sub = loaded.getSubscription(subscription);
if (sub == null) {
return CompletableFuture.completedFuture(null);
}
return sub.delete();
}));
}
return FutureUtil.waitForAll(futures);
}).exceptionally(ex -> {
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof SubscriptionBusyException) {
throw new RestException(Status.PRECONDITION_FAILED,
"Subscription has active connected consumers");
}
throw new RestException(cause);
});
}

protected BundlesData validateBundlesData(BundlesData initialBundles) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1446,14 +1446,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespace(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespaceAsync(subscription, authoritative)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(),
subscription, namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -1462,12 +1463,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse,
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespaceBundle(@PathParam("property") String property, @PathParam("cluster") String cluster,
public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("property") String property, @PathParam("cluster") String cluster,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(property, cluster, namespace);
internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative);
internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(),
subscription, namespaceName, bundleRange, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1588,14 +1588,15 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @
@PathParam("namespace") String namespace,
@PathParam("subscription") String subscription,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
try {
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespace(asyncResponse, subscription, authoritative);
} catch (WebApplicationException wae) {
asyncResponse.resume(wae);
} catch (Exception e) {
asyncResponse.resume(new RestException(e));
}
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespaceAsync(subscription, authoritative)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to unsubscribe {} on namespace {}", clientAppId(),
subscription, namespaceName, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand All @@ -1605,12 +1606,20 @@ public void unsubscribeNamespace(@Suspended final AsyncResponse asyncResponse, @
@ApiResponse(code = 204, message = "Operation successful"),
@ApiResponse(code = 403, message = "Don't have admin or operate permission on the namespace"),
@ApiResponse(code = 404, message = "Namespace does not exist") })
public void unsubscribeNamespaceBundle(@PathParam("tenant") String tenant,
public void unsubscribeNamespaceBundle(@Suspended final AsyncResponse asyncResponse,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace, @PathParam("subscription") String subscription,
@PathParam("bundle") String bundleRange,
@QueryParam("authoritative") @DefaultValue("false") boolean authoritative) {
validateNamespaceName(tenant, namespace);
internalUnsubscribeNamespaceBundle(subscription, bundleRange, authoritative);
internalUnsubscribeNamespaceBundleAsync(subscription, bundleRange, authoritative)
.thenAccept(__ -> asyncResponse.resume(Response.noContent().build()))
.exceptionally(ex -> {
log.error("[{}] Failed to unsubscribe {} on namespace bundle {}/{}", clientAppId(),
subscription, namespaceName, bundleRange, ex);
resumeAsyncResponseExceptionally(asyncResponse, ex);
return null;
});
}

@POST
Expand Down
Loading
Loading