Skip to content

Add SequencingSubprocessRunner for scenario replay with step-name dispatch #673

@Trecek

Description

@Trecek

Summary

Add a SequencingSubprocessRunner that replays pre-recorded scenario fixtures through FakeClaudeCLI instances, dispatched by step name. This is the AutoSkillit-side adapter for the multi-session replay infrastructure added in TalonT-Org/api-simulator#83 and TalonT-Org/api-simulator#84.

Background: api-simulator ScenarioPlayer and ScenarioBuilder

The api-simulator package exposes these types from api_simulator.claude:

from api_simulator.claude import ScenarioPlayer, ScenarioBuilder, InjectionDirective

# --- Loading from recorded fixtures ---
player = ScenarioPlayer.load("tests/fixtures/scenarios/smoke-happy/")

# Inject faults at specific steps
player.inject("implement", InjectionDirective.TruncateAfter(n=1))
player.inject("assess", InjectionDirective.HangAfterResult())

# Add fallback sessions for steps not in the original recording
# (e.g., retry paths triggered by fault injection)
from api_simulator.claude import make_fake_claude
fallback = make_fake_claude(str(tmp_path), str(binary_path))
fallback.add_message({"type": "result", "subtype": "success", ...})
player.add_fallback("assess", fallback)

# Cross-scenario session mixing
player.override_session("assess", from_scenario="smoke-test-fail-once/", from_step="assess")

# Get the dispatch map — step_name → list of (FakeClaudeCLI, StepMetadata)
session_map = player.build_session_map()
# session_map["investigate"] → deque([(cli, meta), ...])
# session_map["implement"]   → deque([(cli, meta), ...])

# --- Programmatic construction (no fixture directory needed) ---
scenario = ScenarioBuilder("smoke-test") \
    .add_session_step("investigate", cassette_from="tests/fixtures/cassettes/skill-success/") \
    .add_synthetic_step("implement", exit_code=0, stdout_lines=[
        {"type": "assistant", "message": {"content": "Done.", "model": "claude-sonnet-4-6", "usage": {...}}},
        {"type": "result", "subtype": "success", "result": "Implemented.", "session_id": "abc-123"},
    ]) \
    .add_non_session_step("test", tool="test_check", result_summary={"exit_code": 0}) \
    .inject("implement", InjectionDirective.CorruptMessage(message_index=0)) \
    .build()
# Returns a ScenarioPlayer — same type as ScenarioPlayer.load()

Key Design: Step-Name-Keyed Dispatch

The ScenarioPlayer.build_session_map() returns a dict[str, deque[tuple[FakeClaudeCLI, StepMetadata]]] keyed by step name. This is not a flat ordered queue — it survives fault injection that alters execution order.

When the orchestrator injects a test failure (forcing assess → classify → implement retry loops), steps not in the original happy-path recording need either:

  • A fallback session registered via player.add_fallback(step_name, cli)
  • A cross-scenario override loaded from a different recording variant via player.override_session()
  • A synthetic step via ScenarioBuilder

For steps that appear multiple times in a recording (retry scenarios like smoke-test-fail-once/), the per-step deque preserves invocation order within that step.

Scope

1. SequencingSubprocessRunner class

Location: src/autoskillit/execution/recording.py (same file as #672's RecordingSubprocessRunner)

Consumes the session map from ScenarioPlayer.build_session_map() and non-session step results from the scenario manifest. On each run_skill call, reads SCENARIO_STEP_NAME from the env (injected by run_headless_core per #672) and dispatches to the matching step queue.

from collections import deque
from autoskillit.core.types import SubprocessRunner, SubprocessResult, TerminationReason

class SequencingSubprocessRunner:
    """Replays pre-recorded sessions by step name."""

    def __init__(
        self,
        session_map: dict[str, deque],  # from ScenarioPlayer.build_session_map()
        non_session_results: dict[str, dict],  # step_name → result_summary from scenario.json
    ):
        self._sessions = session_map
        self._non_session = non_session_results
        self.call_log: list[tuple[str, list[str]]] = []  # (step_name, cmd) for assertions

    async def __call__(
        self,
        cmd: list[str],
        *,
        cwd: Path,
        timeout: float,
        env: dict[str, str] | None = None,
        **kwargs: object,
    ) -> SubprocessResult:
        step_name = (env or {}).get("SCENARIO_STEP_NAME", "")
        self.call_log.append((step_name, cmd))

        if not step_name:
            raise ValueError(
                "SCENARIO_STEP_NAME not in env — cannot dispatch. "
                "Ensure run_headless_core injects it (#672)."
            )

        # Session step (run_skill)
        if step_name in self._sessions and self._sessions[step_name]:
            cli, meta = self._sessions[step_name].popleft()
            result = cli.run()
            return SubprocessResult(
                returncode=meta.exit_code,
                stdout=result.stdout(),
                stderr="",
                termination=TerminationReason.NATURAL_EXIT,
                pid=0,
                elapsed_seconds=meta.duration_ms / 1000.0,
            )

        # Non-session step (run_cmd, test_check, classify_fix)
        if step_name in self._non_session:
            summary = self._non_session[step_name]
            return SubprocessResult(
                returncode=summary.get("exit_code", 0),
                stdout=summary.get("stdout_head", ""),
                stderr=summary.get("stderr", ""),
                termination=TerminationReason.NATURAL_EXIT,
                pid=0,
            )

        raise ScenarioReplayError(
            f"No session or result for step '{step_name}'. "
            f"Available: {sorted(self._sessions.keys())}. "
            f"Register a fallback via player.add_fallback('{step_name}', cli)."
        )

2. ScenarioReplayError exception

Location: src/autoskillit/core/errors.py or inline in recording.py

Clear error with guidance on how to fix (register fallback, add override, record additional variant).

3. Replay test infrastructure

Location: tests/recipe/test_smoke_pipeline_replay.py (new file)

Uses the existing SmokeExecutor from test_smoke_pipeline.py but wires in SequencingSubprocessRunner instead of MockSubprocessRunner:

import pytest
from api_simulator.claude import ScenarioPlayer

SCENARIO_DIR = Path("tests/fixtures/scenarios/smoke-happy")

@pytest.fixture
def replay_runner(tmp_path):
    """Load the recorded scenario and build a replay runner."""
    player = ScenarioPlayer.load(str(SCENARIO_DIR))
    session_map = player.build_session_map()
    # Extract non-session results from scenario manifest
    scenario = player.scenario  # access the loaded Scenario
    non_session = {
        step.step_name: step.result_summary
        for step in scenario.step_sequence
        if step.session_dir is None and step.result_summary
    }
    return SequencingSubprocessRunner(session_map, non_session)

class TestSmokeReplayHappyPath:
    """Replay the recorded happy-path smoke test — no API calls."""

    def test_replays_to_completion(self, replay_runner, tool_ctx):
        tool_ctx.runner = replay_runner
        # ... run SmokeExecutor, assert terminal == "done"

    def test_all_sessions_consumed(self, replay_runner, tool_ctx):
        tool_ctx.runner = replay_runner
        # ... run, then assert all queues are empty

class TestSmokeReplayFaultInjection:
    """Inject faults into specific steps and verify orchestrator recovery."""

    def test_implement_truncation_triggers_retry(self, tmp_path):
        player = ScenarioPlayer.load(str(SCENARIO_DIR))
        player.inject("implement", InjectionDirective.TruncateAfter(n=1))
        # Add fallback for assess (not in happy-path recording)
        player.add_fallback("assess", _build_assess_fallback(tmp_path))
        player.add_fallback("classify", _build_classify_fallback(tmp_path))
        # ... run, assert retry loop fires

    def test_test_failure_forces_assess_classify_loop(self, tmp_path):
        # Use cross-scenario override from a failure variant
        player = ScenarioPlayer.load(str(SCENARIO_DIR))
        player.override_session("assess", from_scenario="smoke-test-fail-once/", from_step="assess")
        player.override_session("classify", from_scenario="smoke-test-fail-once/", from_step="classify")
        # Inject test failure
        # ... run, assert assess and classify steps are called

Dependencies

Acceptance Criteria

  • SequencingSubprocessRunner implements SubprocessRunner protocol
  • Dispatches by SCENARIO_STEP_NAME env var, not queue position
  • Handles both session steps (via FakeClaudeCLI) and non-session steps (via result stubs)
  • Clear error messages when step not found (includes available steps and fix guidance)
  • call_log records all dispatches for test assertions
  • Replay tests pass against the smoke-happy/ fixture
  • At least one fault injection test demonstrates cross-scenario session override

Metadata

Metadata

Assignees

No one assigned

    Labels

    recipe:implementationRoute: proceed directly to implementationstagedImplementation staged and waiting for promotion to main

    Type

    No type

    Projects

    No projects

    Milestone

    No milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions