diff --git a/AGENTS.md b/AGENTS.md index 6189260e59..d2272d26f0 100644 --- a/AGENTS.md +++ b/AGENTS.md @@ -9,6 +9,10 @@ This document contains key context, nuances, and troubleshooting tips specifical ```bash uv run python -c "from uas_standards.astm.f3548.v21.api import OperationalIntentReference; print(OperationalIntentReference.__annotations__)" ``` +- **uv run Troubleshooting**: If running commands via `uv run` fails due to multi-platform dependency resolution issues (e.g. missing upload dates/wheels in custom package registries), you can force it to use standard PyPI by specifying the `--index` option: + ```bash + PYTHONPATH=. uv run --index https://pypi.org/simple pytest monitoring/uss_qualifier/reports/obfuscation_test.py + ``` ## 2. Navigating Data Schemas - **Implicit Types**: Many schema objects inherit from `ImplicitDict`. This means that reading their raw Python class definitions may not reveal all their expected structure. Rely on their `__annotations__` or their OpenAPI documentation. diff --git a/monitoring/uss_qualifier/reports/README.md b/monitoring/uss_qualifier/reports/README.md index e1038b2742..fdfaa8ae11 100644 --- a/monitoring/uss_qualifier/reports/README.md +++ b/monitoring/uss_qualifier/reports/README.md @@ -25,3 +25,8 @@ The [sequence view artifact](./sequence_view) is a human-readable description/lo ### Globally-expanded report The [globally-expanded report artifact](./globally_expanded/README.md) assembles procedural information about the test run into a single, flat presentation, mimicking what might be seen as output had the automated test been performed manually. + +### Test artifacts obfuscation tool + +The [obfuscation tool](./obfuscate.md) can be used to redact and pseudo-anonymize participant IDs, server hostnames, and authorization tokens from a collection of test artifacts before sharing or publishing. + diff --git a/monitoring/uss_qualifier/reports/obfuscate.md b/monitoring/uss_qualifier/reports/obfuscate.md new file mode 100644 index 0000000000..934c077f8e --- /dev/null +++ b/monitoring/uss_qualifier/reports/obfuscate.md @@ -0,0 +1,53 @@ +# Test Artifacts Obfuscator (Anonymizer) + +The `obfuscate.py` tool is a utility designed to replace potentially sensitive or uniquely-identifying information in `uss_qualifier` test artifacts with generic, pseudo-anonymized values. This can be useful if otherwise hesitant to submit uss_qualifier artifacts publicly when requesting help or reporting issues. + +## Capabilities + +The tool can obfuscate three main types of identifying information: +- **Participant IDs**: Replaces all detected participant IDs (both individual and aggregate ones) with generic values (e.g., `participant1`, `participant2`). +- **Server Hostnames**: Detects URLs within files and replaces their hostnames with generic values (e.g., `host1`, `host2`), keeping any port numbers intact. +- **Authorization Tokens**: Redacts any authorization bearer tokens present in request headers (e.g., replacing them with `Bearer REDACTED`). + +By default, all of these options are enabled. + +## Inputs and Outputs + +The tool accepts: +- A local directory containing test artifacts, or +- A `.zip` archive containing test artifacts. + +It will produce the corresponding output format matching your path (either a local directory or a `.zip` archive). + +## How to Get Detailed Option Help + +Detailed information on command-line options and toggles can be retrieved using the `--help` flag: + +```bash +PYTHONPATH=. uv run monitoring/uss_qualifier/reports/obfuscate.py --help +``` + +## Running Locally + +To run the obfuscation tool locally: + +```bash +PYTHONPATH=. uv run monitoring/uss_qualifier/reports/obfuscate.py +``` + +## Running via Docker + +If you prefer to run the tool within a container, build the docker image using `make image` from the repo root, and execute: + +```bash +docker run --rm \ + -v "/path/to/local/input_artifacts:/input" \ + -v "/path/to/local/output_dir:/output" \ + interuss/monitoring \ + uv run uss_qualifier/reports/obfuscate.py /input /output/obfuscated_artifacts.zip +``` + +Ensure that you mount the correct local directories to access your input artifacts and retrieve your obfuscated output. + +> [!WARNING] +> This tool performs pseudo-anonymization based on heuristics and automated string scanning. It does not guarantee complete anonymization. Review the obfuscated artifacts for any remaining sensitive information before publishing or distributing them when appropriate. diff --git a/monitoring/uss_qualifier/reports/obfuscate.py b/monitoring/uss_qualifier/reports/obfuscate.py new file mode 100644 index 0000000000..ed56f72f52 --- /dev/null +++ b/monitoring/uss_qualifier/reports/obfuscate.py @@ -0,0 +1,58 @@ +import argparse +import sys + +from loguru import logger + +from monitoring.uss_qualifier.reports.obfuscation import ( + ObfuscatorConfig, + obfuscate_artifacts, +) + + +def main() -> int: + parser = argparse.ArgumentParser( + description="Obfuscate test artifacts by anonymizing participant IDs, hostnames, and tokens." + ) + parser.add_argument( + "input", help="Path to the input folder or .zip file containing test artifacts." + ) + parser.add_argument( + "output", + help="Path where the obfuscated folder or .zip file should be written.", + ) + parser.add_argument( + "--no-participants", + action="store_true", + help="Disable obfuscation of participant IDs.", + ) + parser.add_argument( + "--no-hostnames", + action="store_true", + help="Disable obfuscation of server/hostnames.", + ) + parser.add_argument( + "--no-tokens", + action="store_true", + help="Disable redaction of authorization bearer tokens.", + ) + + args = parser.parse_args() + + config = ObfuscatorConfig( + obfuscate_participants=not args.no_participants, + obfuscate_hostnames=not args.no_hostnames, + obfuscate_tokens=not args.no_tokens, + ) + + try: + logger.info(f"Starting obfuscation of {args.input} to {args.output}") + obfuscate_artifacts(args.input, args.output, config) + logger.info("Obfuscation completed successfully.") + return 0 + except Exception as e: + logger.exception(f"Obfuscation failed: {e}") + return 1 + + +if __name__ == "__main__": + sys.exit(main()) diff --git a/monitoring/uss_qualifier/reports/obfuscation.py b/monitoring/uss_qualifier/reports/obfuscation.py new file mode 100644 index 0000000000..59b1cda05c --- /dev/null +++ b/monitoring/uss_qualifier/reports/obfuscation.py @@ -0,0 +1,302 @@ +import json +import os +import re +import shutil +import tempfile +import zipfile +from typing import Any +from urllib.parse import urlparse + +from implicitdict import ImplicitDict +from loguru import logger + +WHITELIST_HOSTNAMES = { + "127.0.0.1", + "github.com", + "localhost", + "maps.google.com", + "raw.githubusercontent.com", + "schemas.openapi.org", + "w3.org", + "www.google.com", + "www.opengis.net", + "www.w3.org", +} + + +class ObfuscatorConfig(ImplicitDict): + obfuscate_participants: bool = True + obfuscate_hostnames: bool = True + obfuscate_tokens: bool = True + + +def find_urls(text: str) -> list[str]: + # Match strings starting with http:// or https:// + pattern = r"https?://[a-zA-Z0-9.:/\-_~%#?=&@+;!*()\[\]]+" + raw_urls = re.findall(pattern, text) + cleaned_urls = [] + for url in raw_urls: + while url and url[-1] in (".", ",", ";", "?", "!", ")", "]", ">"): + url = url[:-1] + if url: + cleaned_urls.append(url) + return cleaned_urls + + +def get_hostname(url: str) -> str | None: + try: + parsed = urlparse(url) + return parsed.hostname + except Exception: + return None + + +def scan_json(obj, participant_ids: set[str], hostnames: set[str]) -> None: + if isinstance(obj, dict): + for k, v in obj.items(): + if k in ("participant_id", "participant") and isinstance(v, str): + participant_ids.add(v) + elif k in ("participants", "participant_ids") and isinstance(v, list): + for item in v: + if isinstance(item, str): + participant_ids.add(item) + elif k in ( + "participant_requirements", + "aggregate_participants", + "participant_verifications", + ) and isinstance(v, dict): + for p_id in v.keys(): + participant_ids.add(p_id) + if k == "aggregate_participants": + for sub_list in v.values(): + if isinstance(sub_list, list): + for p_id in sub_list: + if isinstance(p_id, str): + participant_ids.add(p_id) + elif k == "manager" and isinstance(v, str): + participant_ids.add(v) + scan_json(v, participant_ids, hostnames) + elif isinstance(obj, list): + for item in obj: + scan_json(item, participant_ids, hostnames) + elif isinstance(obj, str): + scan_text(obj, hostnames) + + +def scan_text(text: str, hostnames: set[str]): + for url in find_urls(text): + h = get_hostname(url) + if h: + hostnames.add(h) + + +def obfuscate_string( + s: str, + participant_map: dict[str, str], + hostname_map: dict[str, str], + config: ObfuscatorConfig, +) -> str: + if not s: + return s + + # 1. Obfuscate tokens + if config.obfuscate_tokens: + s = re.sub(r"(?i)\bBearer\s+\S+", "Bearer REDACTED", s, flags=re.IGNORECASE) + + # 2. Obfuscate hostnames + if config.obfuscate_hostnames: + for h, mapped_h in hostname_map.items(): + s = re.sub(rf"\b{re.escape(h)}\b", mapped_h, s, flags=re.IGNORECASE) + + # 3. Obfuscate participants + if config.obfuscate_participants: + for pid, mapped_pid in participant_map.items(): + s = re.sub(rf"\b{re.escape(pid)}\b", mapped_pid, s) + + return s + + +def obfuscate_json_obj( + obj, + participant_map: dict[str, str], + hostname_map: dict[str, str], + config: ObfuscatorConfig, +) -> Any: + if isinstance(obj, dict): + new_dict = {} + for k, v in obj.items(): + new_k = obfuscate_string(k, participant_map, hostname_map, config) + if ( + config.obfuscate_tokens + and new_k.lower() == "authorization" + and isinstance(v, str) + ): + if v.lower().startswith("bearer "): + new_dict[new_k] = "Bearer REDACTED" + else: + new_dict[new_k] = "REDACTED" + else: + new_dict[new_k] = obfuscate_json_obj( + v, participant_map, hostname_map, config + ) + return new_dict + elif isinstance(obj, list): + return [ + obfuscate_json_obj(item, participant_map, hostname_map, config) + for item in obj + ] + elif isinstance(obj, str): + return obfuscate_string(obj, participant_map, hostname_map, config) + else: + return obj + + +def obfuscate_path_component( + name: str, participant_map: dict[str, str], config: ObfuscatorConfig +) -> str: + if config.obfuscate_participants: + for pid, mapped_pid in participant_map.items(): + name = re.sub(rf"\b{re.escape(pid)}\b", mapped_pid, name) + return name + + +def obfuscate_relative_path( + rel_path: str, participant_map: dict[str, str], config: ObfuscatorConfig +) -> str: + parts = rel_path.split(os.sep) + obfuscated_parts = [ + obfuscate_path_component(p, participant_map, config) for p in parts + ] + return os.sep.join(obfuscated_parts) + + +def obfuscate_directory( + input_dir: str, output_dir: str, config: ObfuscatorConfig +) -> None: + # Pass 1: Learn/scan + participant_ids = set() + hostnames = set() + + for root, _, files in os.walk(input_dir): + for file in files: + file_path = os.path.join(root, file) + if file.lower().endswith(".json"): + try: + with open(file_path, encoding="utf-8", errors="replace") as f: + data = json.load(f) + scan_json(data, participant_ids, hostnames) + except Exception as e: + logger.warning(f"Failed to scan JSON file {file_path}: {e}") + elif file.lower().endswith((".html", ".kml", ".yaml", ".yml", ".md")): + try: + with open(file_path, encoding="utf-8", errors="replace") as f: + content = f.read() + scan_text(content, hostnames) + except Exception as e: + logger.warning(f"Failed to scan text file {file_path}: {e}") + + # Clean participant IDs and hostnames + participant_ids = {p for p in participant_ids if p} + hostnames = {h for h in hostnames if h and h not in WHITELIST_HOSTNAMES} + + # Generate maps + participant_map = {} + for idx, pid in enumerate( + sorted(sorted(participant_ids), key=len, reverse=True), start=1 + ): + participant_map[pid] = f"participant{idx}" + + hostname_map = {} + for idx, h in enumerate(sorted(sorted(hostnames), key=len, reverse=True), start=1): + hostname_map[h] = f"host{idx}" + + logger.info(f"Detected participants to obfuscate: {list(participant_map.keys())}") + logger.info(f"Detected hostnames to obfuscate: {list(hostname_map.keys())}") + + # Pass 2: Write obfuscated files + for root, _, files in os.walk(input_dir): + for file in files: + input_file_path = os.path.join(root, file) + rel_path = os.path.relpath(input_file_path, input_dir) + obf_rel_path = obfuscate_relative_path(rel_path, participant_map, config) + output_file_path = os.path.join(output_dir, obf_rel_path) + + os.makedirs(os.path.dirname(output_file_path), exist_ok=True) + + if file.lower().endswith(".json"): + try: + with open(input_file_path, encoding="utf-8", errors="replace") as f: + sample = f.read(100) + pretty = "\n" in sample + f.seek(0) + data = json.load(f) + obfuscated_data = obfuscate_json_obj( + data, participant_map, hostname_map, config + ) + with open(output_file_path, "w", encoding="utf-8") as f: + if pretty: + json.dump(obfuscated_data, f, indent=2) + else: + json.dump(obfuscated_data, f) + except Exception as e: + logger.error( + f"Failed to obfuscate JSON file {input_file_path}: {e}" + ) + elif file.lower().endswith((".html", ".kml", ".yaml", ".yml", ".md")): + try: + with open(input_file_path, encoding="utf-8", errors="replace") as f: + content = f.read() + obfuscated_content = obfuscate_string( + content, participant_map, hostname_map, config + ) + with open(output_file_path, "w", encoding="utf-8") as f: + f.write(obfuscated_content) + except Exception as e: + logger.error( + f"Failed to obfuscate text file {input_file_path}: {e}" + ) + else: + try: + shutil.copy2(input_file_path, output_file_path) + except Exception as e: + logger.error(f"Failed to copy file {input_file_path}: {e}") + + +def obfuscate_artifacts( + input_path: str, output_path: str, config: ObfuscatorConfig +) -> None: + input_is_zip = zipfile.is_zipfile(input_path) or input_path.lower().endswith(".zip") + output_is_zip = output_path.lower().endswith(".zip") + + with ( + tempfile.TemporaryDirectory() as tmp_in_dir, + tempfile.TemporaryDirectory() as tmp_out_dir, + ): + if input_is_zip: + logger.info(f"Extracting input zip {input_path} to temporary directory") + with zipfile.ZipFile(input_path, "r") as zip_ref: + zip_ref.extractall(tmp_in_dir) + actual_in_dir = tmp_in_dir + else: + actual_in_dir = input_path + + if output_is_zip: + actual_out_dir = tmp_out_dir + else: + actual_out_dir = output_path + os.makedirs(actual_out_dir, exist_ok=True) + + obfuscate_directory(actual_in_dir, actual_out_dir, config) + + if output_is_zip: + logger.info(f"Packaging output to zip {output_path}") + parent_dir = os.path.dirname(os.path.abspath(output_path)) + if parent_dir: + os.makedirs(parent_dir, exist_ok=True) + + with zipfile.ZipFile(output_path, "w", zipfile.ZIP_DEFLATED) as zip_write: + for root, _, files in os.walk(actual_out_dir): + for file in files: + full_path = os.path.join(root, file) + rel_path = os.path.relpath(full_path, actual_out_dir) + zip_write.write(full_path, rel_path) diff --git a/monitoring/uss_qualifier/reports/obfuscation_test.py b/monitoring/uss_qualifier/reports/obfuscation_test.py new file mode 100644 index 0000000000..09be4e5cbd --- /dev/null +++ b/monitoring/uss_qualifier/reports/obfuscation_test.py @@ -0,0 +1,161 @@ +import json +import os +import tempfile + +from monitoring.uss_qualifier.reports.obfuscation import ( + ObfuscatorConfig, + find_urls, + obfuscate_directory, + obfuscate_json_obj, + obfuscate_string, + scan_json, + scan_text, +) + + +def test_find_urls(): + text = ( + "Check http://dss1.uss1.localutm/dss/v1, or go to https://github.com/interuss. " + + "Also see (http://localhost:8082/status). And https://UPPERCASEHOST.com/foo. " + + "With a user, we have http://user@localhost and a password too with http://user:password@localhost. " + + "A query parameter like http://localhost?q=a2 shouldnt' hurt." + ) + urls = find_urls(text) + assert "http://dss1.uss1.localutm/dss/v1" in urls + assert "https://github.com/interuss" in urls + assert "http://localhost:8082/status" in urls + assert "https://UPPERCASEHOST.com/foo" in urls + assert "http://user@localhost" in urls + assert "http://user:password@localhost" in urls + assert "http://localhost?q=a2" in urls + + +def test_scan_text(): + text = "A plain url at https://UPPERCASEHOST.com/foo" + hostnames = set() + scan_text(text, hostnames) + assert "uppercasehost.com" in hostnames # urlparse converts to lowercase + + +def test_scan_json(): + obj = { + "participant_id": "uss1_core", + "participants": ["uss1_core", "uss2_core"], + "url": "http://dss1.uss3.localutm/dss/v1", + "urls": ["http://dss1.uss4.localutm/dss/v1"], + "report": { + "queries": [ + { + "request": { + "url": "http://dss1.uss1.localutm/dss/v1", + "urls": ["http://dss1.uss2.localutm/dss/v1"], + "headers": {"Authorization": "Bearer mytoken"}, + } + } + ] + }, + } + participant_ids = set() + hostnames = set() + scan_json(obj, participant_ids, hostnames) + assert participant_ids == {"uss1_core", "uss2_core"} + assert hostnames == { + "dss1.uss3.localutm", + "dss1.uss4.localutm", + "dss1.uss1.localutm", + "dss1.uss2.localutm", + } + + +def test_obfuscate_string(): + participant_map = {"uss1": "participant1", "mock_uss": "participant2"} + hostname_map = { + "scdsc.uss1.localutm": "host1", + "dss1.uss1.localutm": "host2", + "uppercasehost.com": "host3", + } + config = ObfuscatorConfig() + + text = "Authorization: Bearer eyJhbGci.eyJzdWIiOiJ1c3MifQ.abc-def. Also call http://scdsc.uss1.localutm/mock/scd for mock_uss and uss1. And https://UPPERCASEHOST.com/foo" + obf = obfuscate_string(text, participant_map, hostname_map, config) + + assert "Bearer REDACTED" in obf + assert "http://host1/mock/scd" in obf + assert "participant2" in obf + assert "participant1" in obf + assert "https://host3/foo" in obf + + +def test_obfuscate_json_obj(): + participant_map = {"uss1": "participant1"} + hostname_map = {"dss1.uss1.localutm": "host1"} + config = ObfuscatorConfig() + + obj = { + "participant_id": "uss1", + "nested": { + "url": "http://dss1.uss1.localutm/dss", + "Authorization": "Bearer token123", + }, + "list_field": ["uss1", "other"], + } + + obfuscated = obfuscate_json_obj(obj, participant_map, hostname_map, config) + assert obfuscated["participant_id"] == "participant1" + assert obfuscated["nested"]["url"] == "http://host1/dss" + assert obfuscated["nested"]["Authorization"] == "Bearer REDACTED" + assert obfuscated["list_field"] == ["participant1", "other"] + + +def test_obfuscate_directory(): + config = ObfuscatorConfig() + with ( + tempfile.TemporaryDirectory() as in_dir, + tempfile.TemporaryDirectory() as out_dir, + ): + # Create a dummy structure + report_data = { + "participant_id": "uss1_core", + "participants": ["uss1_core", "uss2_core"], + "report": { + "queries": [ + { + "request": { + "url": "http://dss1.uss1.localutm/dss/v1", + "headers": {"Authorization": "Bearer mytoken"}, + } + } + ] + }, + } + + # Write to in_dir + with open(os.path.join(in_dir, "report.json"), "w") as f: + json.dump(report_data, f, indent=2) + + html_content = "Link to http://dss1.uss1.localutm/dss and uss1_core." + os.makedirs(os.path.join(in_dir, "gate3"), exist_ok=True) + with open(os.path.join(in_dir, "gate3", "uss1_core.html"), "w") as f: + f.write(html_content) + + obfuscate_directory(in_dir, out_dir, config) + + # Verify renaming + assert os.path.exists(os.path.join(out_dir, "gate3", "participant1.html")) + + # Verify content + with open(os.path.join(out_dir, "gate3", "participant1.html")) as f: + obf_html = f.read() + assert "participant1" in obf_html + assert "host1" in obf_html + + with open(os.path.join(out_dir, "report.json")) as f: + obf_json = json.load(f) + assert obf_json["participant_id"] == "participant1" + assert ( + obf_json["report"]["queries"][0]["request"]["url"] == "http://host1/dss/v1" + ) + assert ( + obf_json["report"]["queries"][0]["request"]["headers"]["Authorization"] + == "Bearer REDACTED" + )