Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions kaievolve/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,17 @@ class PromptConfig:
# touching config or code. None => disabled (no change to prompts).
steering_brief_path: Optional[str] = None

# Research director: an automated meta-agent that, once per migration
# interval, reads the population and writes a strategic directive into the
# steering channel for the next interval (see kaievolve/research_director.py
# and skills/research-director/SKILL.md). Complements the human steering
# brief; both are injected. None/False => disabled.
research_director_enabled: bool = False
# How often (in completed iterations) the director fires, decoupled from
# migration so it gets enough shots to matter. None => fall back to the
# database migration_interval.
research_director_interval: Optional[int] = None

# Strategy clustering + cluster bandit (Phase 3). Clusters programs by the
# embedding of their HMRD summary into emergent strategies, then biases
# parent selection toward promising/under-explored clusters and away from
Expand Down
44 changes: 42 additions & 2 deletions kaievolve/process_parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,9 @@
from kaievolve.config import Config
from kaievolve.database import Program, ProgramDatabase
from kaievolve.llm.ensemble import LLMEnsemble
from kaievolve.research_director import (
render_research_directive as _render_research_directive,
)

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -939,6 +942,28 @@ def __init__(
if self.steering_brief_path:
logger.info(f"Steering brief enabled → {self.steering_brief_path}")

# Research director: an automated meta-agent that writes a strategic
# directive each migration interval; its directive is folded into the
# steering channel for the next interval. Optional + fail-safe.
self.research_director = None
self.research_directive_path = None
self._last_director_iter = 0
self._research_director_interval = getattr(
config.prompt, "research_director_interval", None
) or getattr(config.database, "migration_interval", 20)
if getattr(config.prompt, "research_director_enabled", False):
try:
from kaievolve.research_director import ResearchDirector

self.research_director = ResearchDirector(config, output_dir or ".")
self.research_directive_path = str(self.research_director.directive_path)
logger.info(
f"Research director enabled (every {self._research_director_interval} "
f"iters) → {self.research_directive_path}"
)
except Exception as e:
logger.warning(f"Research director init failed (disabled): {e}")

# Strategy clustering + cluster bandit (Phase 3), created if enabled.
self.strategy_clusters = None
if getattr(config.prompt, "strategy_clustering_enabled", False):
Expand Down Expand Up @@ -1173,8 +1198,12 @@ def _create_database_snapshot(self) -> Dict[str, Any]:
"literature_review": (
self.literature_review.render_for_prompt() if self.literature_review else ""
),
# Human steering brief, re-read each snapshot so mid-run edits apply.
"steering_brief": _render_steering_brief(getattr(self, "steering_brief_path", None)),
# Steering channel = human brief + research-director directive (either
# may be empty); both re-read each snapshot so updates apply mid-run.
"steering_brief": (
_render_steering_brief(getattr(self, "steering_brief_path", None))
+ _render_research_directive(getattr(self, "research_directive_path", None))
),
}

# Include artifacts for programs that might be selected
Expand Down Expand Up @@ -1370,6 +1399,17 @@ async def run_evolution(
self.database.migrate_programs()
self.database.log_island_status()

# Research director: strategic redirection on its own cadence
# (decoupled from migration so it fires often enough to matter).
# Fail-safe; never raises.
if (
self.research_director is not None
and completed_iteration - self._last_director_iter
>= self._research_director_interval
):
await self.research_director.run(self.database, completed_iteration)
self._last_director_iter = completed_iteration

# Log progress
model_info = (
f", model_idx={result.generated_by_model_idx}"
Expand Down
189 changes: 189 additions & 0 deletions kaievolve/research_director.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
"""Research director: a meta-agent that sets the run's strategic direction at
each migration interval.

See ``skills/research-director/SKILL.md`` for the agent's own instructions. In
short: between migrations the program-writing agents evolve on their own; once
per migration interval this director reads the population (current best, score
trajectory, the top programs and the notes their authors left), decides whether
the last direction is working (keep) or has stalled (pivot), and writes ONE
concise directive to ``research_directive.md``. The controller folds that file
into the steering brief, so the directive rides at the top of every
program-writing agent's prompt for the next interval. ``research_log.md`` is the
director's memory across migrations.

The director is optional (``prompt.research_director_enabled``) and fully
fail-safe: any error is swallowed so it can never break the evolution loop.
"""

from __future__ import annotations

import logging
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

# Lightweight fallback if the skill file can't be found at runtime.
_FALLBACK_SKILL = """# Research Director
You direct a KaiEvolve run. A population of program variants is evolved by coding
agents (LLMs that mutate the task code; a frozen evaluator scores combined_score,
higher is better) across islands with periodic migration. Once per migration you
read the population and issue ONE concise, concrete directive for the next
interval. It is injected at the top of every coding agent's prompt, so write
actionable guidance grounded in the programs/notes you see. Keep what works,
pivot when it stalls; say what to try and what to stop. Output only the directive.
"""


def _skill_text(skill_path: Optional[str]) -> str:
candidates = []
if skill_path:
candidates.append(Path(skill_path))
# repo-relative default: skills/research-director/SKILL.md
candidates.append(
Path(__file__).resolve().parents[1] / "skills" / "research-director" / "SKILL.md"
)
for p in candidates:
try:
if p.is_file():
return p.read_text()
except OSError:
continue
return _FALLBACK_SKILL


def render_research_directive(directive_path: Optional[str], max_chars: int = 4000) -> str:
"""Read the director's directive file and wrap it for prompt injection.
Returns '' when absent/empty. Mirrors ``_render_steering_brief`` so the
controller can append it to the steering channel."""
if not directive_path:
return ""
try:
body = Path(directive_path).read_text().strip()
except OSError:
return ""
if not body:
return ""
return (
"# Research direction (set by the run's research director)\n"
"Strategic guidance for this interval, based on the population's progress "
"so far. Follow it when choosing your next change.\n\n"
f"{body[:max_chars]}\n\n"
)


class ResearchDirector:
def __init__(
self,
config,
output_dir: str,
*,
skill_path: Optional[str] = None,
top_k: int = 4,
max_code_chars: int = 1400,
):
self.config = config
self.output_dir = Path(output_dir)
self.directive_path = self.output_dir / "research_directive.md"
self.log_path = self.output_dir / "research_log.md"
self.top_k = top_k
self.max_code_chars = max_code_chars
self.skill = _skill_text(skill_path)
# The task description the human gave the coding agents = the director's
# task context (the problem, its metric, and known records).
self.task_context = (getattr(config.prompt, "system_message", "") or "").strip()
self._ensemble = None
self._last_directive = ""
self._best_at_last: Optional[float] = None

def _llm(self):
if self._ensemble is None:
from kaievolve.llm.ensemble import LLMEnsemble

self._ensemble = LLMEnsemble(self.config.llm.models)
return self._ensemble

@staticmethod
def _score(p) -> Optional[float]:
if p is None or not getattr(p, "metrics", None):
return None
v = p.metrics.get("combined_score")
return float(v) if isinstance(v, (int, float)) else None

def _experiment_log(self, top) -> str:
lines = []
for p in top:
sc = self._score(p)
sc_s = f"{sc:.4f}" if sc is not None else "n/a"
changes = (getattr(p, "metadata", {}) or {}).get("changes", "") or ""
lines.append(f"- score {sc_s} (gen {getattr(p, 'generation', '?')}): {changes[:300]}")
return "\n".join(lines) if lines else "(no programs yet)"

def _build_prompt(self, best, best_score, top, improved, generation) -> str:
best_code = (getattr(best, "code", "") or "")[: self.max_code_chars]
best_s = f"{best_score:.4f}" if best_score is not None else "n/a"
if self._best_at_last is None:
history = "This is your first directive for this run."
else:
delta = (best_score or 0) - self._best_at_last
verdict = "IMPROVED" if improved else "STALLED (little/no gain)"
history = (
f"Your previous directive was:\n---\n{self._last_directive}\n---\n"
f"Since then, best score went {self._best_at_last:.4f} -> {best_s} "
f"({delta:+.4f}) = {verdict}. Decide keep vs pivot accordingly."
)
return (
f"{self.skill}\n\n"
"==================== THIS RUN ====================\n\n"
f"## Task\n{self.task_context or '(no task description provided)'}\n\n"
f"## Where the search is (after {generation} generations)\n"
f"Current best combined_score: {best_s}\n\n"
f"## Top programs and the notes their authors left\n{self._experiment_log(top)}\n\n"
f"## Current best program (truncated)\n```\n{best_code}\n```\n\n"
f"## Your last directive and whether it worked\n{history}\n\n"
"Write the directive for the next interval now."
)

def _append_log(self, generation, best_score, improved, directive):
best_s = f"{best_score:.4f}" if best_score is not None else "n/a"
tag = "improved" if improved else ("first" if self._best_at_last is None else "stalled")
entry = (
f"\n## migration @ gen {generation} (best={best_s}, {tag})\n\n{directive.strip()}\n"
)
try:
with open(self.log_path, "a") as f:
f.write(entry)
except OSError:
pass

async def run(self, database, generation: int) -> Optional[str]:
"""Produce and persist a directive for the next interval. Never raises."""
try:
best = database.get_best_program()
best_score = self._score(best)
try:
top = database.get_top_programs(self.top_k) or []
except Exception:
top = []
improved = (
self._best_at_last is not None
and best_score is not None
and best_score > self._best_at_last + 1e-9
)
prompt = self._build_prompt(best, best_score, top, improved, generation)
directive = await self._llm().generate(prompt)
directive = (directive or "").strip()
if not directive:
return None
self.directive_path.write_text(directive)
self._append_log(generation, best_score, improved, directive)
self._last_directive = directive
self._best_at_last = best_score
logger.info(
f"[research-director] gen {generation}: set directive "
f"({'improved' if improved else 'pivot/first'}), best={best_score}"
)
return directive
except Exception as e: # never break the evolution loop
logger.warning(f"[research-director] skipped (non-fatal): {type(e).__name__}: {e}")
return None
67 changes: 67 additions & 0 deletions skills/research-director/SKILL.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
# Research Director

You are the **research director** for a run of KaiEvolve. You do not write the
candidate programs yourself; you sit above the search and, at regular intervals,
set the strategic direction that the program-writing agents will follow next.
Your job is to read where the search is, decide whether the current direction is
working, and issue ONE clear directive that moves it forward.

## The machine you are steering (KaiEvolve)

KaiEvolve optimizes a piece of code by evolution:

- There is a **population of programs**, all variants of the same task code (the
region marked by `EVOLVE-BLOCK` markers). A program is scored by a fixed
**evaluator** that returns `combined_score` (higher is better) plus task
metrics. The evaluator is frozen and trusted; programs cannot change it.
- Each **iteration**, a coding agent (an LLM) is shown a parent program, a few
top performers, and some diverse "inspiration" programs, and writes a mutated
version (usually as a diff). The mutation is evaluated and admitted to the
population.
- The population is split into **islands** (isolated sub-populations) that evolve
in parallel for diversity, with periodic **migration** that copies strong
programs between islands. MAP-Elites keeps a spread of distinct solutions, not
just the single best.
- You are invoked **at a regular interval** during the run (every N iterations).
Between your invocations, the agents run many iterations on their own.

## How your directive reaches the agents (steering)

Your output is written to the run's **steering brief**, which is injected at the
top of every program-writing agent's prompt for the next interval. The agents
read it as standing guidance and choose their next mutation accordingly. So:

- Write for the coding agents, not for a human. It must be **actionable when
writing the next program**: what approach to take, what to try, what to stop.
- Be **concrete and specific**, grounded in the programs and notes you were
shown - name the technique, the part of the code, the parameter, the
structural idea. Vague encouragement ("explore more", "be creative") is wasted.
- Issue **one clear direction**, not a menu. The agents do best with a focused
push. (Other islands are left free to explore, so you are not closing doors -
you are aiming the main thrust.)
- Keep it short - a few sentences to a short list. It rides in every prompt.

## Your loop each interval (keep or pivot)

You are given: the task and its metric/records, the current best program and
score, the score trajectory, the top programs with the notes their authors left
(Idea / Change / Outcome), and **your own previous directive plus whether the
best score improved since you issued it**.

Decide deliberately, like an experimentalist who keeps what works and discards
what doesn't:

1. **Did the last direction work?** If best-so-far improved meaningfully since
your last directive, the direction is paying off - **double down**: refine it,
push the next concrete step of the same idea.
2. **If it stalled** (little or no gain), the direction is exhausted - **pivot**:
propose a genuinely different approach, and say plainly what to stop doing.
Use the notes to avoid re-proposing something already tried and abandoned.
3. **Always ground in evidence.** Reference what the current best actually does
and where it's losing the score, not generic advice.

## Output

Return only the directive - a concise markdown note (no preamble, no meta-talk),
written as guidance the next agents will act on. Lead with the single most
important instruction.
Loading
Loading