Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
100 changes: 100 additions & 0 deletions src/watchmen/ax_dispatch.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
"""Experimental: turn a cross-runtime route decision into a real delegation
via AX (google/ax) instead of only advising the user.

A ``switch-harness`` winner belongs to a different runtime, so watchmen can't
emit a native artifact the source harness can execute (see the advisory path in
``route_rewrite``). When AX is configured — a running ``ax serve`` with wrappers
for the target harness — we instead emit a dispatcher that shells out to
``ax exec --agent <target>``, so the source agent delegates the skill to the
winning harness for real. When AX is not configured, callers fall back to the
advisory line, so default behavior is unchanged.

Gating is env-only, opt-in:

WATCHMEN_AX_SERVER gRPC address of a running ``ax serve`` (e.g.
``localhost:8494``). Unset disables watchmen's AX dispatch
(the advisory fallback). NB: ``ax exec`` with an empty
``--server`` actually spins a *local* built-in server off
``ax.yaml`` — so "unset" is watchmen's gate, not AX's.
WATCHMEN_AX_BIN path to the ``ax`` binary (default: ``ax`` on PATH).

AX CLI state (verified against google/ax @ 2026-06-09):
- ``ax exec`` flags: ``--agent --server --input --conversation --resume
--last-seq --config``. The old ``--once`` is GONE (see ax_exec_command).
- ``ax serve`` runs the controller as a gRPC server (address from ax.yaml).
- ``ax fork`` (``--src-conversation/--src-seq/--dest-conversation``) forks an
event log from a checkpoint — the native primitive for the fork-and-race
delegation trigger (#96); watchmen wouldn't need to hand-roll it.
- Faithful cross-execution resume is still gated by google/ax#19
(``internal_only`` messages not replayed) — OPEN as of this date. So the
"headless now, AX-native later" call (#96) stands; this dispatch stays
experimental and unverified end-to-end until a local ``ax`` is wired up.
"""

from __future__ import annotations

import os

# watchmen harness slug -> AX agent id as registered in the running ``ax serve``.
# Only harnesses with an AX wrapper can be dispatch *targets*; anything not
# listed falls back to the advisory path.
HARNESS_TO_AX_AGENT = {
"claude_code": "claude-code",
"codex": "codex",
}


def ax_server() -> str | None:
"""gRPC address of the running ``ax serve``, or None when AX dispatch is off."""
return os.environ.get("WATCHMEN_AX_SERVER") or None


def ax_bin() -> str:
"""Path to the ``ax`` binary (``ax`` on PATH by default)."""
return os.environ.get("WATCHMEN_AX_BIN", "ax")


def ax_agent_for(harness: str | None) -> str | None:
"""The AX agent id that can run ``harness``'s runtime, or None when AX is
unconfigured or no wrapper exists for it. This is the availability gate the
rewriter checks before choosing AX dispatch over the advisory fallback."""
if not harness or ax_server() is None:
return None
return HARNESS_TO_AX_AGENT.get(harness)


def ax_exec_command(*, agent: str, model: str | None, workspace: str | None) -> str:
"""Build the ``ax exec`` invocation a dispatcher subagent should run.

``ax exec`` has no ``--workspace`` / ``--model`` flags, so both ride the
wrapper's ``[workspace]`` / ``[model]`` header convention inside ``--input``.
The concrete task isn't known until dispatch time, so the returned command
carries a literal ``<TASK>`` placeholder the subagent replaces with the
request it was handed.
"""
headers: list[str] = []
if model:
headers.append(f"[model] {model}")
if workspace:
headers.append(f"[workspace] {workspace}")
header_block = ("\n".join(headers) + "\n\n") if headers else ""
# Headless single-shot. `ax exec` is a REPL: it runs the turn seeded by
# `--input`, then loops and prompts for the next message (cmd/ax/exec.go
# execLoop → promptUser, verified against google/ax @ 2026-06-09). There is
# NO `--once` flag anymore — the spike used to pass it and current AX would
# reject it. Redirecting stdin from /dev/null makes that post-turn prompt
# hit EOF and stop instead of hanging a non-TTY dispatcher shell, so the
# call bounds to the single seeded turn (turn 1 never prompts — `--input`
# is non-empty). NOTE: the exact exit-code on EOF (vs a clean `q`) is
# unverified pending a local `ax` build; this is the experiment's known gap.
# AX has no `--workspace` / `--model` flags, so both ride the wrapper's
# `[workspace]` / `[model]` header convention inside `--input`.
#
# Single-quoted input preserves newlines and spares the subagent from
# escaping the task body. (A task containing a single quote would break the
# quoting — acceptable for the experiment; a wrapper script is the hardening
# path.)
return (
f"{ax_bin()} exec --server {ax_server()} --agent {agent} "
f"--input '{header_block}<TASK>' < /dev/null"
)
169 changes: 168 additions & 1 deletion src/watchmen/route_rewrite.py
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
from datetime import datetime, timezone
from pathlib import Path

from watchmen.ax_dispatch import ax_agent_for, ax_exec_command
from watchmen.route import (
RouteDecision,
RouteResult,
Expand Down Expand Up @@ -109,6 +110,7 @@ def apply_route_rewrites(
outcomes: list[RewriteOutcome] = []
dispatch_sentences: dict[str, str] = {}
advisory_harnesses: set[str] = set()
ax_routed_harnesses: set[str] = set()

for decision in actionable:
harness = decision.harness
Expand All @@ -131,6 +133,29 @@ def apply_route_rewrites(
or not provider_supports_model(decision.recommended_model, native)
)
if cross_runtime:
# Experimental: if AX is configured and the winning harness has an
# AX wrapper, dispatch the skill there for real instead of only
# advising. The *target* is reached uniformly via `ax exec`; only
# the *source* needs a per-harness dispatcher (how that runtime
# invokes the AX call), so support is keyed by source harness in
# _AX_SOURCE_EMITTERS. Any gap — AX off, no wrapper for the target,
# or an unsupported source — falls through to the advisory line
# below (unchanged default behavior).
ax_agent = ax_agent_for(decision.recommended_harness)
ax_emitter = _AX_SOURCE_EMITTERS.get(harness) if ax_agent else None
if ax_emitter is not None:
outcome, sentence = ax_emitter(
decision=decision,
bucket=result.config.bucket,
repo_root=repo,
ax_agent=ax_agent,
dry_run=dry_run,
)
outcomes.append(outcome)
dispatch_sentences[harness] = sentence
ax_routed_harnesses.add(harness)
continue

advisory_harnesses.add(harness)
dispatch_sentences[harness] = _advisory_sentence(
decision, bucket=result.config.bucket
Expand Down Expand Up @@ -177,7 +202,10 @@ def apply_route_rewrites(
skill_md_path=skill_md_path,
dispatch_sentences=dispatch_sentences,
decisions={d.harness: d for d in actionable},
advisory_harnesses=advisory_harnesses,
# Both advisory and AX-routed entries carry the model in their own
# sentence, so suppress the independent `Recommended model:` line
# (which would otherwise re-print the foreign model + label).
advisory_harnesses=advisory_harnesses | ax_routed_harnesses,
run_id=result.run_id,
dry_run=dry_run,
)
Expand Down Expand Up @@ -265,6 +293,145 @@ def _claude_agent_body(decision: RouteDecision, *, bucket: str) -> str:
)


# ─── AX cross-runtime dispatch (experimental) ────────────────────────

def _emit_ax_dispatch_claude_code(
*,
decision: RouteDecision,
bucket: str,
repo_root: str | None,
ax_agent: str,
dry_run: bool,
) -> tuple[RewriteOutcome, str]:
"""Emit a Claude Code subagent that delegates the skill to the winning
harness through AX, instead of a (non-runnable) native router pinned to a
foreign model. The subagent shells out to ``ax exec --agent <target>``; the
``[model]`` / ``[workspace]`` headers ride inside ``--input`` since the CLI
has no flags for them. Same artifact path as the native router so it
replaces, rather than stacks with, the would-be broken file.
"""
target = _harness_display_name(decision.recommended_harness)
name = f"{bucket}-router"
command = ax_exec_command(
agent=ax_agent,
model=decision.recommended_model,
workspace=repo_root,
)
body = _ax_dispatch_body(
decision, bucket=bucket, target=target, command=command
)
path, fell_back = _pick_router_path(
repo_local=Path(repo_root) / ".claude" / "agents" / f"{name}.md" if repo_root else None,
user_global=Path.home() / ".claude" / "agents" / f"watchmen-route-{bucket}.md",
)
action = _write_file(path, body, dry_run=dry_run)
dispatch_name = name if not fell_back else f"watchmen-route-{bucket}"
sentence = (
f"In Claude Code, dispatch via the Task tool with "
f"`subagent_type=\"{dispatch_name}\"`; it routes `{bucket}` to {target} "
f"via AX (`ax exec --agent {ax_agent}`)."
)
return (
RewriteOutcome(
harness="claude_code",
artifact_kind="ax-router",
path=str(path),
action=action,
reason=(
f"AX dispatch to {decision.recommended_harness}"
+ (" (user-global fallback)" if fell_back else "")
),
),
sentence,
)


def _ax_dispatch_body(
decision: RouteDecision, *, bucket: str, target: str, command: str
) -> str:
name = f"{bucket}-router"
# No `model:` line: the orchestrating subagent runs under Claude's default
# and only brokers the AX call — the real work happens on `target` under
# `recommended_model`, never on a Claude model.
return (
"---\n"
f"name: {name}\n"
f"description: Watchmen-routed (AX): delegate the `{bucket}` skill to "
f"{target}. {decision.note}\n"
"tools: '*'\n"
"---\n"
"\n"
f"# {name}\n"
"\n"
f"Watchmen determined the `{bucket}` skill runs better on {target} than "
"on Claude Code, so you delegate it there through AX (google/ax) rather "
"than running it yourself.\n"
"\n"
"When the main agent hands you a task for this skill:\n"
"\n"
f"1. Read `bundles/<project>/skills/{bucket}/SKILL.md` for the "
"operational guidance.\n"
"2. Run it on "
f"{target} via AX, replacing `<TASK>` with the concrete request "
"(include the SKILL.md guidance the work needs):\n"
"\n"
f" {command}\n"
"\n"
"3. Return AX's output to the main agent verbatim.\n"
"\n"
f"Do not attempt the work under a Claude model — the route picked "
f"{target} (`{decision.recommended_model}`) because: {decision.note}\n"
)


def _emit_ax_dispatch_codex(
*,
decision: RouteDecision,
bucket: str,
repo_root: str | None,
ax_agent: str,
dry_run: bool,
) -> tuple[RewriteOutcome, str]:
"""Codex has no subagent-file mechanism (its native artifact is a model
profile, which only overrides the model — it can't invoke another runtime).
So the cross-runtime dispatch lives entirely in the SKILL.md dispatch
sentence: the Codex session runs ``ax exec`` itself during the skill. No
profile file is written.
"""
target = _harness_display_name(decision.recommended_harness)
command = ax_exec_command(
agent=ax_agent,
model=decision.recommended_model,
workspace=repo_root,
)
sentence = (
f"In Codex, the `{bucket}` skill is routed to {target} via AX. Run the "
f"skill by executing `{command}` (replace `<TASK>` with the concrete "
"request, including the SKILL.md guidance the work needs) and use AX's "
"output as the result — do not run it under a Codex model."
)
return (
RewriteOutcome(
harness="codex",
artifact_kind="ax-dispatch",
path="", # inline in SKILL.md; no per-harness file
action="inline",
reason=f"AX dispatch to {decision.recommended_harness}",
),
sentence,
)


# Source harness -> AX dispatcher. The target is reached uniformly through
# `ax exec`; only the entry point differs per source runtime. claude_code
# writes a subagent file; codex inlines the call into SKILL.md. Sources absent
# here (opencode/pi) fall back to the advisory path.
_AX_SOURCE_EMITTERS = {
"claude_code": _emit_ax_dispatch_claude_code,
"codex": _emit_ax_dispatch_codex,
}


# ─── codex ───────────────────────────────────────────────────────────

def _emit_codex(
Expand Down
Loading
Loading