Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ With Kit, you can define and manage complex workflows in a single `tasks.yaml` f
- **Auto-restart** - Automatically restart services on failure
- **File watching** - Re-run tasks when files change
- **Port forwarding** - Forward ports from services to host
- **Web UI** - Visualize your workflow and monitor task status with real-time metrics
- **Web UI** - Visualize your workflow and monitor task status

## Quick Start

Expand Down
22 changes: 1 addition & 21 deletions internal/index.html
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,6 @@
color: #c9d1d9;
}

#metrics {
color: #8b949e;
margin-left: 16px;
}

/* Task message in log header */
#message {
margin-left: 16px;
Expand Down Expand Up @@ -176,8 +171,7 @@
<div class="flex">
<div>
<b id="name">Click on a task to see logs</b>
<span id="metrics"></span>
<span id="message"></span>
<span id="message"></span>
</div>
<div id="follow" style="cursor: pointer">Auto-scrolling</div>
</div>
Expand Down Expand Up @@ -396,7 +390,6 @@
// Get references to DOM elements
const status = document.getElementById("status");
const name = document.getElementById("name");
const metrics = document.getElementById("metrics");
const message = document.getElementById("message");
const logs = document.getElementById("logs");
const logsContainer = document.getElementById("log-container");
Expand Down Expand Up @@ -503,12 +496,6 @@
return brightColors[code - 90];
};

const formatMetrics = (metrics) => {
if (!metrics) return "";

return `${(metrics.mem / 1024 / 1024).toFixed(0)}Mi`;
};

// Fetch the initial graph data from the server
fetch("/dag")
.then((response) => response.json())
Expand Down Expand Up @@ -560,7 +547,6 @@
id: node.name,
label: node.name,
phase: nodePhase,
metrics: node.metrics,
message: node.message || "",
ports: node.task?.ports || "",
icon: iconDataUrl,
Expand Down Expand Up @@ -632,9 +618,6 @@

// Update the log header with node information
name.textContent = nodeId;
metrics.textContent = nodeData.metrics
? formatMetrics(nodeData.metrics)
: "";
message.textContent = nodeData.message
? nodeData.message
: "";
Expand Down Expand Up @@ -709,15 +692,13 @@
if (
existingNode.data("phase") !== node.phase ||
existingNode.data("message") !== node.message ||
existingNode.data("metrics") !== node.metrics ||
existingNode.data("parent") !==
(node.task && node.task.group
? `group:${node.task.group}`
: undefined)
) {
// Update node data
existingNode.data("phase", node.phase);
existingNode.data("metrics", node.metrics);
existingNode.data("message", node.message);
existingNode.data(
"ports",
Expand Down Expand Up @@ -767,7 +748,6 @@
id: node.name,
label: node.name,
phase: node.phase,
metrics: node.metrics,
message: node.message || "",
ports: node.task?.ports || "",
icon: iconDataUrl,
Expand Down
35 changes: 0 additions & 35 deletions internal/metrics/procfs.go

This file was deleted.

40 changes: 0 additions & 40 deletions internal/metrics/ps.go

This file was deleted.

19 changes: 0 additions & 19 deletions internal/proc/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
"github.com/docker/docker/pkg/stdcopy"
"github.com/docker/docker/registry"
"github.com/docker/go-connections/nat"
"github.com/kitproj/kit/internal/metrics"
"github.com/kitproj/kit/internal/types"
v1 "github.com/opencontainers/image-spec/specs-go/v1"
"k8s.io/utils/strings/slices"
Expand Down Expand Up @@ -314,24 +313,6 @@ func ignoreNotExist(err error) error {

}

func (c *container) GetMetrics(ctx context.Context) (*types.Metrics, error) {
// Initialize Docker client if not already done

command := metrics.GetProcFSCommand(1) // PID 1

// Use Docker API for exec instead of command line
output, err := c.execInContainer(ctx, command)
if err != nil {
return nil, fmt.Errorf("docker exec failed for container %s: %w", c.name, err)
}

metrics, err := metrics.ParseProcFSOutput(string(output))
if err != nil {
return nil, fmt.Errorf("failed to parse process metrics for container %s: %w", c.name, err)
}
return metrics, nil
}

func (c *container) execInContainer(ctx context.Context, command []string) ([]byte, error) {
// Create exec configuration
execConfig := dockertypes.ExecConfig{
Expand Down
13 changes: 0 additions & 13 deletions internal/proc/host.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"syscall"
"time"

"github.com/kitproj/kit/internal/metrics"
"github.com/kitproj/kit/internal/types"
)

Expand Down Expand Up @@ -90,15 +89,3 @@ func ignoreProcessFinishedErr(err error) error {
}
return nil
}

func (h *host) GetMetrics(ctx context.Context) (*types.Metrics, error) {
command := metrics.GetPSCommand(h.pid)
cmd := exec.CommandContext(ctx, command[0], command[1:]...)

output, err := cmd.CombinedOutput()
if err != nil {
return nil, fmt.Errorf("failed to get process metrics for pid %d: %w, output: %s", h.pid, err, string(output))
}

return metrics.ParsePSOutput(string(output))
}
80 changes: 1 addition & 79 deletions internal/proc/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@ import (
"os"
"path/filepath"
"sort"
"strings"
"sync"
"time"

"github.com/kitproj/kit/internal/metrics"
"github.com/kitproj/kit/internal/types"
corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand Down Expand Up @@ -114,7 +112,7 @@ func (k *k8s) Run(ctx context.Context, stdout io.Writer, stderr io.Writer) error
return fmt.Errorf("failed to create clientset: %w", err)
}

// Store clientset and config for later use in metrics
// Store clientset and config for later use
k.clientset = clientset
k.restConfig = config

Expand Down Expand Up @@ -444,82 +442,6 @@ func sortUnstructureds(uns []*unstructured.Unstructured) {
})
}

func (k *k8s) GetMetrics(ctx context.Context) (*types.Metrics, error) {
sum := &types.Metrics{}
k.podsMu.Lock()
pods := append([]string(nil), k.pods...)
k.podsMu.Unlock()
for _, podKey := range pods {
parts := strings.SplitN(podKey, "/", 2)
namespace := parts[0]
podName := parts[1]
metrics, err := k.getMetrics(ctx, namespace, podName)
if err != nil {
return nil, fmt.Errorf("failed to get metrics for pod %s: %w", podKey, err)
}
sum.Mem += metrics.Mem
}
return sum, nil
}

func (k *k8s) getMetrics(ctx context.Context, namespace, podName string) (*types.Metrics, error) {
// First, get the list of containers in the pod
containers, err := k.getContainersInPod(ctx, namespace, podName)
if err != nil {
return nil, fmt.Errorf("failed to get containers for pod %s/%s: %w", namespace, podName, err)
}

totalMemory := uint64(0)

// Iterate through each container and sum their memory usage
for _, containerName := range containers {
containerMetrics, err := k.getContainerMetrics(ctx, namespace, podName, containerName)
if err != nil {
// Log the error but continue with other containers
k.log.Printf("failed to get metrics for container %s in pod %s/%s: %v", containerName, namespace, podName, err)
continue
}
totalMemory += containerMetrics.Mem
}

return &types.Metrics{Mem: totalMemory}, nil
}

func (k *k8s) getContainersInPod(ctx context.Context, namespace, podName string) ([]string, error) {
// Use Kubernetes API to get pod details and extract container names
pod, err := k.clientset.CoreV1().Pods(namespace).Get(ctx, podName, metav1.GetOptions{})
if err != nil {
return nil, fmt.Errorf("failed to get pod %s/%s: %w", namespace, podName, err)
}

var containerNames []string
for _, container := range pod.Spec.Containers {
containerNames = append(containerNames, container.Name)
}

if len(containerNames) == 0 {
return nil, fmt.Errorf("no containers found in pod %s/%s", namespace, podName)
}

return containerNames, nil
}

func (k *k8s) getContainerMetrics(ctx context.Context, namespace, podName, containerName string) (*types.Metrics, error) {
command := metrics.GetProcFSCommand(1) // PID 1

// Use Kubernetes API to execute command in container
output, err := k.execInContainer(ctx, namespace, podName, containerName, command)
if err != nil {
return nil, fmt.Errorf("failed to exec in container %s/%s/%s: %w", namespace, podName, containerName, err)
}

metrics, err := metrics.ParseProcFSOutput(string(output))
if err != nil {
return nil, fmt.Errorf("failed to parse process metrics for container %s: %w", containerName, err)
}
return metrics, nil
}

func (k *k8s) execInContainer(ctx context.Context, namespace, podName, containerName string, command []string) ([]byte, error) {
req := k.clientset.CoreV1().RESTClient().Post().
Resource("pods").
Expand Down
6 changes: 0 additions & 6 deletions internal/proc/noop.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,16 +3,10 @@ package proc
import (
"context"
"io"

"github.com/kitproj/kit/internal/types"
)

type noop struct{}

func (n noop) Run(ctx context.Context, stdout, stderr io.Writer) error {
return nil
}

func (n noop) GetMetrics(ctx context.Context) (*types.Metrics, error) {
return &types.Metrics{}, nil
}
2 changes: 0 additions & 2 deletions internal/proc/proc.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,6 @@ import (
type Interface interface {
// Run runs the process.
Run(ctx context.Context, stdout, stderr io.Writer) error
// GetMetrics returns current resource metrics for this process
GetMetrics(ctx context.Context) (*types.Metrics, error)
}

func New(name string, t types.Task, log *log.Logger, spec types.Spec) Interface {
Expand Down
23 changes: 0 additions & 23 deletions internal/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -504,29 +504,6 @@ func RunSubgraph(ctx context.Context, cancel context.CancelFunc, port int, openB
out = io.MultiWriter(out, buf)
}

go func() {
ticker := time.NewTicker(20 * time.Second)
defer ticker.Stop()

for {
select {
case <-ctx.Done():
return
case <-ticker.C:
if ph := node.getPhase(); ph != "running" && ph != "stalled" {
continue
}
metrics, err := p.GetMetrics(ctx)
if err != nil {
logger.Printf("failed to get metrics: %v", err)
continue
}
node.setMetrics(metrics)
statusEvents <- node
}
}
}()

err = p.Run(ctx, out, out)
// if the task was cancelled, we don't want to restart it, this is normal exit
if errors.Is(ctx.Err(), context.Canceled) {
Expand Down
Loading
Loading