diff --git a/KustoSchemaTools.Tests/Changes/ClusterChangesTest.cs b/KustoSchemaTools.Tests/Changes/ClusterChangesTest.cs new file mode 100644 index 0000000..329bfbc --- /dev/null +++ b/KustoSchemaTools.Tests/Changes/ClusterChangesTest.cs @@ -0,0 +1,122 @@ +using KustoSchemaTools.Changes; +using KustoSchemaTools.Model; +using Microsoft.Extensions.Logging; +using Moq; + +namespace KustoSchemaTools.Tests.Changes +{ + public class ClusterChangesTests + { + private readonly Mock _loggerMock; + + public ClusterChangesTests() + { + _loggerMock = new Mock(); + } + + [Fact] + public void GenerateChanges_WithIdenticalPolicies_ShouldDetectNoChanges() + { + // Arrange + var oldCluster = CreateClusterWithPolicy(0.2, 1, 2, 3); + var newCluster = CreateClusterWithPolicy(0.2, 1, 2, 3); + + // Act + var changeSet = ClusterChanges.GenerateChanges(oldCluster, newCluster, _loggerMock.Object); + + // Assert + Assert.NotNull(changeSet); + Assert.Empty(changeSet.Changes); + } + [Fact] + public void GenerateChanges_WithSingleChange_ShouldDetectChangeAndCreateScript() + { + // Arrange + var oldCluster = CreateClusterWithPolicy(0.2, 1, 2, 3); + var newCluster = CreateClusterWithPolicy(0.2, 1, 2, 5); + + // Act + var changeSet = ClusterChanges.GenerateChanges(oldCluster, newCluster, _loggerMock.Object); + + // Assert + Assert.NotNull(changeSet); + Assert.NotEmpty(changeSet.Changes); + Assert.NotEmpty(changeSet.Scripts); + + // Asserts that there is exactly one policy change in the change set + var policyChange = Assert.Single(changeSet.Changes) as PolicyChange; + Assert.NotNull(policyChange); + + // Assert that the correct script is generated + var expectedScript = newCluster.CapacityPolicy!.ToUpdateScript(); + var actualScriptContainer = Assert.Single(changeSet.Scripts); + Assert.Equal(expectedScript, actualScriptContainer.Script.Text); + } + + [Fact] + public void GenerateChanges_WithMultipleChanges_ShouldDetectAllChanges() + { + // Arrange + var oldCluster = CreateClusterWithPolicy(ingestionCapacityCoreUtilizationCoefficient: 0.75, materializedViewsCapacityClusterMaximumConcurrentOperations: 10); + var newCluster = CreateClusterWithPolicy(ingestionCapacityCoreUtilizationCoefficient: 0.95, materializedViewsCapacityClusterMaximumConcurrentOperations: 20); + + // Act + var changeSet = ClusterChanges.GenerateChanges(oldCluster, newCluster, _loggerMock.Object); + + // Assert + var policyChange = Assert.Single(changeSet.Changes) as PolicyChange; + Assert.NotNull(policyChange); + + // Assert that the correct script is generated + var expectedScript = newCluster.CapacityPolicy!.ToUpdateScript(); + var actualScriptContainer = Assert.Single(changeSet.Scripts); + Assert.Equal(expectedScript, actualScriptContainer.Script.Text); + } + + [Fact] + public void GenerateChanges_WithNullNewCapacityPolicy_ShouldNotGenerateChanges() + { + // Arrange + var oldCluster = CreateClusterWithPolicy(ingestionCapacityCoreUtilizationCoefficient: 0.75); + var newCluster = new Cluster { Name = oldCluster.Name, CapacityPolicy = null }; + + // Act + var changeSet = ClusterChanges.GenerateChanges(oldCluster, newCluster, _loggerMock.Object); + + // Assert + Assert.NotNull(changeSet); + Assert.Empty(changeSet.Changes); + Assert.Empty(changeSet.Scripts); + } + + #region Helper Methods + private Cluster CreateClusterWithPolicy( + double? ingestionCapacityCoreUtilizationCoefficient = null, + int? materializedViewsCapacityClusterMaximumConcurrentOperations = null, + int? extentsRebuildClusterMaximumConcurrentOperations = null, + int? extentsRebuildMaximumConcurrentOperationsPerNode = null + ) + { + return new Cluster + { + CapacityPolicy = new ClusterCapacityPolicy + { + MaterializedViewsCapacity = new MaterializedViewsCapacity + { + ClusterMaximumConcurrentOperations = materializedViewsCapacityClusterMaximumConcurrentOperations, + ExtentsRebuildCapacity = (extentsRebuildClusterMaximumConcurrentOperations != null || extentsRebuildMaximumConcurrentOperationsPerNode != null) ? new ExtentsRebuildCapacity + { + ClusterMaximumConcurrentOperations = extentsRebuildClusterMaximumConcurrentOperations, + MaximumConcurrentOperationsPerNode = extentsRebuildMaximumConcurrentOperationsPerNode + } : null + }, + IngestionCapacity = new IngestionCapacity + { + CoreUtilizationCoefficient = ingestionCapacityCoreUtilizationCoefficient + }, + } + }; + } + #endregion + } +} \ No newline at end of file diff --git a/KustoSchemaTools.Tests/DemoData/ClusterScopedChanges/multipleClusters.yml b/KustoSchemaTools.Tests/DemoData/ClusterScopedChanges/multipleClusters.yml new file mode 100644 index 0000000..37ab940 --- /dev/null +++ b/KustoSchemaTools.Tests/DemoData/ClusterScopedChanges/multipleClusters.yml @@ -0,0 +1,16 @@ +connections: +- name: test1 + url: test1.eastus + capacityPolicy: + ingestionCapacity: + clusterMaximumConcurrentOperations: 512 + coreUtilizationCoefficient: 0.75 + extentsMergeCapacity: + minimumConcurrentOperationsPerNode: 1 + maximumConcurrentOperationsPerNode: 3 +- name: test2 + url: test2.eastus + capacityPolicy: + ingestionCapacity: + clusterMaximumConcurrentOperations: 500 + coreUtilizationCoefficient: 0.8 diff --git a/KustoSchemaTools.Tests/KustoClusterOrchestratorTests.cs b/KustoSchemaTools.Tests/KustoClusterOrchestratorTests.cs new file mode 100644 index 0000000..6461d0f --- /dev/null +++ b/KustoSchemaTools.Tests/KustoClusterOrchestratorTests.cs @@ -0,0 +1,657 @@ +using KustoSchemaTools.Changes; +using KustoSchemaTools.Model; +using KustoSchemaTools.Parser; +using Microsoft.Extensions.Logging; +using Moq; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Xunit; +using System.Data; +using System; +using System.Linq; +using Kusto.Data.Common; + +namespace KustoSchemaTools.Tests +{ + public class KustoClusterOrchestratorTests + { + private readonly Mock> loggerMock; + private readonly Mock kustoClusterHandlerFactoryMock; + private readonly Mock yamlClusterHandlerFactoryMock; + private readonly Mock kustoHandlerMock; + private readonly KustoClusterOrchestrator orchestrator; + + public KustoClusterOrchestratorTests() + { + loggerMock = new Mock>(); + kustoClusterHandlerFactoryMock = new Mock(); + yamlClusterHandlerFactoryMock = new Mock(); + + // Create mock for KustoClusterHandler + var kustoClientMock = new Mock("test.eastus"); + var kustoLoggerMock = new Mock>(); + kustoHandlerMock = new Mock(kustoClientMock.Object, kustoLoggerMock.Object, "test", "test.eastus"); + + orchestrator = new KustoClusterOrchestrator( + loggerMock.Object, + kustoClusterHandlerFactoryMock.Object, + yamlClusterHandlerFactoryMock.Object); + } + + private Clusters CreateClustersWithCapacityPolicy(ClusterCapacityPolicy? capacityPolicy = null) + { + return new Clusters + { + Connections = new List + { + new Cluster + { + Name = "test", + Url = "test.eastus", + CapacityPolicy = capacityPolicy + } + } + }; + } + + private void SetupMockHandler(Cluster kustoCluster) + { + // Configure the handler factory to return our mock handler + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test", "test.eastus")) + .Returns(kustoHandlerMock.Object); + + // Set up the mock handler to return our test cluster + kustoHandlerMock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(kustoCluster); + } + + private Clusters CreateMultipleClusters() + { + return new Clusters + { + Connections = new List + { + new Cluster + { + Name = "cluster1", + Url = "cluster1.eastus", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 500, + CoreUtilizationCoefficient = 0.75 + } + } + }, + new Cluster + { + Name = "cluster2", + Url = "cluster2.westus", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 600, + CoreUtilizationCoefficient = 0.85 + } + } + } + } + }; + } + + private void SetupMultipleClusterMocks() + { + // Mock for cluster1 + var kustoHandler1Mock = new Mock(new Mock("cluster1.eastus").Object, new Mock>().Object, "cluster1", "cluster1.eastus"); + var kustoCluster1 = new Cluster + { + Name = "cluster1", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 300, // Different from config + CoreUtilizationCoefficient = 0.5 + } + } + }; + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("cluster1", "cluster1.eastus")) + .Returns(kustoHandler1Mock.Object); + kustoHandler1Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(kustoCluster1); + + // Mock for cluster2 - same as config, no changes + var kustoHandler2Mock = new Mock(new Mock("cluster2.westus").Object, new Mock>().Object, "cluster2", "cluster2.westus"); + var kustoCluster2 = new Cluster + { + Name = "cluster2", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 600, // Same as config + CoreUtilizationCoefficient = 0.85 + } + } + }; + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("cluster2", "cluster2.westus")) + .Returns(kustoHandler2Mock.Object); + kustoHandler2Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(kustoCluster2); + } + + [Fact] + public async Task GenerateChangesAsync_EmptyCapacityPolicy_ReturnsEmptyChangeSet() + { + // Arrange + var clusters = CreateClustersWithCapacityPolicy(); + var kustoCluster = new Cluster { Name = "test" }; + SetupMockHandler(kustoCluster); + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Single(changes); + var changeSet = changes[0]; + Assert.Empty(changeSet.Changes); + } + + [Fact] + public async Task GenerateChangesAsync_WithCapacityPolicyChanges_ReturnsChangeset() + { + // Arrange + var newCapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 500, + CoreUtilizationCoefficient = 0.75 + } + }; + + var clusters = CreateClustersWithCapacityPolicy(newCapacityPolicy); + + // Create a cluster with different capacity policy settings to trigger changes + var kustoCluster = new Cluster + { + Name = "test", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 300, + } + } + }; + + SetupMockHandler(kustoCluster); + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Single(changes); + var changeSet = changes[0]; + Assert.NotEmpty(changeSet.Changes); + Assert.Single(changeSet.Changes); + var policyChange = Assert.IsType>(changeSet.Changes[0]); + Assert.NotEmpty(policyChange.Scripts); + } + + [Fact] + public async Task GenerateChangesAsync_SameCapacityPolicy_ReturnsEmptyChangeSet() + { + // Arrange + var capacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 400, + CoreUtilizationCoefficient = 0.8 + } + }; + + var clusters = CreateClustersWithCapacityPolicy(capacityPolicy); + + // Create a cluster with the same capacity policy settings - no changes should be detected + var kustoCluster = new Cluster + { + Name = "test", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 400, + CoreUtilizationCoefficient = 0.8 + } + } + }; + + SetupMockHandler(kustoCluster); + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Single(changes); + var changeSet = changes[0]; + Assert.Empty(changeSet.Changes); + } + + [Fact] + public async Task GenerateChangesAsync_MultipleClusters_ReturnsCorrectChangesets() + { + // Arrange + var clusters = CreateMultipleClusters(); + SetupMultipleClusterMocks(); + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Equal(2, changes.Count); + + // First cluster should have changes + var cluster1ChangeSet = changes.FirstOrDefault(c => c.Entity == "cluster1"); + Assert.NotNull(cluster1ChangeSet); + Assert.NotEmpty(cluster1ChangeSet.Changes); + Assert.Single(cluster1ChangeSet.Changes); + + // Second cluster should have no changes + var cluster2ChangeSet = changes.FirstOrDefault(c => c.Entity == "cluster2"); + Assert.NotNull(cluster2ChangeSet); + Assert.Empty(cluster2ChangeSet.Changes); + } + + [Fact] + public async Task GenerateChangesAsync_NullClusters_ThrowsNullReferenceException() + { + // Act & Assert + await Assert.ThrowsAsync(() => orchestrator.GenerateChangesAsync(null!)); + } + + [Fact] + public async Task GenerateChangesAsync_EmptyClustersList_ReturnsEmptyList() + { + // Arrange + var clusters = new Clusters { Connections = new List() }; + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Empty(changes); + } + + [Fact] + public async Task GenerateChangesAsync_LoadAsyncThrowsException_PropagatesException() + { + // Arrange + var clusters = CreateClustersWithCapacityPolicy(); + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test", "test.eastus")) + .Returns(kustoHandlerMock.Object); + + kustoHandlerMock + .Setup(h => h.LoadAsync()) + .ThrowsAsync(new Exception("Connection failed")); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesAsync(clusters)); + Assert.Equal("Connection failed", exception.Message); + } + + [Fact] + public async Task GenerateChangesAsync_MismatchedClusterNames_ThrowsArgumentException() + { + // Arrange + var clusters = CreateClustersWithCapacityPolicy(); + var kustoCluster = new Cluster { Name = "different-name" }; // Different name + SetupMockHandler(kustoCluster); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesAsync(clusters)); + Assert.Contains("Cluster names must match", exception.Message); + } + + [Fact] + public async Task GenerateChangesAsync_NewClusterHasNullPolicy_ReturnsEmptyChanges() + { + // Arrange - clusters with null capacity policy + var clusters = CreateClustersWithCapacityPolicy(null); + var kustoCluster = new Cluster + { + Name = "test", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 300 + } + } + }; + SetupMockHandler(kustoCluster); + + // Act + var changes = await orchestrator.GenerateChangesAsync(clusters); + + // Assert + Assert.Single(changes); + var changeSet = changes[0]; + Assert.Empty(changeSet.Changes); // No changes because new policy is null + } + + [Fact] + public async Task GenerateChangesAsync_VerifyFactoryCalledWithCorrectUrl() + { + // Arrange + var clusters = CreateClustersWithCapacityPolicy(); + var kustoCluster = new Cluster { Name = "test" }; + SetupMockHandler(kustoCluster); + + // Act + await orchestrator.GenerateChangesAsync(clusters); + + // Assert + kustoClusterHandlerFactoryMock.Verify(f => f.Create("test", "test.eastus"), Times.Once); + kustoHandlerMock.Verify(h => h.LoadAsync(), Times.Once); + } + + [Fact] + public async Task GenerateChangesAsync_VerifyLoggingCalled() + { + // Arrange + var clusters = CreateClustersWithCapacityPolicy(); + var kustoCluster = new Cluster { Name = "test" }; + SetupMockHandler(kustoCluster); + + // Act + await orchestrator.GenerateChangesAsync(clusters); + + // Assert - Verify that logging was called with the expected message + loggerMock.Verify( + x => x.Log( + LogLevel.Information, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("Generating cluster diff for test")), + It.IsAny(), + It.IsAny>()), + Times.Once); + } + + [Fact] + public async Task GenerateChangesFromFileAsync_ValidYamlFile_ReturnsChanges() + { + // Arrange + var yamlFilePath = Path.Join("DemoData", "ClusterScopedChanges", "multipleClusters.yml"); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(yamlFilePath)) + .Returns(new YamlClusterHandler(yamlFilePath)); + + // Set up mocks for the clusters defined in the YAML file + var kustoHandler1Mock = new Mock(new Mock("test1.eastus").Object, new Mock>().Object, "test1", "test1.eastus"); + var kustoCluster1 = new Cluster + { + Name = "test1", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 300, // Different from YAML + CoreUtilizationCoefficient = 0.5 + } + } + }; + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test1", "test1.eastus")) + .Returns(kustoHandler1Mock.Object); + kustoHandler1Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(kustoCluster1); + + var kustoHandler2Mock = new Mock(new Mock("test2.eastus").Object, new Mock>().Object, "test2", "test2.eastus"); + var kustoCluster2 = new Cluster + { + Name = "test2", + CapacityPolicy = new ClusterCapacityPolicy + { + IngestionCapacity = new IngestionCapacity + { + ClusterMaximumConcurrentOperations = 500, // Same as YAML + CoreUtilizationCoefficient = 0.8 + } + } + }; + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test2", "test2.eastus")) + .Returns(kustoHandler2Mock.Object); + kustoHandler2Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(kustoCluster2); + + // Act + var changes = await orchestrator.GenerateChangesFromFileAsync(yamlFilePath); + + // Assert + Assert.Equal(2, changes.Count); + + // test1 should have changes + var test1ChangeSet = changes.FirstOrDefault(c => c.Entity == "test1"); + Assert.NotNull(test1ChangeSet); + Assert.NotEmpty(test1ChangeSet.Changes); + + // test2 should have no changes (same configuration) + var test2ChangeSet = changes.FirstOrDefault(c => c.Entity == "test2"); + Assert.NotNull(test2ChangeSet); + Assert.Empty(test2ChangeSet.Changes); + } + + [Fact] + public async Task GenerateChangesFromFileAsync_NonExistentFile_ThrowsFileNotFoundException() + { + // Arrange + var nonExistentFilePath = "non-existent-file.yml"; + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(nonExistentFilePath)) + .Returns(new YamlClusterHandler(nonExistentFilePath)); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesFromFileAsync(nonExistentFilePath)); + Assert.Contains("Clusters file not found", exception.Message); + } + + [Fact] + public async Task GenerateChangesFromFileAsync_EmptyFile_ThrowsInvalidOperationException() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + // Create an empty file + await File.WriteAllTextAsync(tempFilePath, ""); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(tempFilePath)) + .Returns(new YamlClusterHandler(tempFilePath)); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesFromFileAsync(tempFilePath)); + Assert.Contains("Clusters file is empty", exception.Message); + } + finally + { + File.Delete(tempFilePath); + } + } + + [Fact] + public async Task GenerateChangesFromFileAsync_InvalidYaml_ThrowsInvalidOperationException() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + // Create a file with invalid YAML + await File.WriteAllTextAsync(tempFilePath, "invalid: yaml: content: ["); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(tempFilePath)) + .Returns(new YamlClusterHandler(tempFilePath)); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesFromFileAsync(tempFilePath)); + Assert.Contains("Failed to parse clusters file", exception.Message); + } + finally + { + File.Delete(tempFilePath); + } + } + + [Fact] + public async Task GenerateChangesFromFileAsync_ValidYamlWithEmptyConnections_ReturnsEmptyList() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + // Create a YAML file with empty connections + await File.WriteAllTextAsync(tempFilePath, "connections: []"); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(tempFilePath)) + .Returns(new YamlClusterHandler(tempFilePath)); + + // Act + var changes = await orchestrator.GenerateChangesFromFileAsync(tempFilePath); + + // Assert + Assert.Empty(changes); + } + finally + { + File.Delete(tempFilePath); + } + } + + [Fact] + public async Task GenerateChangesFromFileAsync_VerifyLoggingCalled() + { + // Arrange + var yamlFilePath = Path.Join("DemoData", "ClusterScopedChanges", "multipleClusters.yml"); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(yamlFilePath)) + .Returns(new YamlClusterHandler(yamlFilePath)); + + // Set up a simple mock for the clusters + var kustoHandler1Mock = new Mock(new Mock("test1.eastus").Object, new Mock>().Object, "test1", "test1.eastus"); + var kustoHandler2Mock = new Mock(new Mock("test2.eastus").Object, new Mock>().Object, "test2", "test2.eastus"); + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test1", "test1.eastus")) + .Returns(kustoHandler1Mock.Object); + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test2", "test2.eastus")) + .Returns(kustoHandler2Mock.Object); + + kustoHandler1Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(new Cluster { Name = "test1" }); + kustoHandler2Mock + .Setup(h => h.LoadAsync()) + .ReturnsAsync(new Cluster { Name = "test2" }); + + // Act + await orchestrator.GenerateChangesFromFileAsync(yamlFilePath); + + // Assert - Verify that loading logging was called + loggerMock.Verify( + x => x.Log( + LogLevel.Information, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("Loading cluster configurations from file")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + // Verify that loaded count logging was called + loggerMock.Verify( + x => x.Log( + LogLevel.Information, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("Loaded 2 cluster configuration(s) from YAML file")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + // Verify that individual cluster processing logging was called + loggerMock.Verify( + x => x.Log( + LogLevel.Information, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("Generating cluster diff for test1")), + It.IsAny(), + It.IsAny>()), + Times.Once); + + loggerMock.Verify( + x => x.Log( + LogLevel.Information, + It.IsAny(), + It.Is((v, t) => v.ToString()!.Contains("Generating cluster diff for test2")), + It.IsAny(), + It.IsAny>()), + Times.Once); + } + + [Fact] + public async Task GenerateChangesFromFileAsync_LoadAsyncThrowsException_PropagatesException() + { + // Arrange + var yamlFilePath = Path.Join("DemoData", "ClusterScopedChanges", "multipleClusters.yml"); + + // Setup the yaml handler factory to return a real YamlClusterHandler + yamlClusterHandlerFactoryMock + .Setup(f => f.Create(yamlFilePath)) + .Returns(new YamlClusterHandler(yamlFilePath)); + + var kustoHandler1Mock = new Mock(new Mock("test1.eastus").Object, new Mock>().Object, "test1", "test1.eastus"); + + kustoClusterHandlerFactoryMock + .Setup(f => f.Create("test1", "test1.eastus")) + .Returns(kustoHandler1Mock.Object); + + kustoHandler1Mock + .Setup(h => h.LoadAsync()) + .ThrowsAsync(new Exception("Kusto connection failed")); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => orchestrator.GenerateChangesFromFileAsync(yamlFilePath)); + Assert.Equal("Kusto connection failed", exception.Message); + } + } +} diff --git a/KustoSchemaTools.Tests/Parser/YamlClusterHandlerTests.cs b/KustoSchemaTools.Tests/Parser/YamlClusterHandlerTests.cs new file mode 100644 index 0000000..307cb41 --- /dev/null +++ b/KustoSchemaTools.Tests/Parser/YamlClusterHandlerTests.cs @@ -0,0 +1,120 @@ +using KustoSchemaTools.Model; +using KustoSchemaTools.Parser; +using System.Collections.Generic; +using System.IO; +using System.Threading.Tasks; +using Xunit; +using System; + +namespace KustoSchemaTools.Tests.Parser +{ + public class YamlClusterHandlerTests + { + [Fact] + public async Task LoadAsync_ValidYamlFile_ReturnsClustersList() + { + // Arrange + var testFilePath = Path.Join("DemoData", "ClusterScopedChanges", "multipleClusters.yml"); + var handler = new YamlClusterHandler(testFilePath); + + // Act + var result = await handler.LoadAsync(); + + // Assert + Assert.NotNull(result); + Assert.Equal(2, result.Count); + + // Verify first cluster + var cluster1 = result[0]; + Assert.Equal("test1", cluster1.Name); + Assert.Equal("test1.eastus", cluster1.Url); + Assert.NotNull(cluster1.CapacityPolicy); + Assert.NotNull(cluster1.CapacityPolicy.IngestionCapacity); + Assert.Equal(512, cluster1.CapacityPolicy.IngestionCapacity.ClusterMaximumConcurrentOperations); + + // Verify second cluster + var cluster2 = result[1]; + Assert.Equal("test2", cluster2.Name); + Assert.Equal("test2.eastus", cluster2.Url); + Assert.NotNull(cluster2.CapacityPolicy); + Assert.NotNull(cluster2.CapacityPolicy.IngestionCapacity); + Assert.Equal(500, cluster2.CapacityPolicy.IngestionCapacity.ClusterMaximumConcurrentOperations); + } + + [Fact] + public async Task LoadAsync_FileNotFound_ThrowsFileNotFoundException() + { + // Arrange + var nonExistentPath = "/path/that/does/not/exist/clusters.yml"; + var handler = new YamlClusterHandler(nonExistentPath); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => handler.LoadAsync()); + Assert.Contains("Clusters file not found at path", exception.Message); + } + + [Fact] + public async Task LoadAsync_EmptyFile_ThrowsInvalidOperationException() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + await File.WriteAllTextAsync(tempFilePath, ""); + var handler = new YamlClusterHandler(tempFilePath); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => handler.LoadAsync()); + Assert.Contains("Clusters file is empty", exception.Message); + } + finally + { + if (File.Exists(tempFilePath)) + File.Delete(tempFilePath); + } + } + + [Fact] + public async Task LoadAsync_InvalidYaml_ThrowsInvalidOperationException() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + await File.WriteAllTextAsync(tempFilePath, "invalid: yaml: content: ["); + var handler = new YamlClusterHandler(tempFilePath); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => handler.LoadAsync()); + Assert.Contains("Failed to parse clusters file", exception.Message); + } + finally + { + if (File.Exists(tempFilePath)) + File.Delete(tempFilePath); + } + } + + [Fact] + public async Task LoadAsync_InvalidClustersProperties_ThrowsInvalidOperationException() + { + // Arrange + var tempFilePath = Path.GetTempFileName(); + try + { + await File.WriteAllTextAsync(tempFilePath, "someOtherProperty: value"); + var handler = new YamlClusterHandler(tempFilePath); + + // Act & Assert + var exception = await Assert.ThrowsAsync(() => handler.LoadAsync()); + Assert.Contains("Failed to parse clusters file", exception.Message); + Assert.Contains("Property 'someOtherProperty' not found on type 'KustoSchemaTools.Model.Clusters'.", exception.Message); + } + finally + { + if (File.Exists(tempFilePath)) + File.Delete(tempFilePath); + } + } + } +} diff --git a/KustoSchemaTools.Tests/YamlDatabaseParserTests.cs b/KustoSchemaTools.Tests/Parser/YamlDatabaseHandlerTests.cs similarity index 100% rename from KustoSchemaTools.Tests/YamlDatabaseParserTests.cs rename to KustoSchemaTools.Tests/Parser/YamlDatabaseHandlerTests.cs diff --git a/KustoSchemaTools/Changes/ClusterChangeSet.cs b/KustoSchemaTools/Changes/ClusterChangeSet.cs new file mode 100644 index 0000000..3106ce9 --- /dev/null +++ b/KustoSchemaTools/Changes/ClusterChangeSet.cs @@ -0,0 +1,24 @@ +using KustoSchemaTools.Model; + +namespace KustoSchemaTools.Changes +{ + /// + /// Represents the complete set of changes for a single Kusto cluster, + /// including the old and new state, and a list of policy modifications. + /// + public class ClusterChangeSet : BaseChange + { + /// + /// A list of specific, granular changes detected for the cluster's policies. + /// Each item is typically a BaseChange for a specific policy type. + /// + public List Changes { get; } = new List(); + + public ClusterChangeSet(string clusterName, Cluster from, Cluster to) + : base("Cluster", clusterName, from, to) + { + // Consolidate all scripts from policy changes into this top-level object + Scripts.AddRange(Changes.SelectMany(c => c.Scripts)); + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Changes/ClusterChanges.cs b/KustoSchemaTools/Changes/ClusterChanges.cs new file mode 100644 index 0000000..bc5aa64 --- /dev/null +++ b/KustoSchemaTools/Changes/ClusterChanges.cs @@ -0,0 +1,113 @@ +using KustoSchemaTools.Model; +using Microsoft.Extensions.Logging; +using Kusto.Language; + +namespace KustoSchemaTools.Changes +{ + public class ClusterChanges + { + /// + /// Compares two cluster configurations and generates a comprehensive change set + /// containing the differences and the scripts needed to apply those changes. + /// + /// The current/existing cluster configuration (typically from live cluster). + /// The desired cluster configuration (typically from YAML file). + /// Logger instance for recording the comparison process and results. + /// + /// A ClusterChangeSet containing: + /// - Detailed policy changes with before/after values + /// - Generated Kusto scripts to apply the changes + /// - Validation results for each script + /// + /// Thrown when cluster names don't match between old and new configurations. + public static ClusterChangeSet GenerateChanges(Cluster oldCluster, Cluster newCluster, ILogger log) + { + if (oldCluster.Name != newCluster.Name) + { + throw new ArgumentException($"Cluster names must match; {oldCluster.Name} != {newCluster.Name}"); + } + var clusterName = oldCluster.Name; + var changeSet = new ClusterChangeSet(clusterName, oldCluster, newCluster); + + log.LogInformation($"Analyzing capacity policy changes for cluster {clusterName}..."); + if (newCluster.CapacityPolicy == null) { + log.LogInformation("No capacity policy defined in the new cluster configuration."); + } else { + var capacityPolicyChange = ComparePolicy( + "Cluster Capacity Policy", + "default", + oldCluster.CapacityPolicy, + newCluster.CapacityPolicy, + policy => new List { + new DatabaseScriptContainer("AlterMergeClusterCapacityPolicy", 10, policy.ToUpdateScript()) + }); + + if (capacityPolicyChange != null) + { + changeSet.Changes.Add(capacityPolicyChange); + } + } + + changeSet.Scripts.AddRange(changeSet.Changes.SelectMany(c => c.Scripts)); + + // Run Kusto code diagnostics + foreach (var script in changeSet.Scripts) + { + var code = KustoCode.Parse(script.Text); + var diagnostics = code.GetDiagnostics(); + script.IsValid = !diagnostics.Any(); + } + return changeSet; + } + + /// + /// Compares two policy objects of the same type using reflection to detect property-level changes. + /// Only properties that are non-null in the new policy and differ from the old policy are considered changes. + /// This approach aligns with Kusto's `.alter-merge` command behavior, which only modifies specified properties. + /// + /// The type of policy object to compare (must be a reference type). + /// The display name of the entity type for documentation purposes (e.g., "Cluster Capacity Policy"). + /// The name of the specific entity instance being compared (e.g., "default"). + /// The existing policy configuration, or null if no policy was previously set. + /// The desired policy configuration to compare against the old policy. + /// A function that generates the Kusto scripts needed to apply the new policy. + /// + /// An IChange object containing: + /// - Property-level change details in Markdown format + /// - Generated scripts to apply the changes + /// Returns null if no meaningful changes are detected. + /// + private static IChange? ComparePolicy(string entityType, string entityName, T? oldPolicy, T newPolicy, Func> scriptGenerator) where T : class + { + if (newPolicy == null) return null; + if (newPolicy.Equals(oldPolicy)) return null; + + var policyChange = new PolicyChange(entityType, entityName, oldPolicy!, newPolicy); + + var changedProperties = new List(); + var properties = typeof(T).GetProperties().Where(p => p.CanRead && p.CanWrite); + + foreach (var prop in properties) + { + var oldValue = oldPolicy != null ? prop.GetValue(oldPolicy) : null; + var newValue = prop.GetValue(newPolicy); + + if (newValue != null && !object.Equals(oldValue, newValue)) + { + var oldValueStr = oldValue?.ToString() ?? "Not Set"; + var newValueStr = newValue.ToString()!; + changedProperties.Add($"- **{prop.Name}**: `{oldValueStr}` → `{newValueStr}`"); + } + } + + if (changedProperties.Any()) + { + policyChange.Markdown = $"## {entityType} Changes\n\n{string.Join("\n", changedProperties)}"; + policyChange.Scripts.AddRange(scriptGenerator(newPolicy)); + return policyChange; + } + + return null; + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Changes/PolicyChange.cs b/KustoSchemaTools/Changes/PolicyChange.cs new file mode 100644 index 0000000..57ced26 --- /dev/null +++ b/KustoSchemaTools/Changes/PolicyChange.cs @@ -0,0 +1,10 @@ +namespace KustoSchemaTools.Changes +{ + public class PolicyChange : BaseChange + { + public PolicyChange(string entityType, string entity, T from, T to) + : base(entityType, entity, from, to) + { + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/KustoClusterOrchestrator.cs b/KustoSchemaTools/KustoClusterOrchestrator.cs new file mode 100644 index 0000000..2121dcb --- /dev/null +++ b/KustoSchemaTools/KustoClusterOrchestrator.cs @@ -0,0 +1,69 @@ +using KustoSchemaTools.Changes; +using KustoSchemaTools.Model; +using KustoSchemaTools.Parser; +using Microsoft.Extensions.Logging; + +namespace KustoSchemaTools +{ + public class KustoClusterOrchestrator + { + public KustoClusterOrchestrator(ILogger logger, IKustoClusterHandlerFactory kustoClusterHandlerFactory, IYamlClusterHandlerFactory yamlClusterHandlerFactory) + { + Log = logger; + KustoClusterHandlerFactory = kustoClusterHandlerFactory; + YamlClusterHandlerFactory = yamlClusterHandlerFactory; + } + + public ILogger Log { get; } + public IKustoClusterHandlerFactory KustoClusterHandlerFactory { get; } + public IYamlClusterHandlerFactory YamlClusterHandlerFactory { get; } + + /// + /// Generates changes by comparing the provided cluster configurations with their + /// corresponding live Kusto clusters. + /// + /// The cluster configurations to compare against live clusters. + /// A list of ClusterChangeSet objects representing the detected changes. + public async Task> GenerateChangesAsync(Clusters clusters) + { + var allChanges = new List(); + + foreach (var cluster in clusters.Connections) + { + Log.LogInformation($"Generating cluster diff for {cluster.Name}"); + + // Load the "old" schema from the live Kusto cluster + var kustoHandler = KustoClusterHandlerFactory.Create(cluster.Name, cluster.Url); + var kustoCluster = await kustoHandler.LoadAsync(); + + // Compare the live state with the proposed new configuration and generate a change object + var change = ClusterChanges.GenerateChanges(kustoCluster, cluster, Log); + allChanges.Add(change); + } + + return allChanges; + } + + /// + /// Loads cluster configurations from a YAML file and generates changes by comparing + /// them with the live Kusto clusters. + /// + /// The path to the YAML file containing cluster configurations. + /// A list of ClusterChangeSet objects representing the detected changes. + public async Task> GenerateChangesFromFileAsync(string clusterConfigFilePath) + { + Log.LogInformation($"Loading cluster configurations from file: {clusterConfigFilePath}"); + + var yamlHandler = YamlClusterHandlerFactory.Create(clusterConfigFilePath); + var clusterList = await yamlHandler.LoadAsync(); + var clusters = new Clusters + { + Connections = clusterList + }; + + Log.LogInformation($"Loaded {clusterList.Count} cluster configuration(s) from YAML file"); + + return await GenerateChangesAsync(clusters); + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Model/Cluster.cs b/KustoSchemaTools/Model/Cluster.cs index 95e220a..832b648 100644 --- a/KustoSchemaTools/Model/Cluster.cs +++ b/KustoSchemaTools/Model/Cluster.cs @@ -5,6 +5,7 @@ public class Cluster public string Name { get; set; } public string Url { get; set; } public List Scripts { get; set; } = new List(); + public ClusterCapacityPolicy? CapacityPolicy { get; set; } } } diff --git a/KustoSchemaTools/Model/ClusterCapacityPolicy.cs b/KustoSchemaTools/Model/ClusterCapacityPolicy.cs new file mode 100644 index 0000000..65ec103 --- /dev/null +++ b/KustoSchemaTools/Model/ClusterCapacityPolicy.cs @@ -0,0 +1,368 @@ +using KustoSchemaTools.Changes; +using KustoSchemaTools.Helpers; +using Newtonsoft.Json; +using System; +using System.Collections.Generic; +using Kusto.Language; + +namespace KustoSchemaTools.Model +{ + public class ClusterCapacityPolicy : IEquatable + { + public IngestionCapacity? IngestionCapacity { get; set; } + public ExtentsMergeCapacity? ExtentsMergeCapacity { get; set; } + public ExtentsPurgeRebuildCapacity? ExtentsPurgeRebuildCapacity { get; set; } + public ExportCapacity? ExportCapacity { get; set; } + public ExtentsPartitionCapacity? ExtentsPartitionCapacity { get; set; } + public MaterializedViewsCapacity? MaterializedViewsCapacity { get; set; } + public StoredQueryResultsCapacity? StoredQueryResultsCapacity { get; set; } + public StreamingIngestionPostProcessingCapacity? StreamingIngestionPostProcessingCapacity { get; set; } + public PurgeStorageArtifactsCleanupCapacity? PurgeStorageArtifactsCleanupCapacity { get; set; } + public PeriodicStorageArtifactsCleanupCapacity? PeriodicStorageArtifactsCleanupCapacity { get; set; } + public QueryAccelerationCapacity? QueryAccelerationCapacity { get; set; } + public GraphSnapshotsCapacity? GraphSnapshotsCapacity { get; set; } + + public bool Equals(ClusterCapacityPolicy? other) + { + if (other is null) return false; + if (ReferenceEquals(this, other)) return true; + return + EqualityComparer.Default.Equals(IngestionCapacity, other.IngestionCapacity) && + EqualityComparer.Default.Equals(ExtentsMergeCapacity, other.ExtentsMergeCapacity) && + EqualityComparer.Default.Equals(ExtentsPurgeRebuildCapacity, other.ExtentsPurgeRebuildCapacity) && + EqualityComparer.Default.Equals(ExportCapacity, other.ExportCapacity) && + EqualityComparer.Default.Equals(ExtentsPartitionCapacity, other.ExtentsPartitionCapacity) && + EqualityComparer.Default.Equals(MaterializedViewsCapacity, other.MaterializedViewsCapacity) && + EqualityComparer.Default.Equals(StoredQueryResultsCapacity, other.StoredQueryResultsCapacity) && + EqualityComparer.Default.Equals(StreamingIngestionPostProcessingCapacity, other.StreamingIngestionPostProcessingCapacity) && + EqualityComparer.Default.Equals(PurgeStorageArtifactsCleanupCapacity, other.PurgeStorageArtifactsCleanupCapacity) && + EqualityComparer.Default.Equals(PeriodicStorageArtifactsCleanupCapacity, other.PeriodicStorageArtifactsCleanupCapacity) && + EqualityComparer.Default.Equals(QueryAccelerationCapacity, other.QueryAccelerationCapacity) && + EqualityComparer.Default.Equals(GraphSnapshotsCapacity, other.GraphSnapshotsCapacity); + } + + public override bool Equals(object? obj) => Equals(obj as ClusterCapacityPolicy); + public override int GetHashCode() + { + var hc = new HashCode(); + hc.Add(IngestionCapacity); + hc.Add(ExtentsMergeCapacity); + hc.Add(ExtentsPurgeRebuildCapacity); + hc.Add(ExportCapacity); + hc.Add(ExtentsPartitionCapacity); + hc.Add(MaterializedViewsCapacity); + hc.Add(StoredQueryResultsCapacity); + hc.Add(StreamingIngestionPostProcessingCapacity); + hc.Add(PurgeStorageArtifactsCleanupCapacity); + hc.Add(PeriodicStorageArtifactsCleanupCapacity); + hc.Add(QueryAccelerationCapacity); + hc.Add(GraphSnapshotsCapacity); + return hc.ToHashCode(); + } + + public string ToJson() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.Indented + }); + } + + public string ToUpdateScript() + { + var json = ToJson(); + var script = $".alter-merge cluster policy capacity ```{json}```"; + return script; + } + } + + public class IngestionCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + public double? CoreUtilizationCoefficient { get; set; } + + public bool Equals(IngestionCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations && + CoreUtilizationCoefficient == other.CoreUtilizationCoefficient; + } + public override bool Equals(object? obj) => Equals(obj as IngestionCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations, CoreUtilizationCoefficient); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class ExtentsMergeCapacity : IEquatable + { + public int? MinimumConcurrentOperationsPerNode { get; set; } + public int? MaximumConcurrentOperationsPerNode { get; set; } + + public bool Equals(ExtentsMergeCapacity? other) + { + if (other is null) return false; + return MinimumConcurrentOperationsPerNode == other.MinimumConcurrentOperationsPerNode && + MaximumConcurrentOperationsPerNode == other.MaximumConcurrentOperationsPerNode; + } + public override bool Equals(object? obj) => Equals(obj as ExtentsMergeCapacity); + public override int GetHashCode() => HashCode.Combine(MinimumConcurrentOperationsPerNode, MaximumConcurrentOperationsPerNode); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class ExtentsPurgeRebuildCapacity : IEquatable + { + public int? MaximumConcurrentOperationsPerNode { get; set; } + + public bool Equals(ExtentsPurgeRebuildCapacity? other) + { + if (other is null) return false; + return MaximumConcurrentOperationsPerNode == other.MaximumConcurrentOperationsPerNode; + } + public override bool Equals(object? obj) => Equals(obj as ExtentsPurgeRebuildCapacity); + public override int GetHashCode() => HashCode.Combine(MaximumConcurrentOperationsPerNode); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class ExportCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + public double? CoreUtilizationCoefficient { get; set; } + + public bool Equals(ExportCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations && + CoreUtilizationCoefficient == other.CoreUtilizationCoefficient; + } + public override bool Equals(object? obj) => Equals(obj as ExportCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations, CoreUtilizationCoefficient); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class ExtentsPartitionCapacity : IEquatable + { + public int? ClusterMinimumConcurrentOperations { get; set; } + public int? ClusterMaximumConcurrentOperations { get; set; } + + public bool Equals(ExtentsPartitionCapacity? other) + { + if (other is null) return false; + return ClusterMinimumConcurrentOperations == other.ClusterMinimumConcurrentOperations && + ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations; + } + public override bool Equals(object? obj) => Equals(obj as ExtentsPartitionCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMinimumConcurrentOperations, ClusterMaximumConcurrentOperations); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class MaterializedViewsCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + public ExtentsRebuildCapacity? ExtentsRebuildCapacity { get; set; } + + public bool Equals(MaterializedViewsCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations && + EqualityComparer.Default.Equals(ExtentsRebuildCapacity, other.ExtentsRebuildCapacity); + } + public override bool Equals(object? obj) => Equals(obj as MaterializedViewsCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations, ExtentsRebuildCapacity); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class ExtentsRebuildCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + public int? MaximumConcurrentOperationsPerNode { get; set; } + + public bool Equals(ExtentsRebuildCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations && + MaximumConcurrentOperationsPerNode == other.MaximumConcurrentOperationsPerNode; + } + public override bool Equals(object? obj) => Equals(obj as ExtentsRebuildCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations, MaximumConcurrentOperationsPerNode); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class StoredQueryResultsCapacity : IEquatable + { + public int? MaximumConcurrentOperationsPerDbAdmin { get; set; } + public double? CoreUtilizationCoefficient { get; set; } + + public bool Equals(StoredQueryResultsCapacity? other) + { + if (other is null) return false; + return MaximumConcurrentOperationsPerDbAdmin == other.MaximumConcurrentOperationsPerDbAdmin && + CoreUtilizationCoefficient == other.CoreUtilizationCoefficient; + } + public override bool Equals(object? obj) => Equals(obj as StoredQueryResultsCapacity); + public override int GetHashCode() => HashCode.Combine(MaximumConcurrentOperationsPerDbAdmin, CoreUtilizationCoefficient); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class StreamingIngestionPostProcessingCapacity : IEquatable + { + public int? MaximumConcurrentOperationsPerNode { get; set; } + + public bool Equals(StreamingIngestionPostProcessingCapacity? other) + { + if (other is null) return false; + return MaximumConcurrentOperationsPerNode == other.MaximumConcurrentOperationsPerNode; + } + public override bool Equals(object? obj) => Equals(obj as StreamingIngestionPostProcessingCapacity); + public override int GetHashCode() => HashCode.Combine(MaximumConcurrentOperationsPerNode); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class PurgeStorageArtifactsCleanupCapacity : IEquatable + { + public int? MaximumConcurrentOperationsPerCluster { get; set; } + + public bool Equals(PurgeStorageArtifactsCleanupCapacity? other) + { + if (other is null) return false; + return MaximumConcurrentOperationsPerCluster == other.MaximumConcurrentOperationsPerCluster; + } + public override bool Equals(object? obj) => Equals(obj as PurgeStorageArtifactsCleanupCapacity); + public override int GetHashCode() => HashCode.Combine(MaximumConcurrentOperationsPerCluster); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class PeriodicStorageArtifactsCleanupCapacity : IEquatable + { + public int? MaximumConcurrentOperationsPerCluster { get; set; } + + public bool Equals(PeriodicStorageArtifactsCleanupCapacity? other) + { + if (other is null) return false; + return MaximumConcurrentOperationsPerCluster == other.MaximumConcurrentOperationsPerCluster; + } + public override bool Equals(object? obj) => Equals(obj as PeriodicStorageArtifactsCleanupCapacity); + public override int GetHashCode() => HashCode.Combine(MaximumConcurrentOperationsPerCluster); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class QueryAccelerationCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + public double? CoreUtilizationCoefficient { get; set; } + + public bool Equals(QueryAccelerationCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations && + CoreUtilizationCoefficient == other.CoreUtilizationCoefficient; + } + public override bool Equals(object? obj) => Equals(obj as QueryAccelerationCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations, CoreUtilizationCoefficient); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } + + public class GraphSnapshotsCapacity : IEquatable + { + public int? ClusterMaximumConcurrentOperations { get; set; } + + public bool Equals(GraphSnapshotsCapacity? other) + { + if (other is null) return false; + return ClusterMaximumConcurrentOperations == other.ClusterMaximumConcurrentOperations; + } + public override bool Equals(object? obj) => Equals(obj as GraphSnapshotsCapacity); + public override int GetHashCode() => HashCode.Combine(ClusterMaximumConcurrentOperations); + public override string ToString() + { + return JsonConvert.SerializeObject(this, new JsonSerializerSettings + { + NullValueHandling = NullValueHandling.Ignore, + Formatting = Formatting.None + }); + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Parser/IKustoClusterHandlerFactory.cs b/KustoSchemaTools/Parser/IKustoClusterHandlerFactory.cs new file mode 100644 index 0000000..752e889 --- /dev/null +++ b/KustoSchemaTools/Parser/IKustoClusterHandlerFactory.cs @@ -0,0 +1,7 @@ +namespace KustoSchemaTools.Parser +{ + public interface IKustoClusterHandlerFactory + { + KustoClusterHandler Create(string clusterName, string clusterUrl); + } +} diff --git a/KustoSchemaTools/Parser/IYamlClusterHandlerFactory.cs b/KustoSchemaTools/Parser/IYamlClusterHandlerFactory.cs new file mode 100644 index 0000000..fbebea6 --- /dev/null +++ b/KustoSchemaTools/Parser/IYamlClusterHandlerFactory.cs @@ -0,0 +1,7 @@ +namespace KustoSchemaTools.Parser +{ + public interface IYamlClusterHandlerFactory + { + YamlClusterHandler Create(string path); + } +} diff --git a/KustoSchemaTools/Parser/KustoClusterHandler.cs b/KustoSchemaTools/Parser/KustoClusterHandler.cs new file mode 100644 index 0000000..3391cf1 --- /dev/null +++ b/KustoSchemaTools/Parser/KustoClusterHandler.cs @@ -0,0 +1,46 @@ +using KustoSchemaTools.Model; +using Microsoft.Extensions.Logging; +using Kusto.Data.Common; +using Newtonsoft.Json; +using KustoSchemaTools.Parser; + +namespace KustoSchemaTools +{ + public class KustoClusterHandler + { + private readonly KustoClient _client; + private readonly ILogger _logger; + private readonly string _clusterName; + private readonly string _clusterUrl; + + public KustoClusterHandler(KustoClient client, ILogger logger, string clusterName, string clusterUrl) + { + _client = client; + _logger = logger; + _clusterName = clusterName; + _clusterUrl = clusterUrl; + } + + public virtual async Task LoadAsync() + { + var cluster = new Cluster { Name = _clusterName, Url = _clusterUrl }; + + _logger.LogInformation("Loading cluster capacity policy..."); + + using (var reader = await _client.AdminClient.ExecuteControlCommandAsync("", ".show cluster policy capacity", new ClientRequestProperties())) + { + if (reader.Read()) + { + var policyJson = reader["Policy"]?.ToString(); + if (!string.IsNullOrEmpty(policyJson)) + { + var policy = JsonConvert.DeserializeObject(policyJson); + cluster.CapacityPolicy = policy; + } + } + } + + return cluster; + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Parser/KustoClusterHandlerFactory.cs b/KustoSchemaTools/Parser/KustoClusterHandlerFactory.cs new file mode 100644 index 0000000..60ec6af --- /dev/null +++ b/KustoSchemaTools/Parser/KustoClusterHandlerFactory.cs @@ -0,0 +1,22 @@ +using Microsoft.Extensions.Logging; +using KustoSchemaTools.Parser; + +namespace KustoSchemaTools +{ + public class KustoClusterHandlerFactory : IKustoClusterHandlerFactory + { + private readonly ILoggerFactory _loggerFactory; + + public KustoClusterHandlerFactory(ILoggerFactory loggerFactory) + { + _loggerFactory = loggerFactory; + } + + public virtual KustoClusterHandler Create(string clusterName, string clusterUrl) + { + var client = new KustoClient(clusterUrl); + var logger = _loggerFactory.CreateLogger(); + return new KustoClusterHandler(client, logger, clusterName, clusterUrl); + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Parser/YamlClusterHandler.cs b/KustoSchemaTools/Parser/YamlClusterHandler.cs new file mode 100644 index 0000000..a069249 --- /dev/null +++ b/KustoSchemaTools/Parser/YamlClusterHandler.cs @@ -0,0 +1,41 @@ +using KustoSchemaTools.Model; +using KustoSchemaTools.Helpers; + +namespace KustoSchemaTools +{ + public class YamlClusterHandler + { + private readonly string _filePath; + + public YamlClusterHandler(string filePath) + { + _filePath = filePath; + } + + public async Task> LoadAsync() + { + try + { + if (!File.Exists(_filePath)) + { + throw new FileNotFoundException($"Clusters file not found at path: {_filePath}"); + } + + var clustersFileContent = await File.ReadAllTextAsync(_filePath); + + if (string.IsNullOrWhiteSpace(clustersFileContent)) + { + throw new InvalidOperationException($"Clusters file is empty: {_filePath}"); + } + + var clusters = Serialization.YamlPascalCaseDeserializer.Deserialize(clustersFileContent); + + return clusters.Connections.ToList(); + } + catch (Exception ex) when (!(ex is FileNotFoundException || ex is InvalidOperationException)) + { + throw new InvalidOperationException($"Failed to parse clusters file '{_filePath}': {ex.Message}", ex); + } + } + } +} \ No newline at end of file diff --git a/KustoSchemaTools/Parser/YamlClusterHandlerFactory.cs b/KustoSchemaTools/Parser/YamlClusterHandlerFactory.cs new file mode 100644 index 0000000..93c5ee0 --- /dev/null +++ b/KustoSchemaTools/Parser/YamlClusterHandlerFactory.cs @@ -0,0 +1,10 @@ +namespace KustoSchemaTools +{ + public class YamlClusterHandlerFactory + { + public virtual YamlClusterHandler Create(string path) + { + return new YamlClusterHandler(path); + } + } +} \ No newline at end of file diff --git a/README.md b/README.md index 073dc82..b98f142 100644 --- a/README.md +++ b/README.md @@ -6,6 +6,8 @@ A second project "[KustoSchemaToolsAction](https://github.com/github/KustoSchema ## Getting started +### Database management + The `database` object holds all schema related information for a Kusto database. It can be loaded from, or written to a cluster using the `KustoDatabaseHandler` which can be created by the `KustoDatabaseHandlerFactory`. There are several steps involved for loading all relevant information from a kusto database into the `database` object. These are covered by different plugins, which can be configured for the `KustoDatabaseHandlerFactory`. ```csharp @@ -34,6 +36,36 @@ Additional features can be added with custom plugins. A sample for `table groups The `KustoSchemaHandler` is the central place for synching schemas between yaml and a database. It offers functions for generating changes formatted in markdown, writing a database to yaml files and applying changes from yaml files to a database. +### Cluster configuration management + +Cluster configuration changes are handled by the `KustoClusterOrchestrator`. Currently, the only supported feature is [`Capacity Policies`](https://learn.microsoft.com/en-us/kusto/management/capacity-policy?view=azure-data-explorer). The orchestrator expects a file path to a configuration file. A key design principle is that you only need to specify the properties you wish to set or change. Any property omitted in your policy file will be ignored, preserving its current value on the cluster. +A sample file could look like this: + +```yaml +connections: +- name: test + url: test.eastus + capacityPolicy: + ingestionCapacity: + clusterMaximumConcurrentOperations: 512 + coreUtilizationCoefficient: 0.75 + extentsMergeCapacity: + minimumConcurrentOperationsPerNode: 1 + maximumConcurrentOperationsPerNode: 3 + extentsPurgeRebuildCapacity: + maximumConcurrentOperationsPerNode: 1 +``` + +The `KustoClusterOrchestrator` coordinates between cluster handlers to manage cluster configuration changes: + +1. **Loading Configuration**: Uses `YamlClusterHandler` to parse the YAML configuration file and load the desired cluster state +2. **Reading Current State**: Uses `KustoClusterHandler` to connect to each live cluster and retrieve the current capacity policy settings +3. **Generating Changes**: Compares the desired state (from YAML) with the current state (from Kusto) to identify differences +4. **Creating Scripts**: Generates the necessary Kusto control commands (like `.alter-merge cluster policy capacity`) to apply the changes +5. **Applying Updates**: Executes the generated scripts against the live clusters to synchronize them with the desired configuration + +Currently no plugins are supported. The orchestrator expects all cluster configuration in a central file. + ## Supported Features Currently following features are supported: