|
5 | 5 | import os |
6 | 6 | import sys |
7 | 7 | from dataclasses import dataclass |
| 8 | +from datetime import datetime, timedelta, timezone |
| 9 | +from typing import Optional, Tuple |
8 | 10 |
|
9 | 11 | import yaml |
10 | 12 |
|
@@ -43,6 +45,132 @@ def load_env_map() -> dict[str, EnvConfig]: |
43 | 45 | return env_map |
44 | 46 |
|
45 | 47 |
|
| 48 | +# ------------------------------ |
| 49 | +# Time utilities |
| 50 | +# ------------------------------ |
| 51 | + |
| 52 | + |
| 53 | +def fmt_utc(dt: datetime) -> str: |
| 54 | + return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ") |
| 55 | + |
| 56 | + |
| 57 | +def get_24_hours_window() -> Tuple[datetime, datetime]: |
| 58 | + now = datetime.now(timezone.utc) |
| 59 | + start = now - timedelta(hours=24) |
| 60 | + return start, now |
| 61 | + |
| 62 | + |
| 63 | +# ------------------------------ |
| 64 | +# Filter builders |
| 65 | +# ------------------------------ |
| 66 | + |
| 67 | + |
| 68 | +def common_prefix(ns_re: str) -> str: |
| 69 | + return ( |
| 70 | + 'resource.type="k8s_container" ' |
| 71 | + f'AND resource.labels.namespace_name=~"{ns_re}" ' |
| 72 | + 'AND (logName:"/logs/stdout" OR logName:"/logs/stderr")' |
| 73 | + ) |
| 74 | + |
| 75 | + |
| 76 | +def consensus_height_filter(common: str, height: int) -> str: |
| 77 | + return f'{common} AND jsonPayload.message:"Running consensus for height {height}"' |
| 78 | + |
| 79 | + |
| 80 | +def wide_search_filter(common: str, height: int) -> str: |
| 81 | + return f"""{common} AND ( |
| 82 | + ( |
| 83 | + -- Consensus logs (keyed by height) |
| 84 | + ( |
| 85 | + jsonPayload.spans.height="{height}" |
| 86 | + OR jsonPayload.message:"{height}" |
| 87 | + OR textPayload:"{height}" |
| 88 | + ) |
| 89 | + AND |
| 90 | + resource.labels.container_name="sequencer-core" |
| 91 | + AND |
| 92 | + ( |
| 93 | + jsonPayload.message=~"^START_ROUND_(PROPOSER|VALIDATOR):" |
| 94 | + OR jsonPayload.message:"DECISION_REACHED" |
| 95 | + OR jsonPayload.message:"PROPOSAL_FAILED" |
| 96 | + OR jsonPayload.message=~"(?i)prevote|precommit|propose" |
| 97 | +
|
| 98 | + ) |
| 99 | + ) |
| 100 | + OR |
| 101 | + ( |
| 102 | + -- Batcher logs (keyed by propose_block_input) |
| 103 | + jsonPayload.filename:"apollo_batcher" AND "BlockNumber({height})" |
| 104 | + AND |
| 105 | + ( |
| 106 | + jsonPayload.message:"finishing block building" |
| 107 | + OR jsonPayload.message:"Received final number of transactions in block proposal:" |
| 108 | + OR jsonPayload.message:"Finished building block as proposer" |
| 109 | + ) |
| 110 | + ) |
| 111 | + OR |
| 112 | + ( |
| 113 | + -- Blockifier logs |
| 114 | + jsonPayload.filename:"blockifier" |
| 115 | + AND |
| 116 | + jsonPayload.message:"Block {height} final weights" |
| 117 | + ) |
| 118 | + )""" |
| 119 | + |
| 120 | + |
| 121 | +def add_time_bounds(flt: str, start: datetime, end: datetime) -> str: |
| 122 | + return f'{flt} AND timestamp>="{fmt_utc(start)}" AND timestamp<"{fmt_utc(end)}"' |
| 123 | + |
| 124 | + |
| 125 | +# ------------------------------ |
| 126 | +# Timestamp discovery + windowing |
| 127 | +# ------------------------------ |
| 128 | + |
| 129 | + |
| 130 | +def compute_window( |
| 131 | + args: argparse.Namespace, |
| 132 | + environment: Optional[EnvConfig] = None, |
| 133 | + common_filter_prefix: Optional[str] = None, |
| 134 | +) -> Tuple[datetime, datetime]: |
| 135 | + """Compute the time window based on provided arguments. |
| 136 | +
|
| 137 | + Priority/validation: |
| 138 | + - --auto, --near, --start/--end, --last-24-hours are mutually exclusive |
| 139 | + - --start/--end must be provided together |
| 140 | + - --last-24-hours (or no args) uses (current_time - 24 hours) to current_time window |
| 141 | + - --auto requires environment and common_filter_prefix parameters |
| 142 | + """ |
| 143 | + |
| 144 | + # TODO(lev): Implement all the logic for the different time options |
| 145 | + |
| 146 | + # Default: last 24 hours window |
| 147 | + return get_24_hours_window() |
| 148 | + |
| 149 | + |
| 150 | +def prepare_filter(args, environment) -> Tuple[str, str, str]: |
| 151 | + """Prepare log filter and time bounds. Returns (log_filter, start_time, end_time).""" |
| 152 | + common_filter_prefix = common_prefix(environment.namespace_re) |
| 153 | + wide_filter = wide_search_filter(common_filter_prefix, args.height) |
| 154 | + start_time, end_time = compute_window(args, environment, common_filter_prefix) |
| 155 | + log_filter = add_time_bounds(wide_filter, start_time, end_time) |
| 156 | + |
| 157 | + if args.print_filters: |
| 158 | + print( |
| 159 | + "START_MARKER_FILTER:\n" |
| 160 | + + consensus_height_filter(common_filter_prefix, args.height) |
| 161 | + + "\n" |
| 162 | + ) |
| 163 | + print( |
| 164 | + "END_MARKER_FILTER:\n" |
| 165 | + + consensus_height_filter(common_filter_prefix, args.height + 1) |
| 166 | + + "\n" |
| 167 | + ) |
| 168 | + print("FINAL_FILTER:\n" + log_filter + "\n") |
| 169 | + raise SystemExit(0) |
| 170 | + |
| 171 | + return log_filter, start_time, end_time |
| 172 | + |
| 173 | + |
46 | 174 | # ------------------------------ |
47 | 175 | # Main |
48 | 176 | # ------------------------------ |
@@ -94,9 +222,14 @@ def main() -> int: |
94 | 222 | env_map = load_env_map() |
95 | 223 | args = get_args(env_map) |
96 | 224 |
|
97 | | - env_map[args.env] |
| 225 | + environment = env_map[args.env] |
| 226 | + |
| 227 | + try: |
| 228 | + log_filter, start_time, end_time = prepare_filter(args, environment) |
| 229 | + except Exception as e: |
| 230 | + print(f"Error: {e}", file=sys.stderr) |
| 231 | + return 2 |
98 | 232 |
|
99 | | - # TODO(lev): Add filter preparation |
100 | 233 | # TODO(lev): Add log downloading |
101 | 234 | # TODO(lev): Add report generation |
102 | 235 |
|
|
0 commit comments