diff --git a/kaievolve/config.py b/kaievolve/config.py index 5ca3bfc..fe0f479 100644 --- a/kaievolve/config.py +++ b/kaievolve/config.py @@ -265,6 +265,17 @@ class PromptConfig: # touching config or code. None => disabled (no change to prompts). steering_brief_path: Optional[str] = None + # Research director: an automated meta-agent that, once per migration + # interval, reads the population and writes a strategic directive into the + # steering channel for the next interval (see kaievolve/research_director.py + # and skills/research-director/SKILL.md). Complements the human steering + # brief; both are injected. None/False => disabled. + research_director_enabled: bool = False + # How often (in completed iterations) the director fires, decoupled from + # migration so it gets enough shots to matter. None => fall back to the + # database migration_interval. + research_director_interval: Optional[int] = None + # Strategy clustering + cluster bandit (Phase 3). Clusters programs by the # embedding of their HMRD summary into emergent strategies, then biases # parent selection toward promising/under-explored clusters and away from diff --git a/kaievolve/process_parallel.py b/kaievolve/process_parallel.py index 81c87bc..4475d50 100644 --- a/kaievolve/process_parallel.py +++ b/kaievolve/process_parallel.py @@ -16,6 +16,9 @@ from kaievolve.config import Config from kaievolve.database import Program, ProgramDatabase from kaievolve.llm.ensemble import LLMEnsemble +from kaievolve.research_director import ( + render_research_directive as _render_research_directive, +) logger = logging.getLogger(__name__) @@ -939,6 +942,28 @@ def __init__( if self.steering_brief_path: logger.info(f"Steering brief enabled → {self.steering_brief_path}") + # Research director: an automated meta-agent that writes a strategic + # directive each migration interval; its directive is folded into the + # steering channel for the next interval. Optional + fail-safe. + self.research_director = None + self.research_directive_path = None + self._last_director_iter = 0 + self._research_director_interval = getattr( + config.prompt, "research_director_interval", None + ) or getattr(config.database, "migration_interval", 20) + if getattr(config.prompt, "research_director_enabled", False): + try: + from kaievolve.research_director import ResearchDirector + + self.research_director = ResearchDirector(config, output_dir or ".") + self.research_directive_path = str(self.research_director.directive_path) + logger.info( + f"Research director enabled (every {self._research_director_interval} " + f"iters) → {self.research_directive_path}" + ) + except Exception as e: + logger.warning(f"Research director init failed (disabled): {e}") + # Strategy clustering + cluster bandit (Phase 3), created if enabled. self.strategy_clusters = None if getattr(config.prompt, "strategy_clustering_enabled", False): @@ -1173,8 +1198,12 @@ def _create_database_snapshot(self) -> Dict[str, Any]: "literature_review": ( self.literature_review.render_for_prompt() if self.literature_review else "" ), - # Human steering brief, re-read each snapshot so mid-run edits apply. - "steering_brief": _render_steering_brief(getattr(self, "steering_brief_path", None)), + # Steering channel = human brief + research-director directive (either + # may be empty); both re-read each snapshot so updates apply mid-run. + "steering_brief": ( + _render_steering_brief(getattr(self, "steering_brief_path", None)) + + _render_research_directive(getattr(self, "research_directive_path", None)) + ), } # Include artifacts for programs that might be selected @@ -1370,6 +1399,17 @@ async def run_evolution( self.database.migrate_programs() self.database.log_island_status() + # Research director: strategic redirection on its own cadence + # (decoupled from migration so it fires often enough to matter). + # Fail-safe; never raises. + if ( + self.research_director is not None + and completed_iteration - self._last_director_iter + >= self._research_director_interval + ): + await self.research_director.run(self.database, completed_iteration) + self._last_director_iter = completed_iteration + # Log progress model_info = ( f", model_idx={result.generated_by_model_idx}" diff --git a/kaievolve/research_director.py b/kaievolve/research_director.py new file mode 100644 index 0000000..7fb1b78 --- /dev/null +++ b/kaievolve/research_director.py @@ -0,0 +1,189 @@ +"""Research director: a meta-agent that sets the run's strategic direction at +each migration interval. + +See ``skills/research-director/SKILL.md`` for the agent's own instructions. In +short: between migrations the program-writing agents evolve on their own; once +per migration interval this director reads the population (current best, score +trajectory, the top programs and the notes their authors left), decides whether +the last direction is working (keep) or has stalled (pivot), and writes ONE +concise directive to ``research_directive.md``. The controller folds that file +into the steering brief, so the directive rides at the top of every +program-writing agent's prompt for the next interval. ``research_log.md`` is the +director's memory across migrations. + +The director is optional (``prompt.research_director_enabled``) and fully +fail-safe: any error is swallowed so it can never break the evolution loop. +""" + +from __future__ import annotations + +import logging +from pathlib import Path +from typing import Optional + +logger = logging.getLogger(__name__) + +# Lightweight fallback if the skill file can't be found at runtime. +_FALLBACK_SKILL = """# Research Director +You direct a KaiEvolve run. A population of program variants is evolved by coding +agents (LLMs that mutate the task code; a frozen evaluator scores combined_score, +higher is better) across islands with periodic migration. Once per migration you +read the population and issue ONE concise, concrete directive for the next +interval. It is injected at the top of every coding agent's prompt, so write +actionable guidance grounded in the programs/notes you see. Keep what works, +pivot when it stalls; say what to try and what to stop. Output only the directive. +""" + + +def _skill_text(skill_path: Optional[str]) -> str: + candidates = [] + if skill_path: + candidates.append(Path(skill_path)) + # repo-relative default: skills/research-director/SKILL.md + candidates.append( + Path(__file__).resolve().parents[1] / "skills" / "research-director" / "SKILL.md" + ) + for p in candidates: + try: + if p.is_file(): + return p.read_text() + except OSError: + continue + return _FALLBACK_SKILL + + +def render_research_directive(directive_path: Optional[str], max_chars: int = 4000) -> str: + """Read the director's directive file and wrap it for prompt injection. + Returns '' when absent/empty. Mirrors ``_render_steering_brief`` so the + controller can append it to the steering channel.""" + if not directive_path: + return "" + try: + body = Path(directive_path).read_text().strip() + except OSError: + return "" + if not body: + return "" + return ( + "# Research direction (set by the run's research director)\n" + "Strategic guidance for this interval, based on the population's progress " + "so far. Follow it when choosing your next change.\n\n" + f"{body[:max_chars]}\n\n" + ) + + +class ResearchDirector: + def __init__( + self, + config, + output_dir: str, + *, + skill_path: Optional[str] = None, + top_k: int = 4, + max_code_chars: int = 1400, + ): + self.config = config + self.output_dir = Path(output_dir) + self.directive_path = self.output_dir / "research_directive.md" + self.log_path = self.output_dir / "research_log.md" + self.top_k = top_k + self.max_code_chars = max_code_chars + self.skill = _skill_text(skill_path) + # The task description the human gave the coding agents = the director's + # task context (the problem, its metric, and known records). + self.task_context = (getattr(config.prompt, "system_message", "") or "").strip() + self._ensemble = None + self._last_directive = "" + self._best_at_last: Optional[float] = None + + def _llm(self): + if self._ensemble is None: + from kaievolve.llm.ensemble import LLMEnsemble + + self._ensemble = LLMEnsemble(self.config.llm.models) + return self._ensemble + + @staticmethod + def _score(p) -> Optional[float]: + if p is None or not getattr(p, "metrics", None): + return None + v = p.metrics.get("combined_score") + return float(v) if isinstance(v, (int, float)) else None + + def _experiment_log(self, top) -> str: + lines = [] + for p in top: + sc = self._score(p) + sc_s = f"{sc:.4f}" if sc is not None else "n/a" + changes = (getattr(p, "metadata", {}) or {}).get("changes", "") or "" + lines.append(f"- score {sc_s} (gen {getattr(p, 'generation', '?')}): {changes[:300]}") + return "\n".join(lines) if lines else "(no programs yet)" + + def _build_prompt(self, best, best_score, top, improved, generation) -> str: + best_code = (getattr(best, "code", "") or "")[: self.max_code_chars] + best_s = f"{best_score:.4f}" if best_score is not None else "n/a" + if self._best_at_last is None: + history = "This is your first directive for this run." + else: + delta = (best_score or 0) - self._best_at_last + verdict = "IMPROVED" if improved else "STALLED (little/no gain)" + history = ( + f"Your previous directive was:\n---\n{self._last_directive}\n---\n" + f"Since then, best score went {self._best_at_last:.4f} -> {best_s} " + f"({delta:+.4f}) = {verdict}. Decide keep vs pivot accordingly." + ) + return ( + f"{self.skill}\n\n" + "==================== THIS RUN ====================\n\n" + f"## Task\n{self.task_context or '(no task description provided)'}\n\n" + f"## Where the search is (after {generation} generations)\n" + f"Current best combined_score: {best_s}\n\n" + f"## Top programs and the notes their authors left\n{self._experiment_log(top)}\n\n" + f"## Current best program (truncated)\n```\n{best_code}\n```\n\n" + f"## Your last directive and whether it worked\n{history}\n\n" + "Write the directive for the next interval now." + ) + + def _append_log(self, generation, best_score, improved, directive): + best_s = f"{best_score:.4f}" if best_score is not None else "n/a" + tag = "improved" if improved else ("first" if self._best_at_last is None else "stalled") + entry = ( + f"\n## migration @ gen {generation} (best={best_s}, {tag})\n\n{directive.strip()}\n" + ) + try: + with open(self.log_path, "a") as f: + f.write(entry) + except OSError: + pass + + async def run(self, database, generation: int) -> Optional[str]: + """Produce and persist a directive for the next interval. Never raises.""" + try: + best = database.get_best_program() + best_score = self._score(best) + try: + top = database.get_top_programs(self.top_k) or [] + except Exception: + top = [] + improved = ( + self._best_at_last is not None + and best_score is not None + and best_score > self._best_at_last + 1e-9 + ) + prompt = self._build_prompt(best, best_score, top, improved, generation) + directive = await self._llm().generate(prompt) + directive = (directive or "").strip() + if not directive: + return None + self.directive_path.write_text(directive) + self._append_log(generation, best_score, improved, directive) + self._last_directive = directive + self._best_at_last = best_score + logger.info( + f"[research-director] gen {generation}: set directive " + f"({'improved' if improved else 'pivot/first'}), best={best_score}" + ) + return directive + except Exception as e: # never break the evolution loop + logger.warning(f"[research-director] skipped (non-fatal): {type(e).__name__}: {e}") + return None diff --git a/skills/research-director/SKILL.md b/skills/research-director/SKILL.md new file mode 100644 index 0000000..f5b3d34 --- /dev/null +++ b/skills/research-director/SKILL.md @@ -0,0 +1,67 @@ +# Research Director + +You are the **research director** for a run of KaiEvolve. You do not write the +candidate programs yourself; you sit above the search and, at regular intervals, +set the strategic direction that the program-writing agents will follow next. +Your job is to read where the search is, decide whether the current direction is +working, and issue ONE clear directive that moves it forward. + +## The machine you are steering (KaiEvolve) + +KaiEvolve optimizes a piece of code by evolution: + +- There is a **population of programs**, all variants of the same task code (the + region marked by `EVOLVE-BLOCK` markers). A program is scored by a fixed + **evaluator** that returns `combined_score` (higher is better) plus task + metrics. The evaluator is frozen and trusted; programs cannot change it. +- Each **iteration**, a coding agent (an LLM) is shown a parent program, a few + top performers, and some diverse "inspiration" programs, and writes a mutated + version (usually as a diff). The mutation is evaluated and admitted to the + population. +- The population is split into **islands** (isolated sub-populations) that evolve + in parallel for diversity, with periodic **migration** that copies strong + programs between islands. MAP-Elites keeps a spread of distinct solutions, not + just the single best. +- You are invoked **at a regular interval** during the run (every N iterations). + Between your invocations, the agents run many iterations on their own. + +## How your directive reaches the agents (steering) + +Your output is written to the run's **steering brief**, which is injected at the +top of every program-writing agent's prompt for the next interval. The agents +read it as standing guidance and choose their next mutation accordingly. So: + +- Write for the coding agents, not for a human. It must be **actionable when + writing the next program**: what approach to take, what to try, what to stop. +- Be **concrete and specific**, grounded in the programs and notes you were + shown - name the technique, the part of the code, the parameter, the + structural idea. Vague encouragement ("explore more", "be creative") is wasted. +- Issue **one clear direction**, not a menu. The agents do best with a focused + push. (Other islands are left free to explore, so you are not closing doors - + you are aiming the main thrust.) +- Keep it short - a few sentences to a short list. It rides in every prompt. + +## Your loop each interval (keep or pivot) + +You are given: the task and its metric/records, the current best program and +score, the score trajectory, the top programs with the notes their authors left +(Idea / Change / Outcome), and **your own previous directive plus whether the +best score improved since you issued it**. + +Decide deliberately, like an experimentalist who keeps what works and discards +what doesn't: + +1. **Did the last direction work?** If best-so-far improved meaningfully since + your last directive, the direction is paying off - **double down**: refine it, + push the next concrete step of the same idea. +2. **If it stalled** (little or no gain), the direction is exhausted - **pivot**: + propose a genuinely different approach, and say plainly what to stop doing. + Use the notes to avoid re-proposing something already tried and abandoned. +3. **Always ground in evidence.** Reference what the current best actually does + and where it's losing the score, not generic advice. + +## Output + +Return only the directive - a concise markdown note (no preamble, no meta-talk), +written as guidance the next agents will act on. Lead with the single most +important instruction. diff --git a/tests/test_research_director.py b/tests/test_research_director.py new file mode 100644 index 0000000..4a30f5c --- /dev/null +++ b/tests/test_research_director.py @@ -0,0 +1,117 @@ +"""Tests for the research director (kaievolve/research_director.py): the +migration-interval meta-agent that writes a strategic directive into the steering +channel. The LLM call is stubbed; we verify the directive/log are written, the +directive renders into a prompt-injection block, keep/pivot state advances, and +the whole thing is fail-safe.""" + +import asyncio +import unittest +from pathlib import Path +from tempfile import TemporaryDirectory + +from kaievolve.config import Config +from kaievolve.research_director import ResearchDirector, render_research_directive + + +class _Prog: + def __init__(self, score, changes, code="def solve(): return 0", gen=1): + self.metrics = {"combined_score": score} + self.metadata = {"changes": changes} + self.code = code + self.generation = gen + + +class _DB: + def __init__(self, best, top): + self._best = best + self._top = top + + def get_best_program(self): + return self._best + + def get_top_programs(self, n): + return self._top[:n] + + +class _StubLLM: + def __init__(self, text): + self.text = text + self.prompts = [] + + async def generate(self, prompt, **kw): + self.prompts.append(prompt) + return self.text + + +def _director(tmp, llm_text="Pivot: switch to a Fourier parameterization of f."): + cfg = Config() + cfg.prompt.system_message = "Minimize C(f); record ~1.5029." + d = ResearchDirector(cfg, tmp) + d._ensemble = _StubLLM(llm_text) # stub the LLM call + return d + + +class TestRenderDirective(unittest.TestCase): + def test_absent_is_empty(self): + self.assertEqual(render_research_directive(None), "") + with TemporaryDirectory() as d: + self.assertEqual(render_research_directive(str(Path(d) / "nope.md")), "") + + def test_present_is_wrapped(self): + with TemporaryDirectory() as d: + p = Path(d) / "research_directive.md" + p.write_text("Try simulated annealing.") + out = render_research_directive(str(p)) + self.assertIn("Research direction", out) + self.assertIn("Try simulated annealing.", out) + + +class TestResearchDirector(unittest.TestCase): + def test_writes_directive_and_renders_into_steering(self): + with TemporaryDirectory() as tmp: + d = _director(tmp) + db = _DB( + _Prog(0.45, "tried gradient descent"), + [_Prog(0.45, "gradient descent"), _Prog(0.40, "uniform init")], + ) + directive = asyncio.run(d.run(db, generation=20)) + self.assertIn("Fourier", directive) + # directive file written + folds into the steering channel + self.assertTrue(d.directive_path.is_file()) + self.assertIn("Fourier", render_research_directive(str(d.directive_path))) + # memory log written + self.assertTrue(d.log_path.is_file()) + self.assertIn("gen 20", d.log_path.read_text()) + # the director's prompt carried the skill + task + experiment log + p = d._ensemble.prompts[0] + self.assertIn("Research Director", p) # skill text + self.assertIn("1.5029", p) # task context + self.assertIn("gradient descent", p) # experiment log notes + self.assertIn("first directive", p.lower()) # keep/pivot history + + def test_keep_pivot_state_advances(self): + with TemporaryDirectory() as tmp: + d = _director(tmp) + db1 = _DB(_Prog(0.45, "a"), [_Prog(0.45, "a")]) + asyncio.run(d.run(db1, 20)) + # best improves on the next migration -> "improved" verdict in prompt+log + db2 = _DB(_Prog(0.49, "b"), [_Prog(0.49, "b")]) + asyncio.run(d.run(db2, 40)) + self.assertAlmostEqual(d._best_at_last, 0.49) + self.assertIn("IMPROVED", d._ensemble.prompts[1]) + self.assertIn("improved", d.log_path.read_text()) + + def test_failsafe_never_raises(self): + with TemporaryDirectory() as tmp: + d = _director(tmp) + + class Boom: + def get_best_program(self): + raise RuntimeError("db exploded") + + # must swallow the error and return None, not propagate + self.assertIsNone(asyncio.run(d.run(Boom(), 20))) + + +if __name__ == "__main__": + unittest.main()