diff --git a/barretenberg/cpp/src/barretenberg/bb/main.cpp b/barretenberg/cpp/src/barretenberg/bb/main.cpp index 54d1a8d64d8e..73f416c28681 100644 --- a/barretenberg/cpp/src/barretenberg/bb/main.cpp +++ b/barretenberg/cpp/src/barretenberg/bb/main.cpp @@ -1,3 +1,4 @@ +// CI phase timing: cache invalidation trigger #include "barretenberg/bb/cli.hpp" int main(int argc, char* argv[]) diff --git a/bootstrap.sh b/bootstrap.sh index 125fd48dff09..b35176038537 100755 --- a/bootstrap.sh +++ b/bootstrap.sh @@ -390,7 +390,7 @@ function build { # Build the project if we should be building if [[ -z "${BOOTSTRAP_AFTER:-}" || "$start_building" = true ]]; then - $project/bootstrap.sh ${1:-} + ci_phase "$project" $project/bootstrap.sh ${1:-} fi # Stop the build if we've reached BOOTSTRAP_TO @@ -400,7 +400,7 @@ function build { fi done - parallel --line-buffer --tag --halt now,fail=1 "denoise '{}'" ::: ${parallel_cmds[@]} + ci_phase "parallel-builds" parallel --line-buffer --tag --halt now,fail=1 "denoise '{}'" ::: ${parallel_cmds[@]} } function bench_cmds { @@ -569,31 +569,31 @@ case "$cmd" in export CI=1 export USE_TEST_CACHE=1 export CI_FULL=0 - build - test + ci_phase "build" build + ci_phase "test" test ;; "ci-full") export CI=1 export USE_TEST_CACHE=1 export CI_FULL=1 - build - test - bench + ci_phase "build" build + ci_phase "test" test + ci_phase "bench" bench ;; "ci-full-no-test-cache") export CI=1 export USE_TEST_CACHE=0 export CI_FULL=1 - build - test - bench + ci_phase "build" build + ci_phase "test" test + ci_phase "bench" bench ;; "ci-full-no-test-cache-makefile") export CI=1 export USE_TEST_CACHE=0 export CI_FULL=1 - build_and_test - bench + ci_phase "build-and-test" build_and_test + ci_phase "bench" bench ;; "ci-grind-test") export CI=1 @@ -724,8 +724,8 @@ case "$cmd" in if ! semver check $REF_NAME; then exit 1 fi - build - release + ci_phase "build" build + ci_phase "release" release ;; ########################## diff --git a/ci3/ci-metrics/app.py b/ci3/ci-metrics/app.py index c62875e7d19a..c9eb1d9fb486 100644 --- a/ci3/ci-metrics/app.py +++ b/ci3/ci-metrics/app.py @@ -6,6 +6,7 @@ import os import re import redis +import time import threading from pathlib import Path @@ -37,14 +38,29 @@ def verify_password(username, password): def _init(): - """Initialize SQLite and start background threads.""" + """Initialize SQLite, warm caches, and start background threads.""" try: db.get_db() metrics.start_test_listener(r) + metrics.start_phase_listener(r) metrics.start_ci_run_sync(r) + github_data.start_merge_queue_poller() print("[ci-metrics] Background threads started") except Exception as e: print(f"[ci-metrics] Warning: startup failed: {e}") + # Warm billing caches so first request isn't slow + try: + from billing.gcp import _ensure_cached as _warm_gcp + _warm_gcp() + print("[ci-metrics] GCP billing cache warmed") + except Exception as e: + print(f"[ci-metrics] GCP billing warmup failed: {e}") + try: + from billing.aws import _ensure_cached as _warm_aws + _warm_aws() + print("[ci-metrics] AWS costs cache warmed") + except Exception as e: + print(f"[ci-metrics] AWS costs warmup failed: {e}") threading.Thread(target=_init, daemon=True, name='metrics-init').start() @@ -101,6 +117,61 @@ def _json(data): return Response(json.dumps(data), mimetype='application/json') +# ---- Author mapping: git display name → GitHub username ---- + +_author_map = {} +_author_map_ts = 0 + + +def _get_author_map() -> dict: + """Build git display name → GitHub username mapping from ci_runs + pr_authors.""" + global _author_map, _author_map_ts + now = time.time() + if now - _author_map_ts < 3600 and _author_map: + return _author_map + rows = db.query(''' + SELECT cr.author as git_name, pa.author as github_user, COUNT(*) as c + FROM ci_runs cr + JOIN pr_authors pa ON cr.pr_number = pa.pr_number + WHERE cr.author IS NOT NULL AND cr.author != '' + AND pa.author IS NOT NULL AND pa.author != '' + GROUP BY cr.author, pa.author + ''') + name_to_gh = {} + for row in rows: + gn = row['git_name'] + gh = row['github_user'] + if gn not in name_to_gh: + name_to_gh[gn] = {} + name_to_gh[gn][gh] = name_to_gh[gn].get(gh, 0) + row['c'] + result = {} + for gn, gh_counts in name_to_gh.items(): + best = max(gh_counts, key=gh_counts.get) + result[gn] = best + result[best] = best # identity mapping for usernames used as commit_author + _author_map = result + _author_map_ts = now + return result + + +def _normalize_authors(authors_str: str) -> str: + """Normalize comma-separated git names to deduplicated GitHub usernames.""" + if not authors_str: + return '' + amap = _get_author_map() + seen = set() + result = [] + for name in authors_str.split(','): + name = name.strip() + if not name: + continue + gh = amap.get(name, name) + if gh not in seen: + seen.add(gh) + result.append(gh) + return ','.join(result) + + # ---- Namespace billing ---- @app.route('/namespace-billing') @@ -166,7 +237,7 @@ def api_ci_runs(): ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) if date_from else None ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) if date_to else None - runs = metrics.get_ci_runs(r, ts_from, ts_to) + runs = metrics.get_ci_runs(ts_from, ts_to) if status_filter: runs = [run for run in runs if run.get('status') == status_filter] @@ -185,7 +256,7 @@ def api_ci_runs(): @auth.login_required def api_ci_stats(): ts_from = int((datetime.now() - timedelta(days=7)).timestamp() * 1000) - runs = metrics.get_ci_runs(r, ts_from) + runs = metrics.get_ci_runs(ts_from) total = len(runs) passed = sum(1 for run in runs if run.get('status') == 'PASSED') @@ -233,6 +304,7 @@ def api_costs_overview(): buckets[key]['aws_total'] += entry.get('aws_total', 0) buckets[key]['gcp_total'] += entry.get('gcp_total', 0) result['by_date'] = sorted(buckets.values(), key=lambda x: x['date']) + result['period'] = {'from': date_from, 'to': date_to} return _json(result) @@ -287,7 +359,7 @@ def api_costs_attribution(): ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) - runs = metrics.get_ci_runs(r, ts_from, ts_to) + runs = metrics.get_ci_runs(ts_from, ts_to) runs_with_cost = [run for run in runs if run.get('cost_usd') is not None] # Enrich merge queue runs with PR author from GitHub @@ -311,6 +383,9 @@ def api_costs_attribution(): prn = info['pr_number'] if prn and int(prn) in pr_authors: author = pr_authors[int(prn)]['author'] + # Attribute nightly / release runs to a special 'release' actor + if info['type'] in ('nightly', 'releases'): + author = 'release' inst_type = run.get('instance_type', 'unknown') vcpus = run.get('instance_vcpus') @@ -383,14 +458,17 @@ def api_costs_attribution(): instances.sort(key=lambda x: -(x['cost_usd'] or 0)) all_types = sorted(by_type.keys()) + # Pre-compute runs-per-date to avoid O(dates × instances) + runs_per_date = {} + for inst in instances: + runs_per_date[inst['date']] = runs_per_date.get(inst['date'], 0) + 1 by_date_list = [] for date in sorted(by_date_type): - entry = {'date': date, 'total': 0, 'runs': 0} + entry = {'date': date, 'total': 0, 'runs': runs_per_date.get(date, 0)} for rt in all_types: entry[rt] = round(by_date_type[date].get(rt, 0), 2) entry['total'] += by_date_type[date].get(rt, 0) entry['total'] = round(entry['total'], 2) - entry['runs'] = sum(1 for inst in instances if inst['date'] == date) by_date_list.append(entry) by_date_list = _aggregate_dates(by_date_list, granularity, @@ -405,6 +483,7 @@ def api_costs_attribution(): 'by_date': by_date_list, 'run_types': all_types, 'instances': instances[:500], + 'period': {'from': date_from, 'to': date_to}, 'totals': {'aws': round(total_aws, 2), 'gcp': round(gcp_total, 2), 'gcp_unattributed': round(gcp_total, 2), 'combined': round(total_aws + gcp_total, 2)}, @@ -421,7 +500,7 @@ def api_costs_runners(): ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) - runs = metrics.get_ci_runs(r, ts_from, ts_to) + runs = metrics.get_ci_runs(ts_from, ts_to) runs_with_cost = [run for run in runs if run.get('cost_usd') is not None] if dashboard: runs_with_cost = [run for run in runs_with_cost if run.get('dashboard') == dashboard] @@ -475,6 +554,7 @@ def api_costs_runners(): 'by_date': by_date, 'by_instance_type': by_instance, 'by_dashboard': by_dashboard, + 'period': {'from': date_from, 'to': date_to}, 'summary': { 'total_cost': round(total_cost, 2), 'spot_pct': round(100.0 * spot_cost / max(total_cost, 0.01), 1), @@ -496,7 +576,7 @@ def api_ci_performance(): ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) - runs = metrics.get_ci_runs(r, ts_from, ts_to) + runs = metrics.get_ci_runs(ts_from, ts_to) runs = [run for run in runs if run.get('status') in ('PASSED', 'FAILED')] if dashboard: runs = [run for run in runs if run.get('dashboard') == dashboard] @@ -519,6 +599,7 @@ def api_ci_performance(): by_date = [] for date in sorted(by_date_map): d = by_date_map[date] + durs = sorted(d['durations']) by_date.append({ 'date': date, 'total': d['total'], @@ -526,75 +607,105 @@ def api_ci_performance(): 'failed': d['failed'], 'pass_rate': round(100.0 * d['passed'] / max(d['total'], 1), 1), 'failure_rate': round(100.0 * d['failed'] / max(d['total'], 1), 1), - 'avg_duration_mins': round(sum(d['durations']) / len(d['durations']), 1) if d['durations'] else None, + 'avg_duration_mins': round(sum(durs) / len(durs), 1) if durs else None, + 'p50_duration_mins': round(durs[len(durs) // 2], 1) if durs else None, + 'p95_duration_mins': round(durs[int(len(durs) * 0.95)], 1) if durs else None, + 'max_duration_mins': round(max(durs), 1) if durs else None, }) + # Merge test outcome counts from test_daily_stats before aggregation + ds_conditions = ['date >= ?', 'date <= ?'] + ds_params = [date_from, date_to] + if dashboard: + ds_conditions.append('dashboard = ?') + ds_params.append(dashboard) + ds_where = 'WHERE ' + ' AND '.join(ds_conditions) + + daily_test_counts = db.query(f''' + SELECT date, SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked + FROM test_daily_stats {ds_where} + GROUP BY date + ''', ds_params) + daily_test_map = {r['date']: r for r in daily_test_counts} + for d in by_date: + tc = daily_test_map.get(d['date'], {}) + d['flake_count'] = tc.get('flaked', 0) or 0 + d['test_failure_count'] = tc.get('failed', 0) or 0 + d['test_success_count'] = tc.get('passed', 0) or 0 + by_date = _aggregate_dates(by_date, granularity, - sum_fields=['total', 'passed', 'failed'], - avg_fields=['avg_duration_mins']) + sum_fields=['total', 'passed', 'failed', + 'flake_count', 'test_failure_count', 'test_success_count'], + avg_fields=['avg_duration_mins', 'p50_duration_mins', + 'p95_duration_mins', 'max_duration_mins']) for d in by_date: d['pass_rate'] = round(100.0 * d['passed'] / max(d['total'], 1), 1) d['failure_rate'] = round(100.0 * d['failed'] / max(d['total'], 1), 1) - # Daily flake/failure counts from test_events - if dashboard: - flake_daily = db.query(''' - SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count - FROM test_events WHERE status = 'flaked' AND dashboard = ? - AND timestamp >= ? AND timestamp < ? - GROUP BY substr(timestamp, 1, 10) - ''', (dashboard, date_from, date_to + 'T23:59:59')) - fail_test_daily = db.query(''' - SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count - FROM test_events WHERE status = 'failed' AND dashboard = ? - AND timestamp >= ? AND timestamp < ? - GROUP BY substr(timestamp, 1, 10) - ''', (dashboard, date_from, date_to + 'T23:59:59')) - else: - flake_daily = db.query(''' - SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count - FROM test_events WHERE status = 'flaked' - AND timestamp >= ? AND timestamp < ? - GROUP BY substr(timestamp, 1, 10) - ''', (date_from, date_to + 'T23:59:59')) - fail_test_daily = db.query(''' - SELECT substr(timestamp, 1, 10) as date, COUNT(*) as count - FROM test_events WHERE status = 'failed' - AND timestamp >= ? AND timestamp < ? - GROUP BY substr(timestamp, 1, 10) - ''', (date_from, date_to + 'T23:59:59')) - flake_daily_map = {r['date']: r['count'] for r in flake_daily} - fail_test_daily_map = {r['date']: r['count'] for r in fail_test_daily} - for d in by_date: - d['flake_count'] = flake_daily_map.get(d['date'], 0) - d['test_failure_count'] = fail_test_daily_map.get(d['date'], 0) + # Duration by dashboard (pipeline) — from pre-aggregated ci_run_daily_stats + dbd_rows = db.query(''' + SELECT date, dashboard, run_count, passed, failed, + sum_duration, min_duration, max_duration, p50_duration, p95_duration + FROM ci_run_daily_stats + WHERE date >= ? AND date <= ? + ORDER BY date + ''', (date_from, date_to)) + + dbd_map = {} # {dashboard: [{date, avg_duration_mins, ...}]} + for r in dbd_rows: + dbd_map.setdefault(r['dashboard'], []).append({ + 'date': r['date'], + 'avg_duration_mins': round(r['sum_duration'] / max(r['run_count'], 1), 1), + 'total_duration_mins': round(r['sum_duration'], 1), + 'p50_duration_mins': r['p50_duration'], + 'p95_duration_mins': r['p95_duration'], + 'count': r['run_count'], + }) - # Top flakes/failures + duration_by_dashboard = {} + for db_name, entries in dbd_map.items(): + duration_by_dashboard[db_name] = _aggregate_dates( + entries, granularity, + sum_fields=['count', 'total_duration_mins'], + avg_fields=['avg_duration_mins', 'p50_duration_mins', 'p95_duration_mins']) + + # Top flakes/failures (with affected authors — filter out empty/NULL) + _author_concat = "GROUP_CONCAT(DISTINCT CASE WHEN commit_author IS NOT NULL AND commit_author != '' THEN commit_author END)" if dashboard: - top_flakes = db.query(''' - SELECT test_cmd, COUNT(*) as count, ref_name + top_flakes = db.query(f''' + SELECT test_cmd, COUNT(*) as count, dashboard, + {_author_concat} as authors FROM test_events WHERE status='flaked' AND dashboard = ? AND timestamp >= ? AND timestamp <= ? - GROUP BY test_cmd ORDER BY count DESC LIMIT 15 + GROUP BY test_cmd ORDER BY count DESC LIMIT 20 ''', (dashboard, date_from, date_to + 'T23:59:59')) - top_failures = db.query(''' - SELECT test_cmd, COUNT(*) as count + top_failures = db.query(f''' + SELECT test_cmd, COUNT(*) as count, dashboard, + {_author_concat} as authors FROM test_events WHERE status='failed' AND dashboard = ? AND timestamp >= ? AND timestamp <= ? - GROUP BY test_cmd ORDER BY count DESC LIMIT 15 + GROUP BY test_cmd ORDER BY count DESC LIMIT 20 ''', (dashboard, date_from, date_to + 'T23:59:59')) else: - top_flakes = db.query(''' - SELECT test_cmd, COUNT(*) as count, ref_name + top_flakes = db.query(f''' + SELECT test_cmd, COUNT(*) as count, dashboard, + {_author_concat} as authors FROM test_events WHERE status='flaked' AND timestamp >= ? AND timestamp <= ? - GROUP BY test_cmd ORDER BY count DESC LIMIT 15 + GROUP BY test_cmd ORDER BY count DESC LIMIT 20 ''', (date_from, date_to + 'T23:59:59')) - top_failures = db.query(''' - SELECT test_cmd, COUNT(*) as count + top_failures = db.query(f''' + SELECT test_cmd, COUNT(*) as count, dashboard, + {_author_concat} as authors FROM test_events WHERE status='failed' AND timestamp >= ? AND timestamp <= ? - GROUP BY test_cmd ORDER BY count DESC LIMIT 15 + GROUP BY test_cmd ORDER BY count DESC LIMIT 20 ''', (date_from, date_to + 'T23:59:59')) + # Normalize git display names → GitHub usernames + for row in top_flakes: + row['authors'] = _normalize_authors(row.get('authors', '')) + for row in top_failures: + row['authors'] = _normalize_authors(row.get('authors', '')) + # Summary total = len(runs) passed = sum(1 for run in runs if run.get('status') == 'PASSED') @@ -606,38 +717,23 @@ def api_ci_performance(): if complete and ts: durations.append((complete - ts) / 60000.0) - if dashboard: - flake_count = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status='flaked' AND dashboard = ? - AND timestamp >= ? AND timestamp <= ? - ''', (dashboard, date_from, date_to + 'T23:59:59')) - total_tests = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status IN ('failed','flaked') AND dashboard = ? - AND timestamp >= ? AND timestamp <= ? - ''', (dashboard, date_from, date_to + 'T23:59:59')) - total_failures_count = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status='failed' AND dashboard = ? - AND timestamp >= ? AND timestamp <= ? - ''', (dashboard, date_from, date_to + 'T23:59:59')) - else: - flake_count = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status='flaked' AND timestamp >= ? AND timestamp <= ? - ''', (date_from, date_to + 'T23:59:59')) - total_tests = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status IN ('failed','flaked') AND timestamp >= ? AND timestamp <= ? - ''', (date_from, date_to + 'T23:59:59')) - total_failures_count = db.query(''' - SELECT COUNT(*) as c FROM test_events WHERE status='failed' AND timestamp >= ? AND timestamp <= ? - ''', (date_from, date_to + 'T23:59:59')) - - fc = flake_count[0]['c'] if flake_count else 0 - tc = total_tests[0]['c'] if total_tests else 0 - tfc = total_failures_count[0]['c'] if total_failures_count else 0 + # Test outcome summary from test_daily_stats + ds_summary = db.query(f''' + SELECT SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked + FROM test_daily_stats {ds_where} + ''', ds_params) + ds_s = ds_summary[0] if ds_summary else {} + fc = ds_s.get('flaked', 0) or 0 + tfc = ds_s.get('failed', 0) or 0 + tpc = ds_s.get('passed', 0) or 0 + tc = fc + tfc + tpc return _json({ 'by_date': by_date, + 'duration_by_dashboard': duration_by_dashboard, 'top_flakes': top_flakes, 'top_failures': top_failures, + 'period': {'from': date_from, 'to': date_to}, 'summary': { 'total_runs': total, 'pass_rate': round(100.0 * passed / max(total, 1), 1), @@ -646,6 +742,7 @@ def api_ci_performance(): 'flake_rate': round(100.0 * fc / max(tc, 1), 1) if tc else 0, 'total_flakes': fc, 'total_test_failures': tfc, + 'total_test_successes': tpc, }, }) @@ -684,7 +781,7 @@ def api_pr_metrics(): author = request.args.get('author', '') ts_from = int(datetime.strptime(date_from, '%Y-%m-%d').timestamp() * 1000) ts_to = int((datetime.strptime(date_to, '%Y-%m-%d') + timedelta(days=1)).timestamp() * 1000) - ci_runs = metrics.get_ci_runs(r, ts_from, ts_to) + ci_runs = metrics.get_ci_runs(ts_from, ts_to) return _json(github_data.get_pr_metrics(date_from, date_to, author, ci_runs)) @@ -696,6 +793,16 @@ def api_merge_queue_stats(): return _json(github_data.get_merge_queue_stats(date_from, date_to)) +@app.route('/api/test-history/') +@auth.login_required +def api_test_history(test_hash): + """Test event history by hash — SQLite backing for Redis history_ lists.""" + branch = request.args.get('branch', '') + limit = min(int(request.args.get('limit', 1000)), 5000) + rows = metrics.get_test_history(test_hash, branch, limit) + return _json(rows) + + @app.route('/api/ci/flakes-by-command') @auth.login_required def api_flakes_by_command(): @@ -706,6 +813,19 @@ def api_flakes_by_command(): return _json(metrics.get_flakes_by_command(date_from, date_to, dashboard)) +# ---- CI Phase timing ---- + +@app.route('/api/ci/phases') +@auth.login_required +def api_ci_phases(): + """CI phase timing breakdown: avg time per phase, by date, and per run.""" + date_from = request.args.get('from', (datetime.now() - timedelta(days=30)).strftime('%Y-%m-%d')) + date_to = request.args.get('to', datetime.now().strftime('%Y-%m-%d')) + dashboard = request.args.get('dashboard', '') + run_id = request.args.get('run_id', '') + return _json(metrics.get_phases(date_from, date_to, dashboard, run_id)) + + # ---- Test timings ---- @app.route('/api/tests/timings') @@ -734,55 +854,125 @@ def api_test_timings(): where = 'WHERE ' + ' AND '.join(conditions) - # Per-test stats + # Per-test timing from test_events (all statuses including passed) by_test = db.query(f''' SELECT test_cmd, - COUNT(*) as count, + COUNT(*) as event_count, ROUND(AVG(duration_secs), 1) as avg_secs, ROUND(MIN(duration_secs), 1) as min_secs, ROUND(MAX(duration_secs), 1) as max_secs, - SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, - SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked, dashboard FROM test_events {where} GROUP BY test_cmd - ORDER BY count DESC + ORDER BY event_count DESC LIMIT 200 ''', params) - # Add pass rate + # Per-test counts from daily stats (includes passed) + ds_conditions = ['date >= ?', 'date <= ?'] + ds_params = [date_from, date_to] + if dashboard: + ds_conditions.append('dashboard = ?') + ds_params.append(dashboard) + if test_cmd: + ds_conditions.append('test_cmd = ?') + ds_params.append(test_cmd) + ds_where = 'WHERE ' + ' AND '.join(ds_conditions) + + daily_stats_by_test = {r['test_cmd']: r for r in db.query(f''' + SELECT test_cmd, + SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked + FROM test_daily_stats {ds_where} + GROUP BY test_cmd + ORDER BY SUM(passed) + SUM(failed) + SUM(flaked) DESC + LIMIT 500 + ''', ds_params)} + + # Merge counts into timing data for row in by_test: - total = row['passed'] + row['failed'] + row['flaked'] - row['pass_rate'] = round(100.0 * row['passed'] / max(total, 1), 1) - row['total_time_secs'] = round(row['avg_secs'] * row['count'], 0) + ds = daily_stats_by_test.get(row['test_cmd'], {}) + row['passed'] = ds.get('passed', 0) or 0 + row['failed'] = ds.get('failed', 0) or row['event_count'] + row['flaked'] = ds.get('flaked', 0) or 0 + row['count'] = row['passed'] + row['failed'] + row['flaked'] + total = max(row['count'], 1) + row['pass_rate'] = round(100.0 * row['passed'] / total, 1) + row['total_time_secs'] = round(row['avg_secs'] * row['event_count'], 0) + del row['event_count'] + + # Also add tests that only have daily stats (all passed, no individual events) + existing_cmds = {r['test_cmd'] for r in by_test} + for cmd, ds in daily_stats_by_test.items(): + if cmd not in existing_cmds and not status: + passed = ds.get('passed', 0) or 0 + failed = ds.get('failed', 0) or 0 + flaked = ds.get('flaked', 0) or 0 + total = passed + failed + flaked + if total > 0: + by_test.append({ + 'test_cmd': cmd, 'count': total, + 'avg_secs': None, 'min_secs': None, 'max_secs': None, + 'passed': passed, 'failed': failed, 'flaked': flaked, + 'pass_rate': round(100.0 * passed / total, 1), + 'total_time_secs': 0, 'dashboard': '', + }) + by_test.sort(key=lambda r: r['count'], reverse=True) + by_test = by_test[:500] + + # Daily time series: merge daily_stats (has passed counts) with test_events + # (has timing + failed/flaked for dates where daily_stats may be missing) + ds_by_date = {r['date']: r for r in db.query(f''' + SELECT date, + SUM(passed) as passed, SUM(failed) as failed, SUM(flaked) as flaked, + SUM(passed) + SUM(failed) + SUM(flaked) as count + FROM test_daily_stats {ds_where} + GROUP BY date + ORDER BY date + ''', ds_params)} - # Daily time series (aggregate across all tests or filtered test) - by_date = db.query(f''' + # Timing + counts from test_events (all statuses) + te_by_date = {r['date']: r for r in db.query(f''' SELECT substr(timestamp, 1, 10) as date, - COUNT(*) as count, ROUND(AVG(duration_secs), 1) as avg_secs, ROUND(MAX(duration_secs), 1) as max_secs, - SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, - SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as te_failed, + SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as te_flaked FROM test_events {where} GROUP BY substr(timestamp, 1, 10) - ORDER BY date - ''', params) + ''', params)} - # Summary - summary_rows = db.query(f''' - SELECT COUNT(*) as count, - ROUND(AVG(duration_secs), 1) as avg_secs, + # Merge: prefer daily_stats counts but fill gaps from test_events + all_dates = sorted(set(ds_by_date.keys()) | set(te_by_date.keys())) + by_date = [] + for date in all_dates: + ds = ds_by_date.get(date, {}) + te = te_by_date.get(date, {}) + passed = ds.get('passed', 0) or 0 + failed = ds.get('failed') or te.get('te_failed', 0) or 0 + flaked = ds.get('flaked') or te.get('te_flaked', 0) or 0 + by_date.append({ + 'date': date, + 'passed': passed, + 'failed': failed, + 'flaked': flaked, + 'count': passed + failed + flaked, + 'avg_secs': te.get('avg_secs'), + 'max_secs': te.get('max_secs'), + }) + + # Summary: aggregate from merged by_date (which already combines both sources) + passed = sum(d['passed'] for d in by_date) + failed = sum(d['failed'] for d in by_date) + flaked = sum(d['flaked'] for d in by_date) + + # Timing summary from test_events + timing_summary = db.query(f''' + SELECT ROUND(AVG(duration_secs), 1) as avg_secs, ROUND(MAX(duration_secs), 1) as max_secs, - SUM(duration_secs) as total_secs, - SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END) as passed, - SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END) as failed, - SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) as flaked + SUM(duration_secs) as total_secs FROM test_events {where} ''', params) - s = summary_rows[0] if summary_rows else {} + ts = timing_summary[0] if timing_summary else {} # Slowest individual test runs slowest = db.query(f''' @@ -797,14 +987,15 @@ def api_test_timings(): 'by_test': by_test, 'by_date': by_date, 'slowest': slowest, + 'period': {'from': date_from, 'to': date_to}, 'summary': { - 'total_runs': s.get('count', 0), - 'avg_duration_secs': s.get('avg_secs'), - 'max_duration_secs': s.get('max_secs'), - 'total_compute_secs': round(s.get('total_secs', 0) or 0, 0), - 'passed': s.get('passed', 0), - 'failed': s.get('failed', 0), - 'flaked': s.get('flaked', 0), + 'total_runs': passed + failed + flaked, + 'avg_duration_secs': ts.get('avg_secs'), + 'max_duration_secs': ts.get('max_secs'), + 'total_compute_secs': round(ts.get('total_secs', 0) or 0, 0), + 'passed': passed, + 'failed': failed, + 'flaked': flaked, }, }) diff --git a/ci3/ci-metrics/billing/aws.py b/ci3/ci-metrics/billing/aws.py index 481393d74ec3..d8296f91e3f9 100644 --- a/ci3/ci-metrics/billing/aws.py +++ b/ci3/ci-metrics/billing/aws.py @@ -54,6 +54,8 @@ # Messaging 'Amazon Simple Notification Service': 'sns', 'Amazon Simple Queue Service': 'sqs', + # Savings Plans / Reserved Instances + 'Savings Plans for AWS Compute usage': 'savings_plans', # Other 'Tax': 'tax', 'AWS Support (Business)': 'support', @@ -152,7 +154,10 @@ def _fetch_aws_costs(date_from: str, date_to: str) -> list[dict]: TimePeriod={'Start': date_from, 'End': date_to}, Granularity='DAILY', Metrics=['UnblendedCost'], - GroupBy=[{'Type': 'DIMENSION', 'Key': 'SERVICE'}], + GroupBy=[ + {'Type': 'DIMENSION', 'Key': 'SERVICE'}, + {'Type': 'DIMENSION', 'Key': 'USAGE_TYPE'}, + ], ) if next_token: kwargs['NextPageToken'] = next_token @@ -163,12 +168,26 @@ def _fetch_aws_costs(date_from: str, date_to: str) -> list[dict]: date = result['TimePeriod']['Start'] for group in result['Groups']: service = group['Keys'][0] + usage_type = group['Keys'][1] if len(group['Keys']) > 1 else '' amount = float(group['Metrics']['UnblendedCost']['Amount']) if amount == 0: continue category = SERVICE_CATEGORY_MAP.get(service, 'other') + # Savings plans: ComputeSP:1yrAllUpfront, ComputeSP:3yrNoUpfront, etc. + if category == 'savings_plans': + m = re.match(r'ComputeSP:(\d+yr)(\w+)', usage_type) + if m: + term = m.group(1) + payment = m.group(2) + if payment == 'NoUpfront': + category = f'savings_plan_{term}_monthly' + elif 'Upfront' in payment: + category = f'savings_plan_{term}_annual' + # EC2 reserved instances: HeavyUsage: billed monthly on 1st + elif category == 'ec2' and 'HeavyUsage:' in usage_type: + category = 'reserved_instance_monthly' if category == 'other': - print(f"[rk_aws_costs] unmapped service: {service!r} (${amount:.2f})") + print(f"[rk_aws_costs] unmapped service: {service!r} / {usage_type!r} (${amount:.2f})") rows.append({ 'date': date, 'service': service, diff --git a/ci3/ci-metrics/db.py b/ci3/ci-metrics/db.py index 93e970fe3a56..2f4a2cbd0b2c 100644 --- a/ci3/ci-metrics/db.py +++ b/ci3/ci-metrics/db.py @@ -7,7 +7,8 @@ import sqlite3 import threading -_DB_PATH = os.path.join(os.getenv('LOGS_DISK_PATH', '/logs-disk'), 'metrics.db') +_DB_PATH = os.getenv('METRICS_DB_PATH', + os.path.join(os.getenv('LOGS_DISK_PATH', '/logs-disk'), 'metrics.db')) _local = threading.local() SCHEMA = """ @@ -34,6 +35,7 @@ CREATE INDEX IF NOT EXISTS idx_test_events_ts ON test_events(timestamp); CREATE INDEX IF NOT EXISTS idx_test_events_cmd ON test_events(test_cmd); CREATE INDEX IF NOT EXISTS idx_test_events_dashboard ON test_events(dashboard); +CREATE INDEX IF NOT EXISTS idx_test_events_status_ts ON test_events(status, timestamp); CREATE TABLE IF NOT EXISTS merge_queue_daily ( date TEXT PRIMARY KEY, @@ -64,6 +66,67 @@ CREATE INDEX IF NOT EXISTS idx_ci_runs_ts ON ci_runs(timestamp_ms); CREATE INDEX IF NOT EXISTS idx_ci_runs_name ON ci_runs(name); CREATE INDEX IF NOT EXISTS idx_ci_runs_dashboard ON ci_runs(dashboard); + +CREATE TABLE IF NOT EXISTS test_daily_stats ( + date TEXT NOT NULL, + test_cmd TEXT NOT NULL, + dashboard TEXT NOT NULL DEFAULT '', + passed INTEGER NOT NULL DEFAULT 0, + failed INTEGER NOT NULL DEFAULT 0, + flaked INTEGER NOT NULL DEFAULT 0, + PRIMARY KEY (date, test_cmd, dashboard) +); +CREATE INDEX IF NOT EXISTS idx_tds_date ON test_daily_stats(date); +CREATE INDEX IF NOT EXISTS idx_tds_dashboard ON test_daily_stats(dashboard); + +CREATE TABLE IF NOT EXISTS merge_queue_snapshots ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + timestamp TEXT NOT NULL, + depth INTEGER NOT NULL, + entries_json TEXT +); +CREATE INDEX IF NOT EXISTS idx_mqs_ts ON merge_queue_snapshots(timestamp); + +CREATE TABLE IF NOT EXISTS ci_run_daily_stats ( + date TEXT NOT NULL, + dashboard TEXT NOT NULL, + run_count INTEGER NOT NULL DEFAULT 0, + passed INTEGER NOT NULL DEFAULT 0, + failed INTEGER NOT NULL DEFAULT 0, + sum_duration REAL NOT NULL DEFAULT 0, + min_duration REAL, + max_duration REAL, + p50_duration REAL, + p95_duration REAL, + PRIMARY KEY (date, dashboard) +); +CREATE INDEX IF NOT EXISTS idx_crds_date ON ci_run_daily_stats(date); + +CREATE TABLE IF NOT EXISTS ci_phases ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + phase TEXT NOT NULL, + duration_secs REAL NOT NULL, + exit_code INTEGER, + run_id TEXT, + job_id TEXT, + dashboard TEXT NOT NULL DEFAULT '', + ref_name TEXT, + commit_hash TEXT, + timestamp TEXT NOT NULL +); +CREATE INDEX IF NOT EXISTS idx_ci_phases_run ON ci_phases(run_id); +CREATE INDEX IF NOT EXISTS idx_ci_phases_ts ON ci_phases(timestamp); +CREATE INDEX IF NOT EXISTS idx_ci_phases_phase ON ci_phases(phase); + +CREATE TABLE IF NOT EXISTS pr_authors ( + pr_number INTEGER PRIMARY KEY, + author TEXT NOT NULL, + title TEXT NOT NULL DEFAULT '', + branch TEXT NOT NULL DEFAULT '', + additions INTEGER DEFAULT 0, + deletions INTEGER DEFAULT 0, + fetched_at TEXT NOT NULL +); """ @@ -73,6 +136,11 @@ "ALTER TABLE ci_runs ADD COLUMN job_id TEXT DEFAULT ''", "ALTER TABLE ci_runs ADD COLUMN arch TEXT DEFAULT ''", "CREATE INDEX IF NOT EXISTS idx_ci_runs_dashboard ON ci_runs(dashboard)", + "ALTER TABLE test_events ADD COLUMN test_hash TEXT", + "CREATE INDEX IF NOT EXISTS idx_test_events_hash ON test_events(test_hash)", + "ALTER TABLE merge_queue_daily ADD COLUMN avg_depth REAL", + "ALTER TABLE merge_queue_daily ADD COLUMN peak_depth INTEGER", + "CREATE INDEX IF NOT EXISTS idx_test_events_duration_ts ON test_events(timestamp) WHERE duration_secs IS NOT NULL AND duration_secs > 0", ] diff --git a/ci3/ci-metrics/ec2_pricing.py b/ci3/ci-metrics/ec2_pricing.py index ace55ea4f40a..96e0561d0d70 100644 --- a/ci3/ci-metrics/ec2_pricing.py +++ b/ci3/ci-metrics/ec2_pricing.py @@ -16,12 +16,20 @@ # ---- Hardcoded fallback rates (us-east-2, USD/hr) ---- _HARDCODED_RATES = { - ('m6a.48xlarge', True): 8.31, # spot - ('m6a.48xlarge', False): 16.56, # on-demand - ('m6a.32xlarge', True): 5.54, - ('m6a.32xlarge', False): 11.04, + ('m6a.xlarge', True): 0.07, # spot + ('m6a.xlarge', False): 0.1728, # on-demand + ('m6a.4xlarge', True): 0.28, + ('m6a.4xlarge', False): 0.6912, + ('m6a.8xlarge', True): 0.55, + ('m6a.8xlarge', False): 1.3824, ('m6a.16xlarge', True): 2.77, ('m6a.16xlarge', False): 5.52, + ('m6a.24xlarge', True): 1.66, + ('m6a.24xlarge', False): 4.1472, + ('m6a.32xlarge', True): 5.54, + ('m6a.32xlarge', False): 11.04, + ('m6a.48xlarge', True): 8.31, + ('m6a.48xlarge', False): 16.56, ('m7a.48xlarge', True): 8.31, ('m7a.48xlarge', False): 16.56, ('m7a.16xlarge', True): 2.77, @@ -145,8 +153,19 @@ def _fetch_all_spot(instance_types: list[str]) -> dict[str, float]: # ---- Cache refresh ---- def _get_known_instance_types() -> list[str]: - """Return the set of instance types we need pricing for.""" - return sorted({itype for itype, _ in _HARDCODED_RATES}) + """Return the set of instance types we need pricing for (hardcoded + from DB).""" + types = {itype for itype, _ in _HARDCODED_RATES} + try: + import db + conn = db.get_db() + rows = conn.execute( + "SELECT DISTINCT instance_type FROM ci_runs " + "WHERE instance_type IS NOT NULL AND instance_type != '' AND instance_type != 'unknown'" + ).fetchall() + types.update(r['instance_type'] for r in rows) + except Exception: + pass + return sorted(types) def _refresh_cache(): diff --git a/ci3/ci-metrics/github_data.py b/ci3/ci-metrics/github_data.py index 8824d187cb81..fd9a7a905f0d 100644 --- a/ci3/ci-metrics/github_data.py +++ b/ci3/ci-metrics/github_data.py @@ -1,15 +1,20 @@ """GitHub API polling with in-memory cache. -Fetches PR lifecycle, deployment runs, branch lag, and merge queue stats via `gh` CLI. +Fetches PR lifecycle, deployment runs, branch lag, and merge queue stats via +the GitHub REST API (using requests + GH_TOKEN env var). Most data cached in memory with TTL. Merge queue stats persisted to SQLite daily. """ import json -import subprocess +import os +import requests import threading import time from datetime import datetime, timedelta, timezone +import db as _db + REPO = 'AztecProtocol/aztec-packages' +_GH_API = 'https://api.github.com' BRANCH_PAIRS = [ ('next', 'staging-public'), @@ -27,39 +32,122 @@ _pr_cache = {'data': [], 'ts': 0} _deploy_cache = {'data': [], 'ts': 0} _lag_cache = {'data': [], 'ts': 0} -_pr_author_cache = {} # {pr_number: {'author': str, 'title': str, 'branch': str}} _pr_lock = threading.Lock() _deploy_lock = threading.Lock() _lag_lock = threading.Lock() -def _gh(args: list[str]) -> str | None: +def _gh_headers() -> dict: + token = os.environ.get('GH_TOKEN') or os.environ.get('GITHUB_TOKEN', '') + h = {'Accept': 'application/vnd.github+json', 'X-GitHub-Api-Version': '2022-11-28'} + if token: + h['Authorization'] = f'Bearer {token}' + return h + + +def _github_get(path: str, paginate: bool = False) -> list | dict | None: + """GET from GitHub REST API. Returns parsed JSON (list or dict). + If paginate=True, follows Link: next headers and merges array results.""" + url = f'{_GH_API}/{path}' if not path.startswith('http') else path + headers = _gh_headers() try: - result = subprocess.run( - ['gh'] + args, - capture_output=True, text=True, timeout=30 - ) - if result.returncode == 0: - return result.stdout.strip() - except (FileNotFoundError, subprocess.TimeoutExpired) as e: - print(f"[rk_github] gh error: {e}") - return None + if not paginate: + resp = requests.get(url, headers=headers, timeout=30) + if resp.status_code != 200: + print(f"[rk_github] API {resp.status_code}: {url}") + return None + return resp.json() + # Paginated: collect all pages + all_items = [] + while url: + resp = requests.get(url, headers=headers, timeout=30) + if resp.status_code != 200: + print(f"[rk_github] API {resp.status_code}: {url}") + break + data = resp.json() + if isinstance(data, list): + all_items.extend(data) + elif isinstance(data, dict): + # For endpoints like /actions/workflows/.../runs that wrap in an object + all_items.append(data) + # Follow Link: ; rel="next" + link = resp.headers.get('Link', '') + url = None + for part in link.split(','): + if 'rel="next"' in part: + url = part.split('<')[1].split('>')[0] + return all_items + except Exception as e: + print(f"[rk_github] API error: {e}") + return None + + +def _github_graphql(query: str, variables: dict = None) -> dict | None: + """Execute a GitHub GraphQL query.""" + headers = _gh_headers() + try: + resp = requests.post(f'{_GH_API}/graphql', headers=headers, + json={'query': query, 'variables': variables or {}}, + timeout=30) + if resp.status_code != 200: + print(f"[rk_github] GraphQL {resp.status_code}") + return None + data = resp.json() + if 'errors' in data: + print(f"[rk_github] GraphQL errors: {data['errors']}") + return data.get('data') + except Exception as e: + print(f"[rk_github] GraphQL error: {e}") + return None # ---- PR lifecycle ---- +_PR_GQL = ''' +query($owner: String!, $repo: String!, $cursor: String) { + repository(owner: $owner, name: $repo) { + pullRequests(states: MERGED, first: 100, after: $cursor, orderBy: {field: UPDATED_AT, direction: DESC}) { + pageInfo { hasNextPage endCursor } + nodes { + number + author { login } + title + createdAt + mergedAt + closedAt + baseRefName + headRefName + additions + deletions + changedFiles + isDraft + reviewDecision + labels(first: 20) { nodes { name } } + } + } + } +}''' + + def _fetch_and_process_prs() -> list[dict]: - out = _gh([ - 'pr', 'list', '--repo', REPO, '--state', 'merged', - '--limit', '500', - '--json', 'number,author,title,createdAt,mergedAt,closedAt,baseRefName,' - 'headRefName,additions,deletions,changedFiles,isDraft,reviewDecision,labels' - ]) - if not out: - return [] - try: - prs = json.loads(out) - except json.JSONDecodeError: + owner, repo = REPO.split('/') + prs = [] + cursor = None + for _ in range(5): # max 5 pages = 500 PRs + data = _github_graphql(_PR_GQL, {'owner': owner, 'repo': repo, 'cursor': cursor}) + if not data: + break + pr_data = data.get('repository', {}).get('pullRequests', {}) + nodes = pr_data.get('nodes', []) + for node in nodes: + node['author'] = (node.get('author') or {}).get('login', 'unknown') + node['labels'] = [l['name'] for l in (node.get('labels') or {}).get('nodes', [])] + prs.extend(nodes) + page_info = pr_data.get('pageInfo', {}) + if not page_info.get('hasNextPage'): + break + cursor = page_info.get('endCursor') + if not prs: return [] for pr in prs: @@ -106,20 +194,14 @@ def _ensure_prs(): def _fetch_all_deploys() -> list[dict]: all_runs = [] for workflow in DEPLOY_WORKFLOWS: - out = _gh([ - 'run', 'list', '--repo', REPO, - '--workflow', workflow, '--limit', '50', - '--json', 'databaseId,status,conclusion,createdAt,updatedAt,headBranch,name' - ]) - if not out: - continue - try: - runs = json.loads(out) - except json.JSONDecodeError: + data = _github_get( + f'repos/{REPO}/actions/workflows/{workflow}/runs?per_page=50&status=completed') + if not data: continue + runs = data.get('workflow_runs', []) for run in runs: - started = run.get('createdAt', '') - completed = run.get('updatedAt') + started = run.get('created_at', '') + completed = run.get('updated_at') duration = None if started and completed: try: @@ -129,9 +211,9 @@ def _fetch_all_deploys() -> list[dict]: except (ValueError, TypeError): pass all_runs.append({ - 'run_id': str(run.get('databaseId', '')), + 'run_id': str(run.get('id', '')), 'workflow_name': workflow.replace('.yml', ''), - 'ref_name': run.get('headBranch', ''), + 'ref_name': run.get('head_branch', ''), 'status': run.get('conclusion', run.get('status', 'unknown')), 'started_at': started, 'completed_at': completed, @@ -162,26 +244,22 @@ def _fetch_branch_lag() -> list[dict]: results = [] today = datetime.now(timezone.utc).date().isoformat() for source, target in BRANCH_PAIRS: - out = _gh([ - 'api', f'repos/{REPO}/compare/{target}...{source}', - '--jq', '.ahead_by' - ]) - if not out: + data = _github_get(f'repos/{REPO}/compare/{target}...{source}') + if not data: continue try: - commits_behind = int(out) + commits_behind = int(data.get('ahead_by', 0)) except (ValueError, TypeError): continue days_behind = None - out2 = _gh([ - 'api', f'repos/{REPO}/compare/{target}...{source}', - '--jq', '.commits[0].commit.committer.date' - ]) - if out2: + commits = data.get('commits', []) + if commits: try: - oldest = datetime.fromisoformat(out2.replace('Z', '+00:00')) - days_behind = round((datetime.now(timezone.utc) - oldest).total_seconds() / 86400, 1) + oldest_date = commits[0].get('commit', {}).get('committer', {}).get('date', '') + if oldest_date: + oldest = datetime.fromisoformat(oldest_date.replace('Z', '+00:00')) + days_behind = round((datetime.now(timezone.utc) - oldest).total_seconds() / 86400, 1) except (ValueError, TypeError): pass @@ -291,71 +369,106 @@ def get_branch_lag(date_from: str, date_to: str) -> dict: return {'pairs': pairs} +def _cache_pr_author(pr_number: int, info: dict): + """Write PR author info to SQLite cache.""" + _db.execute(''' + INSERT OR REPLACE INTO pr_authors (pr_number, author, title, branch, additions, deletions, fetched_at) + VALUES (?, ?, ?, ?, ?, ?, ?) + ''', (pr_number, info['author'], info.get('title', ''), info.get('branch', ''), + info.get('additions', 0), info.get('deletions', 0), + datetime.now(timezone.utc).isoformat())) + + +def _get_cached_pr_author(pr_number: int) -> dict | None: + """Read PR author info from SQLite cache.""" + rows = _db.query('SELECT * FROM pr_authors WHERE pr_number = ?', (pr_number,)) + if rows: + r = rows[0] + return {'author': r['author'], 'title': r['title'], 'branch': r['branch'], + 'additions': r['additions'], 'deletions': r['deletions']} + return None + + def get_pr_author(pr_number) -> dict | None: - """Look up PR author/title by number. Results are cached permanently (PR data doesn't change).""" + """Look up PR author/title by number. Results cached in SQLite.""" pr_number = int(pr_number) if pr_number else None if not pr_number: return None - if pr_number in _pr_author_cache: - return _pr_author_cache[pr_number] - # Check merged PR cache first (already fetched) + # Check SQLite cache + cached = _get_cached_pr_author(pr_number) + if cached: + return cached + + # Check merged PR cache (already fetched in-memory) for pr in _pr_cache.get('data', []): if pr.get('number') == pr_number: info = {'author': pr.get('author', 'unknown'), 'title': pr.get('title', ''), 'branch': pr.get('headRefName', ''), 'additions': pr.get('additions', 0), 'deletions': pr.get('deletions', 0)} - _pr_author_cache[pr_number] = info + _cache_pr_author(pr_number, info) return info - # Fetch from GitHub API - out = _gh(['pr', 'view', str(pr_number), '--repo', REPO, - '--json', 'author,title,headRefName,additions,deletions']) - if out: + # Fetch from GitHub REST API + data = _github_get(f'repos/{REPO}/pulls/{pr_number}') + if data: try: - data = json.loads(out) - author = data.get('author', {}) - if isinstance(author, dict): - author = author.get('login', 'unknown') + author = (data.get('user') or {}).get('login', 'unknown') info = {'author': author, 'title': data.get('title', ''), - 'branch': data.get('headRefName', ''), + 'branch': (data.get('head') or {}).get('ref', ''), 'additions': data.get('additions', 0), 'deletions': data.get('deletions', 0)} - _pr_author_cache[pr_number] = info + _cache_pr_author(pr_number, info) return info - except (json.JSONDecodeError, KeyError): + except (KeyError, TypeError): pass return None def batch_get_pr_authors(pr_numbers: set) -> dict: - """Fetch authors for multiple PR numbers, using cache. Returns {pr_number: info}.""" + """Fetch authors for multiple PR numbers, using SQLite cache. Returns {pr_number: info}.""" result = {} - to_fetch = [] - for prn in pr_numbers: - if not prn: - continue - prn = int(prn) - if prn in _pr_author_cache: - result[prn] = _pr_author_cache[prn] - else: - to_fetch.append(prn) - - # Check merged PR cache first - for pr in _pr_cache.get('data', []): - num = pr.get('number') - if num in to_fetch: - info = {'author': pr.get('author', 'unknown'), 'title': pr.get('title', ''), - 'branch': pr.get('headRefName', ''), - 'additions': pr.get('additions', 0), 'deletions': pr.get('deletions', 0)} - _pr_author_cache[num] = info - result[num] = info - to_fetch.remove(num) - - # Fetch remaining individually (with a cap to avoid API abuse) - for prn in to_fetch[:50]: - info = get_pr_author(prn) - if info: - result[prn] = info + # Batch fetch from SQLite cache in a single query + clean = [int(prn) for prn in pr_numbers if prn] + if not clean: + return result + placeholders = ','.join('?' * len(clean)) + cached_rows = _db.query( + f'SELECT * FROM pr_authors WHERE pr_number IN ({placeholders})', clean) + cached_set = set() + for r in cached_rows: + prn = r['pr_number'] + result[prn] = {'author': r['author'], 'title': r['title'], 'branch': r['branch'], + 'additions': r['additions'], 'deletions': r['deletions']} + cached_set.add(prn) + to_fetch = [prn for prn in clean if prn not in cached_set] + + # Check merged PR cache (in-memory) + if to_fetch: + to_fetch_set = set(to_fetch) + for pr in _pr_cache.get('data', []): + num = pr.get('number') + if num in to_fetch_set: + info = {'author': pr.get('author', 'unknown'), 'title': pr.get('title', ''), + 'branch': pr.get('headRefName', ''), + 'additions': pr.get('additions', 0), 'deletions': pr.get('deletions', 0)} + _cache_pr_author(num, info) + result[num] = info + to_fetch_set.discard(num) + to_fetch = list(to_fetch_set) + + # Fetch remaining concurrently (with a cap to avoid API abuse) + if to_fetch: + from concurrent.futures import ThreadPoolExecutor, as_completed + with ThreadPoolExecutor(max_workers=10) as pool: + futures = {pool.submit(get_pr_author, prn): prn for prn in to_fetch[:50]} + for fut in as_completed(futures): + prn = futures[fut] + try: + info = fut.result() + if info: + result[prn] = info + except Exception: + pass return result @@ -495,33 +608,29 @@ def _median(vals): def _fetch_merge_queue_runs(date_str: str) -> dict: """Fetch merge_group workflow runs for a single date. Returns daily summary.""" - out = _gh([ - 'api', '--paginate', + pages = _github_get( f'repos/{REPO}/actions/workflows/{CI3_WORKFLOW}/runs' f'?event=merge_group&created={date_str}&per_page=100', - '--jq', '.workflow_runs[] | [.conclusion, .status] | @tsv', - ]) + paginate=True) summary = {'date': date_str, 'total': 0, 'success': 0, 'failure': 0, 'cancelled': 0, 'in_progress': 0} - if not out: + if not pages: return summary - for line in out.strip().split('\n'): - if not line.strip(): - continue - parts = line.split('\t') - conclusion = parts[0] if parts[0] else '' - status = parts[1] if len(parts) > 1 else '' - summary['total'] += 1 - if conclusion == 'success': - summary['success'] += 1 - elif conclusion == 'failure': - summary['failure'] += 1 - elif conclusion == 'cancelled': - summary['cancelled'] += 1 - elif status in ('in_progress', 'queued', 'waiting'): - summary['in_progress'] += 1 - else: - summary['failure'] += 1 # treat unknown conclusions as failures + for page in pages: + for run in (page.get('workflow_runs') or []) if isinstance(page, dict) else []: + conclusion = run.get('conclusion') or '' + status = run.get('status') or '' + summary['total'] += 1 + if conclusion == 'success': + summary['success'] += 1 + elif conclusion == 'failure': + summary['failure'] += 1 + elif conclusion == 'cancelled': + summary['cancelled'] += 1 + elif status in ('in_progress', 'queued', 'waiting'): + summary['in_progress'] += 1 + else: + summary['failure'] += 1 # treat unknown conclusions as failures return summary @@ -597,13 +706,14 @@ def _backfill_merge_queue(): def refresh_merge_queue_today(): - """Refresh today's (and yesterday's) merge queue stats. Called periodically.""" + """Refresh recent merge queue stats. Re-fetches the last 7 days to fix any + zero rows written during transient API failures.""" import db conn = db.get_db() - today = datetime.now(timezone.utc).date().isoformat() - yesterday = (datetime.now(timezone.utc) - timedelta(days=1)).date().isoformat() + today = datetime.now(timezone.utc).date() - for ds in [yesterday, today]: + for i in range(7): + ds = (today - timedelta(days=i)).isoformat() summary = _fetch_merge_queue_runs(ds) conn.execute( 'INSERT OR REPLACE INTO merge_queue_daily (date, total, success, failure, cancelled, in_progress) ' @@ -613,6 +723,80 @@ def refresh_merge_queue_today(): conn.commit() +_MQ_DEPTH_GQL = ''' +query($owner: String!, $repo: String!, $branch: String!) { + repository(owner: $owner, name: $repo) { + mergeQueue(branch: $branch) { + entries(first: 100) { + totalCount + nodes { position state enqueuedAt pullRequest { number title author { login } } } + } + } + } +}''' + +_MQ_BRANCH = 'next' + + +def poll_merge_queue_depth(): + """Snapshot the current merge queue depth into SQLite.""" + import db + owner, repo = REPO.split('/') + data = _github_graphql(_MQ_DEPTH_GQL, + {'owner': owner, 'repo': repo, 'branch': _MQ_BRANCH}) + if not data: + return + mq = (data.get('repository') or {}).get('mergeQueue') + if mq is None: + return + entries = mq.get('entries', {}) + depth = entries.get('totalCount', 0) + nodes = entries.get('nodes', []) + entries_json = json.dumps([{ + 'position': n.get('position'), + 'state': n.get('state'), + 'pr': (n.get('pullRequest') or {}).get('number'), + 'author': ((n.get('pullRequest') or {}).get('author') or {}).get('login'), + } for n in nodes]) if nodes else None + + now = datetime.now(timezone.utc).isoformat() + db.execute('INSERT INTO merge_queue_snapshots (timestamp, depth, entries_json) VALUES (?, ?, ?)', + (now, depth, entries_json)) + + +def _aggregate_depth_stats(): + """Aggregate merge_queue_snapshots into avg/peak depth on merge_queue_daily.""" + import db + conn = db.get_db() + rows = conn.execute(''' + SELECT substr(timestamp, 1, 10) as date, + ROUND(AVG(depth), 1) as avg_depth, + MAX(depth) as peak_depth + FROM merge_queue_snapshots + GROUP BY substr(timestamp, 1, 10) + ''').fetchall() + for row in rows: + conn.execute(''' + UPDATE merge_queue_daily SET avg_depth = ?, peak_depth = ? + WHERE date = ? + ''', (row['avg_depth'], row['peak_depth'], row['date'])) + conn.commit() + + +def start_merge_queue_poller(): + """Start background thread that polls merge queue depth every 5 minutes.""" + def loop(): + while True: + try: + poll_merge_queue_depth() + except Exception as e: + print(f"[rk_github] queue depth poll error: {e}") + time.sleep(300) # 5 minutes + t = threading.Thread(target=loop, daemon=True, name='mq-depth-poller') + t.start() + return t + + _mq_backfill_lock = threading.Lock() _mq_last_refresh = 0 _MQ_REFRESH_TTL = 3600 # refresh today's data every hour @@ -629,6 +813,7 @@ def ensure_merge_queue_data(): try: _backfill_merge_queue() refresh_merge_queue_today() + _aggregate_depth_stats() _mq_last_refresh = now finally: _mq_backfill_lock.release() @@ -646,7 +831,7 @@ def get_merge_queue_stats(date_from: str, date_to: str) -> dict: threading.Thread(target=ensure_merge_queue_data, daemon=True).start() rows = db.query( - 'SELECT date, total, success, failure, cancelled, in_progress ' + 'SELECT date, total, success, failure, cancelled, in_progress, avg_depth, peak_depth ' 'FROM merge_queue_daily WHERE date >= ? AND date <= ? ORDER BY date', (date_from, date_to)) diff --git a/ci3/ci-metrics/metrics.py b/ci3/ci-metrics/metrics.py index 5c0d1610e06b..7990f2cf5e12 100644 --- a/ci3/ci-metrics/metrics.py +++ b/ci3/ci-metrics/metrics.py @@ -1,9 +1,11 @@ -"""CI metrics: direct Redis reads + test event listener. +"""CI metrics: SQLite source of truth + Redis ingestion + test event listener. -Reads CI run data directly from Redis sorted sets on each request. +CI runs are ingested from Redis (written by log_ci_run on CI instances) and +stored in SQLite. All reads go through SQLite so enriched fields (instance_type +from CloudTrail, recalculated costs) are preserved. Test events stored in SQLite since they only arrive via pub/sub. -CI runs periodically synced from Redis to SQLite for flake correlation. """ +import hashlib import json import re import time @@ -21,6 +23,17 @@ _URL_PR_RE = re.compile(r'/pull/(\d+)') +def hash_str_orig(s: str) -> str: + """Replicate bash's `echo "$s" | git hash-object --stdin | cut -c1-16`. + + git hash-object computes SHA-1 of "blob \\0" where content + includes the trailing newline from echo. + """ + content = s + "\n" + blob = f"blob {len(content)}\0{content}".encode() + return hashlib.sha1(blob).hexdigest()[:16] + + def compute_run_cost(data: dict) -> float | None: complete = data.get('complete') ts = data.get('timestamp') @@ -31,7 +44,9 @@ def compute_run_cost(data: dict) -> float | None: is_spot = bool(data.get('spot')) rate = ec2_pricing.get_instance_rate(instance_type, is_spot) if not rate: - vcpus = data.get('instance_vcpus', 192) + vcpus = data.get('instance_vcpus') + if not vcpus: + return None # unknown instance type and no vCPU data rate = vcpus * ec2_pricing.get_fallback_vcpu_rate(is_spot) return round(hours * rate, 4) @@ -116,31 +131,14 @@ def _get_ci_runs_from_sqlite(date_from_ms=None, date_to_ms=None): return runs -def get_ci_runs(redis_conn, date_from_ms=None, date_to_ms=None): - """Read CI runs from Redis, backfilled with SQLite for data that Redis has flushed.""" - redis_runs = _get_ci_runs_from_redis(redis_conn, date_from_ms, date_to_ms) - - # Find the earliest timestamp in Redis to know what SQLite needs to fill - redis_keys = set() - redis_min_ts = float('inf') - for run in redis_runs: - ts = run.get('timestamp', 0) - redis_keys.add((run.get('dashboard', ''), ts, run.get('name', ''))) - if ts < redis_min_ts: - redis_min_ts = ts +def get_ci_runs(date_from_ms=None, date_to_ms=None): + """Read CI runs from SQLite (the source of truth). - # If requesting data older than what Redis has, backfill from SQLite - sqlite_runs = [] - need_sqlite = (date_from_ms is not None and date_from_ms < redis_min_ts) or not redis_runs - if need_sqlite: - sqlite_to = int(redis_min_ts) if redis_runs else date_to_ms - sqlite_runs = _get_ci_runs_from_sqlite(date_from_ms, sqlite_to) - # Deduplicate: only include SQLite runs not already in Redis - sqlite_runs = [r for r in sqlite_runs - if (r.get('dashboard', ''), r.get('timestamp', 0), r.get('name', '')) - not in redis_keys] - - return sqlite_runs + redis_runs + Redis is only an ingestion pipe — sync_ci_runs_to_sqlite() copies data in. + All reads go through SQLite so enriched fields (instance_type from CloudTrail, + recalculated costs) are always reflected. + """ + return _get_ci_runs_from_sqlite(date_from_ms, date_to_ms) def _ts_to_date(ts_ms): @@ -149,6 +147,19 @@ def _ts_to_date(ts_ms): # ---- Test event handling (only thing needing SQLite) ---- +def _upsert_daily_stats(status: str, test_cmd: str, dashboard: str, timestamp: str): + """Increment the daily counter for a test status.""" + date = timestamp[:10] # 'YYYY-MM-DD' + col = status if status in ('passed', 'failed', 'flaked') else None + if not col: + return + db.execute(f''' + INSERT INTO test_daily_stats (date, test_cmd, dashboard, {col}) + VALUES (?, ?, ?, 1) + ON CONFLICT(date, test_cmd, dashboard) DO UPDATE SET {col} = {col} + 1 + ''', (date, test_cmd, dashboard)) + + def _handle_test_event(channel: str, data: dict): status = channel.split(':')[-1] # Handle field name mismatches: run_test_cmd publishes 'cmd' for failed/flaked @@ -157,12 +168,19 @@ def _handle_test_event(channel: str, data: dict): log_url = data.get('log_url') or data.get('log_key') if log_url and not log_url.startswith('http'): log_url = f'http://ci.aztec-labs.com/{log_url}' + dashboard = data.get('dashboard', '') + timestamp = data.get('timestamp', datetime.now(timezone.utc).isoformat()) + test_hash = hash_str_orig(test_cmd) if test_cmd else None + + # Always update daily stats (lightweight aggregate) + _upsert_daily_stats(status, test_cmd, dashboard, timestamp) + db.execute(''' INSERT INTO test_events (status, test_cmd, log_url, ref_name, commit_hash, commit_author, commit_msg, exit_code, duration_secs, is_scenario, owners, - flake_group_id, dashboard, timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + flake_group_id, dashboard, timestamp, test_hash) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( status, test_cmd, @@ -176,14 +194,15 @@ def _handle_test_event(channel: str, data: dict): 1 if data.get('is_scenario_test') else 0, json.dumps(data['owners']) if data.get('owners') else None, data.get('flake_group_id'), - data.get('dashboard', ''), - data.get('timestamp', datetime.now(timezone.utc).isoformat()), + dashboard, + timestamp, + test_hash, )) def start_test_listener(redis_conn): """Subscribe to test event channels only. Reconnects on failure.""" - channels = [b'ci:test:started', b'ci:test:passed', b'ci:test:failed', b'ci:test:flaked'] + channels = [b'ci:test:passed', b'ci:test:failed', b'ci:test:flaked'] def listener(): backoff = 1 @@ -215,6 +234,156 @@ def listener(): return t +# ---- CI Phase timing listener ---- + +def _handle_phase_event(data: dict): + """Insert a CI phase timing event into SQLite.""" + db.execute(''' + INSERT INTO ci_phases + (phase, duration_secs, exit_code, run_id, job_id, dashboard, + ref_name, commit_hash, timestamp) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + data.get('phase', ''), + data.get('duration_secs', 0), + data.get('exit_code'), + data.get('run_id', ''), + data.get('job_id', ''), + data.get('dashboard', ''), + data.get('ref_name', ''), + data.get('commit_hash', ''), + datetime.now(timezone.utc).isoformat(), + )) + + +def start_phase_listener(redis_conn): + """Subscribe to ci:phase:complete and store in ci_phases table.""" + def listener(): + backoff = 1 + while True: + try: + pubsub = redis_conn.pubsub() + pubsub.subscribe(b'ci:phase:complete') + backoff = 1 + for message in pubsub.listen(): + if message['type'] != 'message': + continue + try: + payload = message['data'] + if isinstance(payload, bytes): + payload = payload.decode() + _handle_phase_event(json.loads(payload)) + except Exception as e: + print(f"[rk_metrics] Error parsing phase event: {e}") + except Exception as e: + print(f"[rk_metrics] Phase listener error (reconnecting in {backoff}s): {e}") + time.sleep(backoff) + backoff = min(backoff * 2, 60) + + t = threading.Thread(target=listener, daemon=True, name='phase-listener') + t.start() + return t + + +def get_phases(date_from: str, date_to: str, dashboard: str = '', + run_id: str = '') -> dict: + """Query CI phase timing data for the API.""" + conditions = ['timestamp >= ?', 'timestamp < ?'] + params: list = [date_from, date_to + 'T23:59:59'] + if dashboard: + conditions.append('dashboard = ?') + params.append(dashboard) + if run_id: + conditions.append('run_id = ?') + params.append(run_id) + where = 'WHERE ' + ' AND '.join(conditions) + + # Aggregate by phase name + by_phase = db.query(f''' + SELECT phase, + COUNT(*) as count, + ROUND(AVG(duration_secs), 1) as avg_secs, + ROUND(MIN(duration_secs), 1) as min_secs, + ROUND(MAX(duration_secs), 1) as max_secs, + ROUND(SUM(duration_secs), 0) as total_secs + FROM ci_phases {where} + GROUP BY phase + ORDER BY total_secs DESC + ''', params) + + # Aggregate by date: avg duration per phase per day + date_rows = db.query(f''' + SELECT substr(timestamp, 1, 10) as date, phase, + ROUND(AVG(duration_secs), 1) as avg_secs, + COUNT(*) as count + FROM ci_phases {where} + GROUP BY date, phase + ORDER BY date + ''', params) + by_date: dict[str, dict] = {} + for row in date_rows: + d = row['date'] + if d not in by_date: + by_date[d] = {'date': d, 'phases': {}} + by_date[d]['phases'][row['phase']] = row['avg_secs'] + + # Recent individual runs with their phases + recent_runs = db.query(f''' + SELECT run_id, job_id, dashboard, ref_name, commit_hash, + phase, duration_secs, exit_code, timestamp + FROM ci_phases {where} + ORDER BY timestamp DESC + LIMIT 500 + ''', params) + runs_map: dict[str, dict] = {} + for row in recent_runs: + rid = row['run_id'] or row['timestamp'] + if rid not in runs_map: + runs_map[rid] = { + 'run_id': row['run_id'], 'job_id': row['job_id'], + 'dashboard': row['dashboard'], 'ref_name': row['ref_name'], + 'commit_hash': row['commit_hash'], 'phases': [], + } + runs_map[rid]['phases'].append({ + 'phase': row['phase'], + 'duration_secs': row['duration_secs'], + 'exit_code': row['exit_code'], + }) + + # Aggregate by dashboard: total duration per phase per pipeline + # Exclude cache-download/cache-upload — those are S3 transfer noise; + # the project-level ci_phase wrappers capture the meaningful build time. + dash_rows = db.query(f''' + SELECT dashboard, phase, + ROUND(AVG(duration_secs), 1) as avg_secs, + COUNT(*) as count, + ROUND(SUM(duration_secs), 0) as total_secs + FROM ci_phases {where} + AND dashboard != '' + AND phase NOT LIKE 'cache-download:%' + AND phase NOT LIKE 'cache-upload:%' + GROUP BY dashboard, phase + ORDER BY dashboard, total_secs DESC + ''', params) + by_dashboard: dict[str, dict] = {} + for row in dash_rows: + d = row['dashboard'] + if d not in by_dashboard: + by_dashboard[d] = {'dashboard': d, 'phases': {}, 'total_secs': 0, 'count': 0} + by_dashboard[d]['phases'][row['phase']] = row['total_secs'] + by_dashboard[d]['total_secs'] += row['total_secs'] + by_dashboard[d]['count'] = max(by_dashboard[d]['count'], row['count']) + for d in by_dashboard.values(): + d['total_secs'] = round(d['total_secs'], 1) + + return { + 'by_phase': by_phase, + 'by_date': list(by_date.values()), + 'by_dashboard': list(by_dashboard.values()), + 'recent_runs': list(runs_map.values())[:50], + } + + # ---- Sync failed_tests_{section} lists from Redis into SQLite ---- _ANSI_STRIP = re.compile(r'\x1b\[[^m]*m|\x1b\]8;;[^\x07]*\x07') @@ -326,18 +495,18 @@ def sync_failed_tests_to_sqlite(redis_conn): _failed_tests_sync_ts = now conn = db.get_db() - # Track existing entries to avoid duplicates: log_url for entries that have one, - # (test_cmd, timestamp, dashboard) composite key for entries without log_url + # Track existing failed/flaked entries to avoid duplicates (this sync only + # processes failed/flaked from Redis lists, so no need to scan passed rows). existing_urls = {row['log_url'] for row in conn.execute( - "SELECT DISTINCT log_url FROM test_events WHERE log_url IS NOT NULL" + "SELECT DISTINCT log_url FROM test_events WHERE log_url IS NOT NULL AND status IN ('failed', 'flaked')" ).fetchall()} existing_keys = {(row['test_cmd'], row['timestamp'], row['dashboard']) for row in conn.execute( - "SELECT test_cmd, timestamp, dashboard FROM test_events WHERE log_url IS NULL" + "SELECT test_cmd, timestamp, dashboard FROM test_events WHERE log_url IS NULL AND status IN ('failed', 'flaked')" ).fetchall()} total = 0 - for section in SECTIONS: - key = f'failed_tests_{section}' + for section in SECTIONS + ['']: + key = f'failed_tests_{section}' if section else 'failed_tests' try: entries = redis_conn.lrange(key, 0, -1) except Exception as e: @@ -363,15 +532,19 @@ def sync_failed_tests_to_sqlite(redis_conn): INSERT INTO test_events (status, test_cmd, log_url, ref_name, commit_author, commit_msg, duration_secs, flake_group_id, dashboard, - timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + timestamp, test_hash) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( parsed['status'], parsed['test_cmd'], parsed['log_url'], parsed['ref_name'], parsed['commit_author'], parsed['commit_msg'], parsed['duration_secs'], parsed['flake_group_id'], parsed['dashboard'], parsed['timestamp'], + hash_str_orig(parsed['test_cmd']) if parsed['test_cmd'] else None, )) + _upsert_daily_stats( + parsed['status'], parsed['test_cmd'], + parsed['dashboard'], parsed['timestamp']) total += 1 except Exception as e: print(f"[rk_metrics] Error inserting test event: {e}") @@ -437,15 +610,16 @@ def _load_seed_data(): events = data['test_events'] for ev in events: try: + te_cmd = ev.get('test_cmd', '') conn.execute(''' INSERT OR IGNORE INTO test_events (status, test_cmd, log_url, ref_name, commit_hash, commit_author, commit_msg, exit_code, duration_secs, is_scenario, owners, - flake_group_id, dashboard, timestamp) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + flake_group_id, dashboard, timestamp, test_hash) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) ''', ( ev.get('status', ''), - ev.get('test_cmd', ''), + te_cmd, ev.get('log_url'), ev.get('ref_name', ''), ev.get('commit_hash'), @@ -458,6 +632,7 @@ def _load_seed_data(): ev.get('flake_group_id'), ev.get('dashboard', ''), ev.get('timestamp', ''), + hash_str_orig(te_cmd) if te_cmd else None, )) except Exception: continue @@ -472,14 +647,19 @@ def _load_seed_data(): def sync_ci_runs_to_sqlite(redis_conn): - """Sync all CI runs from Redis into SQLite for persistence.""" + """Ingest CI runs from Redis into SQLite. + + Redis is the ingestion pipe (log_ci_run writes there from CI instances). + SQLite is the source of truth. Fields enriched post-ingestion (instance_type, + cost_usd from CloudTrail resolution) are preserved — only overwritten if + Redis has a non-empty value. + """ global _ci_sync_ts now = time.time() if now - _ci_sync_ts < _CI_SYNC_TTL: return _ci_sync_ts = now - # Sync everything Redis has (not just 30 days) runs = _get_ci_runs_from_redis(redis_conn) now_iso = datetime.now(timezone.utc).isoformat() @@ -488,11 +668,32 @@ def sync_ci_runs_to_sqlite(redis_conn): for run in runs: try: conn.execute(''' - INSERT OR REPLACE INTO ci_runs + INSERT INTO ci_runs (dashboard, name, timestamp_ms, complete_ms, status, author, pr_number, instance_type, instance_vcpus, spot, cost_usd, job_id, arch, synced_at) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ON CONFLICT(dashboard, timestamp_ms, name) DO UPDATE SET + complete_ms = excluded.complete_ms, + status = excluded.status, + author = excluded.author, + pr_number = excluded.pr_number, + instance_vcpus = excluded.instance_vcpus, + spot = excluded.spot, + job_id = excluded.job_id, + arch = excluded.arch, + synced_at = excluded.synced_at, + -- Preserve enriched fields: only overwrite if Redis has real data + instance_type = CASE + WHEN excluded.instance_type IS NOT NULL AND excluded.instance_type != '' + THEN excluded.instance_type + ELSE ci_runs.instance_type + END, + cost_usd = CASE + WHEN excluded.instance_type IS NOT NULL AND excluded.instance_type != '' + THEN excluded.cost_usd + ELSE ci_runs.cost_usd + END ''', ( run.get('dashboard', ''), run.get('name', ''), @@ -516,15 +717,368 @@ def sync_ci_runs_to_sqlite(redis_conn): print(f"[rk_metrics] Synced {count} CI runs to SQLite") +def _backfill_daily_stats(): + """Populate test_daily_stats from existing test_events rows. + + Uses INSERT OR IGNORE to fill gaps without overwriting data from the + real-time listener. Safe to call repeatedly — skips dates/tests that + already have rows. + """ + conn = db.get_db() + cur = conn.execute(''' + INSERT OR IGNORE INTO test_daily_stats (date, test_cmd, dashboard, passed, failed, flaked) + SELECT substr(timestamp, 1, 10) as date, test_cmd, dashboard, + SUM(CASE WHEN status = 'passed' THEN 1 ELSE 0 END), + SUM(CASE WHEN status = 'failed' THEN 1 ELSE 0 END), + SUM(CASE WHEN status = 'flaked' THEN 1 ELSE 0 END) + FROM test_events + GROUP BY substr(timestamp, 1, 10), test_cmd, dashboard + ''') + conn.commit() + if cur.rowcount and cur.rowcount > 0: + print(f"[rk_metrics] Backfilled {cur.rowcount} daily stat rows from test_events") + + +def _materialize_ci_run_daily_stats(): + """Recompute ci_run_daily_stats from ci_runs. + + Replaces all rows — safe to call repeatedly. Stores pre-aggregated + duration percentiles so the API doesn't need to scan raw rows. + """ + conn = db.get_db() + # Fetch raw daily durations grouped by date + dashboard + rows = conn.execute(''' + SELECT + strftime('%Y-%m-%d', timestamp_ms / 1000, 'unixepoch') AS date, + dashboard, + (complete_ms - timestamp_ms) / 60000.0 AS dur_mins + FROM ci_runs + WHERE status IN ('PASSED', 'FAILED') + AND complete_ms IS NOT NULL AND complete_ms > timestamp_ms + ''').fetchall() + + # Group durations: {(date, dashboard): [dur_mins, ...]} + groups = {} + for r in rows: + key = (r['date'], r['dashboard']) + groups.setdefault(key, {'passed': 0, 'failed': 0, 'durs': []}) + groups[key]['durs'].append(r['dur_mins']) + + # Also count pass/fail per group + status_rows = conn.execute(''' + SELECT + strftime('%Y-%m-%d', timestamp_ms / 1000, 'unixepoch') AS date, + dashboard, status, COUNT(*) as cnt + FROM ci_runs + WHERE status IN ('PASSED', 'FAILED') + GROUP BY date, dashboard, status + ''').fetchall() + for r in status_rows: + key = (r['date'], r['dashboard']) + if key not in groups: + groups[key] = {'passed': 0, 'failed': 0, 'durs': []} + if r['status'] == 'PASSED': + groups[key]['passed'] = r['cnt'] + else: + groups[key]['failed'] = r['cnt'] + + conn.execute('DELETE FROM ci_run_daily_stats') + inserted = 0 + for (date, dashboard), g in groups.items(): + durs = sorted(g['durs']) + n = len(durs) + conn.execute(''' + INSERT INTO ci_run_daily_stats + (date, dashboard, run_count, passed, failed, + sum_duration, min_duration, max_duration, p50_duration, p95_duration) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?) + ''', ( + date, dashboard, g['passed'] + g['failed'], + g['passed'], g['failed'], + round(sum(durs), 2) if durs else 0, + round(min(durs), 1) if durs else None, + round(max(durs), 1) if durs else None, + round(durs[n // 2], 1) if durs else None, + round(durs[int(n * 0.95)], 1) if durs else None, + )) + inserted += 1 + conn.commit() + print(f"[rk_metrics] Materialized {inserted} ci_run_daily_stats rows") + + +def _backfill_test_hashes(): + """Populate test_hash for existing test_events rows that are missing it.""" + conn = db.get_db() + rows = conn.execute( + "SELECT DISTINCT test_cmd FROM test_events WHERE test_hash IS NULL AND test_cmd != ''" + ).fetchall() + if not rows: + return + for row in rows: + cmd = row['test_cmd'] + h = hash_str_orig(cmd) + conn.execute( + "UPDATE test_events SET test_hash = ? WHERE test_cmd = ? AND test_hash IS NULL", + (h, cmd)) + conn.commit() + print(f"[rk_metrics] Backfilled test_hash for {len(rows)} distinct test commands") + + +# ---- CloudTrail instance type resolution ---- + +_ct_resolve_ts = 0 +_CT_RESOLVE_TTL = 6 * 3600 # 6 hours + + +def _fetch_cloudtrail_daily(ct, event_name, start_time, end_time, max_per_day=10000): + """Fetch CloudTrail events in daily chunks to avoid the 5000-event global limit.""" + events = [] + day = start_time.replace(hour=0, minute=0, second=0, microsecond=0) + while day < end_time: + day_end = min(day + timedelta(days=1), end_time) + kwargs = { + 'LookupAttributes': [ + {'AttributeKey': 'EventName', 'AttributeValue': event_name}, + ], + 'StartTime': day, + 'EndTime': day_end, + 'MaxResults': 50, + } + while True: + resp = ct.lookup_events(**kwargs) + events.extend(resp.get('Events', [])) + token = resp.get('NextToken') + if not token or len(events) >= max_per_day: + break + kwargs['NextToken'] = token + day += timedelta(days=1) + return events + + +# Name tag format: _[_] +_NAME_TAG_RE = re.compile(r'^(.+)_(amd64|arm64)(?:_.*)?$') + + +def _normalize_branch_name(name): + """Normalize a branch name the same way bootstrap_ec2 does for the EC2 Name tag.""" + m = re.match(r'^gh-readonly-queue/[^/]+/pr-(\d+)', name) + if m: + return f'pr-{m.group(1)}' + name = re.sub(r'\s*\(queue\)$', '', name) + return re.sub(r'[^a-zA-Z0-9-]', '_', name[:50]) + + +def resolve_unknown_instance_types(): + """Query CloudTrail for RunInstances + CreateTags events to resolve unknown instance types. + + Strategy: + 1. Fetch RunInstances events (daily chunks) → instance_id → instance_type + launch_time + 2. Fetch CreateTags events (daily chunks) → instance_id → {Name, Group, Dashboard, ...} + Tags are accumulated across multiple events then filtered to Group=build-instance. + 3. Join by instance_id, then match to ci_runs by normalized branch name + arch + time window. + """ + global _ct_resolve_ts + now = time.time() + if now - _ct_resolve_ts < _CT_RESOLVE_TTL: + return + _ct_resolve_ts = now + + conn = db.get_db() + unknown_runs = conn.execute(''' + SELECT dashboard, name, timestamp_ms, complete_ms, instance_vcpus, spot, + cost_usd, arch, pr_number + FROM ci_runs + WHERE (instance_type IS NULL OR instance_type = '' OR instance_type = 'unknown') + AND timestamp_ms > ? + ''', (int((time.time() - 90 * 86400) * 1000),)).fetchall() + + if not unknown_runs: + return + + try: + import boto3 + except ImportError: + return + + try: + ct = boto3.client('cloudtrail', region_name='us-east-2') + start_time = datetime.fromtimestamp( + min(r['timestamp_ms'] for r in unknown_runs) / 1000 - 300, tz=timezone.utc) + end_time = datetime.now(timezone.utc) + + # Step 1: Fetch RunInstances events in daily chunks → instance_id → type + launch time + run_events = _fetch_cloudtrail_daily(ct, 'RunInstances', start_time, end_time) + instance_types = {} + instance_launch_times = {} + for event in run_events: + try: + detail = json.loads(event.get('CloudTrailEvent', '{}')) + itype = detail.get('requestParameters', {}).get('instanceType', '') + items = (detail.get('responseElements') or {}).get('instancesSet', {}).get('items', []) + for item in items: + iid = item.get('instanceId', '') + item_type = item.get('instanceType', '') or itype + if iid and item_type: + instance_types[iid] = item_type + instance_launch_times[iid] = int(event['EventTime'].timestamp() * 1000) + except Exception: + continue + + if not instance_types: + print("[rk_metrics] CloudTrail: no RunInstances events found") + return + + # Step 2: Fetch CreateTags events in daily chunks. + # Accumulate ALL tags per instance first, then filter to build instances. + # This handles the case where Name, Group, and Dashboard are set in separate + # create-tags API calls (aws_request_instance_type lines 97, 126, 127). + tag_events = _fetch_cloudtrail_daily(ct, 'CreateTags', start_time, end_time) + all_instance_tags = {} + for event in tag_events: + try: + detail = json.loads(event.get('CloudTrailEvent', '{}')) + req = detail.get('requestParameters', {}) + resources = req.get('resourcesSet', {}).get('items', []) + tags = req.get('tagSet', {}).get('items', []) + tag_dict = {t.get('key', ''): t.get('value', '') for t in tags} + for res in resources: + rid = res.get('resourceId', '') + if rid.startswith('i-'): + if rid not in all_instance_tags: + all_instance_tags[rid] = {} + all_instance_tags[rid].update(tag_dict) + except Exception: + continue + + # Filter to build instances + instance_tags = { + iid: tags for iid, tags in all_instance_tags.items() + if tags.get('Group') == 'build-instance' + } + + # Step 3: Join RunInstances + CreateTags by instance_id + instances = [] + for iid, itype in instance_types.items(): + tags = instance_tags.get(iid, {}) + if not tags.get('Name'): + continue + instances.append({ + 'instance_type': itype, + 'launch_ms': instance_launch_times.get(iid, 0), + 'dashboard': tags.get('Dashboard', ''), + 'name_tag': tags.get('Name', ''), + }) + + # Build index: normalized branch name → [instances] + tag_index = {} + for inst in instances: + m = _NAME_TAG_RE.match(inst['name_tag']) + if m: + tag_index.setdefault(m.group(1), []).append(inst) + else: + tag_index.setdefault(inst['name_tag'], []).append(inst) + + # Step 4: Match unknown runs to instances + updated = 0 + for run in unknown_runs: + run_name = run['name'] + run_arch = run['arch'] or '' + run_ts = run['timestamp_ms'] + run_dashboard = run['dashboard'] + + expected_name = _normalize_branch_name(run_name) + candidates = tag_index.get(expected_name, []) + + best = None + for inst in candidates: + # Verify arch matches + if run_arch: + m = _NAME_TAG_RE.match(inst['name_tag']) + if m and m.group(2) != run_arch: + continue + + # Verify dashboard matches (if tag present) + if inst['dashboard'] and inst['dashboard'] != run_dashboard: + continue + + # CI run starts after instance launch; allow up to 90 min (instance lifetime) + delta = run_ts - inst['launch_ms'] + if delta < -60_000 or delta > 5400_000: + continue + + # Prefer most recently launched instance before the run + if delta >= 0 and (best is None or inst['launch_ms'] > best['launch_ms']): + best = inst + elif best is None and abs(delta) < 60_000: + best = inst + + if best: + itype = best['instance_type'] + is_spot = bool(run['spot']) + rate = ec2_pricing.get_instance_rate(itype, is_spot) + new_cost = run['cost_usd'] + if rate and run['complete_ms'] and run['timestamp_ms']: + hours = (run['complete_ms'] - run['timestamp_ms']) / 3_600_000 + new_cost = round(hours * rate, 4) + conn.execute(''' + UPDATE ci_runs SET instance_type = ?, cost_usd = ? + WHERE dashboard = ? AND timestamp_ms = ? AND name = ? + ''', (itype, new_cost, run['dashboard'], run['timestamp_ms'], run['name'])) + updated += 1 + + conn.commit() + if updated: + print(f"[rk_metrics] CloudTrail: resolved {updated}/{len(unknown_runs)} unknown instance types") + else: + print(f"[rk_metrics] CloudTrail: {len(instances)} instances, " + f"0/{len(unknown_runs)} matched") + except Exception as e: + print(f"[rk_metrics] CloudTrail resolution failed: {e}") + + +def recalculate_all_costs(): + """Recalculate cost_usd for all ci_runs based on current instance_type and pricing.""" + conn = db.get_db() + runs = conn.execute(''' + SELECT dashboard, name, timestamp_ms, complete_ms, instance_type, + instance_vcpus, spot, cost_usd + FROM ci_runs + WHERE complete_ms IS NOT NULL AND complete_ms > 0 + ''').fetchall() + updated = 0 + for run in runs: + cost = compute_run_cost({ + 'complete': run['complete_ms'], + 'timestamp': run['timestamp_ms'], + 'instance_type': run['instance_type'] or 'unknown', + 'spot': run['spot'], + 'instance_vcpus': run['instance_vcpus'], + }) + if cost is not None and cost != run['cost_usd']: + conn.execute(''' + UPDATE ci_runs SET cost_usd = ? + WHERE dashboard = ? AND timestamp_ms = ? AND name = ? + ''', (cost, run['dashboard'], run['timestamp_ms'], run['name'])) + updated += 1 + conn.commit() + print(f"[rk_metrics] Recalculated costs: {updated}/{len(runs)} changed") + return updated + + def start_ci_run_sync(redis_conn): """Start periodic CI run + test event sync thread.""" _load_seed_data() + _backfill_daily_stats() + _backfill_test_hashes() + _materialize_ci_run_daily_stats() def loop(): while True: try: sync_ci_runs_to_sqlite(redis_conn) sync_failed_tests_to_sqlite(redis_conn) + resolve_unknown_instance_types() + _materialize_ci_run_daily_stats() except Exception as e: print(f"[rk_metrics] sync error: {e}") time.sleep(600) # check every 10 min (TTL gates actual work) @@ -600,3 +1154,21 @@ def get_flakes_by_command(date_from, date_to, dashboard=''): 'total_failures': sum(failures_by_command.values()), }, } + + +def get_test_history(test_hash: str, branch: str = '', limit: int = 1000) -> list[dict]: + """Get test event history by test_hash, matching Redis history_{hash}[_{branch}] lists.""" + conditions = ['test_hash = ?'] + params: list = [test_hash] + if branch: + conditions.append('ref_name = ?') + params.append(branch) + where = 'WHERE ' + ' AND '.join(conditions) + params.append(limit) + return db.query(f''' + SELECT status, test_cmd, log_url, ref_name, commit_author, + commit_msg, duration_secs, dashboard, timestamp + FROM test_events {where} + ORDER BY timestamp DESC + LIMIT ? + ''', params) diff --git a/ci3/ci-metrics/views/ci-insights.html b/ci3/ci-metrics/views/ci-insights.html index 533b6bfb62cd..69b1d9573d18 100644 --- a/ci3/ci-metrics/views/ci-insights.html +++ b/ci3/ci-metrics/views/ci-insights.html @@ -22,6 +22,15 @@ .msg { color:#888; padding:8px 0; } .msg.err { color:#f85149; } + /* Tabs */ + .tabs { display:flex; gap:0; margin:12px 0 0 0; } + .tab { background:#111; border:1px solid #333; border-bottom:none; color:#888; + font-family:monospace; font-size:13px; padding:6px 16px; cursor:pointer; } + .tab:hover { color:#ccc; } + .tab.active { background:#0a0a0a; color:#fff; border-color:#58a6ff; border-bottom:1px solid #0a0a0a; position:relative; z-index:1; } + .tab-content { display:none; border:1px solid #333; border-top:1px solid #333; padding:12px; background:#0a0a0a; margin-top:-1px; } + .tab-content.active { display:block; } + /* KPI strip */ .kpi-strip { display:flex; gap:12px; margin:16px 0; flex-wrap:wrap; } .kpi { background:#0a0a0a; border:1px solid #222; padding:12px 16px; flex:1; min-width:180px; max-height:120px; overflow:hidden; } @@ -53,7 +62,19 @@ .amt { font-variant-numeric:tabular-nums; text-align:right; } th.amt { text-align:right; } .detail-scroll { max-height:500px; overflow:auto; } + .detail-table { width:100%; border-collapse:collapse; font-size:12px; } + .detail-table th { text-align:left; padding:4px 6px; border-bottom:1px solid #333; color:#888; white-space:nowrap; position:sticky; top:0; background:#0a0a0a; } + .detail-table td { padding:4px 6px; border-bottom:1px solid #111; white-space:nowrap; } + .detail-table .amt { text-align:right; font-variant-numeric:tabular-nums; } + .detail-table th.amt { text-align:right; } + .stats { margin:12px 0; color:#888; } + .stats span { color:#ccc; } + /* Test details */ + .cmd { max-width:500px; overflow:hidden; text-overflow:ellipsis; white-space:nowrap; } + .pass { color:#3fb950; } + .fail { color:#f85149; } + .flake { color:#d29922; } @@ -64,12 +85,16 @@ ci insights -

ci insights

+

ci insights

- + + - + + + + | @@ -82,71 +107,205 @@

ci insights

- - -
-
daily ci spend
--
-
cost / merge
--
-
mq success rate
--
-
flakes / day
--
-
prs merged / day
--
+
+
Overview
+
Attribution
- -
-
-

daily ci cost + 7-day rolling cost per merge

-
+ +
+ -
-

merge queue: daily outcomes + success rate

-
+ +
+
mq success rate
--
+
flakes / day
--
+
prs merged / day
--
+
avg mq duration
--
-
-

flakes + test failures per day

-
+ +
+
+

merge queue: daily outcomes + success rate

+
+
+
+

test outcomes per day

+
+
+
+

ci run duration by pipeline (avg mins)

+
+
+
+

total ci time by pipeline (hours)

+
+
+
+

ci time by pipeline & phase

+
+
+
+ +
+
+
top flaky tests
+
+ + + +
testflakesaffected
+
+
+
+
top failing tests
+
+ + + +
testfailuresaffected
+
+
-
- -
flakes by pipeline
-
- - - -
+
flakes by pipeline
+
+ + + +
+
+ + +
+
+ test details + | + + + + +
+
+
+
+
+
+

avg duration by day

+
+
+
+

test run count by day

+
+
+
+

tests by duration

+
+ + + + + + + + + + + + + + + + +
test commandrunsavg (s)min (s)max (s)total (h)pass %passedfailedflaked
+
+

slowest individual runs

+
+ + + + + + + + + + + + + +
test commandduration (s)statusdateauthorpipelinelog
+
- -
author ci profile
-
- - - -
+ +
+
+ +
+
+
+

ci cost by run type (time series)

+
+
+
+

cost by user (AWS + GCP)

+
+
+
+

cost by run type

+
+
+
+ +
author ci profile
+
+ + + +
+
+ +
instances
+
+ + + +
+
- + diff --git a/ci3/ci-metrics/views/cost-overview.html b/ci3/ci-metrics/views/cost-overview.html index 53424a2d2d70..533e7d732c98 100644 --- a/ci3/ci-metrics/views/cost-overview.html +++ b/ci3/ci-metrics/views/cost-overview.html @@ -76,12 +76,16 @@ ci insights
-

cost overview

+

cost overview

- - + + + + + + | @@ -98,7 +102,6 @@

cost overview

Overview
Resource Details
-
CI Attribution
@@ -136,35 +139,6 @@

aws vs gcp split

-
-
- -
-
-
-

ci cost by run type (time series)

-
-
-
-

cost by user (AWS + GCP)

-
-
-
-

cost by run type

-
-
-
-

instances

-
- - - -
-
-
- + diff --git a/ci3/dashboard/Dockerfile b/ci3/dashboard/Dockerfile index 2da7805ffa83..cd2e5b1f9b1d 100644 --- a/ci3/dashboard/Dockerfile +++ b/ci3/dashboard/Dockerfile @@ -24,4 +24,4 @@ RUN pip install --no-cache-dir -r ci-metrics/requirements.txt RUN git config --global --add safe.directory /aztec-packages COPY . . EXPOSE 8080 8081 -CMD ["gunicorn", "-w", "100", "-b", "0.0.0.0:8080", "rk:app"] +CMD ["gunicorn", "-w", "50", "-b", "0.0.0.0:8080", "rk:app"] diff --git a/ci3/dashboard/rk.py b/ci3/dashboard/rk.py index aedf35a824e2..d4ce27e4c527 100644 --- a/ci3/dashboard/rk.py +++ b/ci3/dashboard/rk.py @@ -27,30 +27,43 @@ Compress(app) auth = HTTPBasicAuth() -# Start the ci-metrics server as a subprocess -# Check sibling dir (repo layout) then subdirectory (Docker layout) +# Start the ci-metrics server as a subprocess (once across all workers). +# Uses a file lock so only the first gunicorn worker to import this module +# actually spawns the process; the rest skip silently. +import fcntl +import signal +import time as _time + _ci_metrics_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), '..', 'ci-metrics') if not os.path.isdir(_ci_metrics_dir): _ci_metrics_dir = os.path.join(os.path.dirname(os.path.abspath(__file__)), 'ci-metrics') if os.path.isdir(_ci_metrics_dir): - # Kill any stale process on the port (e.g. leftover from previous reload) - import signal + _lock_path = f'/tmp/ci-metrics-{CI_METRICS_PORT}.lock' try: - out = subprocess.check_output( - ['lsof', '-ti', f':{CI_METRICS_PORT}'], stderr=subprocess.DEVNULL, text=True) - for pid in out.strip().split('\n'): - if pid: - os.kill(int(pid), signal.SIGTERM) - import time; time.sleep(0.5) - except (subprocess.CalledProcessError, OSError): + _lock_fd = open(_lock_path, 'w') + fcntl.flock(_lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) + # We hold the lock — kill stale process and spawn fresh one + try: + out = subprocess.check_output( + ['lsof', '-ti', f':{CI_METRICS_PORT}'], stderr=subprocess.DEVNULL, text=True) + for pid in out.strip().split('\n'): + if pid: + os.kill(int(pid), signal.SIGTERM) + _time.sleep(0.5) + except (subprocess.CalledProcessError, OSError): + pass + _ci_metrics_env = {**os.environ, 'CI_METRICS_PORT': str(CI_METRICS_PORT)} + subprocess.Popen( + ['gunicorn', '-w', '1', '-b', f'0.0.0.0:{CI_METRICS_PORT}', + '--timeout', '120', 'app:app'], + cwd=_ci_metrics_dir, + env=_ci_metrics_env, + ) + print(f"[rk.py] ci-metrics server started on port {CI_METRICS_PORT}") + # Hold the lock until this process exits so other workers skip + except OSError: + # Another worker already holds the lock — nothing to do pass - _ci_metrics_env = {**os.environ, 'CI_METRICS_PORT': str(CI_METRICS_PORT)} - subprocess.Popen( - ['gunicorn', '-w', '4', '-b', f'0.0.0.0:{CI_METRICS_PORT}', '--timeout', '120', 'app:app'], - cwd=_ci_metrics_dir, - env=_ci_metrics_env, - ) - print(f"[rk.py] ci-metrics server started on port {CI_METRICS_PORT}") def read_from_disk(key): """Read log from disk as fallback when Redis key not found.""" @@ -178,7 +191,6 @@ def root() -> str: f"{hyperlink('/cost-overview', 'cost overview (AWS + GCP)')}\n" f"{hyperlink('/namespace-billing', 'namespace billing')}\n" f"{hyperlink('/ci-insights', 'ci insights')}\n" - f"{hyperlink('/test-timings', 'test timings')}\n" f"{RESET}" ) @@ -363,10 +375,53 @@ def show_section(section): follow='top' ) +def _history_from_api(key): + """Fallback: fetch test history from ci-metrics API when Redis list is empty.""" + # Key format: history_<16-char-hash>[_] + rest = key[len('history_'):] + test_hash = rest[:16] + branch = rest[17:] if len(rest) > 16 and rest[16] == '_' else '' + try: + params = {'branch': branch} if branch else {} + resp = _proxy_session.get( + f'{CI_METRICS_URL}/api/test-history/{test_hash}', + params=params, + auth=(request.authorization.username, request.authorization.password), + timeout=10, + ) + if resp.status_code != 200: + return '' + rows = resp.json() + if not rows: + return '' + lines = [] + for row in rows: + ts = row.get('timestamp', '')[:19].replace('T', ' ') + if len(ts) > 5: + ts = ts[5:] # Strip year → "MM-DD HH:MM:SS" + status = (row.get('status') or '').upper() + color = GREEN if status == 'PASSED' else (RED if status == 'FAILED' else YELLOW) + cmd = row.get('test_cmd', '') + dur = row.get('duration_secs') + dur_str = f' ({int(dur)}s)' if dur else '' + author = row.get('commit_author', '') + msg = row.get('commit_msg', '') + author_str = f' ({author}: {msg})' if author else '' + log_url = row.get('log_url', '') + log_link = f' {hyperlink(log_url, log_url.split("/")[-1])}' if log_url else '' + lines.append(f'{ts}: {color}{BOLD}{status}{RESET}{log_link} {cmd}{dur_str}{author_str}') + return '\n'.join(lines) + except Exception as e: + print(f"[rk.py] history fallback error: {e}") + return '' + + @app.route('/list/') @auth.login_required def get_list(key): value = get_list_as_string(key) + if not value.strip() and key.startswith('history_'): + value = _history_from_api(key) follow = request.args.get('follow', 'top') return render_template_string(TEMPLATE, value=ansi_to_html(value), follow=follow, filter_str='', filter_prop='') @@ -528,32 +583,32 @@ def make_options(param_name, options, current_value, suffix=''): _proxy_session = requests.Session() _HOP_BY_HOP = frozenset([ 'connection', 'keep-alive', 'proxy-authenticate', 'proxy-authorization', - 'te', 'trailers', 'transfer-encoding', 'upgrade', 'content-length', - # `requests` auto-decompresses gzip responses, so Content-Encoding is - # stale — strip it so the browser doesn't try to decompress plain content. - # Flask-Compress on rkapp handles browser compression. - 'content-encoding', + 'te', 'trailers', 'transfer-encoding', 'upgrade', ]) -# Don't forward Accept-Encoding — let `requests` negotiate with ci-metrics -# (it adds its own and auto-decompresses). -_STRIP_REQUEST_HEADERS = frozenset(['host', 'accept-encoding']) +_STRIP_REQUEST_HEADERS = frozenset(['host']) def _proxy(path): - """Forward request to ci-metrics, streaming the response back.""" + """Forward request to ci-metrics, streaming the response back. + + Passes the browser's Accept-Encoding through to ci-metrics so it + compresses directly for the browser. We stream the raw (still + compressed) bytes back without decompression. + """ url = f'{CI_METRICS_URL}/{path.lstrip("/")}' try: + fwd_headers = {k: v for k, v in request.headers if k.lower() not in _STRIP_REQUEST_HEADERS} resp = _proxy_session.request( method=request.method, url=url, params=request.args, data=request.get_data(), - headers={k: v for k, v in request.headers if k.lower() not in _STRIP_REQUEST_HEADERS}, + headers=fwd_headers, stream=True, - timeout=60, + timeout=180, ) - # Strip hop-by-hop headers + # Stream raw bytes (skip requests auto-decompression) headers = {k: v for k, v in resp.headers.items() if k.lower() not in _HOP_BY_HOP} - return Response(resp.iter_content(chunk_size=8192), + return Response(resp.raw.stream(8192), status=resp.status_code, headers=headers) except Exception as e: return Response(json.dumps({'error': f'ci-metrics unavailable: {e}'}), diff --git a/ci3/log_ci_run b/ci3/log_ci_run index b52b93256edc..2c39ee96de9b 100755 --- a/ci3/log_ci_run +++ b/ci3/log_ci_run @@ -35,7 +35,8 @@ if [ -z "$key" ]; then author="$(git log -1 --pretty=format:"%an")" name=$REF_NAME [ "$(aws_get_meta_data instance-life-cycle)" == "spot" ] && spot=true || spot=false - instance_type=$(aws_get_meta_data instance-type 2>/dev/null || echo "unknown") + instance_type=${EC2_INSTANCE_TYPE:-$(aws_get_meta_data instance-type 2>/dev/null)} + instance_type=${instance_type:-unknown} instance_vcpus=$(nproc 2>/dev/null || echo 0) # Extract PR number from branch name or merge queue ref diff --git a/ci3/source_bootstrap b/ci3/source_bootstrap index 1e9fd282b49d..bb7af1801063 100644 --- a/ci3/source_bootstrap +++ b/ci3/source_bootstrap @@ -3,6 +3,7 @@ source $(git rev-parse --show-toplevel)/ci3/source source $ci3/source_refname source $ci3/source_redis +source $ci3/source_phases cmd=${1:-} [ -n "$cmd" ] && shift diff --git a/ci3/source_phases b/ci3/source_phases new file mode 100644 index 000000000000..7b938738bf40 --- /dev/null +++ b/ci3/source_phases @@ -0,0 +1,65 @@ +# Sourced by source_bootstrap. Provides CI phase timing instrumentation. +# Wraps cache_download/cache_upload to auto-publish timing, and provides +# ci_phase for wrapping arbitrary commands. +# +# All timing events are published to Redis channel ci:phase:complete as JSON, +# consumed by the ci-metrics server and stored in the ci_phases table. + +function _publish_phase { + local phase=$1 duration_secs=$2 exit_code=$3 + [ "${CI_REDIS_AVAILABLE:-0}" -eq 1 ] || return 0 + # Skip sub-0.1s phases (no real work done — fully cached or negligible) + (( $(echo "$duration_secs < 0.1" | bc -l) )) && return 0 + local commit_hash=${COMMIT_HASH:-$(git rev-parse HEAD 2>/dev/null || true)} + redis_publish ci:phase:complete "$(jq -c -n \ + --arg phase "$phase" \ + --argjson duration "$duration_secs" \ + --argjson exit_code "$exit_code" \ + --arg run_id "${RUN_ID:-}" \ + --arg job_id "${JOB_ID:-}" \ + --arg dashboard "${CI_DASHBOARD:-}" \ + --arg ref_name "${REF_NAME:-}" \ + --arg commit_hash "$commit_hash" \ + '{phase:$phase, duration_secs:$duration, exit_code:$exit_code, + run_id:$run_id, job_id:$job_id, dashboard:$dashboard, + ref_name:$ref_name, commit_hash:$commit_hash}')" &>/dev/null & +} + +# High-resolution elapsed seconds since $1 (captured via $EPOCHREALTIME). +function _elapsed { + echo "$EPOCHREALTIME - $1" | bc -l | xargs printf '%.3f' +} + +# Shadow the cache_download script on $PATH. +# Phase name = "cache-download:". +function cache_download { + local _t=$EPOCHREALTIME + command cache_download "$@" + local code=$? + _publish_phase "cache-download:${1:-unknown}" "$(_elapsed $_t)" "$code" + return $code +} + +# Shadow the cache_upload script on $PATH. +# Phase name = "cache-upload:". +function cache_upload { + local _t=$EPOCHREALTIME + command cache_upload "$@" + local code=$? + _publish_phase "cache-upload:${1:-unknown}" "$(_elapsed $_t)" "$code" + return $code +} + +# General phase wrapper. +# Usage: ci_phase +# Example: ci_phase "barretenberg" ./barretenberg/cpp/bootstrap.sh +function ci_phase { + local phase_name=$1; shift + local _t=$EPOCHREALTIME + "$@" + local code=$? + _publish_phase "$phase_name" "$(_elapsed $_t)" "$code" + return $code +} + +export -f _publish_phase _elapsed cache_download cache_upload ci_phase diff --git a/noir-projects/noir-contracts/bootstrap.sh b/noir-projects/noir-contracts/bootstrap.sh index e38c87d5ac44..f19a029f4d27 100755 --- a/noir-projects/noir-contracts/bootstrap.sh +++ b/noir-projects/noir-contracts/bootstrap.sh @@ -217,7 +217,7 @@ function build { local contracts="$@" fi set +e - parallel $PARALLEL_FLAGS --joblog joblog.txt -v --line-buffer --tag compile {} $folder_name ::: ${contracts[@]} + parallel $PARALLEL_FLAGS --joblog joblog.txt -v --line-buffer --tag ci_phase "contract:{}" compile {} $folder_name ::: ${contracts[@]} code=$? cat joblog.txt return $code diff --git a/noir-projects/noir-protocol-circuits/bootstrap.sh b/noir-projects/noir-protocol-circuits/bootstrap.sh index 8de829b93502..92aae4fac1c3 100755 --- a/noir-projects/noir-protocol-circuits/bootstrap.sh +++ b/noir-projects/noir-protocol-circuits/bootstrap.sh @@ -174,7 +174,7 @@ function build { fi done | \ parallel -v --line-buffer --tag --halt now,fail=1 --memsuspend $(memsuspend_limit) \ - --joblog joblog.txt compile {} + --joblog joblog.txt ci_phase "circuit:{}" compile {} code=$? cat joblog.txt return $code