diff --git a/.circleci/config.yml b/.circleci/config.yml index fe81dd272c1..0a9c842f882 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -171,23 +171,24 @@ jobs: - run: name: "Check code formatting" command: | - sudo -u lightning mix format --check-formatted || touch /tmp/lint_failed + sudo -u lightning mix format --check-formatted || echo "format" >> /tmp/lint_failed - run: name: "Check code style with Credo" when: always command: | - sudo -u lightning mix credo --strict --all || touch /tmp/lint_failed + sudo -u lightning mix credo --strict --all || echo "credo" >> /tmp/lint_failed - run: name: "Check for security vulnerabilities" when: always command: | - sudo -u lightning mix sobelow --threshold medium || touch /tmp/lint_failed + sudo -u lightning mix sobelow --threshold medium || echo "sobelow" >> /tmp/lint_failed - run: name: "Verify all checks passed" when: always command: | if [ -f /tmp/lint_failed ]; then - echo "One or more lint checks failed" + echo "The following checks failed:" + cat /tmp/lint_failed exit 1 fi diff --git a/benchmarking/channels/README.md b/benchmarking/channels/README.md new file mode 100644 index 00000000000..08f17ca49ed --- /dev/null +++ b/benchmarking/channels/README.md @@ -0,0 +1,361 @@ +# Channel Proxy Benchmarking + +Pure Elixir tools for testing the channel proxy's performance and memory +behaviour. No external dependencies (k6, etc.) — just `elixir` and a running +Lightning instance. + +## Prerequisites + +- Elixir installed (Mix.install handles all script dependencies) +- A running Lightning instance started as a named Erlang node +- The channel proxy feature enabled (routes at `/channels/:id/*path`) + +## Quick Start + +Three terminals: + +```bash +# Terminal 1 — Mock sink (simulates the downstream HTTP service) +elixir benchmarking/channels/mock_sink.exs + +# Terminal 2 — Lightning (as a named node) +iex --sname lightning --cookie bench -S mix phx.server + +# Terminal 3 — Load test (single scenario) +elixir --sname loadtest --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario happy_path --concurrency 20 --duration 30 + +# Or run all scenarios in sequence: +benchmarking/channels/run_all.sh --duration 30 --concurrency 20 +``` + +The load test will automatically create a "load-test" project and channel +pointing at the mock sink, then drive traffic through the proxy and report +results. + +## File Structure + +``` +benchmarking/channels/ +├── load_test.exs # Entry point (~20 lines): Mix.install, loads modules, calls main() +├── mock_sink.exs # Standalone mock HTTP sink server +├── run_all.sh # Runs all 7 scenarios in sequence +├── lib/ +│ ├── load_test/ +│ │ ├── config.exs # LoadTest.Config — CLI parsing and validation +│ │ ├── metrics.exs # LoadTest.Metrics — Agent-based latency/error collector +│ │ ├── setup.exs # LoadTest.Setup — BEAM connection, channel creation, telemetry deploy +│ │ ├── runner.exs # LoadTest.Runner — Scenario execution (steady, ramp-up, direct) +│ │ ├── report.exs # LoadTest.Report — Results formatting and CSV output +│ │ └── main.exs # LoadTest — Orchestrator (ties everything together) +│ └── telemetry_collector.exs # Bench.TelemetryCollector — Deployed to Lightning for server-side timing +└── results/ + └── .gitignore +``` + +The entry point (`load_test.exs`) installs deps via `Mix.install`, loads all +modules via `Code.require_file` in dependency order, then calls +`LoadTest.main(System.argv())`. + +## Mock Sink (`mock_sink.exs`) + +A standalone Bandit HTTP server that accepts all requests and responds according +to the configured mode. + +```bash +elixir benchmarking/channels/mock_sink.exs [options] +``` + +### Options + +| Option | Default | Description | +| ------------------- | ------- | -------------------------- | +| `--port PORT` | 4001 | Listen port | +| `--mode MODE` | fixed | Response mode (see below) | +| `--status CODE` | 200 | HTTP status for fixed mode | +| `--body-size BYTES` | 256 | Response body size | + +### Modes + +| Mode | Behaviour | +| ----------- | ------------------------------------------------------------------------------- | +| **fixed** | Returns `--status` with `--body-size` body (respects `?delay=N`) | +| **timeout** | Accepts connection, never responds | +| **auth** | 401 if no `Authorization` header, 403 if invalid, 200 for `Bearer test-api-key` | +| **mixed** | 80% fast 200, 10% slow 200 (2s delay), 10% 503 | + +### Query Parameters + +The mock sink supports per-request overrides via query parameters: + +| Parameter | Description | +| ------------------ | ----------------------------------------------- | +| `?response_size=N` | Override `--body-size` for this request (bytes) | +| `?delay=N` | Add a response delay for this request (ms) | +| `?status=N` | Override `--status` for this request (e.g. 503) | + +This lets the load test control response sizes and delays without restarting the +sink: + +```bash +# Default body size +curl http://localhost:4001/test + +# Override to 5000 bytes for this request +curl "http://localhost:4001/test?response_size=5000" + +# 500ms delay +curl "http://localhost:4001/test?delay=500" + +# Combine both +curl "http://localhost:4001/test?delay=500&response_size=5000" +``` + +### Examples + +```bash +# Default: fast 200 responses +elixir benchmarking/channels/mock_sink.exs + +# Simulate flaky upstream +elixir benchmarking/channels/mock_sink.exs --mode mixed + +# Large response bodies (5MB) +elixir benchmarking/channels/mock_sink.exs --body-size 5000000 + +# Require authentication +elixir benchmarking/channels/mock_sink.exs --mode auth + +# Slow responses via query param (no restart needed) +curl "http://localhost:4001/test?delay=2000" +``` + +## Load Test (`load_test.exs`) + +Drives HTTP traffic through the channel proxy, collects metrics, and reports +latency percentiles, throughput, error rates, BEAM memory usage, and server-side +telemetry timing breakdown. + +```bash +elixir --sname loadtest --cookie COOKIE \ + benchmarking/channels/load_test.exs [options] +``` + +**Important:** Must be run as a named Erlang node (`--sname`) so it can connect +to the Lightning BEAM for channel setup, memory sampling, and telemetry. + +### Options + +| Option | Default | Description | +| ----------------------- | ----------------------- | ---------------------------------------------------- | +| `--target URL` | `http://localhost:4000` | Lightning base URL | +| `--sink URL` | `http://localhost:4001` | Mock sink URL (for channel creation) | +| `--node NODE` | `lightning@hostname` | Lightning node name | +| `--cookie COOKIE` | — | Erlang cookie (also settable via `elixir --cookie`) | +| `--channel NAME` | `load-test` | Channel name to find/create | +| `--scenario NAME` | `happy_path` | Test scenario (see below) | +| `--concurrency N` | 10 | Concurrent virtual users | +| `--duration SECS` | 30 | Test duration | +| `--payload-size BYTES` | 1024 | Request body size | +| `--response-size BYTES` | — | Response body size override (via `?response_size=N`) | +| `--delay MS` | — (slow_sink: 2000) | Sink response delay (via `?delay=N`) | +| `--csv PATH` | — | Optional CSV output file | + +### Scenarios + +| Scenario | Description | Mock sink mode | +| ------------------ | ----------------------------------------------- | --------------------- | +| **happy_path** | Sustained POST requests at constant concurrency | `fixed` (default) | +| **ramp_up** | Linearly ramp from 1 → N VUs over duration | `fixed` | +| **large_payload** | POST with large request bodies (default 1MB) | `fixed` | +| **large_response** | GET requests with large response bodies | `fixed --body-size N` | +| **mixed_methods** | Rotate through GET, POST, PUT, PATCH, DELETE | `fixed` | +| **slow_sink** | Measure latency with a slow upstream | `fixed` + `?delay=N` | +| **direct_sink** | Hit mock sink directly (baseline measurement) | `fixed` (default) | + +### Examples + +```bash +# Basic throughput test +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --concurrency 20 --duration 30 + +# Memory test with 1MB payloads +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario large_payload --payload-size 1048576 --duration 30 + +# Ramp up to 50 VUs with CSV output +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario ramp_up --concurrency 50 --duration 60 --csv results.csv + +# Slow upstream latency test (delay applied via query param, no sink restart) +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario slow_sink --delay 2000 --concurrency 10 --duration 30 + +# Baseline: hit mock sink directly (no Lightning needed, no --sname required) +elixir benchmarking/channels/load_test.exs \ + --scenario direct_sink --concurrency 20 --duration 10 + +# Large response test with explicit response size +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario large_response --response-size 1048576 --duration 30 +``` + +## Run All Scenarios (`run_all.sh`) + +Runs all 7 scenarios in sequence, logging output to a timestamped file and +appending CSV rows for each scenario. Assumes Lightning and mock sink are +already running. Bails on first failure. + +```bash +benchmarking/channels/run_all.sh [options] +``` + +### Options + +| Option | Default | Description | +| ----------------- | ------- | --------------------- | +| `--sname NAME` | lt | Erlang short name | +| `--cookie COOKIE` | bench | Erlang cookie | +| `--duration SECS` | 30 | Per-scenario duration | +| `--concurrency N` | 20 | Virtual users | + +### Scenario Order + +| # | Scenario | Extra flags | +| --- | ---------------- | ------------------------- | +| 1 | `direct_sink` | (none — baseline) | +| 2 | `happy_path` | (none) | +| 3 | `ramp_up` | (none) | +| 4 | `large_payload` | `--payload-size 1048576` | +| 5 | `large_response` | `--response-size 1048576` | +| 6 | `mixed_methods` | (none) | +| 7 | `slow_sink` | `--delay 2000` | + +### Examples + +```bash +# Quick smoke test (10s per scenario, 5 VUs) +benchmarking/channels/run_all.sh --duration 10 --concurrency 5 + +# Full run with defaults (30s per scenario, 20 VUs) +benchmarking/channels/run_all.sh + +# Custom node/cookie +benchmarking/channels/run_all.sh --sname mynode --cookie mysecret +``` + +Results are written to `/tmp/channel-bench-results/`: + +- `YYYY.MM.DD-HH.MM.log` — full console output +- `YYYY.MM.DD-HH.MM.csv` — one row per scenario for analysis + +## Interpreting Results + +The load test prints a summary like: + +``` +═══════════════════════════════════════ + Channel Load Test Results +═══════════════════════════════════════ + Scenario: happy_path + Concurrency: 20 VUs + Duration: 30s +─────────────────────────────────────── + Requests: 15432 + Throughput: 514.4 req/s + Errors: 0 (0.0%) +─────────────────────────────────────── + Latency: + p50: 12.3ms + p95: 45.7ms + p99: 89.2ms +─────────────────────────────────────── + Memory (Lightning BEAM): + start: 128.5 MB + end: 131.2 MB + max: 135.0 MB + delta: +2.7 MB +═══════════════════════════════════════ +``` + +### Telemetry Timing Breakdown + +When running through Lightning (not `direct_sink`), the load test automatically +deploys a telemetry collector onto the Lightning BEAM node. After the test, it +prints a server-side timing breakdown: + +``` +─────────────────────────────────────── + Channel Proxy Timing (server-side): + Total request p50=12.3ms, p95=45.7ms, p99=89.2ms, n=15432 + DB lookup p50=0.2ms, p95=0.5ms, p99=1.1ms, n=15432 + Upstream proxy p50=11.8ms, p95=44.9ms, p99=87.5ms, n=15432 +``` + +This tells you exactly where time is spent inside the channel proxy: + +| Metric | What it measures | +| ------------------ | -------------------------------------------------------------------- | +| **Total request** | Entire `ChannelProxyPlug.call/2` — DB lookup + proxy + plug overhead | +| **DB lookup** | `Ecto.UUID.cast` + `Repo.get` to find the channel | +| **Upstream proxy** | `Weir.proxy` call — HTTP to the sink + response streaming back | +| **Plug overhead** | `Total request` - `DB lookup` - `Upstream proxy` = plug/header work | + +If `Total request` is much larger than `Upstream proxy`, the overhead is in the +Plug pipeline or DB lookup. If `Upstream proxy` dominates, the time is in the +network hop to the sink. + +The telemetry collector uses ETS with `:public` access and `write_concurrency` +for minimal overhead — handlers run in the connection processes, not through a +GenServer bottleneck. + +### What "good" looks like + +- **Memory delta** is the key metric for proxy correctness. If the proxy is + streaming properly, memory should stay roughly flat regardless of payload + size. A delta under **50 MB** for a 30-second test with 1MB payloads indicates + correct streaming behaviour. A growing delta suggests the proxy is buffering + entire request/response bodies in memory. + +- **Throughput** depends heavily on your machine and the mock sink + configuration. With a fast local sink and 20 VUs, expect 500+ req/s on modern + hardware. + +- **Latency p95** should be close to p50 for the `happy_path` scenario (no + artificial delays). A large gap indicates contention or resource exhaustion. + +- **Error rate** should be 0% for `happy_path` and `large_payload` scenarios. + Non-zero errors suggest proxy bugs or resource limits. + +### Measuring proxy overhead with `direct_sink` + +The `direct_sink` scenario hits the mock sink directly, bypassing Lightning +entirely. This gives a baseline for the test harness + mock sink latency: + +``` +proxy_overhead = happy_path_latency - direct_sink_latency +``` + +Run both and compare: + +```bash +# Baseline (no Lightning needed) +elixir benchmarking/channels/load_test.exs \ + --scenario direct_sink --concurrency 20 --duration 10 + +# Through proxy +elixir --sname lt --cookie bench \ + benchmarking/channels/load_test.exs \ + --scenario happy_path --concurrency 20 --duration 10 +``` + +The difference tells you exactly what the proxy pipeline (plugs, DB lookup, +Weir, second HTTP hop) costs per request. The telemetry breakdown further +decomposes that cost into DB lookup vs upstream proxy vs plug overhead. diff --git a/benchmarking/channels/lib/load_test/config.exs b/benchmarking/channels/lib/load_test/config.exs new file mode 100644 index 00000000000..a237b2874bf --- /dev/null +++ b/benchmarking/channels/lib/load_test/config.exs @@ -0,0 +1,191 @@ +# benchmarking/channels/lib/load_test/config.exs +# +# CLI argument parsing and validation for the channel load test. + +defmodule LoadTest.Config do + @moduledoc false + + @scenarios ~w(happy_path ramp_up saturation large_payload large_response mixed_methods slow_sink direct_sink) + + @defaults %{ + target: "http://localhost:4000", + sink: "http://localhost:4001", + node: nil, + cookie: nil, + channel: "load-test", + scenario: "happy_path", + concurrency: 10, + duration: 30, + payload_size: 1024, + response_size: nil, + delay: nil, + csv: nil, + charts: false + } + + @help """ + Usage: elixir --sname loadtest --cookie COOKIE \\ + benchmarking/channels/load_test.exs [options] + + Options: + --target URL Lightning base URL (default: http://localhost:4000) + --sink URL Mock sink URL for channel creation (default: http://localhost:4001) + --node NODE Lightning node name (default: lightning@hostname) + --cookie COOKIE Erlang cookie (can also use --cookie flag on elixir command) + --channel NAME Channel name to find/create (default: load-test) + --scenario SCENARIO Test scenario (default: happy_path) + --concurrency N Concurrent virtual users (default: 10) + --duration SECS Test duration in seconds (default: 30) + --payload-size BYTES Request body size (default: 1024) + --response-size BYTES Response body size override via query param (default: none) + --delay MS Sink response delay via query param (default: none; slow_sink: 2000) + --csv PATH Optional CSV output file for results + --charts Generate gnuplot charts (PNG files in /tmp or next to --csv) + --help Show this help + + Scenarios: + happy_path Sustained POST requests at --concurrency VUs for --duration seconds + ramp_up Ramp from 1 to --concurrency VUs over --duration seconds + saturation Ramp through concurrency levels, report per-step (find throughput ceiling) + large_payload POST with --payload-size bodies (default 1MB), check memory stays flat + large_response GET requests; mock sink returns large bodies. Reports memory + mixed_methods Rotate through GET, POST, PUT, PATCH, DELETE + slow_sink Sink with --delay ms (default 2000); measures TTFB and latency + direct_sink Hit mock sink directly (no Lightning), baseline measurement + + Note: Most scenarios require a named node (--sname) to connect to Lightning. + The direct_sink scenario does not require --sname or a running Lightning instance. + The --cookie flag on the elixir command sets the Erlang cookie. The + script also accepts --cookie in its own args as a convenience. + """ + + def parse(args) do + case parse_args(args, @defaults) do + :help -> + IO.puts(@help) + System.halt(0) + + {:error, message} -> + IO.puts(:stderr, "error: #{message}\n") + IO.puts(:stderr, @help) + System.halt(1) + + config -> + config + |> apply_defaults() + |> validate!() + end + end + + defp parse_args([], acc), do: acc + defp parse_args(["--help" | _], _acc), do: :help + + defp parse_args(["--target", value | rest], acc), + do: parse_args(rest, %{acc | target: String.trim_trailing(value, "/")}) + + defp parse_args(["--sink", value | rest], acc), + do: parse_args(rest, %{acc | sink: String.trim_trailing(value, "/")}) + + defp parse_args(["--node", value | rest], acc), + do: parse_args(rest, %{acc | node: value}) + + defp parse_args(["--cookie", value | rest], acc), + do: parse_args(rest, %{acc | cookie: value}) + + defp parse_args(["--channel", value | rest], acc), + do: parse_args(rest, %{acc | channel: value}) + + defp parse_args(["--scenario", value | rest], acc) do + if value in @scenarios do + parse_args(rest, %{acc | scenario: value}) + else + {:error, + "unknown scenario: #{value}. Expected one of: #{Enum.join(@scenarios, ", ")}"} + end + end + + defp parse_args(["--concurrency", value | rest], acc) do + case Integer.parse(value) do + {n, ""} when n > 0 -> parse_args(rest, %{acc | concurrency: n}) + _ -> {:error, "invalid concurrency: #{value}"} + end + end + + defp parse_args(["--duration", value | rest], acc) do + case Integer.parse(value) do + {n, ""} when n > 0 -> parse_args(rest, %{acc | duration: n}) + _ -> {:error, "invalid duration: #{value}"} + end + end + + defp parse_args(["--payload-size", value | rest], acc) do + case Integer.parse(value) do + {n, ""} when n > 0 -> parse_args(rest, %{acc | payload_size: n}) + _ -> {:error, "invalid payload-size: #{value}"} + end + end + + defp parse_args(["--response-size", value | rest], acc) do + case Integer.parse(value) do + {n, ""} when n > 0 -> parse_args(rest, %{acc | response_size: n}) + _ -> {:error, "invalid response-size: #{value}"} + end + end + + defp parse_args(["--delay", value | rest], acc) do + case Integer.parse(value) do + {n, ""} when n >= 0 -> parse_args(rest, %{acc | delay: n}) + _ -> {:error, "invalid delay: #{value}"} + end + end + + defp parse_args(["--csv", value | rest], acc), + do: parse_args(rest, %{acc | csv: value}) + + defp parse_args(["--charts" | rest], acc), + do: parse_args(rest, %{acc | charts: true}) + + defp parse_args([unknown | _], _acc), + do: {:error, "unknown option: #{unknown}"} + + defp apply_defaults(%{node: nil} = config) do + {:ok, hostname} = :inet.gethostname() + %{config | node: "lightning@#{hostname}"} + end + + defp apply_defaults(config), do: config + + defp apply_defaults_for_scenario( + %{scenario: "large_payload", payload_size: 1024} = config + ) do + %{config | payload_size: 1_048_576} + end + + defp apply_defaults_for_scenario( + %{scenario: "large_response", response_size: nil} = config + ) do + %{config | response_size: 1_048_576} + end + + defp apply_defaults_for_scenario(%{scenario: "slow_sink", delay: nil} = config) do + %{config | delay: 2_000} + end + + defp apply_defaults_for_scenario(config), do: config + + defp validate!(config) do + config = apply_defaults_for_scenario(config) + + if config.scenario == "direct_sink" or Node.alive?() do + config + else + IO.puts(:stderr, """ + error: This script must be run as a named Erlang node. + + Use: elixir --sname loadtest --cookie COOKIE benchmarking/channels/load_test.exs + """) + + System.halt(1) + end + end +end diff --git a/benchmarking/channels/lib/load_test/main.exs b/benchmarking/channels/lib/load_test/main.exs new file mode 100644 index 00000000000..2656a90cdba --- /dev/null +++ b/benchmarking/channels/lib/load_test/main.exs @@ -0,0 +1,172 @@ +# benchmarking/channels/lib/load_test/main.exs +# +# Orchestrator — ties together Config, Metrics, Setup, Runner, and Report. + +defmodule LoadTest do + @moduledoc false + + def main(args) do + # Capture the raw argv before parsing so we can reproduce the invocation + command = reconstruct_command(args) + + opts = LoadTest.Config.parse(args) + + opts = + if opts[:charts] do + prefix = LoadTest.Report.generate_output_prefix(opts) + Map.put(opts, :output_prefix, prefix) + else + opts + end + + # Start the Finch HTTP client pool + {:ok, _} = + Finch.start_link( + name: LoadTest.Finch, + pools: %{ + default: [size: opts[:concurrency], count: 1] + } + ) + + # Start the metrics collector + {:ok, _} = LoadTest.Metrics.start_link() + + # Pre-flight: verify mock sink is reachable + LoadTest.Setup.preflight_sink!(opts) + + direct? = opts[:scenario] == "direct_sink" + + # Build the target URL + {channel_url, node} = + if direct? do + {"#{opts[:sink]}/test", nil} + else + # Connect to the Lightning BEAM + node = LoadTest.Setup.connect!(opts) + + # Ensure test channel exists + channel = + LoadTest.Setup.ensure_channel!(node, opts) + + {"#{opts[:target]}/channels/#{channel.id}/test", node} + end + + # Append query params (?response_size=N&delay=N) when configured + channel_url = append_query_params(channel_url, opts) + + # Deploy telemetry collector (skip for direct_sink) + telemetry_ok? = + if not direct? and node do + LoadTest.Setup.deploy_telemetry_collector!(node) == :ok + else + false + end + + # Print test banner + duration_label = + if opts[:scenario] == "saturation", + do: "#{opts[:duration]}s (per step)", + else: "#{opts[:duration]}s" + + IO.puts(""" + + Starting load test... + URL: #{channel_url} + Scenario: #{opts[:scenario]} + Concurrency: #{opts[:concurrency]} VUs + Duration: #{duration_label} + Payload: #{opts[:payload_size]} bytes#{format_response_size(opts[:response_size])}#{format_delay(opts[:delay])} + Telemetry: #{if telemetry_ok?, do: "enabled", else: "disabled"} + Command: #{command} + """) + + # Run the scenario + if direct? do + LoadTest.Runner.run_direct(opts[:scenario], channel_url, opts) + print_standard_results(opts, command, node, telemetry_ok?) + else + result = LoadTest.Runner.run(opts[:scenario], channel_url, opts) + + if opts[:scenario] == "saturation" do + LoadTest.Report.print_saturation(result, opts, command) + + # Charts need a CSV to read from; auto-create one if --charts without --csv + opts_with_csv = + if opts[:charts] and is_nil(opts[:csv]), + do: Map.put(opts, :csv, opts[:output_prefix] <> ".csv"), + else: opts + + LoadTest.Report.write_saturation_csv(result, opts_with_csv) + + if opts[:charts] do + csv_path = opts_with_csv[:csv] + LoadTest.Report.write_saturation_charts(csv_path) + end + + if telemetry_ok? do + LoadTest.Setup.teardown_telemetry_collector!(node) + end + else + print_standard_results(opts, command, node, telemetry_ok?) + end + end + end + + defp print_standard_results(opts, command, node, telemetry_ok?) do + summary = LoadTest.Metrics.summary() + LoadTest.Report.print(summary, opts, command) + + if telemetry_ok? do + telemetry = LoadTest.Setup.get_telemetry_summary(node) + LoadTest.Report.print_telemetry(telemetry) + LoadTest.Setup.teardown_telemetry_collector!(node) + end + + LoadTest.Report.write_csv(summary, opts) + + if opts[:charts] do + timeseries = LoadTest.Metrics.latency_timeseries() + LoadTest.Report.write_standard_charts(timeseries, opts) + end + end + + defp reconstruct_command(args) do + script = "benchmarking/channels/load_test.exs" + argv = Enum.join(args, " ") + + node_part = + case Node.self() do + :nonode@nohost -> "" + node -> " --sname #{node}" + end + + cookie_part = + case Node.get_cookie() do + :nocookie -> "" + cookie -> " --cookie #{cookie}" + end + + "elixir#{node_part}#{cookie_part} #{script} #{argv}" + |> String.trim() + end + + defp append_query_params(url, opts) do + params = + [ + if(opts[:response_size], do: "response_size=#{opts[:response_size]}"), + if(opts[:delay], do: "delay=#{opts[:delay]}") + ] + |> Enum.reject(&is_nil/1) + + case params do + [] -> url + parts -> "#{url}?#{Enum.join(parts, "&")}" + end + end + + defp format_response_size(nil), do: "" + defp format_response_size(n), do: "\n Response: #{n} bytes" + + defp format_delay(nil), do: "" + defp format_delay(n), do: "\n Delay: #{n}ms" +end diff --git a/benchmarking/channels/lib/load_test/metrics.exs b/benchmarking/channels/lib/load_test/metrics.exs new file mode 100644 index 00000000000..ee4d76b64d0 --- /dev/null +++ b/benchmarking/channels/lib/load_test/metrics.exs @@ -0,0 +1,163 @@ +# benchmarking/channels/lib/load_test/metrics.exs +# +# Agent-based metrics collector for request latencies, status codes, +# errors, and BEAM memory samples. + +defmodule LoadTest.Metrics do + @moduledoc false + + use Agent + + def start_link do + Agent.start_link( + fn -> + %{ + latencies: [], + status_codes: %{}, + errors: 0, + error_reasons: %{}, + memory_samples: [], + start_time: System.monotonic_time(:microsecond) + } + end, + name: __MODULE__ + ) + end + + def record_request(latency_us, status) do + Agent.update(__MODULE__, fn state -> + elapsed_us = System.monotonic_time(:microsecond) - state.start_time + + state + |> Map.update!(:latencies, &[{elapsed_us, latency_us} | &1]) + |> Map.update!(:status_codes, fn codes -> + Map.update(codes, status, 1, &(&1 + 1)) + end) + end) + end + + def record_error(reason) do + key = inspect(reason) + + Agent.update(__MODULE__, fn state -> + state + |> Map.update!(:errors, &(&1 + 1)) + |> Map.update!(:error_reasons, fn reasons -> + Map.update(reasons, key, 1, &(&1 + 1)) + end) + end) + end + + def reset do + Agent.update(__MODULE__, fn state -> + %{ + state + | latencies: [], + status_codes: %{}, + errors: 0, + error_reasons: %{}, + start_time: System.monotonic_time(:microsecond) + } + end) + end + + def record_memory(bytes) do + timestamp = System.monotonic_time(:microsecond) + + Agent.update(__MODULE__, fn state -> + Map.update!(state, :memory_samples, &[{timestamp, bytes} | &1]) + end) + end + + def summary do + Agent.get(__MODULE__, fn state -> + now = System.monotonic_time(:microsecond) + elapsed_us = now - state.start_time + elapsed_s = max(elapsed_us / 1_000_000, 0.001) + + latencies = state.latencies |> Enum.map(&elem(&1, 1)) |> Enum.sort() + total = length(latencies) + + memory_samples = + state.memory_samples + |> Enum.reverse() + + %{ + total_requests: total, + rps: if(total > 0, do: Float.round(total / elapsed_s, 1), else: 0.0), + error_count: state.errors, + error_rate: + if(total + state.errors > 0, + do: Float.round(state.errors / (total + state.errors) * 100, 1), + else: 0.0 + ), + error_reasons: state.error_reasons, + status_codes: state.status_codes, + p50: percentile(latencies, 50), + p95: percentile(latencies, 95), + p99: percentile(latencies, 99), + min: List.first(latencies, 0), + max: List.last(latencies, 0), + duration_s: Float.round(elapsed_s, 1), + memory_start: memory_start(memory_samples), + memory_end: memory_end(memory_samples), + memory_max: memory_max(memory_samples), + memory_delta: memory_delta(memory_samples) + } + end) + end + + def latency_timeseries(bucket_s \\ 1.0) do + Agent.get(__MODULE__, fn state -> + bucket_us = round(bucket_s * 1_000_000) + + state.latencies + |> Enum.group_by(fn {elapsed_us, _lat} -> div(elapsed_us, bucket_us) end) + |> Enum.sort_by(fn {bucket, _} -> bucket end) + |> Enum.map(fn {bucket, entries} -> + lats = entries |> Enum.map(&elem(&1, 1)) |> Enum.sort() + + %{ + t: Float.round((bucket + 1) * bucket_s, 1), + p50: percentile(lats, 50), + p95: percentile(lats, 95), + p99: percentile(lats, 99), + count: length(lats) + } + end) + end) + end + + defp percentile([], _p), do: 0 + + defp percentile(sorted, p) do + k = max(round(length(sorted) * p / 100) - 1, 0) + Enum.at(sorted, k, 0) + end + + defp memory_start([{_ts, bytes} | _]), do: bytes + defp memory_start([]), do: nil + + defp memory_end(samples) do + case List.last(samples) do + {_ts, bytes} -> bytes + nil -> nil + end + end + + defp memory_max(samples) do + case samples do + [] -> nil + _ -> samples |> Enum.map(&elem(&1, 1)) |> Enum.max() + end + end + + defp memory_delta(samples) do + with {_ts1, start_bytes} <- List.first(samples), + {_ts2, end_bytes} <- List.last(samples) do + end_bytes - start_bytes + else + _ -> nil + end + end +end diff --git a/benchmarking/channels/lib/load_test/report.exs b/benchmarking/channels/lib/load_test/report.exs new file mode 100644 index 00000000000..3215a79f423 --- /dev/null +++ b/benchmarking/channels/lib/load_test/report.exs @@ -0,0 +1,474 @@ +# benchmarking/channels/lib/load_test/report.exs +# +# Formats and prints metrics summary, including telemetry timing breakdown. + +defmodule LoadTest.Report do + @moduledoc false + + def print(summary, opts, command \\ nil) do + direct? = opts[:scenario] == "direct_sink" + + scenario_label = + if direct?, do: "#{opts[:scenario]} (baseline)", else: opts[:scenario] + + memory_section = format_memory_section(summary, direct?) + + IO.puts(""" + + \u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550 + Channel Load Test Results + \u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550 + Scenario: #{scenario_label} + Concurrency: #{opts[:concurrency]} VUs + Duration: #{summary.duration_s}s + Command: #{command} + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + Requests: #{summary.total_requests} + Throughput: #{summary.rps} req/s + Errors: #{summary.error_count} (#{summary.error_rate}%) + #{format_status_codes(summary.status_codes)}\ + #{format_error_reasons(summary.error_reasons)}\ + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + Latency: + p50: #{format_us(summary.p50)} + p95: #{format_us(summary.p95)} + p99: #{format_us(summary.p99)} + min: #{format_us(summary.min)} + max: #{format_us(summary.max)} + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + #{memory_section}\ + \u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550 + """) + end + + @doc """ + Print telemetry timing breakdown from the remote collector. + Shows server-side timing for the channel proxy pipeline. + """ + def print_telemetry(nil), do: :ok + + def print_telemetry(telemetry) when map_size(telemetry) == 0 do + IO.puts(""" + Channel Proxy Timing (server-side): + No telemetry data collected + """) + end + + def print_telemetry(telemetry) do + # Display order: total request, then sub-spans + rows = [ + {:request, "Total request", Map.get(telemetry, :request)}, + {:fetch_channel, " DB lookup", Map.get(telemetry, :fetch_channel)}, + {:upstream, " Upstream proxy", Map.get(telemetry, :upstream)} + ] + + lines = + rows + |> Enum.filter(fn {_key, _label, stats} -> stats != nil end) + |> Enum.map(fn {_key, label, stats} -> + " #{String.pad_trailing(label, 18)} " <> + "p50=#{format_us(stats.p50)}, " <> + "p95=#{format_us(stats.p95)}, " <> + "p99=#{format_us(stats.p99)}, " <> + "n=#{stats.count}" + end) + |> Enum.join("\n") + + IO.puts(""" + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + Channel Proxy Timing (server-side): + #{lines} + """) + end + + # -- Saturation output -- + + def print_saturation(results, opts, command \\ nil) do + header = """ + + \u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550 + Saturation Test Results + \u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550 + Max VUs: #{opts[:concurrency]} + Per-step: #{opts[:duration]}s + Steps: #{length(results)} + Command: #{command} + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + """ + + table_header = + " #{pad_r("VUs", 6)} #{pad_r("RPS", 9)} #{pad_r("Reqs", 8)} " <> + "#{pad_r("Errors", 8)} #{pad_r("p50", 10)} #{pad_r("p95", 10)} #{pad_r("p99", 10)}" + + separator = + " #{String.duplicate("\u2500", 5)} " <> + "#{String.duplicate("\u2500", 8)} " <> + "#{String.duplicate("\u2500", 7)} " <> + "#{String.duplicate("\u2500", 7)} " <> + "#{String.duplicate("\u2500", 9)} " <> + "#{String.duplicate("\u2500", 9)} " <> + "#{String.duplicate("\u2500", 9)}" + + rows = + results + |> Enum.map(fn %{concurrency: c, summary: s} -> + " #{pad_r(to_string(c), 6)} " <> + "#{pad_r(to_string(s.rps), 9)} " <> + "#{pad_r(to_string(s.total_requests), 8)} " <> + "#{pad_r(to_string(s.error_count), 8)} " <> + "#{pad_r(format_us(s.p50), 10)} " <> + "#{pad_r(format_us(s.p95), 10)} " <> + "#{pad_r(format_us(s.p99), 10)}" + end) + |> Enum.join("\n") + + footer = + "\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550\u2550" + + IO.puts(header) + IO.puts(table_header) + IO.puts(separator) + IO.puts(rows) + IO.puts(" #{footer}") + + # Print server-side telemetry table if any step has it + if Enum.any?(results, fn %{telemetry: t} -> t != nil and map_size(t) > 0 end) do + print_saturation_telemetry(results) + end + end + + defp print_saturation_telemetry(results) do + IO.puts(""" + + \u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500\u2500 + Server-side Timing (per step): + """) + + for %{concurrency: c, telemetry: t} <- results, t != nil, map_size(t) > 0 do + request = Map.get(t, :request) + db = Map.get(t, :fetch_channel) + upstream = Map.get(t, :upstream) + + parts = + [ + if(request, + do: "req p50=#{format_us(request.p50)}/p95=#{format_us(request.p95)}" + ), + if(db, do: "db p50=#{format_us(db.p50)}"), + if(upstream, do: "up p50=#{format_us(upstream.p50)}") + ] + |> Enum.reject(&is_nil/1) + |> Enum.join(", ") + + IO.puts(" #{pad_r("#{c} VUs:", 10)} #{parts}") + end + end + + def write_saturation_csv(results, opts) do + case opts[:csv] do + nil -> + :ok + + path -> + IO.write("Writing saturation CSV to #{path}... ") + + header = + "step,concurrency,duration_s,requests,rps,errors,error_rate," <> + "p50_us,p95_us,p99_us,min_us,max_us,memory_end_bytes," <> + "server_request_p50_us,server_request_p95_us," <> + "server_db_p50_us,server_upstream_p50_us\n" + + rows = + results + |> Enum.with_index(1) + |> Enum.map(fn {%{concurrency: c, summary: s, telemetry: t}, step} -> + tel_request = if t, do: Map.get(t, :request) + tel_db = if t, do: Map.get(t, :fetch_channel) + tel_upstream = if t, do: Map.get(t, :upstream) + + [ + step, + c, + s.duration_s, + s.total_requests, + s.rps, + s.error_count, + s.error_rate, + s.p50, + s.p95, + s.p99, + s.min, + s.max, + s.memory_end || "", + if(tel_request, do: tel_request.p50, else: ""), + if(tel_request, do: tel_request.p95, else: ""), + if(tel_db, do: tel_db.p50, else: ""), + if(tel_upstream, do: tel_upstream.p50, else: "") + ] + |> Enum.join(",") + end) + |> Enum.join("\n") + + content = + if File.exists?(path) do + rows <> "\n" + else + header <> rows <> "\n" + end + + mode = if File.exists?(path), do: [:append], else: [:write] + File.write!(path, content, mode) + IO.puts("ok") + end + end + + def write_saturation_charts(csv_path) do + basename = Path.rootname(csv_path) + script_path = basename <> ".gnuplot" + throughput_png = basename <> "_throughput.png" + latency_png = basename <> "_latency.png" + + # Build latency plot with gnuplot line continuations. + # Can't use \\ at end-of-line in heredocs (Elixir treats \ as continuation), + # so we concatenate the pieces explicitly. + latency_plot = + "plot '#{csv_path}' every ::1 using 2:($8/1000) with linespoints pt 7 lw 2 title 'p50', \\\n" <> + " '' every ::1 using 2:($9/1000) with linespoints pt 5 lw 2 title 'p95', \\\n" <> + " '' every ::1 using 2:($10/1000) with linespoints pt 9 lw 2 title 'p99'\n" + + script = + """ + set datafile separator ',' + set terminal pngcairo size 1024,600 enhanced font 'sans,11' + set grid + set xlabel 'Concurrency (VUs)' + + # Chart 1: Throughput + set output '#{throughput_png}' + set title 'Saturation: Throughput vs Concurrency' + set ylabel 'Throughput (req/s)' + plot '#{csv_path}' every ::1 using 2:5 with linespoints pt 7 ps 1.2 lw 2 title 'RPS' + + # Chart 2: Latency + set output '#{latency_png}' + set title 'Saturation: Latency vs Concurrency' + set ylabel 'Latency (ms)' + set key top left + """ <> latency_plot + + File.write!(script_path, script) + + run_gnuplot(script_path, [throughput_png, latency_png]) + end + + def write_standard_charts(timeseries, opts) do + prefix = + if opts[:output_prefix] do + opts[:output_prefix] + else + Path.rootname(opts[:csv]) + end + + script_path = prefix <> "_latency.gnuplot" + latency_png = prefix <> "_latency.png" + + # columns: t p50_ms p95_ms p99_ms rps + data_lines = + timeseries + |> Enum.map(fn %{t: t, p50: p50, p95: p95, p99: p99, count: count} -> + p50_ms = Float.round(p50 / 1_000, 2) + p95_ms = Float.round(p95 / 1_000, 2) + p99_ms = Float.round(p99 / 1_000, 2) + "#{t} #{p50_ms} #{p95_ms} #{p99_ms} #{count}" + end) + |> Enum.join("\n") + + title = + "#{opts[:scenario]}: Throughput & Latency (#{opts[:concurrency]} VUs)" + + # Build plot command with gnuplot line continuations. + # Can't use \\ at end-of-line in heredocs (Elixir treats \ as continuation), + # so we concatenate explicitly. + # RPS as filled area on y2 (background), latency lines on y1 (foreground). + plot_cmd = + "plot $DATA using 1:5 axes x1y2 with filledcurves x1 fs transparent solid 0.15 lc rgb '#999999' title 'RPS', \\\n" <> + " '' using 1:2 axes x1y1 with linespoints pt 7 lw 2 title 'p50', \\\n" <> + " '' using 1:3 axes x1y1 with linespoints pt 5 lw 2 title 'p95', \\\n" <> + " '' using 1:4 axes x1y1 with linespoints pt 9 lw 2 title 'p99'\n" + + script = + """ + $DATA << EOD + #{data_lines} + EOD + + set terminal pngcairo size 1024,600 enhanced font 'sans,11' + set output '#{latency_png}' + set title '#{title}' + set xlabel 'Time (s)' + set ylabel 'Latency (ms)' + set y2label 'Throughput (req/s)' + set y2tics + set ytics nomirror + set grid + set key top left + """ <> plot_cmd + + File.write!(script_path, script) + + run_gnuplot(script_path, [latency_png]) + end + + def generate_output_prefix(opts) do + timestamp = Calendar.strftime(DateTime.utc_now(), "%Y%m%d_%H%M%S") + "/tmp/loadtest_#{opts[:scenario]}_#{timestamp}" + end + + defp run_gnuplot(script_path, png_paths) do + case System.cmd("gnuplot", [script_path], stderr_to_stdout: true) do + {_, 0} -> + [first | rest] = png_paths + IO.puts("Charts: #{first}") + Enum.each(rest, fn path -> IO.puts(" #{path}") end) + + {output, _} -> + IO.puts("gnuplot script: #{script_path}") + IO.puts(" Run manually: gnuplot #{script_path}") + + if output =~ "not found" or output =~ "No such file" do + IO.puts(" Install: pacman -S gnuplot (or apt install gnuplot)") + end + end + rescue + ErlangError -> + IO.puts("gnuplot script: #{script_path}") + + IO.puts( + " gnuplot not found — install: pacman -S gnuplot (or apt install gnuplot)" + ) + end + + # -- Standard CSV -- + + def write_csv(summary, opts) do + case opts[:csv] do + nil -> + :ok + + path -> + IO.write("Writing CSV to #{path}... ") + + header = + "scenario,concurrency,duration_s,total_requests,rps," <> + "error_count,error_rate,p50_us,p95_us,p99_us,min_us,max_us," <> + "memory_start_bytes,memory_end_bytes,memory_max_bytes,memory_delta_bytes\n" + + row = + [ + opts[:scenario], + opts[:concurrency], + summary.duration_s, + summary.total_requests, + summary.rps, + summary.error_count, + summary.error_rate, + summary.p50, + summary.p95, + summary.p99, + summary.min, + summary.max, + summary.memory_start || "", + summary.memory_end || "", + summary.memory_max || "", + summary.memory_delta || "" + ] + |> Enum.join(",") + + content = + if File.exists?(path) do + # Append without header if file exists + row <> "\n" + else + header <> row <> "\n" + end + + mode = if File.exists?(path), do: [:append], else: [:write] + File.write!(path, content, mode) + IO.puts("ok") + end + end + + # -- Formatting helpers -- + + defp format_memory_section(_summary, true = _direct?) do + " Memory: n/a (direct sink baseline)" + end + + defp format_memory_section(summary, _direct?) do + """ + Memory (Lightning BEAM): + start: #{format_bytes(summary.memory_start)} + end: #{format_bytes(summary.memory_end)} + max: #{format_bytes(summary.memory_max)} + delta: #{format_bytes_delta(summary.memory_delta)}\ + """ + end + + defp format_us(0), do: "n/a" + + defp format_us(us) when us < 1_000, + do: "#{us}us" + + defp format_us(us) when us < 1_000_000, + do: "#{Float.round(us / 1_000, 1)}ms" + + defp format_us(us), + do: "#{Float.round(us / 1_000_000, 2)}s" + + defp format_bytes(nil), do: "n/a" + + defp format_bytes(bytes) when bytes < 1_024, + do: "#{bytes} B" + + defp format_bytes(bytes) when bytes < 1_048_576, + do: "#{Float.round(bytes / 1_024, 1)} KB" + + defp format_bytes(bytes), + do: "#{Float.round(bytes / 1_048_576, 1)} MB" + + defp format_bytes_delta(nil), do: "n/a" + defp format_bytes_delta(0), do: "0 B" + + defp format_bytes_delta(bytes) when bytes > 0, + do: "+#{format_bytes(bytes)}" + + defp format_bytes_delta(bytes), + do: "-#{format_bytes(abs(bytes))}" + + defp format_status_codes(codes) when map_size(codes) == 0, do: "" + + defp format_status_codes(codes) do + lines = + codes + |> Enum.sort_by(fn {code, _} -> code end) + |> Enum.map(fn {code, count} -> " #{code}: #{count}" end) + |> Enum.join("\n") + + " Status codes:\n#{lines}\n" + end + + defp format_error_reasons(reasons) when map_size(reasons) == 0, do: "" + + defp format_error_reasons(reasons) do + lines = + reasons + |> Enum.sort_by(fn {_, count} -> -count end) + |> Enum.take(5) + |> Enum.map(fn {reason, count} -> " #{reason}: #{count}" end) + |> Enum.join("\n") + + " Top errors:\n#{lines}\n" + end + + defp pad_r(str, width), do: String.pad_trailing(str, width) +end diff --git a/benchmarking/channels/lib/load_test/runner.exs b/benchmarking/channels/lib/load_test/runner.exs new file mode 100644 index 00000000000..2328761514c --- /dev/null +++ b/benchmarking/channels/lib/load_test/runner.exs @@ -0,0 +1,301 @@ +# benchmarking/channels/lib/load_test/runner.exs +# +# Executes test scenarios: steady-state, ramp-up, and direct sink. + +defmodule LoadTest.Runner do + @moduledoc false + + @methods [:get, :post, :put, :patch, :delete] + + def run(scenario, channel_url, opts) do + duration_ms = opts[:duration] * 1_000 + node = String.to_atom(opts[:node]) + + # Saturation manages its own memory sampling and returns per-step results + if scenario == "saturation" do + run_saturation(channel_url, opts) + else + # Start memory sampler in background + memory_task = + Task.async(fn -> sample_memory_loop(node, duration_ms) end) + + case scenario do + "ramp_up" -> run_ramp_up(channel_url, opts, duration_ms) + _ -> run_steady(scenario, channel_url, opts, duration_ms) + end + + Task.await(memory_task, :infinity) + end + end + + def run_direct(scenario, channel_url, opts) do + duration_ms = opts[:duration] * 1_000 + # No memory sampling — there is no Lightning BEAM to sample + run_steady(scenario, channel_url, opts, duration_ms) + end + + # -- Steady-state scenarios (constant concurrency) -- + + defp run_steady(scenario, channel_url, opts, duration_ms) do + concurrency = opts[:concurrency] + payload = generate_payload(opts[:payload_size]) + + work_stream(duration_ms) + |> Task.async_stream( + fn _tick -> + execute_request(scenario, channel_url, payload, opts) + end, + max_concurrency: concurrency, + timeout: 60_000 + ) + |> Stream.each(fn + {:ok, {latency_us, status}} -> + LoadTest.Metrics.record_request(latency_us, status) + + {:exit, reason} -> + LoadTest.Metrics.record_error(reason) + end) + |> Stream.run() + end + + # -- Ramp-up scenario (increasing concurrency) -- + + defp run_ramp_up(channel_url, opts, duration_ms) do + max_concurrency = opts[:concurrency] + payload = generate_payload(opts[:payload_size]) + + # Divide duration into 10 steps, each at increasing concurrency + steps = 10 + step_duration_ms = div(duration_ms, steps) + + for step <- 1..steps do + concurrency = max(1, div(max_concurrency * step, steps)) + + IO.puts( + " [ramp] Step #{step}/#{steps}: #{concurrency} VUs for #{div(step_duration_ms, 1000)}s" + ) + + work_stream(step_duration_ms) + |> Task.async_stream( + fn _tick -> + execute_request("happy_path", channel_url, payload, opts) + end, + max_concurrency: concurrency, + timeout: 60_000 + ) + |> Stream.each(fn + {:ok, {latency_us, status}} -> + LoadTest.Metrics.record_request(latency_us, status) + + {:exit, reason} -> + LoadTest.Metrics.record_error(reason) + end) + |> Stream.run() + end + end + + # -- Saturation scenario (increasing concurrency with per-step metrics) -- + + @saturation_levels [1, 2, 5, 10, 20, 50, 100, 200, 500, 1000] + + defp run_saturation(channel_url, opts) do + max_concurrency = opts[:concurrency] + duration_ms = opts[:duration] * 1_000 + payload = generate_payload(opts[:payload_size]) + node = String.to_atom(opts[:node]) + + # Build step sequence: standard levels up to max, always include max + steps = + @saturation_levels + |> Enum.filter(&(&1 <= max_concurrency)) + |> then(fn levels -> + if max_concurrency in levels, + do: levels, + else: levels ++ [max_concurrency] + end) + + IO.puts(" Steps: #{inspect(steps)}\n") + + # Start memory sampler for the full duration (all steps) + total_duration_ms = duration_ms * length(steps) + + memory_task = + Task.async(fn -> sample_memory_loop(node, total_duration_ms) end) + + results = + steps + |> Enum.with_index(1) + |> Enum.map(fn {concurrency, step_num} -> + # Reset metrics for this step + LoadTest.Metrics.reset() + LoadTest.Setup.reset_telemetry_collector(node) + + IO.write( + " [saturation] Step #{step_num}/#{length(steps)}: " <> + "#{concurrency} VUs for #{opts[:duration]}s... " + ) + + # Run the work loop at this concurrency level + work_stream(duration_ms) + |> Task.async_stream( + fn _tick -> + execute_request("happy_path", channel_url, payload, opts) + end, + max_concurrency: concurrency, + timeout: 60_000 + ) + |> Stream.each(fn + {:ok, {latency_us, status}} -> + LoadTest.Metrics.record_request(latency_us, status) + + {:exit, reason} -> + LoadTest.Metrics.record_error(reason) + end) + |> Stream.run() + + # Capture step results + summary = LoadTest.Metrics.summary() + telemetry = LoadTest.Setup.get_telemetry_summary(node) + + IO.puts( + "#{summary.rps} rps, p50=#{format_us_inline(summary.p50)}, " <> + "#{summary.error_count} errors" + ) + + %{concurrency: concurrency, summary: summary, telemetry: telemetry} + end) + + Task.await(memory_task, :infinity) + results + end + + defp format_us_inline(0), do: "n/a" + defp format_us_inline(us) when us < 1_000, do: "#{us}us" + + defp format_us_inline(us) when us < 1_000_000, + do: "#{Float.round(us / 1_000, 1)}ms" + + defp format_us_inline(us), do: "#{Float.round(us / 1_000_000, 2)}s" + + # -- Work stream generator -- + + defp work_stream(duration_ms) do + deadline = System.monotonic_time(:millisecond) + duration_ms + + Stream.repeatedly(fn -> :go end) + |> Stream.take_while(fn _ -> + System.monotonic_time(:millisecond) < deadline + end) + end + + # -- Request execution -- + + defp execute_request(scenario, channel_url, payload, _opts) do + method = pick_method(scenario) + start = System.monotonic_time(:microsecond) + + case do_http_request(method, channel_url, payload) do + {:ok, status} -> + latency_us = System.monotonic_time(:microsecond) - start + {latency_us, status} + + {:error, reason} -> + LoadTest.Metrics.record_error(reason) + # Return a synthetic latency so the caller does not crash + latency_us = System.monotonic_time(:microsecond) - start + {latency_us, :error} + end + end + + defp pick_method("happy_path"), do: :post + defp pick_method("ramp_up"), do: :post + defp pick_method("saturation"), do: :post + defp pick_method("large_payload"), do: :post + defp pick_method("large_response"), do: :get + defp pick_method("slow_sink"), do: :post + defp pick_method("direct_sink"), do: :post + + defp pick_method("mixed_methods") do + Enum.random(@methods) + end + + defp do_http_request(method, url, payload) do + body = if method in [:post, :put, :patch], do: payload, else: nil + headers = [{"content-type", "application/json"}] + + request = Finch.build(method, url, headers, body) + + case Finch.request(request, LoadTest.Finch, receive_timeout: 30_000) do + {:ok, %Finch.Response{status: status}} -> {:ok, status} + {:error, reason} -> {:error, reason} + end + end + + # -- Payload generation -- + + defp generate_payload(size) do + base = + Jason.encode!(%{ + timestamp: DateTime.to_iso8601(DateTime.utc_now()), + source: "load_test" + }) + + base_size = byte_size(base) + + cond do + base_size >= size -> + binary_part(base, 0, size) + + true -> + # Pad with a "data" field to reach the target size + # Account for the JSON wrapper: {"timestamp":"...","source":"...","data":"PADDING"} + # We need to rebuild with the padding included + padding_needed = size - base_size - 11 + + padding = + if padding_needed > 0, + do: String.duplicate("x", padding_needed), + else: "" + + result = + Jason.encode!(%{ + timestamp: DateTime.to_iso8601(DateTime.utc_now()), + source: "load_test", + data: padding + }) + + # Trim to exact size if JSON overhead caused overshoot + if byte_size(result) > size do + binary_part(result, 0, size) + else + result + end + end + end + + # -- Memory sampling -- + + defp sample_memory_loop(node, duration_ms) do + deadline = System.monotonic_time(:millisecond) + duration_ms + + Stream.repeatedly(fn -> + Process.sleep(1_000) + :sample + end) + |> Stream.take_while(fn _ -> + System.monotonic_time(:millisecond) < deadline + end) + |> Enum.each(fn _ -> + case :rpc.call(node, :erlang, :memory, [:total]) do + {:badrpc, _reason} -> :ok + bytes when is_integer(bytes) -> LoadTest.Metrics.record_memory(bytes) + end + end) + + # Final sample + case :rpc.call(node, :erlang, :memory, [:total]) do + {:badrpc, _} -> :ok + bytes when is_integer(bytes) -> LoadTest.Metrics.record_memory(bytes) + end + end +end diff --git a/benchmarking/channels/lib/load_test/setup.exs b/benchmarking/channels/lib/load_test/setup.exs new file mode 100644 index 00000000000..4ca0fe7969a --- /dev/null +++ b/benchmarking/channels/lib/load_test/setup.exs @@ -0,0 +1,290 @@ +# benchmarking/channels/lib/load_test/setup.exs +# +# Connect to Lightning BEAM, find/create channel, and manage telemetry +# collector deployment on the remote node. + +defmodule LoadTest.Setup do + @moduledoc false + + @telemetry_events [ + [:lightning, :channel_proxy, :request], + [:lightning, :channel_proxy, :fetch_channel], + [:lightning, :channel_proxy, :upstream] + ] + + # -- Connection & channel setup -- + + def connect!(opts) do + node = String.to_atom(opts[:node]) + + if opts[:cookie] do + Node.set_cookie(String.to_atom(opts[:cookie])) + end + + IO.write("Connecting to #{node}... ") + + case Node.connect(node) do + true -> + IO.puts("ok") + node + + false -> + IO.puts(:stderr, "\nerror: Could not connect to #{node}") + + IO.puts(:stderr, """ + + Make sure: + 1. Lightning is running as a named node (e.g. --sname lightning) + 2. The cookie matches (e.g. --cookie SECRET) + 3. Both nodes are on the same network/machine + """) + + System.halt(1) + + :ignored -> + IO.puts( + :stderr, + "\nerror: Node.connect returned :ignored. Is this node alive?" + ) + + System.halt(1) + end + end + + def ensure_channel!(node, opts) do + channel_name = opts[:channel] + sink_url = opts[:sink] + + IO.write("Looking up channel '#{channel_name}'... ") + + case rpc!(node, Lightning.Repo, :get_by, [ + Lightning.Channels.Channel, + [name: channel_name] + ]) do + nil -> + IO.puts("not found, creating") + project = ensure_project!(node) + create_channel!(node, channel_name, sink_url, project.id) + + %{enabled: false} = channel -> + IO.puts("found (disabled), enabling") + enable_channel!(node, channel) + + channel -> + IO.puts("found (id: #{short_id(channel.id)})") + channel + end + end + + def preflight_sink!(opts) do + sink_url = opts[:sink] + IO.write("Checking mock sink at #{sink_url}... ") + + request = Finch.build(:get, sink_url) + + case Finch.request(request, LoadTest.Finch, receive_timeout: 5_000) do + {:ok, %Finch.Response{status: status}} when status < 500 -> + IO.puts("ok (status #{status})") + + {:ok, %Finch.Response{status: status}} -> + IO.puts(:stderr, "\nwarning: Mock sink returned #{status}") + + {:error, reason} -> + IO.puts(:stderr, "\nerror: Could not reach mock sink at #{sink_url}") + IO.puts(:stderr, " Reason: #{inspect(reason)}") + + IO.puts(:stderr, """ + + Start the mock sink first: + elixir benchmarking/channels/mock_sink.exs + """) + + System.halt(1) + end + end + + # -- Telemetry collector deployment -- + + @doc """ + Deploy the Bench.TelemetryCollector onto the remote Lightning node. + Reads the collector source, evals it on the remote node, and starts it. + """ + def deploy_telemetry_collector!(node) do + IO.write("Deploying telemetry collector... ") + + # __ENV__.file is .../lib/load_test/setup.exs — go up 2 to .../lib/ + lib_dir = Path.dirname(Path.dirname(__ENV__.file)) + source = File.read!(Path.join(lib_dir, "telemetry_collector.exs")) + + case :rpc.call(node, Code, :eval_string, [source]) do + {:badrpc, reason} -> + IO.puts(:stderr, "\nwarning: Failed to deploy telemetry collector") + IO.puts(:stderr, " Reason: #{inspect(reason)}") + :error + + _ -> + case :rpc.call(node, Bench.TelemetryCollector, :start, [ + @telemetry_events + ]) do + {:ok, _pid} -> + IO.puts("ok") + :ok + + {:badrpc, reason} -> + IO.puts(:stderr, "\nwarning: Failed to start telemetry collector") + IO.puts(:stderr, " Reason: #{inspect(reason)}") + :error + end + end + end + + @doc """ + Fetch the telemetry summary from the remote node. + Returns a map of event_key => stats, or nil on failure. + """ + def get_telemetry_summary(node) do + case :rpc.call(node, Bench.TelemetryCollector, :summary, []) do + {:badrpc, _reason} -> nil + summary when is_map(summary) -> summary + end + end + + @doc """ + Reset the telemetry collector on the remote node (between saturation steps). + """ + def reset_telemetry_collector(node) do + case :rpc.call(node, Bench.TelemetryCollector, :reset, []) do + {:badrpc, _} -> :error + :ok -> :ok + end + end + + @doc """ + Stop and clean up the telemetry collector on the remote node. + """ + def teardown_telemetry_collector!(node) do + IO.write("Tearing down telemetry collector... ") + + case :rpc.call(node, Bench.TelemetryCollector, :stop, []) do + {:badrpc, _} -> IO.puts("skipped (not running)") + _ -> IO.puts("ok") + end + end + + # -- Private helpers -- + + defp ensure_project!(node) do + case rpc!(node, Lightning.Repo, :get_by, [ + Lightning.Projects.Project, + [name: "load-test"] + ]) do + nil -> + IO.write(" Creating 'load-test' project... ") + user = ensure_user!(node) + + case rpc!(node, Lightning.Projects, :create_project, [ + %{ + name: "load-test", + project_users: [%{user_id: user.id, role: :owner}] + }, + false + ]) do + {:ok, project} -> + IO.puts("ok (id: #{short_id(project.id)})") + project + + {:error, changeset} -> + IO.puts(:stderr, "\nerror: Failed to create project") + IO.puts(:stderr, " #{inspect(changeset.errors)}") + System.halt(1) + end + + project -> + IO.puts( + " Using existing 'load-test' project (id: #{short_id(project.id)})" + ) + + project + end + end + + defp ensure_user!(node) do + email = "load-test@openfn.org" + + case rpc!(node, Lightning.Repo, :get_by, [ + Lightning.Accounts.User, + [email: email] + ]) do + nil -> + IO.write(" Creating load-test user... ") + + {:ok, user} = + rpc!(node, Lightning.Accounts, :register_user, [ + %{ + first_name: "Load", + last_name: "Test", + email: email, + password: "load-test-password-12345" + } + ]) + + IO.puts("ok") + user + + user -> + user + end + end + + defp create_channel!(node, name, sink_url, project_id) do + IO.write(" Creating channel '#{name}'... ") + + case rpc!(node, Lightning.Channels, :create_channel, [ + %{name: name, sink_url: sink_url, project_id: project_id} + ]) do + {:ok, channel} -> + IO.puts("ok (id: #{short_id(channel.id)})") + channel + + {:error, changeset} -> + IO.puts(:stderr, "\nerror: Failed to create channel") + IO.puts(:stderr, " #{inspect(changeset.errors)}") + System.halt(1) + end + end + + defp enable_channel!(node, channel) do + case rpc!(node, Lightning.Channels, :update_channel, [ + channel, + %{enabled: true} + ]) do + {:ok, channel} -> + IO.puts(" Enabled channel (id: #{short_id(channel.id)})") + channel + + {:error, changeset} -> + IO.puts(:stderr, "\nerror: Failed to enable channel") + IO.puts(:stderr, " #{inspect(changeset.errors)}") + System.halt(1) + end + end + + defp rpc!(node, mod, fun, args) do + case :rpc.call(node, mod, fun, args) do + {:badrpc, reason} -> + IO.puts( + :stderr, + "\nerror: RPC call failed: #{mod}.#{fun}/#{length(args)}" + ) + + IO.puts(:stderr, " Reason: #{inspect(reason)}") + System.halt(1) + + result -> + result + end + end + + defp short_id(id) when is_binary(id), do: String.slice(id, 0, 8) <> "..." + defp short_id(id), do: inspect(id) +end diff --git a/benchmarking/channels/lib/telemetry_collector.exs b/benchmarking/channels/lib/telemetry_collector.exs new file mode 100644 index 00000000000..eb9e9fdfbd3 --- /dev/null +++ b/benchmarking/channels/lib/telemetry_collector.exs @@ -0,0 +1,162 @@ +# benchmarking/channels/lib/telemetry_collector.exs +# +# A lightweight telemetry collector designed to be deployed onto a remote +# Lightning BEAM node via :rpc.call(node, Code, :eval_string, [source]). +# +# Uses ETS with :public access and write_concurrency so telemetry handlers +# (running in connection processes) can write directly without going through +# the GenServer. The GenServer owns the table lifetime and handles cleanup. +# +# Usage (from load test): +# source = File.read!("lib/telemetry_collector.exs") +# :rpc.call(node, Code, :eval_string, [source]) +# :rpc.call(node, Bench.TelemetryCollector, :start, [events]) +# ...run test... +# :rpc.call(node, Bench.TelemetryCollector, :summary, []) +# :rpc.call(node, Bench.TelemetryCollector, :stop, []) + +defmodule Bench.TelemetryCollector do + use GenServer + + @table :bench_telemetry + @handler_id "bench_telemetry_collector" + + # -- Public API -- + + @doc """ + Start the collector and attach to the :stop events for the given event + prefixes. Idempotent — if already running, resets and re-attaches. + """ + def start(events) do + case GenServer.start(__MODULE__, events, name: __MODULE__) do + {:ok, pid} -> + {:ok, pid} + + {:error, {:already_started, pid}} -> + reset() + {:ok, pid} + end + end + + @doc """ + Stop the collector, detach handlers, and delete the ETS table. + """ + def stop do + GenServer.stop(__MODULE__, :normal) + catch + :exit, _ -> :ok + end + + @doc """ + Clear all collected data (between scenarios). + """ + def reset do + if :ets.info(@table) != :undefined do + :ets.delete_all_objects(@table) + end + + :ok + end + + @doc """ + Aggregate collected data into a summary map. Each event gets: + count, min, max, mean, p50, p95, p99 (all in microseconds). + """ + def summary do + if :ets.info(@table) == :undefined do + %{} + else + @table + |> :ets.tab2list() + |> Enum.group_by(&elem(&1, 0), &elem(&1, 1)) + |> Enum.into(%{}, fn {event_key, durations} -> + sorted = Enum.sort(durations) + count = length(sorted) + + stats = %{ + count: count, + min: List.first(sorted, 0), + max: List.last(sorted, 0), + mean: + if(count > 0, + do: Float.round(Enum.sum(sorted) / count, 1), + else: 0.0 + ), + p50: percentile(sorted, 50), + p95: percentile(sorted, 95), + p99: percentile(sorted, 99) + } + + {event_key, stats} + end) + end + end + + # -- GenServer callbacks -- + + @impl true + def init(events) do + # Create ETS table: :duplicate_bag allows multiple entries per key + # (including identical {key, value} pairs — :bag would deduplicate those), + # :public lets handler processes write directly, + # write_concurrency optimizes for concurrent inserts. + table = + :ets.new(@table, [ + :duplicate_bag, + :named_table, + :public, + write_concurrency: true + ]) + + # Attach to :stop events for each prefix + stop_events = Enum.map(events, &(&1 ++ [:stop])) + + :telemetry.attach_many( + @handler_id, + stop_events, + &__MODULE__.handle_event/4, + nil + ) + + {:ok, %{table: table}} + end + + @impl true + def terminate(_reason, _state) do + :telemetry.detach(@handler_id) + + if :ets.info(@table) != :undefined do + :ets.delete(@table) + end + + :ok + catch + _, _ -> :ok + end + + # -- Telemetry handler (runs in the calling process, not GenServer) -- + + def handle_event(event, %{duration: duration}, _metadata, _config) do + # event is e.g. [:lightning, :channel_proxy, :request, :stop] + # Convert to a key like :request, :fetch_channel, :upstream + event_key = event |> Enum.at(-2) + + # duration is in native units, convert to microseconds + duration_us = System.convert_time_unit(duration, :native, :microsecond) + + if :ets.info(@table) != :undefined do + :ets.insert(@table, {event_key, duration_us}) + end + end + + def handle_event(_event, _measurements, _metadata, _config), do: :ok + + # -- Private helpers -- + + defp percentile([], _p), do: 0 + + defp percentile(sorted, p) do + k = max(round(length(sorted) * p / 100) - 1, 0) + Enum.at(sorted, k, 0) + end +end diff --git a/benchmarking/channels/load_test.exs b/benchmarking/channels/load_test.exs new file mode 100644 index 00000000000..2e6e0506d18 --- /dev/null +++ b/benchmarking/channels/load_test.exs @@ -0,0 +1,22 @@ +# benchmarking/channels/load_test.exs +# +# Entry point for the channel proxy load test. Installs dependencies, +# loads all modules in dependency order, and runs the test. +# +# Usage: +# elixir --sname loadtest --cookie SECRET \ +# benchmarking/channels/load_test.exs [options] +# +# Run with --help for full usage information. + +Mix.install([:finch, :jason]) + +base = Path.dirname(__ENV__.file) + +for file <- ~w(config metrics setup runner report main) do + Code.require_file("lib/load_test/#{file}.exs", base) +end + +Code.require_file("lib/telemetry_collector.exs", base) + +LoadTest.main(System.argv()) diff --git a/benchmarking/channels/mock_sink.exs b/benchmarking/channels/mock_sink.exs new file mode 100644 index 00000000000..30012cfa09e --- /dev/null +++ b/benchmarking/channels/mock_sink.exs @@ -0,0 +1,344 @@ +# benchmarking/channels/mock_sink.exs +# +# A standalone HTTP sink server for testing the channel proxy. +# Accepts all requests on any path and responds according to the +# configured mode. Useful for integration tests, load tests, and +# manual exploration of the proxy pipeline. +# +# Usage: +# elixir benchmarking/channels/mock_sink.exs [options] +# +# Examples: +# elixir benchmarking/channels/mock_sink.exs +# elixir benchmarking/channels/mock_sink.exs --port 9000 --status 201 +# elixir benchmarking/channels/mock_sink.exs --mode auth +# elixir benchmarking/channels/mock_sink.exs --mode mixed + +Mix.install([:bandit, :plug, :jason]) + +defmodule MockSink.Config do + @moduledoc """ + Parses CLI arguments into a configuration map and prints help text. + """ + + @defaults %{ + port: 4001, + mode: "fixed", + status: 200, + body_size: 256 + } + + @help """ + Usage: elixir benchmarking/channels/mock_sink.exs [options] + + A configurable HTTP sink server for testing the channel proxy. + + Options: + --port PORT Listen port (default: 4001) + --mode MODE Response mode (default: fixed) + Modes: fixed, timeout, auth, mixed + --status CODE Response status code for fixed mode (default: 200) + --body-size BYTES Response body size in bytes (default: 256) + --help Show this help + + Query parameters (per-request overrides): + ?response_size=N Override --body-size for this request (bytes) + ?delay=N Add a response delay for this request (ms) + ?status=N Override --status for this request (100-599) + + Modes: + fixed Returns --status with --body-size body. + timeout Accepts the connection but never responds. + auth Checks Authorization header. 401 if missing, 403 if invalid, + 200 if valid. Expected: "Bearer test-api-key". + mixed 80% fast 200, 10% slow 200 (2s delay), 10% 503. + """ + + def parse(args) do + case parse_args(args, @defaults) do + :help -> + IO.puts(@help) + System.halt(0) + + {:error, message} -> + IO.puts(:stderr, "error: #{message}\n") + IO.puts(:stderr, @help) + System.halt(1) + + config -> + validate!(config) + end + end + + defp parse_args([], acc), do: acc + + defp parse_args(["--help" | _rest], _acc), do: :help + + defp parse_args(["--port", value | rest], acc) do + case Integer.parse(value) do + {port, ""} when port > 0 -> parse_args(rest, %{acc | port: port}) + _ -> {:error, "invalid port: #{value}"} + end + end + + defp parse_args(["--mode", value | rest], acc) do + if value in ~w(fixed timeout auth mixed) do + parse_args(rest, %{acc | mode: value}) + else + {:error, "unknown mode: #{value}. Expected: fixed, timeout, auth, mixed"} + end + end + + defp parse_args(["--status", value | rest], acc) do + case Integer.parse(value) do + {code, ""} when code >= 100 and code < 600 -> + parse_args(rest, %{acc | status: code}) + + _ -> + {:error, "invalid status code: #{value}"} + end + end + + defp parse_args(["--body-size", value | rest], acc) do + case Integer.parse(value) do + {bytes, ""} when bytes >= 0 -> parse_args(rest, %{acc | body_size: bytes}) + _ -> {:error, "invalid body-size: #{value}"} + end + end + + defp parse_args([unknown | _rest], _acc) do + {:error, "unknown option: #{unknown}"} + end + + defp validate!(config) when is_map(config), do: config +end + +defmodule MockSink.Logger do + @moduledoc """ + Simple request logging to stdout. + """ + + def log_request(method, path, status, elapsed_ms) do + timestamp = Calendar.strftime(DateTime.utc_now(), "%Y-%m-%d %H:%M:%S.%f") + + IO.puts( + "[#{timestamp}] #{String.upcase(method)} #{path} -> #{status} (#{elapsed_ms}ms)" + ) + end +end + +defmodule MockSink.Body do + @moduledoc """ + Generates response bodies of the configured size. + Small payloads (<= 1024 bytes) get a JSON envelope; larger ones are + padded with a repeated character to hit the exact byte count. + """ + + def generate(body_size) when body_size <= 1024 do + json = + Jason.encode!(%{ + ok: true, + server: "mock_sink", + timestamp: DateTime.to_iso8601(DateTime.utc_now()), + padding: String.duplicate("x", max(body_size - 80, 0)) + }) + + # Trim or pad to reach the target size exactly. + byte_size = byte_size(json) + + cond do + byte_size == body_size -> json + byte_size > body_size -> binary_part(json, 0, body_size) + true -> json <> String.duplicate(" ", body_size - byte_size) + end + end + + def generate(body_size) do + String.duplicate("x", body_size) + end +end + +defmodule MockSink.Router do + @moduledoc """ + Plug router that handles all incoming requests according to the + configured mode. Config is injected via `conn.private[:mock_config]`. + """ + + use Plug.Router + + plug :put_config + plug :match + + plug Plug.Parsers, + parsers: [:urlencoded, :json], + json_decoder: Jason, + pass: ["*/*"] + + plug :dispatch + + # ------------------------------------------------------------------ + # Plug: inject config into conn.private so downstream handlers can + # read it without global state. + # ------------------------------------------------------------------ + def put_config(conn, _opts) do + Plug.Conn.put_private(conn, :mock_config, conn.assigns[:mock_config]) + end + + @impl Plug.Router + def init(config) do + config + end + + @impl Plug.Router + def call(conn, config) do + conn + |> Plug.Conn.assign(:mock_config, config) + |> super(config) + end + + # ------------------------------------------------------------------ + # Catch-all route + # ------------------------------------------------------------------ + match _ do + config = conn.private[:mock_config] + config = apply_query_overrides(conn, config) + start = System.monotonic_time(:millisecond) + + {status, body, content_type} = handle_mode(conn, config) + + elapsed = System.monotonic_time(:millisecond) - start + + MockSink.Logger.log_request( + conn.method, + conn.request_path, + status, + elapsed + ) + + conn + |> Plug.Conn.put_resp_content_type(content_type) + |> Plug.Conn.send_resp(status, body) + end + + # ------------------------------------------------------------------ + # Query param overrides + # ------------------------------------------------------------------ + defp apply_query_overrides(conn, config) do + conn = Plug.Conn.fetch_query_params(conn) + + config + |> override_param(conn, "response_size", :body_size) + |> override_param(conn, "delay", :delay) + |> override_param(conn, "status", :status) + end + + defp override_param(config, conn, param, key) do + case conn.query_params[param] do + nil -> + config + + value -> + case Integer.parse(value) do + {n, ""} when n >= 0 -> Map.put(config, key, n) + _ -> config + end + end + end + + # ------------------------------------------------------------------ + # Mode handlers + # ------------------------------------------------------------------ + defp handle_mode(_conn, %{mode: "fixed"} = config) do + delay = Map.get(config, :delay, 0) + if delay > 0, do: Process.sleep(delay) + body = MockSink.Body.generate(config.body_size) + {config.status, body, content_type(config.body_size)} + end + + defp handle_mode(_conn, %{mode: "timeout"}) do + # Accept the connection but never respond. + # The client will eventually time out. + Process.sleep(:infinity) + # Unreachable, but keeps the typespec happy. + {200, "", "text/plain"} + end + + defp handle_mode(conn, %{mode: "auth"} = config) do + case Plug.Conn.get_req_header(conn, "authorization") do + [] -> + body = Jason.encode!(%{error: "missing authorization header"}) + {401, body, "application/json"} + + ["Bearer test-api-key"] -> + body = MockSink.Body.generate(config.body_size) + {200, body, content_type(config.body_size)} + + [_invalid] -> + body = Jason.encode!(%{error: "invalid credentials"}) + {403, body, "application/json"} + + _ -> + body = Jason.encode!(%{error: "invalid authorization header"}) + {400, body, "application/json"} + end + end + + defp handle_mode(_conn, %{mode: "mixed"} = config) do + roll = :rand.uniform() + + cond do + # 10% — 503 Service Unavailable + roll > 0.9 -> + body = Jason.encode!(%{error: "service unavailable"}) + {503, body, "application/json"} + + # 10% — slow 200 (2 second delay) + roll > 0.8 -> + Process.sleep(2000) + body = MockSink.Body.generate(config.body_size) + {200, body, content_type(config.body_size)} + + # 80% — fast 200 + true -> + body = MockSink.Body.generate(config.body_size) + {200, body, content_type(config.body_size)} + end + end + + defp content_type(body_size) when body_size <= 1024, do: "application/json" + defp content_type(_body_size), do: "application/octet-stream" +end + +defmodule MockSink do + @moduledoc """ + Entry point. Parses CLI args, prints a banner, and starts Bandit. + """ + + def main(args) do + config = MockSink.Config.parse(args) + + IO.puts(""" + + =================================================== + MockSink starting on port #{config.port} + Mode: #{config.mode} + Status: #{config.status} + Body size: #{config.body_size} bytes + =================================================== + """) + + {:ok, _} = + Bandit.start_link( + plug: {MockSink.Router, config}, + port: config.port, + thousand_island_options: [ + num_acceptors: 10 + ] + ) + + # Block the script forever so the server stays up. + Process.sleep(:infinity) + end +end + +MockSink.main(System.argv()) diff --git a/benchmarking/channels/results/.gitignore b/benchmarking/channels/results/.gitignore new file mode 100644 index 00000000000..d6b7ef32c84 --- /dev/null +++ b/benchmarking/channels/results/.gitignore @@ -0,0 +1,2 @@ +* +!.gitignore diff --git a/benchmarking/channels/run_all.sh b/benchmarking/channels/run_all.sh new file mode 100755 index 00000000000..bc2bf0ac833 --- /dev/null +++ b/benchmarking/channels/run_all.sh @@ -0,0 +1,177 @@ +#!/usr/bin/env bash +# benchmarking/channels/run_all.sh +# +# Runs all channel proxy load test scenarios in sequence, logging output +# to a timestamped file. Assumes Lightning and mock sink are already running. +# Bails on first failure. +# +# Usage: +# benchmarking/channels/run_all.sh [options] +# +# Options: +# --sname NAME Erlang short name (default: lt) +# --cookie COOKIE Erlang cookie (default: bench) +# --duration SECS Per-scenario duration (default: 30) +# --concurrency N Virtual users (default: 20) +# +# Environment: +# RESULTS_DIR Output directory (default: /tmp/channel-bench-results) +# +# Examples: +# benchmarking/channels/run_all.sh +# benchmarking/channels/run_all.sh --duration 60 --concurrency 50 +# benchmarking/channels/run_all.sh --sname mynode --cookie mysecret + +set -euo pipefail + +# ── Defaults ────────────────────────────────────────────────────── +SNAME="lt" +COOKIE="bench" +DURATION=30 +CONCURRENCY=20 + +# ── Parse args ──────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --sname) SNAME="$2"; shift 2 ;; + --cookie) COOKIE="$2"; shift 2 ;; + --duration) DURATION="$2"; shift 2 ;; + --concurrency) CONCURRENCY="$2"; shift 2 ;; + --help|-h) + sed -n '2,/^$/s/^# \?//p' "$0" + exit 0 + ;; + *) + echo "error: unknown option: $1" >&2 + echo "Run with --help for usage." >&2 + exit 1 + ;; + esac +done + +# ── Log setup ───────────────────────────────────────────────────── +RESULTS_DIR="${RESULTS_DIR:-/tmp/channel-bench-results}" +mkdir -p "$RESULTS_DIR" + +TIMESTAMP="$(date +%Y.%m.%d-%H.%M)" +LOG="$RESULTS_DIR/$TIMESTAMP.log" +CSV="$RESULTS_DIR/$TIMESTAMP.csv" + +# ── Preflight checks ───────────────────────────────────────────── +echo "=== Channel Proxy Load Test Suite ===" +echo "" +echo " sname: $SNAME" +echo " cookie: $COOKIE" +echo " duration: ${DURATION}s per scenario" +echo " concurrency: $CONCURRENCY VUs" +echo " log: $LOG" +echo " csv: $CSV" +echo "" + +echo -n "Checking mock sink at http://localhost:4001... " +if curl -sf http://localhost:4001/ > /dev/null 2>&1; then + echo "ok" +else + echo "FAILED" + echo "error: Mock sink is not reachable at http://localhost:4001" >&2 + echo "Start it first: elixir benchmarking/channels/mock_sink.exs" >&2 + exit 1 +fi + +echo -n "Checking Lightning at http://localhost:4000... " +if curl -sf http://localhost:4000/ > /dev/null 2>&1; then + echo "ok" +else + echo "FAILED" + echo "error: Lightning is not reachable at http://localhost:4000" >&2 + echo "Start it first: iex --sname lightning --cookie $COOKIE -S mix phx.server" >&2 + exit 1 +fi + +echo "" + +# ── Scenario runner ─────────────────────────────────────────────── +SCRIPT="benchmarking/channels/load_test.exs" +PASS=0 +FAIL=0 + +run_scenario() { + local name="$1" + shift + local extra_flags=("$@") + + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" + echo " Scenario: $name" | tee -a "$LOG" + echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" + + local cmd=( + elixir --sname "${SNAME}-${name}" --cookie "$COOKIE" + "$SCRIPT" + --scenario "$name" + --concurrency "$CONCURRENCY" + --duration "$DURATION" + --csv "$CSV" + "${extra_flags[@]}" + ) + + echo " ${cmd[*]}" | tee -a "$LOG" + echo "" | tee -a "$LOG" + + if "${cmd[@]}" 2>&1 | tee -a "$LOG"; then + PASS=$((PASS + 1)) + echo "" | tee -a "$LOG" + else + FAIL=$((FAIL + 1)) + echo "" | tee -a "$LOG" + echo "FATAL: scenario '$name' failed (exit $?). Stopping." | tee -a "$LOG" + echo "" + echo "Results so far: $PASS passed, $FAIL failed" + echo "Log: $LOG" + echo "CSV: $CSV" + exit 1 + fi +} + +# ── Run scenarios ───────────────────────────────────────────────── + +# 1. Baseline — hit mock sink directly (no Lightning, no --sname) +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" +echo " Scenario: direct_sink" | tee -a "$LOG" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" + +DIRECT_CMD=( + elixir "$SCRIPT" + --scenario direct_sink + --concurrency "$CONCURRENCY" + --duration "$DURATION" + --csv "$CSV" +) +echo " ${DIRECT_CMD[*]}" | tee -a "$LOG" +echo "" | tee -a "$LOG" + +if "${DIRECT_CMD[@]}" 2>&1 | tee -a "$LOG"; then + PASS=$((PASS + 1)) + echo "" | tee -a "$LOG" +else + FAIL=$((FAIL + 1)) + echo "" | tee -a "$LOG" + echo "FATAL: scenario 'direct_sink' failed. Stopping." | tee -a "$LOG" + echo "Log: $LOG" + exit 1 +fi + +# 2-7. Scenarios that go through Lightning +run_scenario happy_path +run_scenario ramp_up +run_scenario large_payload --payload-size 1048576 +run_scenario large_response --response-size 1048576 +run_scenario mixed_methods +run_scenario slow_sink --delay 2000 + +# ── Summary ─────────────────────────────────────────────────────── +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" +echo " All scenarios complete: $PASS passed, $FAIL failed" | tee -a "$LOG" +echo "━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━" | tee -a "$LOG" +echo "" +echo "Log: $LOG" +echo "CSV: $CSV" diff --git a/config/config.exs b/config/config.exs index bf2a1de80c3..21adce41c40 100644 --- a/config/config.exs +++ b/config/config.exs @@ -179,6 +179,8 @@ config :lightning, LightningWeb, allow_credential_transfer: false config :tesla, adapter: {Tesla.Adapter.Finch, name: Lightning.Finch} +config :weir, finch_name: Lightning.Finch + config :lightning, :is_resettable_demo, false config :lightning, :default_retention_period, nil config :lightning, :claim_work_mem, nil diff --git a/lib/lightning/application.ex b/lib/lightning/application.ex index b8eb925ab1b..2ab64a218a0 100644 --- a/lib/lightning/application.ex +++ b/lib/lightning/application.ex @@ -28,6 +28,17 @@ defmodule Lightning.Application do }) end + # :logger.add_handler(:file_log, :logger_std_h, %{ + # level: :warning, + # config: %{ + # file: ~c"log/lightning.log", + # max_no_bytes: 10_000_000, + # max_no_files: 5, + # compress_on_rotate: true + # }, + # formatter: Logger.Formatter.new() + # }) + adaptor_registry_childspec = {Lightning.AdaptorRegistry, Application.get_env(:lightning, Lightning.AdaptorRegistry, [])} diff --git a/lib/lightning/channels.ex b/lib/lightning/channels.ex new file mode 100644 index 00000000000..4b3cb4d1812 --- /dev/null +++ b/lib/lightning/channels.ex @@ -0,0 +1,325 @@ +defmodule Lightning.Channels do + @moduledoc """ + Context for managing Channels — HTTP proxy configurations that forward + requests from a source to a sink. + """ + + import Ecto.Query + + alias Ecto.Multi + alias Lightning.Accounts.User + alias Lightning.Channels.Audit + alias Lightning.Channels.Channel + alias Lightning.Channels.ChannelAuthMethod + alias Lightning.Channels.ChannelRequest + alias Lightning.Channels.ChannelSnapshot + alias Lightning.Repo + + @doc """ + Returns all channels for a project, ordered by name. + """ + def list_channels_for_project(project_id) do + from(c in Channel, + where: c.project_id == ^project_id, + order_by: [asc: :name] + ) + |> Repo.all() + end + + @doc """ + Returns channels for a project with aggregate stats from channel_requests. + + Each entry is a map with keys: + - all Channel fields (via struct) + - `:request_count` — total number of requests + - `:last_activity` — datetime of most recent request, or nil + """ + def list_channels_for_project_with_stats(project_id) do + from(c in Channel, + where: c.project_id == ^project_id, + left_join: cr in ChannelRequest, + on: cr.channel_id == c.id, + group_by: c.id, + order_by: [asc: c.name], + select: %{ + channel: c, + request_count: count(cr.id), + last_activity: max(cr.started_at) + } + ) + |> Repo.all() + end + + @doc """ + Returns aggregate stats for all channels in a project. + + Returns a map with: + - `:total_channels` — number of channels in the project + - `:total_requests` — total channel requests across all channels + + Uses a single query with a LEFT JOIN so both counts are fetched in one + database round-trip. + """ + def get_channel_stats_for_project(project_id) do + from(c in Channel, + where: c.project_id == ^project_id, + left_join: cr in ChannelRequest, + on: cr.channel_id == c.id, + select: %{ + total_channels: count(c.id, :distinct), + total_requests: count(cr.id) + } + ) + |> Repo.one() + end + + @doc """ + Gets a single channel by ID. Returns nil if not found. + """ + def get_channel(id) do + Repo.get(Channel, id) + end + + @doc """ + Gets a channel by ID with all auth methods preloaded. + + Preloads source auth methods (with webhook_auth_method) and sink auth + methods (with project_credential → credential). Returns nil if not found. + + Used by ChannelProxyPlug for source authentication and sink credential resolution. + """ + def get_channel_with_auth(id) do + from(c in Channel, + where: c.id == ^id, + left_join: src in assoc(c, :source_auth_methods), + left_join: wam in assoc(src, :webhook_auth_method), + left_join: snk in assoc(c, :sink_auth_methods), + left_join: pc in assoc(snk, :project_credential), + left_join: cred in assoc(pc, :credential), + preload: [ + source_auth_methods: {src, webhook_auth_method: wam}, + sink_auth_methods: {snk, project_credential: {pc, credential: cred}} + ] + ) + |> Repo.one() + end + + @doc """ + Gets a single channel. Raises if not found. + """ + def get_channel!(id, opts \\ []) do + preloads = Keyword.get(opts, :include, []) + Repo.get!(Channel, id) |> Repo.preload(preloads) + end + + @doc """ + Gets a channel by ID scoped to a project. Returns `nil` if the channel + does not exist or belongs to a different project. + """ + def get_channel_for_project(project_id, channel_id) do + Repo.get_by(Channel, id: channel_id, project_id: project_id) + end + + @doc """ + Creates a channel. + """ + @spec create_channel(map(), actor: User.t()) :: + {:ok, Channel.t()} | {:error, Ecto.Changeset.t()} + def create_channel(attrs, actor: %User{} = actor) do + changeset = Channel.changeset(%Channel{}, attrs) + + Multi.new() + |> Multi.insert(:channel, changeset) + |> Multi.insert(:audit, fn %{channel: channel} -> + Audit.event("created", channel.id, actor, changeset) + end) + |> maybe_audit_auth_method_changes(changeset, actor) + |> Repo.transaction() + |> case do + {:ok, %{channel: channel}} -> {:ok, channel} + {:error, :channel, changeset, _} -> {:error, changeset} + end + end + + @doc """ + Updates a channel's config fields, bumping lock_version. + """ + @spec update_channel(Channel.t(), map(), actor: User.t()) :: + {:ok, Channel.t()} | {:error, Ecto.Changeset.t()} + def update_channel(%Channel{} = channel, attrs, actor: %User{} = actor) do + changeset = Channel.changeset(channel, attrs) + + Multi.new() + |> Multi.update(:channel, changeset, stale_error_field: :lock_version) + |> Multi.insert(:audit, fn %{channel: updated} -> + Audit.event("updated", updated.id, actor, changeset) + end) + |> maybe_audit_auth_method_changes(changeset, actor) + |> Repo.transaction() + |> case do + {:ok, %{channel: channel}} -> {:ok, channel} + {:error, :channel, changeset, _} -> {:error, changeset} + end + end + + @doc """ + Deletes a channel. + + Returns `{:error, changeset}` if the channel has snapshots + (due to `:restrict` FK on `channel_snapshots`). + """ + @spec delete_channel(Channel.t(), actor: User.t()) :: + {:ok, Channel.t()} | {:error, Ecto.Changeset.t()} + def delete_channel(%Channel{} = channel, actor: %User{} = actor) do + changeset = + channel + |> Ecto.Changeset.change() + |> Ecto.Changeset.foreign_key_constraint(:channel_snapshots, + name: "channel_snapshots_channel_id_fkey", + message: "has history that must be retained" + ) + + Multi.new() + |> Multi.insert(:audit, Audit.event("deleted", channel.id, actor, %{})) + |> Multi.delete(:channel, changeset) + |> Repo.transaction() + |> case do + {:ok, %{channel: channel}} -> {:ok, channel} + {:error, :channel, changeset, _} -> {:error, changeset} + end + end + + # Emits one "auth_method_added" or "auth_method_removed" audit step per + # association change. No-op when the changeset has no auth method changes + # (e.g. the toggle handler, which passes no "channel_auth_methods" key). + defp maybe_audit_auth_method_changes(multi, changeset, actor) do + auth_changes = + Ecto.Changeset.get_change(changeset, :channel_auth_methods, []) + + inserted = Enum.filter(auth_changes, &(&1.action == :insert)) + deleted = Enum.filter(auth_changes, &(&1.action == :delete)) + + multi + |> add_auth_method_added_audits(inserted, actor) + |> add_auth_method_removed_audits(deleted, actor) + end + + defp add_auth_method_added_audits(multi, inserted, actor) do + inserted + |> Enum.with_index() + |> Enum.reduce(multi, fn {cs, idx}, acc -> + role = Ecto.Changeset.get_field(cs, :role) + fields = auth_method_fields_for(cs, role) + + Multi.insert( + acc, + :"audit_auth_method_added_#{idx}", + fn %{channel: channel} -> + Audit.event("auth_method_added", channel.id, actor, %{ + before: nil, + after: fields + }) + end + ) + end) + end + + defp add_auth_method_removed_audits(multi, deleted, actor) do + deleted + |> Enum.with_index() + |> Enum.reduce(multi, fn {cs, idx}, acc -> + role = cs.data.role + fields = auth_method_fields_for(cs, role) + + Multi.insert( + acc, + :"audit_auth_method_removed_#{idx}", + fn %{channel: channel} -> + Audit.event("auth_method_removed", channel.id, actor, %{ + before: fields, + after: nil + }) + end + ) + end) + end + + defp auth_method_fields_for(cs, :source) do + %{ + role: "source", + webhook_auth_method_id: + Ecto.Changeset.get_field(cs, :webhook_auth_method_id) + } + end + + defp auth_method_fields_for(cs, :sink) do + %{ + role: "sink", + project_credential_id: Ecto.Changeset.get_field(cs, :project_credential_id) + } + end + + @doc """ + Returns all ChannelAuthMethod records for a channel, preloading + their associated webhook_auth_method and project_credential (with credential). + """ + def list_channel_auth_methods(%Channel{} = channel) do + from(cam in ChannelAuthMethod, + where: cam.channel_id == ^channel.id, + preload: [:webhook_auth_method, project_credential: :credential] + ) + |> Repo.all() + end + + @doc """ + Get or create a snapshot for the channel's current lock_version. + + Returns an existing snapshot if one matches, or creates a minimal one from + the current channel config. Handles concurrent creation race via + ON CONFLICT DO NOTHING + re-fetch. + + Full snapshot lifecycle management is in #4406. + """ + def get_or_create_current_snapshot(%Channel{} = channel) do + case Repo.get_by(ChannelSnapshot, + channel_id: channel.id, + lock_version: channel.lock_version + ) do + %ChannelSnapshot{} = snapshot -> + {:ok, snapshot} + + nil -> + attrs = %{ + channel_id: channel.id, + lock_version: channel.lock_version, + name: channel.name, + sink_url: channel.sink_url, + enabled: channel.enabled + } + + %ChannelSnapshot{} + |> ChannelSnapshot.changeset(attrs) + |> Repo.insert( + on_conflict: :nothing, + conflict_target: [:channel_id, :lock_version] + ) + |> case do + {:ok, %ChannelSnapshot{id: nil}} -> + # ON CONFLICT DO NOTHING returns struct with nil id; re-fetch + snapshot = + Repo.get_by!(ChannelSnapshot, + channel_id: channel.id, + lock_version: channel.lock_version + ) + + {:ok, snapshot} + + {:ok, snapshot} -> + {:ok, snapshot} + + {:error, changeset} -> + {:error, changeset} + end + end + end +end diff --git a/lib/lightning/channels/audit.ex b/lib/lightning/channels/audit.ex new file mode 100644 index 00000000000..187463ea865 --- /dev/null +++ b/lib/lightning/channels/audit.ex @@ -0,0 +1,15 @@ +defmodule Lightning.Channels.Audit do + @moduledoc """ + Audit trail for channel CRUD operations. + """ + use Lightning.Auditing.Audit, + repo: Lightning.Repo, + item: "channel", + events: [ + "created", + "updated", + "deleted", + "auth_method_added", + "auth_method_removed" + ] +end diff --git a/lib/lightning/channels/channel.ex b/lib/lightning/channels/channel.ex new file mode 100644 index 00000000000..8ae47340eeb --- /dev/null +++ b/lib/lightning/channels/channel.ex @@ -0,0 +1,62 @@ +defmodule Lightning.Channels.Channel do + @moduledoc """ + Schema for a Channel — an HTTP proxy configuration that forwards + requests from a source endpoint to a sink URL. + """ + use Lightning.Schema + + alias Lightning.Projects.Project + alias Lightning.Validators + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + project_id: Ecto.UUID.t(), + name: String.t(), + sink_url: String.t(), + enabled: boolean(), + lock_version: integer(), + inserted_at: DateTime.t(), + updated_at: DateTime.t() + } + + schema "channels" do + field :name, :string + field :sink_url, :string + field :enabled, :boolean, default: true + field :lock_version, :integer, default: 0 + + belongs_to :project, Project + + has_many :channel_auth_methods, Lightning.Channels.ChannelAuthMethod + + has_many :source_auth_methods, Lightning.Channels.ChannelAuthMethod, + where: [role: :source] + + has_many :sink_auth_methods, Lightning.Channels.ChannelAuthMethod, + where: [role: :sink] + + has_many :channel_snapshots, Lightning.Channels.ChannelSnapshot + has_many :channel_requests, Lightning.Channels.ChannelRequest + + timestamps() + end + + def changeset(channel, attrs) do + channel + |> cast(attrs, [ + :name, + :sink_url, + :project_id, + :enabled + ]) + |> validate_required([:name, :sink_url, :project_id]) + |> Validators.validate_url(:sink_url) + |> assoc_constraint(:project) + |> unique_constraint([:project_id, :name], + error_key: :name, + message: "A channel with this name already exists in this project" + ) + |> optimistic_lock(:lock_version) + |> cast_assoc(:channel_auth_methods) + end +end diff --git a/lib/lightning/channels/channel_auth_method.ex b/lib/lightning/channels/channel_auth_method.ex new file mode 100644 index 00000000000..649805ecae9 --- /dev/null +++ b/lib/lightning/channels/channel_auth_method.ex @@ -0,0 +1,93 @@ +defmodule Lightning.Channels.ChannelAuthMethod do + @moduledoc """ + Join table connecting channels to auth method implementations. + + Each record has a `role` (:source or :sink) and points to exactly one + of `webhook_auth_method` (for source/inbound auth) or + `project_credential` (for sink/outbound auth). + """ + use Lightning.Schema + + alias Lightning.Channels.Channel + alias Lightning.Projects.ProjectCredential + alias Lightning.Validators + alias Lightning.Workflows.WebhookAuthMethod + + @roles [:source, :sink] + + schema "channel_auth_methods" do + field :role, Ecto.Enum, values: @roles + field :delete, :boolean, virtual: true + + belongs_to :channel, Channel + belongs_to :webhook_auth_method, WebhookAuthMethod + belongs_to :project_credential, ProjectCredential + + timestamps() + end + + def changeset(struct, attrs) do + struct + |> cast(attrs, [ + :role, + :webhook_auth_method_id, + :project_credential_id, + :delete + ]) + |> validate_required([:role]) + |> Validators.validate_exclusive( + [:webhook_auth_method_id, :project_credential_id], + "webhook_auth_method_id and project_credential_id are mutually exclusive" + ) + |> Validators.validate_one_required( + [:webhook_auth_method_id, :project_credential_id], + "must reference either a webhook auth method or a project credential" + ) + |> validate_role_target_consistency() + |> assoc_constraint(:channel) + |> foreign_key_constraint(:webhook_auth_method_id) + |> foreign_key_constraint(:project_credential_id) + |> unique_constraint(:webhook_auth_method_id, + name: :channel_auth_methods_wam_unique + ) + |> unique_constraint(:project_credential_id, + name: :channel_auth_methods_pc_unique + ) + |> then(fn changeset -> + if get_change(changeset, :delete) do + %{changeset | action: :delete} + else + changeset + end + end) + end + + defp validate_role_target_consistency(changeset) do + case get_field(changeset, :role) do + :source -> + if get_field(changeset, :project_credential_id) do + add_error( + changeset, + :project_credential_id, + "source auth must use a webhook auth method, not a project credential" + ) + else + changeset + end + + :sink -> + if get_field(changeset, :webhook_auth_method_id) do + add_error( + changeset, + :webhook_auth_method_id, + "sink auth must use a project credential, not a webhook auth method" + ) + else + changeset + end + + _ -> + changeset + end + end +end diff --git a/lib/lightning/channels/channel_event.ex b/lib/lightning/channels/channel_event.ex new file mode 100644 index 00000000000..8944b3d200b --- /dev/null +++ b/lib/lightning/channels/channel_event.ex @@ -0,0 +1,74 @@ +defmodule Lightning.Channels.ChannelEvent do + @moduledoc """ + Schema for a ChannelEvent — a detailed log entry recording HTTP + request/response data for a channel request. + """ + use Lightning.Schema + + alias Lightning.Channels.ChannelRequest + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + channel_request_id: Ecto.UUID.t(), + type: :source_received | :sink_request | :sink_response | :error, + request_method: String.t() | nil, + request_path: String.t() | nil, + request_headers: String.t() | nil, + request_body_preview: String.t() | nil, + request_body_hash: String.t() | nil, + response_status: integer() | nil, + response_headers: String.t() | nil, + response_body_preview: String.t() | nil, + response_body_hash: String.t() | nil, + latency_ms: integer() | nil, + ttfb_ms: integer() | nil, + error_message: String.t() | nil, + inserted_at: DateTime.t() + } + + schema "channel_events" do + field :type, Ecto.Enum, + values: [:source_received, :sink_request, :sink_response, :error] + + field :request_method, :string + field :request_path, :string + field :request_headers, :string + field :request_body_preview, :string + field :request_body_hash, :string + + field :response_status, :integer + field :response_headers, :string + field :response_body_preview, :string + field :response_body_hash, :string + + field :latency_ms, :integer + field :ttfb_ms, :integer + field :error_message, :string + + belongs_to :channel_request, ChannelRequest + + timestamps(updated_at: false) + end + + def changeset(event, attrs) do + event + |> cast(attrs, [ + :channel_request_id, + :type, + :request_method, + :request_path, + :request_headers, + :request_body_preview, + :request_body_hash, + :response_status, + :response_headers, + :response_body_preview, + :response_body_hash, + :latency_ms, + :ttfb_ms, + :error_message + ]) + |> validate_required([:channel_request_id, :type]) + |> assoc_constraint(:channel_request) + end +end diff --git a/lib/lightning/channels/channel_request.ex b/lib/lightning/channels/channel_request.ex new file mode 100644 index 00000000000..97cf9989715 --- /dev/null +++ b/lib/lightning/channels/channel_request.ex @@ -0,0 +1,61 @@ +defmodule Lightning.Channels.ChannelRequest do + @moduledoc """ + Schema for a ChannelRequest — tracks the lifecycle of a single proxied + HTTP request through a channel. + """ + use Lightning.Schema + + alias Lightning.Channels.Channel + alias Lightning.Channels.ChannelEvent + alias Lightning.Channels.ChannelSnapshot + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + channel_id: Ecto.UUID.t(), + channel_snapshot_id: Ecto.UUID.t(), + request_id: String.t(), + client_identity: String.t() | nil, + state: :pending | :success | :failed | :timeout | :error, + started_at: DateTime.t(), + completed_at: DateTime.t() | nil + } + + schema "channel_requests" do + field :request_id, :string + field :client_identity, :string + + field :state, Ecto.Enum, + values: [:pending, :success, :failed, :timeout, :error] + + field :started_at, :utc_datetime_usec + field :completed_at, :utc_datetime_usec + + belongs_to :channel, Channel + belongs_to :channel_snapshot, ChannelSnapshot + + has_many :channel_events, ChannelEvent + end + + def changeset(request, attrs) do + request + |> cast(attrs, [ + :channel_id, + :channel_snapshot_id, + :request_id, + :client_identity, + :state, + :started_at, + :completed_at + ]) + |> validate_required([ + :channel_id, + :channel_snapshot_id, + :request_id, + :state, + :started_at + ]) + |> assoc_constraint(:channel) + |> assoc_constraint(:channel_snapshot) + |> unique_constraint(:request_id) + end +end diff --git a/lib/lightning/channels/channel_snapshot.ex b/lib/lightning/channels/channel_snapshot.ex new file mode 100644 index 00000000000..a0604a1d787 --- /dev/null +++ b/lib/lightning/channels/channel_snapshot.ex @@ -0,0 +1,52 @@ +defmodule Lightning.Channels.ChannelSnapshot do + @moduledoc """ + Schema for a ChannelSnapshot — an immutable point-in-time copy of a + channel's configuration, created when a request is proxied. + """ + use Lightning.Schema + + alias Lightning.Channels.Channel + + @type t :: %__MODULE__{ + id: Ecto.UUID.t(), + channel_id: Ecto.UUID.t(), + lock_version: integer(), + name: String.t(), + sink_url: String.t(), + enabled: boolean(), + inserted_at: DateTime.t() + } + + schema "channel_snapshots" do + field :lock_version, :integer + field :name, :string + field :sink_url, :string + field :enabled, :boolean + + belongs_to :channel, Channel + + has_many :channel_requests, Lightning.Channels.ChannelRequest + + timestamps(updated_at: false) + end + + def changeset(snapshot, attrs) do + snapshot + |> cast(attrs, [ + :channel_id, + :lock_version, + :name, + :sink_url, + :enabled + ]) + |> validate_required([ + :channel_id, + :lock_version, + :name, + :sink_url, + :enabled + ]) + |> assoc_constraint(:channel) + |> unique_constraint([:channel_id, :lock_version]) + end +end diff --git a/lib/lightning/channels/handler.ex b/lib/lightning/channels/handler.ex new file mode 100644 index 00000000000..8dfe9e64f11 --- /dev/null +++ b/lib/lightning/channels/handler.ex @@ -0,0 +1,198 @@ +defmodule Lightning.Channels.Handler do + @moduledoc """ + Weir handler that persists every proxied Channel request. + + ## Lifecycle + + Weir invokes three callbacks during the proxy lifecycle: + + 1. `handle_request_started` — creates a `ChannelRequest` record + synchronously. If the insert fails, the request is rejected with 503. + + 2. `handle_response_started` — captures TTFB and response headers into + handler state. **May not be called** — see below. + + 3. `handle_response_finished` — creates a `ChannelEvent` and updates + the `ChannelRequest` state. + + ## Skipped `handle_response_started` + + `handle_response_started` fires when the first response bytes arrive from + the upstream (TTFB). If the upstream never sends a response, the callback + is skipped entirely and `handle_response_finished` receives handler state + from `handle_request_started` only — without `ttfb_us`, `response_status`, + or `response_headers`. + + This happens when: + + - DNS resolution fails (`:nxdomain`) + - The upstream refuses the connection (`:econnrefused`) + - The host or network is unreachable (`:ehostunreach`, `:enetunreach`) + - The connection times out before any response (`:connect_timeout`) + - The response times out before headers arrive (`:timeout`) + - TLS handshake fails + + All fields derived from `handle_response_started` are accessed via + `Map.get/2` with `nil` fallbacks, so this is safe. The `classify_error/1` + function translates known Weir error shapes into stable string identifiers + for persistence. + """ + + use Weir.Handler + + alias Lightning.Channels.ChannelEvent + alias Lightning.Channels.ChannelRequest + alias Lightning.Repo + + require Logger + + @redacted_headers ~w(authorization x-api-key) + + @known_transport_errors ~w( + nxdomain econnrefused ehostunreach enetunreach + closed econnreset econnaborted epipe + )a + + @impl true + def handle_request_started(metadata, state) do + attrs = %{ + channel_id: state.channel.id, + channel_snapshot_id: state.snapshot.id, + request_id: state.request_id, + client_identity: state.client_identity, + state: :pending, + started_at: state.started_at + } + + case %ChannelRequest{} |> ChannelRequest.changeset(attrs) |> Repo.insert() do + {:ok, channel_request} -> + {:ok, + Map.merge(state, %{ + channel_request: channel_request, + request_headers: redact_headers(metadata.headers), + request_method: metadata.method + })} + + {:error, _changeset} -> + Logger.warning("Failed to create ChannelRequest for #{state.request_id}") + + {:reject, 503, "Service Unavailable", state} + end + end + + @impl true + def handle_response_started(metadata, state) do + {:ok, + Map.merge(state, %{ + ttfb_us: metadata.time_to_first_byte_us, + response_status: metadata.status, + response_headers: redact_headers(metadata.headers) + })} + end + + @impl true + def handle_response_finished(result, state) do + persist_completion(result, state) + {:ok, state} + end + + defp persist_completion(result, state) do + request_state = derive_request_state(result) + event_type = derive_event_type(result) + + event_attrs = %{ + channel_request_id: state.channel_request.id, + type: event_type, + request_method: state.request_method, + request_path: state.request_path, + request_headers: encode_headers(state.request_headers), + request_body_preview: get_in(result, [:request_observation, :preview]), + request_body_hash: get_in(result, [:request_observation, :hash]), + response_status: result.status, + response_headers: encode_headers(Map.get(state, :response_headers)), + response_body_preview: get_in(result, [:response_observation, :preview]), + response_body_hash: get_in(result, [:response_observation, :hash]), + latency_ms: div(result.duration_us, 1000), + ttfb_ms: state |> Map.get(:ttfb_us) |> maybe_div(1000), + error_message: if(result.error, do: classify_error(result.error)) + } + + request_update = %{ + state: request_state, + completed_at: DateTime.utc_now() + } + + with {:ok, _event} <- + %ChannelEvent{} + |> ChannelEvent.changeset(event_attrs) + |> Repo.insert(), + {:ok, _request} <- + state.channel_request + |> ChannelRequest.changeset(request_update) + |> Repo.update() do + :ok + else + {:error, changeset} -> + Logger.warning( + "Failed to persist channel observation for request " <> + "#{state.channel_request.request_id}: #{inspect(changeset.errors)}" + ) + + state.channel_request + |> ChannelRequest.changeset(%{ + state: :error, + completed_at: DateTime.utc_now() + }) + |> Repo.update() + end + end + + defp derive_request_state(result) do + cond do + match?({:timeout, _}, result.error) -> :timeout + result.error != nil -> :error + result.status in 200..299 -> :success + result.status in 400..599 -> :failed + true -> :error + end + end + + defp derive_event_type(result) do + if result.error != nil, do: :error, else: :sink_response + end + + defp redact_headers(headers) when is_list(headers) do + Enum.map(headers, fn {key, value} -> + if String.downcase(key) in @redacted_headers do + {key, "[REDACTED]"} + else + {key, value} + end + end) + end + + defp redact_headers(nil), do: nil + + defp encode_headers(nil), do: nil + + # Encodes as array-of-pairs rather than a map because HTTP allows + # duplicate header keys (e.g. multiple Set-Cookie headers). + defp encode_headers(headers) do + headers + |> Enum.map(fn {k, v} -> [k, v] end) + |> Jason.encode!() + end + + defp classify_error({:timeout, :connect_timeout}), do: "connect_timeout" + defp classify_error({:timeout, :timeout}), do: "response_timeout" + defp classify_error({:timeout, {:closed, :timeout}}), do: "timeout" + + defp classify_error(%{reason: reason}) + when reason in @known_transport_errors, + do: Atom.to_string(reason) + + defp classify_error(error), do: inspect(error) + + defp maybe_div(nil, _), do: nil + defp maybe_div(us, divisor), do: div(us, divisor) +end diff --git a/lib/lightning/channels/sink_auth.ex b/lib/lightning/channels/sink_auth.ex new file mode 100644 index 00000000000..4435dc8b676 --- /dev/null +++ b/lib/lightning/channels/sink_auth.ex @@ -0,0 +1,74 @@ +defmodule Lightning.Channels.SinkAuth do + @moduledoc """ + Maps a credential's schema type and body to an outbound HTTP Authorization header. + + ## Supported Schemas + + - `"http"` — Bearer token (`access_token`) or Basic auth (`username`+`password`) + - `"dhis2"` — DHIS2 ApiToken (`pat`) or Basic auth (`username`+`password`) + - `"oauth"` — Bearer token (`access_token`, with auto-refresh via `resolve_credential_body`) + + Schemas not in this list are rejected at config time. If an unsupported schema + somehow reaches runtime, `build_auth_header/2` returns an error. + """ + + @supported_schemas ~w(http dhis2 oauth) + + @doc """ + Returns the list of credential schema types that can be used for sink auth. + Used for config-time validation. + """ + @spec supported_schemas() :: [String.t()] + def supported_schemas, do: @supported_schemas + + @doc """ + Given a credential schema name and decrypted body map, returns the + Authorization header value to set on the outbound request. + + Returns: + - `{:ok, header_value}` — e.g., `"Bearer tok123"` or `"Basic dTpw"` + - `{:error, :no_auth_fields}` — credential body missing required auth fields + - `{:error, {:unsupported_schema, schema}}` — schema not in supported list + """ + @spec build_auth_header(String.t(), map()) :: + {:ok, String.t()} + | {:error, :no_auth_fields | {:unsupported_schema, String.t()}} + def build_auth_header("http", body) do + cond do + token = body["access_token"] -> + {:ok, "Bearer #{token}"} + + body["username"] && body["password"] -> + {:ok, + "Basic #{Base.encode64("#{body["username"]}:#{body["password"]}")}"} + + true -> + {:error, :no_auth_fields} + end + end + + def build_auth_header("dhis2", body) do + cond do + token = body["pat"] -> + {:ok, "ApiToken #{token}"} + + body["username"] && body["password"] -> + {:ok, + "Basic #{Base.encode64("#{body["username"]}:#{body["password"]}")}"} + + true -> + {:error, :no_auth_fields} + end + end + + def build_auth_header("oauth", body) do + case body["access_token"] do + nil -> {:error, :no_auth_fields} + token -> {:ok, "Bearer #{token}"} + end + end + + def build_auth_header(schema, _body) do + {:error, {:unsupported_schema, schema}} + end +end diff --git a/lib/lightning/config/bootstrap.ex b/lib/lightning/config/bootstrap.ex index 11df8cadbee..6d822a81775 100644 --- a/lib/lightning/config/bootstrap.ex +++ b/lib/lightning/config/bootstrap.ex @@ -53,6 +53,29 @@ defmodule Lightning.Config.Bootstrap do if config_env() == :dev do enabled = env!("LIVE_DEBUGGER", &Utils.ensure_boolean/1, true) config :live_debugger, :disabled?, not enabled + + live_debugger_ip = + env!( + "LIVE_DEBUGGER_IP", + fn address -> + address + |> String.split(".") + |> Enum.map(&String.to_integer/1) + |> List.to_tuple() + end, + nil + ) + + if live_debugger_ip do + config :live_debugger, :ip, live_debugger_ip + end + + live_debugger_external_url = + env!("LIVE_DEBUGGER_EXTERNAL_URL", :string, nil) + + if live_debugger_external_url do + config :live_debugger, :external_url, live_debugger_external_url + end end # Load storage and webhook retry config early so endpoint can respect it. diff --git a/lib/lightning/policies/project_users.ex b/lib/lightning/policies/project_users.ex index 507cc201203..647092700db 100644 --- a/lib/lightning/policies/project_users.ex +++ b/lib/lightning/policies/project_users.ex @@ -28,6 +28,9 @@ defmodule Lightning.Policies.ProjectUsers do | :initiate_github_sync | :create_collection | :publish_template + | :create_channel + | :delete_channel + | :update_channel @doc """ authorize/3 takes an action, a user, and a project. It checks the user's role @@ -110,7 +113,10 @@ defmodule Lightning.Policies.ProjectUsers do :delete_workflow, :run_workflow, :create_project_credential, - :initiate_github_sync + :initiate_github_sync, + :create_channel, + :delete_channel, + :update_channel ] def authorize( diff --git a/lib/lightning/projects/project.ex b/lib/lightning/projects/project.ex index 8691a120f5b..5f1b0e1364d 100644 --- a/lib/lightning/projects/project.ex +++ b/lib/lightning/projects/project.ex @@ -50,6 +50,7 @@ defmodule Lightning.Projects.Project do has_many :credentials, through: [:project_credentials, :credential] has_many :collections, Lightning.Collections.Collection + has_many :channels, Lightning.Channels.Channel timestamps() end diff --git a/lib/lightning/setup_utils.ex b/lib/lightning/setup_utils.ex index 86d57d498a8..dea4ba789a2 100644 --- a/lib/lightning/setup_utils.ex +++ b/lib/lightning/setup_utils.ex @@ -813,6 +813,9 @@ defmodule Lightning.SetupUtils do Lightning.Projects.File, Lightning.Projects.ProjectOauthClient, Lightning.Credentials.OauthClient, + Lightning.Channels.ChannelRequest, + Lightning.Channels.ChannelSnapshot, + Lightning.Channels.Channel, Lightning.Projects.Project, Lightning.Collaboration.DocumentState ]) diff --git a/lib/lightning_web/auth.ex b/lib/lightning_web/auth.ex new file mode 100644 index 00000000000..b55e7822e3b --- /dev/null +++ b/lib/lightning_web/auth.ex @@ -0,0 +1,75 @@ +defmodule LightningWeb.Auth do + @moduledoc """ + Shared HTTP request authentication functions. + + Validates inbound requests against `WebhookAuthMethod` records using + API key (`x-api-key` header) or Basic Auth (`authorization` header). + Used by both `WebhookAuth` plug (triggers) and `ChannelProxyPlug` + (channels). + + All comparisons use `Plug.Crypto.secure_compare/2` to prevent + timing attacks. + """ + import Plug.Conn, only: [get_req_header: 2] + + alias Lightning.Workflows.WebhookAuthMethod + + @doc """ + Returns true if the request's `x-api-key` header matches any + `:api`-type auth method in the list. + """ + def valid_key?(conn, methods) do + Enum.any?(methods, &key_matches?(conn, &1)) + end + + @doc """ + Returns true if the request's Basic Auth credentials match any + `:basic`-type auth method in the list. + """ + def valid_user?(conn, methods) do + Enum.any?(methods, &user_matches?(conn, &1)) + end + + @doc """ + Returns true if the request contains an `x-api-key` or + `authorization` header (regardless of whether the value is correct). + """ + def has_credentials?(conn) do + get_req_header(conn, "x-api-key") != [] or + get_req_header(conn, "authorization") != [] + end + + defp key_matches?( + conn, + %WebhookAuthMethod{auth_type: :api, api_key: key} + ) do + get_req_header(conn, "x-api-key") + |> Enum.any?(fn header_value -> + Plug.Crypto.secure_compare(header_value, key) + end) + end + + defp key_matches?(_, _), do: false + + defp user_matches?(conn, %WebhookAuthMethod{ + auth_type: :basic, + username: expected_user, + password: expected_pass + }) do + get_req_header(conn, "authorization") + |> Enum.find_value(false, fn auth -> + with [scheme, b64] <- String.split(auth, " ", parts: 2), + true <- String.downcase(scheme) == "basic", + {:ok, decoded} <- Base.decode64(b64), + [user, pass] <- String.split(decoded, ":", parts: 2), + true <- Plug.Crypto.secure_compare(user, expected_user), + true <- Plug.Crypto.secure_compare(pass, expected_pass) do + true + else + _ -> false + end + end) + end + + defp user_matches?(_, _), do: false +end diff --git a/lib/lightning_web/endpoint.ex b/lib/lightning_web/endpoint.ex index 40add7babd5..c08f1af12c9 100644 --- a/lib/lightning_web/endpoint.ex +++ b/lib/lightning_web/endpoint.ex @@ -68,6 +68,9 @@ defmodule LightningWeb.Endpoint do plug Plug.RequestId + # Channel proxy — must be before Plug.Parsers to preserve raw body + plug LightningWeb.ChannelProxyPlug + plug Plugs.PromexWrapper @pre_parsers_plugs Application.compile_env( diff --git a/lib/lightning_web/live/channel_live/form_component.ex b/lib/lightning_web/live/channel_live/form_component.ex new file mode 100644 index 00000000000..1639bb3d636 --- /dev/null +++ b/lib/lightning_web/live/channel_live/form_component.ex @@ -0,0 +1,276 @@ +defmodule LightningWeb.ChannelLive.FormComponent do + @moduledoc false + use LightningWeb, :live_component + + alias Lightning.Channels + alias Lightning.Channels.Channel + alias Lightning.Projects + alias Lightning.WebhookAuthMethods + + @impl true + def update( + %{channel: channel, project: project, on_close: _} = assigns, + socket + ) do + changeset = Channel.changeset(channel, %{}) + + wams = WebhookAuthMethods.list_for_project(project) + pcs = Projects.list_project_credentials(project) + + current_source_ids = + channel.channel_auth_methods + |> Enum.filter(&(&1.role == :source)) + |> Enum.map(& &1.webhook_auth_method_id) + + current_sink_ids = + channel.channel_auth_methods + |> Enum.filter(&(&1.role == :sink)) + |> Enum.map(& &1.project_credential_id) + + source_selections = + Map.new(wams, fn wam -> {wam.id, wam.id in current_source_ids} end) + + sink_selections = + Map.new(pcs, fn pc -> {pc.id, pc.id in current_sink_ids} end) + + {:ok, + socket + |> assign(assigns) + |> assign( + changeset: changeset, + webhook_auth_methods: wams, + project_credentials: pcs, + source_selections: source_selections, + sink_selections: sink_selections + )} + end + + @impl true + def handle_event("validate", %{"channel" => params}, socket) do + changeset = + socket.assigns.channel + |> Channel.changeset(params) + |> Map.put(:action, :validate) + + source_selections = + merge_selections( + socket.assigns.source_selections, + Map.get(params, "source_auth_methods", %{}) + ) + + sink_selections = + merge_selections( + socket.assigns.sink_selections, + Map.get(params, "sink_auth_methods", %{}) + ) + + {:noreply, + assign(socket, + changeset: changeset, + source_selections: source_selections, + sink_selections: sink_selections + )} + end + + def handle_event("save", %{"channel" => params}, socket) do + save_channel(socket, socket.assigns.action, params) + end + + @impl true + def render(assigns) do + assigns = + assign_new(assigns, :title, fn -> + case assigns.action do + :new -> "New Channel" + :edit -> "Edit Channel" + end + end) + + ~H""" +
+ Source Authentication Methods +
+ <.link + navigate={~p"/projects/#{@project}/settings#webhook_security"} + class="text-xs link" + > + Create a new one in project settings. + ++ Sink Credentials +
+ <.link + navigate={~p"/projects/#{@project}/settings#credentials"} + class="text-xs link" + > + Create a new one in project settings. + +No channels found.
+