diff --git a/src/watchmen/ax_dispatch.py b/src/watchmen/ax_dispatch.py new file mode 100644 index 0000000..c47c8e3 --- /dev/null +++ b/src/watchmen/ax_dispatch.py @@ -0,0 +1,100 @@ +"""Experimental: turn a cross-runtime route decision into a real delegation +via AX (google/ax) instead of only advising the user. + +A ``switch-harness`` winner belongs to a different runtime, so watchmen can't +emit a native artifact the source harness can execute (see the advisory path in +``route_rewrite``). When AX is configured — a running ``ax serve`` with wrappers +for the target harness — we instead emit a dispatcher that shells out to +``ax exec --agent ``, so the source agent delegates the skill to the +winning harness for real. When AX is not configured, callers fall back to the +advisory line, so default behavior is unchanged. + +Gating is env-only, opt-in: + + WATCHMEN_AX_SERVER gRPC address of a running ``ax serve`` (e.g. + ``localhost:8494``). Unset disables watchmen's AX dispatch + (the advisory fallback). NB: ``ax exec`` with an empty + ``--server`` actually spins a *local* built-in server off + ``ax.yaml`` — so "unset" is watchmen's gate, not AX's. + WATCHMEN_AX_BIN path to the ``ax`` binary (default: ``ax`` on PATH). + +AX CLI state (verified against google/ax @ 2026-06-09): + - ``ax exec`` flags: ``--agent --server --input --conversation --resume + --last-seq --config``. The old ``--once`` is GONE (see ax_exec_command). + - ``ax serve`` runs the controller as a gRPC server (address from ax.yaml). + - ``ax fork`` (``--src-conversation/--src-seq/--dest-conversation``) forks an + event log from a checkpoint — the native primitive for the fork-and-race + delegation trigger (#96); watchmen wouldn't need to hand-roll it. + - Faithful cross-execution resume is still gated by google/ax#19 + (``internal_only`` messages not replayed) — OPEN as of this date. So the + "headless now, AX-native later" call (#96) stands; this dispatch stays + experimental and unverified end-to-end until a local ``ax`` is wired up. +""" + +from __future__ import annotations + +import os + +# watchmen harness slug -> AX agent id as registered in the running ``ax serve``. +# Only harnesses with an AX wrapper can be dispatch *targets*; anything not +# listed falls back to the advisory path. +HARNESS_TO_AX_AGENT = { + "claude_code": "claude-code", + "codex": "codex", +} + + +def ax_server() -> str | None: + """gRPC address of the running ``ax serve``, or None when AX dispatch is off.""" + return os.environ.get("WATCHMEN_AX_SERVER") or None + + +def ax_bin() -> str: + """Path to the ``ax`` binary (``ax`` on PATH by default).""" + return os.environ.get("WATCHMEN_AX_BIN", "ax") + + +def ax_agent_for(harness: str | None) -> str | None: + """The AX agent id that can run ``harness``'s runtime, or None when AX is + unconfigured or no wrapper exists for it. This is the availability gate the + rewriter checks before choosing AX dispatch over the advisory fallback.""" + if not harness or ax_server() is None: + return None + return HARNESS_TO_AX_AGENT.get(harness) + + +def ax_exec_command(*, agent: str, model: str | None, workspace: str | None) -> str: + """Build the ``ax exec`` invocation a dispatcher subagent should run. + + ``ax exec`` has no ``--workspace`` / ``--model`` flags, so both ride the + wrapper's ``[workspace]`` / ``[model]`` header convention inside ``--input``. + The concrete task isn't known until dispatch time, so the returned command + carries a literal ```` placeholder the subagent replaces with the + request it was handed. + """ + headers: list[str] = [] + if model: + headers.append(f"[model] {model}") + if workspace: + headers.append(f"[workspace] {workspace}") + header_block = ("\n".join(headers) + "\n\n") if headers else "" + # Headless single-shot. `ax exec` is a REPL: it runs the turn seeded by + # `--input`, then loops and prompts for the next message (cmd/ax/exec.go + # execLoop → promptUser, verified against google/ax @ 2026-06-09). There is + # NO `--once` flag anymore — the spike used to pass it and current AX would + # reject it. Redirecting stdin from /dev/null makes that post-turn prompt + # hit EOF and stop instead of hanging a non-TTY dispatcher shell, so the + # call bounds to the single seeded turn (turn 1 never prompts — `--input` + # is non-empty). NOTE: the exact exit-code on EOF (vs a clean `q`) is + # unverified pending a local `ax` build; this is the experiment's known gap. + # AX has no `--workspace` / `--model` flags, so both ride the wrapper's + # `[workspace]` / `[model]` header convention inside `--input`. + # + # Single-quoted input preserves newlines and spares the subagent from + # escaping the task body. (A task containing a single quote would break the + # quoting — acceptable for the experiment; a wrapper script is the hardening + # path.) + return ( + f"{ax_bin()} exec --server {ax_server()} --agent {agent} " + f"--input '{header_block}' < /dev/null" + ) diff --git a/src/watchmen/route_rewrite.py b/src/watchmen/route_rewrite.py index 0e5b396..560ac7a 100644 --- a/src/watchmen/route_rewrite.py +++ b/src/watchmen/route_rewrite.py @@ -41,6 +41,7 @@ from datetime import datetime, timezone from pathlib import Path +from watchmen.ax_dispatch import ax_agent_for, ax_exec_command from watchmen.route import ( RouteDecision, RouteResult, @@ -109,6 +110,7 @@ def apply_route_rewrites( outcomes: list[RewriteOutcome] = [] dispatch_sentences: dict[str, str] = {} advisory_harnesses: set[str] = set() + ax_routed_harnesses: set[str] = set() for decision in actionable: harness = decision.harness @@ -131,6 +133,29 @@ def apply_route_rewrites( or not provider_supports_model(decision.recommended_model, native) ) if cross_runtime: + # Experimental: if AX is configured and the winning harness has an + # AX wrapper, dispatch the skill there for real instead of only + # advising. The *target* is reached uniformly via `ax exec`; only + # the *source* needs a per-harness dispatcher (how that runtime + # invokes the AX call), so support is keyed by source harness in + # _AX_SOURCE_EMITTERS. Any gap — AX off, no wrapper for the target, + # or an unsupported source — falls through to the advisory line + # below (unchanged default behavior). + ax_agent = ax_agent_for(decision.recommended_harness) + ax_emitter = _AX_SOURCE_EMITTERS.get(harness) if ax_agent else None + if ax_emitter is not None: + outcome, sentence = ax_emitter( + decision=decision, + bucket=result.config.bucket, + repo_root=repo, + ax_agent=ax_agent, + dry_run=dry_run, + ) + outcomes.append(outcome) + dispatch_sentences[harness] = sentence + ax_routed_harnesses.add(harness) + continue + advisory_harnesses.add(harness) dispatch_sentences[harness] = _advisory_sentence( decision, bucket=result.config.bucket @@ -177,7 +202,10 @@ def apply_route_rewrites( skill_md_path=skill_md_path, dispatch_sentences=dispatch_sentences, decisions={d.harness: d for d in actionable}, - advisory_harnesses=advisory_harnesses, + # Both advisory and AX-routed entries carry the model in their own + # sentence, so suppress the independent `Recommended model:` line + # (which would otherwise re-print the foreign model + label). + advisory_harnesses=advisory_harnesses | ax_routed_harnesses, run_id=result.run_id, dry_run=dry_run, ) @@ -265,6 +293,145 @@ def _claude_agent_body(decision: RouteDecision, *, bucket: str) -> str: ) +# ─── AX cross-runtime dispatch (experimental) ──────────────────────── + +def _emit_ax_dispatch_claude_code( + *, + decision: RouteDecision, + bucket: str, + repo_root: str | None, + ax_agent: str, + dry_run: bool, +) -> tuple[RewriteOutcome, str]: + """Emit a Claude Code subagent that delegates the skill to the winning + harness through AX, instead of a (non-runnable) native router pinned to a + foreign model. The subagent shells out to ``ax exec --agent ``; the + ``[model]`` / ``[workspace]`` headers ride inside ``--input`` since the CLI + has no flags for them. Same artifact path as the native router so it + replaces, rather than stacks with, the would-be broken file. + """ + target = _harness_display_name(decision.recommended_harness) + name = f"{bucket}-router" + command = ax_exec_command( + agent=ax_agent, + model=decision.recommended_model, + workspace=repo_root, + ) + body = _ax_dispatch_body( + decision, bucket=bucket, target=target, command=command + ) + path, fell_back = _pick_router_path( + repo_local=Path(repo_root) / ".claude" / "agents" / f"{name}.md" if repo_root else None, + user_global=Path.home() / ".claude" / "agents" / f"watchmen-route-{bucket}.md", + ) + action = _write_file(path, body, dry_run=dry_run) + dispatch_name = name if not fell_back else f"watchmen-route-{bucket}" + sentence = ( + f"In Claude Code, dispatch via the Task tool with " + f"`subagent_type=\"{dispatch_name}\"`; it routes `{bucket}` to {target} " + f"via AX (`ax exec --agent {ax_agent}`)." + ) + return ( + RewriteOutcome( + harness="claude_code", + artifact_kind="ax-router", + path=str(path), + action=action, + reason=( + f"AX dispatch to {decision.recommended_harness}" + + (" (user-global fallback)" if fell_back else "") + ), + ), + sentence, + ) + + +def _ax_dispatch_body( + decision: RouteDecision, *, bucket: str, target: str, command: str +) -> str: + name = f"{bucket}-router" + # No `model:` line: the orchestrating subagent runs under Claude's default + # and only brokers the AX call — the real work happens on `target` under + # `recommended_model`, never on a Claude model. + return ( + "---\n" + f"name: {name}\n" + f"description: Watchmen-routed (AX): delegate the `{bucket}` skill to " + f"{target}. {decision.note}\n" + "tools: '*'\n" + "---\n" + "\n" + f"# {name}\n" + "\n" + f"Watchmen determined the `{bucket}` skill runs better on {target} than " + "on Claude Code, so you delegate it there through AX (google/ax) rather " + "than running it yourself.\n" + "\n" + "When the main agent hands you a task for this skill:\n" + "\n" + f"1. Read `bundles//skills/{bucket}/SKILL.md` for the " + "operational guidance.\n" + "2. Run it on " + f"{target} via AX, replacing `` with the concrete request " + "(include the SKILL.md guidance the work needs):\n" + "\n" + f" {command}\n" + "\n" + "3. Return AX's output to the main agent verbatim.\n" + "\n" + f"Do not attempt the work under a Claude model — the route picked " + f"{target} (`{decision.recommended_model}`) because: {decision.note}\n" + ) + + +def _emit_ax_dispatch_codex( + *, + decision: RouteDecision, + bucket: str, + repo_root: str | None, + ax_agent: str, + dry_run: bool, +) -> tuple[RewriteOutcome, str]: + """Codex has no subagent-file mechanism (its native artifact is a model + profile, which only overrides the model — it can't invoke another runtime). + So the cross-runtime dispatch lives entirely in the SKILL.md dispatch + sentence: the Codex session runs ``ax exec`` itself during the skill. No + profile file is written. + """ + target = _harness_display_name(decision.recommended_harness) + command = ax_exec_command( + agent=ax_agent, + model=decision.recommended_model, + workspace=repo_root, + ) + sentence = ( + f"In Codex, the `{bucket}` skill is routed to {target} via AX. Run the " + f"skill by executing `{command}` (replace `` with the concrete " + "request, including the SKILL.md guidance the work needs) and use AX's " + "output as the result — do not run it under a Codex model." + ) + return ( + RewriteOutcome( + harness="codex", + artifact_kind="ax-dispatch", + path="", # inline in SKILL.md; no per-harness file + action="inline", + reason=f"AX dispatch to {decision.recommended_harness}", + ), + sentence, + ) + + +# Source harness -> AX dispatcher. The target is reached uniformly through +# `ax exec`; only the entry point differs per source runtime. claude_code +# writes a subagent file; codex inlines the call into SKILL.md. Sources absent +# here (opencode/pi) fall back to the advisory path. +_AX_SOURCE_EMITTERS = { + "claude_code": _emit_ax_dispatch_claude_code, + "codex": _emit_ax_dispatch_codex, +} + + # ─── codex ─────────────────────────────────────────────────────────── def _emit_codex( diff --git a/tests/test_route.py b/tests/test_route.py index ba16835..058d32f 100644 --- a/tests/test_route.py +++ b/tests/test_route.py @@ -697,6 +697,133 @@ def test_switch_harness_advises_instead_of_writing_unrunnable_file(tmp_path, mon assert "Recommended model:" not in skill_md +def test_switch_harness_dispatches_via_ax_when_configured(tmp_path, monkeypatch): + """With WATCHMEN_AX_SERVER set and an AX wrapper for the winning harness, a + switch-harness decision emits a dispatcher subagent that calls `ax exec` + instead of the advisory line — turning the recommendation into a real + cross-runtime delegation. The frontmatter must NOT pin the foreign model.""" + from watchmen.route_rewrite import apply_route_rewrites + + monkeypatch.setenv("WATCHMEN_AX_SERVER", "localhost:8494") + monkeypatch.setenv("WATCHMEN_AX_BIN", "ax") + + repo = tmp_path / "src-repo" + repo.mkdir() + _setup_project(tmp_path, monkeypatch, "p", str(repo)) + + skill_dir = tmp_path / "bundles" / "p" / "skills" / "demo-skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: demo-skill\n---\nbody\n", encoding="utf-8", + ) + result = _route_result(tmp_path, "demo-skill", [ + _decision("claude_code", "anthropic/claude-opus-4-7", + "openai/gpt-5-codex", label="switch-harness", + recommended_harness="codex"), + ]) + outcomes = apply_route_rewrites(result, repo_root=str(repo)) + + # A dispatcher IS written this time (unlike the advisory path). + router = repo / ".claude" / "agents" / "demo-skill-router.md" + assert router.exists() + body = router.read_text(encoding="utf-8") + # It delegates through AX to the winning harness's AX agent... + assert "ax exec --server localhost:8494 --agent codex" in body + assert "--once" not in body # removed: gone from current ax exec + assert "< /dev/null" in body # headless: EOF the post-turn REPL prompt + assert "[model] openai/gpt-5-codex" in body + assert str(repo) in body # [workspace] header + assert "" in body # task placeholder + # ...and never pins the foreign model in the Claude subagent frontmatter. + assert "model: openai/gpt-5-codex" not in body + + by_kind = {(o.harness, o.artifact_kind): o for o in outcomes} + assert ("claude_code", "ax-router") in by_kind + assert ("claude_code", "advisory") not in by_kind + + skill_md = (skill_dir / "SKILL.md").read_text() + assert "via AX" in skill_md + assert "subagent_type=\"demo-skill-router\"" in skill_md + # The independent foreign-model line is still suppressed. + assert "Recommended model:" not in skill_md + + +def test_switch_harness_falls_back_to_advisory_when_ax_unset(tmp_path, monkeypatch): + """No WATCHMEN_AX_SERVER => default behavior: advisory, no dispatcher.""" + from watchmen.route_rewrite import apply_route_rewrites + + monkeypatch.delenv("WATCHMEN_AX_SERVER", raising=False) + + repo = tmp_path / "src-repo" + repo.mkdir() + _setup_project(tmp_path, monkeypatch, "p", str(repo)) + + skill_dir = tmp_path / "bundles" / "p" / "skills" / "demo-skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: demo-skill\n---\nbody\n", encoding="utf-8", + ) + result = _route_result(tmp_path, "demo-skill", [ + _decision("claude_code", "anthropic/claude-opus-4-7", + "openai/gpt-5-codex", label="switch-harness", + recommended_harness="codex"), + ]) + outcomes = apply_route_rewrites(result, repo_root=str(repo)) + + assert not (repo / ".claude" / "agents" / "demo-skill-router.md").exists() + by_kind = {(o.harness, o.artifact_kind): o for o in outcomes} + assert ("claude_code", "advisory") in by_kind + assert ("claude_code", "ax-router") not in by_kind + + +def test_codex_source_dispatches_via_ax_inline(tmp_path, monkeypatch): + """Codex has no subagent file — its native artifact is a model profile that + can't invoke another runtime. So a Codex-source switch-harness route inlines + the `ax exec` call into the SKILL.md dispatch block; no profile file is + written, and the delegation targets the winning harness's AX agent.""" + from watchmen import route_rewrite + from watchmen.route_rewrite import apply_route_rewrites + + monkeypatch.setenv("WATCHMEN_AX_SERVER", "localhost:8494") + fake_home = tmp_path / "home" + (fake_home / ".codex").mkdir(parents=True) + monkeypatch.setattr(route_rewrite.Path, "home", staticmethod(lambda: fake_home)) + + repo = tmp_path / "src-repo" + repo.mkdir() + _setup_project(tmp_path, monkeypatch, "p", str(repo)) + + skill_dir = tmp_path / "bundles" / "p" / "skills" / "demo-skill" + skill_dir.mkdir(parents=True) + (skill_dir / "SKILL.md").write_text( + "---\nname: demo-skill\n---\nbody\n", encoding="utf-8", + ) + # Codex's work wins on Claude Code for this skill -> delegate Codex -> CC. + result = _route_result(tmp_path, "demo-skill", [ + _decision("codex", "openai/gpt-5.5", + "anthropic/claude-opus-4-7", label="switch-harness", + recommended_harness="claude_code"), + ]) + outcomes = apply_route_rewrites(result, repo_root=str(repo)) + + # No codex profile file is written — dispatch lives in SKILL.md. + assert not (fake_home / ".codex" / "route-demo-skill.config.toml").exists() + + by_kind = {(o.harness, o.artifact_kind): o for o in outcomes} + assert ("codex", "ax-dispatch") in by_kind + assert by_kind[("codex", "ax-dispatch")].action == "inline" + assert by_kind[("codex", "ax-dispatch")].path == "" + + skill_md = (skill_dir / "SKILL.md").read_text() + assert "ax exec --server localhost:8494 --agent claude-code" in skill_md + assert "--once" not in skill_md # removed: gone from current ax exec + assert "< /dev/null" in skill_md # headless: EOF the post-turn REPL prompt + assert "[model] anthropic/claude-opus-4-7" in skill_md + assert "" in skill_md + assert "via AX" in skill_md + assert "Recommended model:" not in skill_md # suppressed for ax-routed + + def test_foreign_candidate_winner_is_advised_not_emitted(tmp_path, monkeypatch): """A user-injected --candidate from another provider can win as a downshift (it's never labeled switch-harness because no current harness