diff --git a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java index 0ac000359d..ed1fac412f 100644 --- a/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java +++ b/fluss-client/src/test/java/org/apache/fluss/client/security/acl/FlussAuthorizationITCase.java @@ -51,12 +51,16 @@ import org.apache.fluss.rpc.gateway.AdminReadOnlyGateway; import org.apache.fluss.rpc.gateway.CoordinatorGateway; import org.apache.fluss.rpc.gateway.TabletServerGateway; +import org.apache.fluss.rpc.messages.AdjustIsrRequest; import org.apache.fluss.rpc.messages.ControlledShutdownRequest; import org.apache.fluss.rpc.messages.GetKvSnapshotMetadataRequest; import org.apache.fluss.rpc.messages.InitWriterRequest; import org.apache.fluss.rpc.messages.InitWriterResponse; import org.apache.fluss.rpc.messages.MetadataRequest; +import org.apache.fluss.rpc.messages.NotifyLeaderAndIsrRequest; import org.apache.fluss.rpc.messages.ReleaseKvSnapshotLeaseRequest; +import org.apache.fluss.rpc.messages.StopReplicaRequest; +import org.apache.fluss.rpc.messages.UpdateMetadataRequest; import org.apache.fluss.rpc.metrics.TestingClientMetricGroup; import org.apache.fluss.security.acl.AccessControlEntry; import org.apache.fluss.security.acl.AccessControlEntryFilter; @@ -1394,6 +1398,171 @@ private static Configuration initConfig() { return conf; } + @Test + void testInternalReplicationControlAuthorization() throws Exception { + // These RPCs are internal-only, so we test via direct gateway access + try (RpcClient rpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway guestTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + rpcClient, + TabletServerGateway.class); + + CoordinatorGateway guestCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + rpcClient, + CoordinatorGateway.class); + + // Test 1: notifyLeaderAndIsr without WRITE permission + NotifyLeaderAndIsrRequest notifyRequest = new NotifyLeaderAndIsrRequest(); + notifyRequest.setCoordinatorEpoch(1); + assertThatThrownBy(() -> guestTabletGateway.notifyLeaderAndIsr(notifyRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 2: updateMetadata without WRITE permission + UpdateMetadataRequest updateRequest = new UpdateMetadataRequest(); + updateRequest.setCoordinatorEpoch(1); + assertThatThrownBy(() -> guestTabletGateway.updateMetadata(updateRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 3: stopReplica without WRITE permission + StopReplicaRequest stopRequest = new StopReplicaRequest(); + stopRequest.setCoordinatorEpoch(1); + assertThatThrownBy(() -> guestTabletGateway.stopReplica(stopRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + + // Test 4: adjustIsr without WRITE permission + AdjustIsrRequest adjustRequest = new AdjustIsrRequest(); + adjustRequest.setServerId(0); + assertThatThrownBy(() -> guestCoordinatorGateway.adjustIsr(adjustRequest).get()) + .rootCause() + .isInstanceOf(AuthorizationException.class) + .hasMessageContaining( + String.format( + "Principal %s have no authorization to operate WRITE on resource Resource{type=CLUSTER, name='fluss-cluster'}", + guestPrincipal)); + } + + // Test 5: Grant CLUSTER/WRITE permission and verify operations succeed + List aclBindings = + Collections.singletonList( + new AclBinding( + Resource.cluster(), + new AccessControlEntry( + guestPrincipal, + "*", + OperationType.WRITE, + PermissionType.ALLOW))); + rootAdmin.createAcls(aclBindings).all().get(); + FLUSS_CLUSTER_EXTENSION.waitUntilAuthenticationSync(aclBindings, true); + + try (RpcClient authorizedRpcClient = + RpcClient.create(guestConf, TestingClientMetricGroup.newInstance())) { + + TabletServerGateway authorizedTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("CLIENT").get(0), + authorizedRpcClient, + TabletServerGateway.class); + + CoordinatorGateway authorizedCoordinatorGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getCoordinatorServerNode("CLIENT"), + authorizedRpcClient, + CoordinatorGateway.class); + + // Now with WRITE permission, operations should NOT throw AuthorizationException + // (they may fail for other reasons like invalid data, but not authorization) + + NotifyLeaderAndIsrRequest authorizedNotifyRequest = new NotifyLeaderAndIsrRequest(); + authorizedNotifyRequest.setCoordinatorEpoch(1); + Throwable notifyThrown = + catchThrowable( + () -> + authorizedTabletGateway + .notifyLeaderAndIsr(authorizedNotifyRequest) + .get()); + if (notifyThrown != null) { + assertThat(notifyThrown).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + UpdateMetadataRequest authorizedUpdateRequest = new UpdateMetadataRequest(); + authorizedUpdateRequest.setCoordinatorEpoch(1); + Throwable updateThrown = + catchThrowable( + () -> + authorizedTabletGateway + .updateMetadata(authorizedUpdateRequest) + .get()); + if (updateThrown != null) { + assertThat(updateThrown).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + StopReplicaRequest authorizedStopRequest = new StopReplicaRequest(); + authorizedStopRequest.setCoordinatorEpoch(1); + Throwable stopThrown = + catchThrowable( + () -> authorizedTabletGateway.stopReplica(authorizedStopRequest).get()); + if (stopThrown != null) { + assertThat(stopThrown).rootCause().isNotInstanceOf(AuthorizationException.class); + } + + AdjustIsrRequest authorizedAdjustRequest = new AdjustIsrRequest(); + authorizedAdjustRequest.setServerId(0); + Throwable adjustThrown = + catchThrowable( + () -> + authorizedCoordinatorGateway + .adjustIsr(authorizedAdjustRequest) + .get()); + if (adjustThrown != null) { + assertThat(adjustThrown).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + + // Test 6: Verify internal sessions bypass authorization + TabletServerGateway internalTabletGateway = + GatewayClientProxy.createGatewayProxy( + () -> FLUSS_CLUSTER_EXTENSION.getTabletServerNodes("FLUSS").get(0), + FLUSS_CLUSTER_EXTENSION.getRpcClient(), + TabletServerGateway.class); + + // Internal connection should NOT throw AuthorizationException + // (may fail for other reasons like invalid data, but not authorization) + NotifyLeaderAndIsrRequest internalNotifyRequest = new NotifyLeaderAndIsrRequest(); + internalNotifyRequest.setCoordinatorEpoch(1); + + // The request will likely fail due to invalid data, but importantly + // it should NOT fail with AuthorizationException + Throwable thrown = + catchThrowable( + () -> + internalTabletGateway + .notifyLeaderAndIsr(internalNotifyRequest) + .get()); + if (thrown != null) { + assertThat(thrown).rootCause().isNotInstanceOf(AuthorizationException.class); + } + } + private void assertNoTableDescribeAuth(ThrowableAssert.ThrowingCallable callable) { assertThatThrownBy(callable) .cause() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java index bdc97434ec..42031a4fd2 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/coordinator/CoordinatorService.java @@ -198,6 +198,7 @@ import static org.apache.fluss.config.FlussConfigUtils.isTableStorageConfig; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindingFilters; import static org.apache.fluss.rpc.util.CommonRpcMessageUtils.toAclBindings; +import static org.apache.fluss.security.acl.OperationType.WRITE; import static org.apache.fluss.server.coordinator.rebalance.goal.GoalUtils.getGoalByType; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.addTableOffsetsToResponse; import static org.apache.fluss.server.utils.ServerRpcMessageUtils.fromTablePath; @@ -760,6 +761,9 @@ public CompletableFuture metadata(MetadataRequest request) { } public CompletableFuture adjustIsr(AdjustIsrRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); eventManagerSupplier .get() diff --git a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java index 88731daaba..050c083dd7 100644 --- a/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java +++ b/fluss-server/src/main/java/org/apache/fluss/server/tablet/TabletService.java @@ -348,6 +348,9 @@ public CompletableFuture getTableStats(GetTableStatsReque @Override public CompletableFuture notifyLeaderAndIsr( NotifyLeaderAndIsrRequest notifyLeaderAndIsrRequest) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); List notifyLeaderAndIsrRequestData = getNotifyLeaderAndIsrRequestData(notifyLeaderAndIsrRequest); @@ -373,6 +376,9 @@ public CompletableFuture metadata(MetadataRequest request) { @Override public CompletableFuture updateMetadata(UpdateMetadataRequest request) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } int coordinatorEpoch = request.hasCoordinatorEpoch() ? request.getCoordinatorEpoch() @@ -385,6 +391,9 @@ public CompletableFuture updateMetadata(UpdateMetadataRe @Override public CompletableFuture stopReplica( StopReplicaRequest stopReplicaRequest) { + if (authorizer != null) { + authorizer.authorize(currentSession(), WRITE, Resource.cluster()); + } CompletableFuture response = new CompletableFuture<>(); replicaManager.stopReplicas( stopReplicaRequest.getCoordinatorEpoch(),