diff --git a/desktop/acp-bridge/dist/index.js b/desktop/acp-bridge/dist/index.js new file mode 100644 index 0000000000..b3751f3336 --- /dev/null +++ b/desktop/acp-bridge/dist/index.js @@ -0,0 +1,1067 @@ +/** + * ACP Bridge — translates between OMI's JSON-lines protocol and the + * Agent Client Protocol (ACP) used by claude-code-acp. + * + * THIS IS THE DESKTOP APP FLOW. It is unrelated to the VM/agent-cloud flow + * (agent-cloud/agent.mjs), which runs Claude Code SDK on a remote VM for + * the Omi Agent feature. This bridge runs locally on the user's Mac. + * + * Session lifecycle: + * 1. warmup → session/new (system prompt applied here, once) + * 2. query → session reused; systemPrompt field in the message is ignored + * unless the session was invalidated (cwd change → new session/new) + * 3. The ACP SDK owns conversation history after session/new — do not inject + * it into the system prompt. + * + * Token counts: + * session/prompt drives one or more internal Anthropic API calls (initial + * response + one per tool-use round). The usage returned in the result is + * the AGGREGATE across all those rounds. There are no separate sub-agents. + * + * Implementation flow: + * 1. Create Unix socket server for omi-tools relay + * 2. Spawn claude-code-acp as subprocess (JSON-RPC over stdio) + * 3. Initialize ACP connection + * 4. Handle auth if required (forward to Swift, wait for user action) + * 5. On query: reuse or create session, send prompt, translate notifications → JSON-lines + * 6. On interrupt: cancel the session + */ +import { spawn } from "child_process"; +import { createInterface } from "readline"; +import { dirname, join } from "path"; +import { resolveSession, needsModelUpdate, filterSessionsToWarm, getRetryDeleteKey } from "./session-manager.js"; +import { fileURLToPath } from "url"; +import { createServer as createNetServer } from "net"; +import { tmpdir } from "os"; +import { unlinkSync, appendFileSync, statSync } from "fs"; +import { startOAuthFlow } from "./oauth-flow.js"; +const __dirname = dirname(fileURLToPath(import.meta.url)); +// Resolve paths to bundled tools +const playwrightCli = join(__dirname, "..", "node_modules", "@playwright", "mcp", "cli.js"); +const omiToolsStdioScript = join(__dirname, "omi-tools-stdio.js"); +// --- Helpers --- +function send(msg) { + try { + process.stdout.write(JSON.stringify(msg) + "\n"); + } + catch (err) { + logErr(`Failed to write to stdout: ${err}`); + } +} +function logErr(msg) { + process.stderr.write(`[acp-bridge] ${msg}\n`); +} +// --- OMI tools relay via Unix socket --- +let omiToolsPipePath = ""; +let omiToolsClients = []; +// Pending tool call promises — resolved when Swift sends back results +const pendingToolCalls = new Map(); +let currentMode = "act"; +/** Resolve a pending tool call with a result from Swift */ +function resolveToolCall(msg) { + const pending = pendingToolCalls.get(msg.callId); + if (pending) { + pending.resolve(msg.result); + pendingToolCalls.delete(msg.callId); + } + else { + logErr(`Warning: no pending tool call for callId=${msg.callId}`); + } +} +/** Start Unix socket server for omi-tools stdio processes to connect to */ +function startOmiToolsRelay() { + const pipePath = join(tmpdir(), `omi-tools-${process.pid}.sock`); + // Clean up any stale socket + try { + unlinkSync(pipePath); + } + catch { + // ignore + } + return new Promise((resolve, reject) => { + const server = createNetServer((client) => { + omiToolsClients.push(client); + let buffer = ""; + client.on("data", (data) => { + buffer += data.toString(); + let newlineIdx; + while ((newlineIdx = buffer.indexOf("\n")) >= 0) { + const line = buffer.slice(0, newlineIdx); + buffer = buffer.slice(newlineIdx + 1); + if (!line.trim()) + continue; + try { + const msg = JSON.parse(line); + if (msg.type === "tool_use") { + // Forward tool call to Swift via stdout + send({ + type: "tool_use", + callId: msg.callId, + name: msg.name, + input: msg.input, + }); + // Create a promise that will be resolved when Swift responds + const callId = msg.callId; + pendingToolCalls.set(callId, { + resolve: (result) => { + // Send result back to the omi-tools stdio process + try { + client.write(JSON.stringify({ + type: "tool_result", + callId, + result, + }) + "\n"); + } + catch (err) { + logErr(`Failed to send tool result to omi-tools: ${err}`); + } + }, + }); + } + } + catch { + logErr(`Failed to parse omi-tools message: ${line.slice(0, 200)}`); + } + } + }); + client.on("close", () => { + omiToolsClients = omiToolsClients.filter((c) => c !== client); + }); + client.on("error", (err) => { + logErr(`omi-tools client error: ${err.message}`); + }); + }); + server.listen(pipePath, () => { + logErr(`omi-tools relay socket: ${pipePath}`); + resolve(pipePath); + }); + server.on("error", reject); + // Clean up on exit + process.on("exit", () => { + server.close(); + try { + unlinkSync(pipePath); + } + catch { + // ignore + } + }); + }); +} +// --- ACP subprocess management --- +let acpProcess = null; +let acpStdinWriter = null; +let acpResponseHandlers = new Map(); +let acpNotificationHandler = null; +let nextRpcId = 1; +/** Send a JSON-RPC request to the ACP subprocess and wait for the response */ +async function acpRequest(method, params = {}) { + const id = nextRpcId++; + const msg = JSON.stringify({ jsonrpc: "2.0", id, method, params }); + return new Promise((resolve, reject) => { + acpResponseHandlers.set(id, { resolve, reject }); + if (acpStdinWriter) { + acpStdinWriter(msg); + } + else { + reject(new Error("ACP process stdin not available")); + } + }); +} +/** Send a JSON-RPC notification (no response expected) to ACP */ +function acpNotify(method, params = {}) { + const msg = JSON.stringify({ jsonrpc: "2.0", method, params }); + if (acpStdinWriter) { + acpStdinWriter(msg); + } +} +/** Start the ACP subprocess */ +function startAcpProcess() { + // Build environment for ACP subprocess + // If ANTHROPIC_API_KEY is present (Mode A), keep it so ACP uses OMI's key. + // If absent (Mode B), ACP will use user's own OAuth. + const env = { ...process.env }; + delete env.CLAUDE_CODE_USE_VERTEX; + // Remove CLAUDECODE so the ACP subprocess (and the Claude Code it spawns) don't + // inherit the nested-session guard. Without this, `--resume` silently fails when + // Claude Code detects it's being launched from inside another Claude Code session. + delete env.CLAUDECODE; + env.NODE_NO_WARNINGS = "1"; + // Use our patched ACP entry point (adds model selection support) + // Located in dist/ (same as __dirname) so it's included in the app bundle + const acpEntry = join(__dirname, "patched-acp-entry.mjs"); + const nodeBin = process.execPath; + const mode = env.ANTHROPIC_API_KEY ? "Mode A (Omi API key)" : "Mode B (Your Claude Account / OAuth)"; + logErr(`Starting ACP subprocess [${mode}]: ${nodeBin} ${acpEntry}`); + acpProcess = spawn(nodeBin, [acpEntry], { + env, + stdio: ["pipe", "pipe", "pipe"], + }); + if (!acpProcess.stdin || !acpProcess.stdout || !acpProcess.stderr) { + throw new Error("Failed to create ACP subprocess pipes"); + } + // Write to ACP stdin + acpStdinWriter = (line) => { + try { + acpProcess?.stdin?.write(line + "\n"); + } + catch (err) { + logErr(`Failed to write to ACP stdin: ${err}`); + } + }; + // Read ACP stdout (JSON-RPC responses and notifications) + const rl = createInterface({ + input: acpProcess.stdout, + terminal: false, + }); + rl.on("line", (line) => { + if (!line.trim()) + return; + try { + const msg = JSON.parse(line); + if ("method" in msg && "id" in msg && msg.id !== null && msg.id !== undefined) { + // Server-initiated JSON-RPC request (has both method and id, expects a response) + const id = msg.id; + const method = msg.method; + if (method === "session/request_permission") { + // Auto-approve all tool permissions (matches agent-bridge's bypassPermissions behavior) + const params = msg.params; + const options = params?.options ?? []; + const allowAlways = options.find((o) => o.kind === "allow_always"); + const allowOnce = options.find((o) => o.kind === "allow_once"); + const optionId = allowAlways?.optionId ?? allowOnce?.optionId ?? "allow"; + logErr(`Auto-approving permission for tool (id=${id})`); + acpStdinWriter?.(JSON.stringify({ + jsonrpc: "2.0", + id, + result: { outcome: { outcome: "selected", optionId } }, + })); + } + else if (method === "session/update") { + // session/update can also arrive as a request (with id) — handle and ack + if (acpNotificationHandler) { + acpNotificationHandler(method, msg.params); + } + acpStdinWriter?.(JSON.stringify({ jsonrpc: "2.0", id, result: null })); + } + else { + logErr(`Unhandled ACP request: ${method} (id=${id})`); + acpStdinWriter?.(JSON.stringify({ + jsonrpc: "2.0", + id, + error: { code: -32601, message: `Method not handled: ${method}` }, + })); + } + } + else if ("id" in msg && msg.id !== null && msg.id !== undefined) { + // JSON-RPC response (has id but no method) + const id = msg.id; + const handler = acpResponseHandlers.get(id); + if (handler) { + acpResponseHandlers.delete(id); + if ("error" in msg) { + const err = msg.error; + const error = new AcpError(err.message, err.code, err.data); + handler.reject(error); + } + else { + handler.resolve(msg.result); + } + } + } + else if ("method" in msg) { + // JSON-RPC notification (has method but no id) + if (acpNotificationHandler) { + acpNotificationHandler(msg.method, msg.params); + } + } + } + catch (err) { + logErr(`Failed to parse ACP message: ${line.slice(0, 200)}`); + } + }); + // Read ACP stderr for logging + acpProcess.stderr.on("data", (data) => { + const text = data.toString().trim(); + if (text) { + logErr(`ACP stderr: ${text}`); + } + }); + acpProcess.on("exit", (code) => { + logErr(`ACP process exited with code ${code}`); + acpProcess = null; + acpStdinWriter = null; + // All sessions are lost when ACP process dies + sessions.clear(); + activeSessionId = ""; + isInitialized = false; + for (const [, handler] of acpResponseHandlers) { + handler.reject(new Error(`ACP process exited (code ${code})`)); + } + acpResponseHandlers.clear(); + }); +} +class AcpError extends Error { + code; + data; + constructor(message, code, data) { + super(message); + this.code = code; + this.data = data; + } +} +// --- State --- +/** Pre-warmed sessions keyed by sessionKey (e.g. "main", "floating", or model name for backward compat) */ +const sessions = new Map(); +/** The session currently being used by an active query (for interrupt) */ +let activeSessionId = ""; +let activeAbort = null; +let interruptRequested = false; +let isInitialized = false; +let authMethods = []; +let authResolve = null; +let preWarmPromise = null; +let authRetryCount = 0; +const MAX_AUTH_RETRIES = 2; +let activeAuthPromise = null; +let activeOAuthFlow = null; +// --- Auth flow (OAuth) --- +/** Restart the ACP subprocess so it picks up freshly-stored credentials */ +async function restartAcpProcess() { + logErr("Restarting ACP subprocess to pick up new credentials..."); + if (acpProcess) { + const exitPromise = new Promise((resolve) => { + acpProcess.once("exit", () => resolve()); + }); + acpProcess.kill(); + await exitPromise; + } + // State is cleaned up by the exit handler (sessions, handlers, etc.) + startAcpProcess(); +} +/** + * Start the OAuth flow: spin up a local callback server, send the auth URL + * to Swift (so it can open the browser), wait for the user to complete auth, + * store credentials in Keychain, and restart the ACP subprocess. + * + * Idempotent: if a flow is already running, returns the same promise. + */ +async function startAuthFlow() { + if (activeAuthPromise) { + logErr("Auth flow already in progress, waiting for it..."); + return activeAuthPromise; + } + activeAuthPromise = (async () => { + try { + logErr("Starting OAuth flow..."); + const flow = await startOAuthFlow(logErr); + activeOAuthFlow = flow; + // Send auth URL to Swift so it can open the browser + send({ type: "auth_required", methods: authMethods, authUrl: flow.authUrl }); + // Wait for OAuth callback + token exchange + credential storage + await flow.complete; + logErr("OAuth flow completed successfully"); + // Restart ACP subprocess so it picks up new credentials from Keychain + await restartAcpProcess(); + // Notify Swift + send({ type: "auth_success" }); + } + catch (err) { + logErr(`OAuth flow failed: ${err}`); + throw err; + } + finally { + activeOAuthFlow = null; + activeAuthPromise = null; + } + })(); + return activeAuthPromise; +} +// --- ACP initialization --- +async function initializeAcp() { + if (isInitialized) + return; + try { + const result = (await acpRequest("initialize", { + protocolVersion: 1, + })); + logErr(`ACP initialized: protocol=${result.protocolVersion}, capabilities=${JSON.stringify(result.agentCapabilities)}`); + // Store auth methods for potential later use + if (result.authMethods && result.authMethods.length > 0) { + authMethods = result.authMethods.map((m) => ({ + id: m.id, + type: (m.type ?? "agent_auth"), + displayName: m.name || m.description || m.id, + args: m.args, + env: m.env, + })); + logErr(`Auth methods: ${authMethods.map((m) => `${m.id}(${m.displayName})`).join(", ")}`); + } + isInitialized = true; + } + catch (err) { + if (err instanceof AcpError && err.code === -32000) { + // AUTH_REQUIRED + const data = err.data; + if (data?.authMethods) { + authMethods = data.authMethods.map((m) => ({ + id: m.id, + type: (m.type ?? "agent_auth"), + displayName: m.name || m.description || m.id, + })); + } + logErr(`ACP requires authentication: ${JSON.stringify(authMethods)}`); + await startAuthFlow(); + // Retry initialization after auth (ACP subprocess already restarted) + await initializeAcp(); + return; + } + throw err; + } +} +function buildMcpServers(mode, cwd, sessionKey) { + const servers = []; + // omi-tools (stdio, connects back via Unix socket) + const omiToolsEnv = [ + { name: "OMI_BRIDGE_PIPE", value: omiToolsPipePath }, + { name: "OMI_QUERY_MODE", value: mode }, + ]; + if (cwd) { + omiToolsEnv.push({ name: "OMI_WORKSPACE", value: cwd }); + } + if (sessionKey === "onboarding") { + omiToolsEnv.push({ name: "OMI_ONBOARDING", value: "true" }); + } + servers.push({ + name: "omi-tools", + command: process.execPath, + args: [omiToolsStdioScript], + env: omiToolsEnv, + }); + // Playwright MCP server + const playwrightArgs = [playwrightCli]; + if (process.env.PLAYWRIGHT_USE_EXTENSION === "true") { + playwrightArgs.push("--extension"); + } + const playwrightEnv = []; + if (process.env.PLAYWRIGHT_MCP_EXTENSION_TOKEN) { + playwrightEnv.push({ + name: "PLAYWRIGHT_MCP_EXTENSION_TOKEN", + value: process.env.PLAYWRIGHT_MCP_EXTENSION_TOKEN, + }); + } + servers.push({ + name: "playwright", + command: process.execPath, + args: playwrightArgs, + env: playwrightEnv, + }); + return servers; +} +// --- Session pre-warming --- +const DEFAULT_MODEL = "claude-sonnet-4-6"; +/** + * Session keys for which we proactively write Anthropic's prompt cache at + * startup. Floating bar is latency-sensitive and typically has the largest + * cached system prompt, so warming it pays for itself within the first real + * voice query. Main chat is not included to keep per-launch Anthropic cost + * low; it warms on first real usage like before. + */ +const CACHE_WARM_KEYS = new Set(["floating"]); +async function preWarmSession(cwd, sessionConfigs, models) { + const warmCwd = cwd || process.env.HOME || "/"; + // Build the list of sessions to warm: new format (sessionConfigs) takes priority over legacy (models array) + const allConfigs = sessionConfigs && sessionConfigs.length > 0 + ? sessionConfigs + : (models && models.length > 0 ? models : [DEFAULT_MODEL]) + .map((m) => ({ key: m, model: m })); + const toWarm = filterSessionsToWarm(sessions, allConfigs); + if (toWarm.length === 0) { + logErr("All requested sessions already pre-warmed"); + return; + } + try { + await initializeAcp(); + await Promise.all(toWarm.map(async (cfg) => { + try { + const sessionParams = { + cwd: warmCwd, + mcpServers: buildMcpServers("act", warmCwd, cfg.key), + ...(cfg.systemPrompt ? { _meta: { systemPrompt: cfg.systemPrompt } } : {}), + }; + // Retry once after a short delay if session/new fails + let result; + try { + result = (await acpRequest("session/new", sessionParams)); + } + catch (firstErr) { + logErr(`Pre-warm session/new failed for ${cfg.key}, retrying in 2s: ${firstErr}`); + await new Promise((r) => setTimeout(r, 2000)); + result = (await acpRequest("session/new", sessionParams)); + } + await acpRequest("session/set_model", { sessionId: result.sessionId, modelId: cfg.model }); + // Only cache after set_model succeeds — if it fails, the session stays on the default + // model and the reuse logic should detect the mismatch and re-set it. + sessions.set(cfg.key, { sessionId: result.sessionId, cwd: warmCwd, model: cfg.model }); + logErr(`Pre-warmed session: ${result.sessionId} (key=${cfg.key}, model=${cfg.model}, hasSystemPrompt=${!!cfg.systemPrompt})`); + } + catch (err) { + if (err instanceof AcpError && err.code === -32000) { + logErr(`Pre-warm failed with auth error (code=${err.code}), starting OAuth flow`); + await startAuthFlow(); + return; + } + logErr(`Pre-warm failed for ${cfg.key}: ${err}`); + } + })); + } + catch (err) { + logErr(`Pre-warm failed (will create on first query): ${err}`); + } + // Fire-and-forget: write Anthropic's prompt cache for latency-sensitive + // sessions so the user's first real query is a cache-read, not a + // cache-write. Uses a throwaway session with the same system prompt so + // the reusable session keeps clean conversation history. + void warmAnthropicCache(warmCwd, toWarm); +} +async function warmAnthropicCache(warmCwd, configs) { + for (const cfg of configs) { + if (!cfg.systemPrompt) + continue; + if (!CACHE_WARM_KEYS.has(cfg.key)) + continue; + try { + const warmupSessionKey = `${cfg.key}-cache-warm`; + const sessionParams = { + cwd: warmCwd, + mcpServers: buildMcpServers("act", warmCwd, warmupSessionKey), + _meta: { systemPrompt: cfg.systemPrompt }, + }; + const created = (await acpRequest("session/new", sessionParams)); + await acpRequest("session/set_model", { sessionId: created.sessionId, modelId: cfg.model }); + await acpRequest("session/prompt", { + sessionId: created.sessionId, + prompt: [{ type: "text", text: "ready" }], + }); + logErr(`Anthropic cache primed: key=${cfg.key} model=${cfg.model}`); + } + catch (err) { + logErr(`Anthropic cache warmup failed for ${cfg.key}: ${err}`); + } + } +} +// --- Handle query from Swift --- +async function handleQuery(msg) { + if (activeAbort) { + activeAbort.abort(); + activeAbort = null; + } + const abortController = new AbortController(); + activeAbort = abortController; + interruptRequested = false; + authRetryCount = 0; + let fullText = ""; + let fullPrompt = ""; + let isNewSession = false; + const pendingTools = []; + try { + const mode = msg.mode ?? "act"; + currentMode = mode; + logErr(`Query mode: ${mode}`); + // Wait for pre-warm to finish if in progress + if (preWarmPromise) { + logErr("Waiting for pre-warm to complete..."); + await preWarmPromise; + preWarmPromise = null; + } + // Ensure ACP is initialized + await initializeAcp(); + // Look up a pre-warmed session by sessionKey (falls back to model name for backward compat) + const requestedModel = msg.model || DEFAULT_MODEL; + const sessionKey = msg.sessionKey ?? requestedModel; + const requestedCwd = msg.cwd || process.env.HOME || "/"; + let sessionId = ""; + const resolved = resolveSession(sessions, sessionKey, requestedCwd); + if (resolved) { + sessionId = resolved.sessionId; + } + else if (sessions.get(sessionKey)) { + // resolveSession deleted it due to cwd change + logErr(`Cwd changed for ${sessionKey}, creating new session`); + } + // Reuse existing session if alive, resume a persisted one, or create a new one + if (msg.resume && !sessionId) { + // Resume a persisted session by ID (survives process restarts via ~/.claude/projects/) + // Fall back to session/new if the session file is gone or resume fails + try { + await acpRequest("session/resume", { + sessionId: msg.resume, + cwd: requestedCwd, + mcpServers: buildMcpServers(mode, requestedCwd, sessionKey), + }); + sessionId = msg.resume; + if (requestedModel) { + await acpRequest("session/set_model", { sessionId, modelId: requestedModel }); + } + sessions.set(sessionKey, { sessionId, cwd: requestedCwd, model: requestedModel }); + isNewSession = false; + logErr(`ACP session resumed: ${sessionId} (key=${sessionKey})`); + } + catch (resumeErr) { + logErr(`ACP session resume failed (will create new session): ${resumeErr}`); + // Fall through to session/new below + } + } + if (!sessionId) { + const sessionParams = { + cwd: requestedCwd, + mcpServers: buildMcpServers(mode, requestedCwd, sessionKey), + ...(msg.systemPrompt ? { _meta: { systemPrompt: msg.systemPrompt } } : {}), + }; + const sessionResult = (await acpRequest("session/new", sessionParams)); + sessionId = sessionResult.sessionId; + isNewSession = true; + if (requestedModel) { + await acpRequest("session/set_model", { sessionId, modelId: requestedModel }); + } + sessions.set(sessionKey, { sessionId, cwd: requestedCwd, model: requestedModel }); + logErr(`ACP session created: ${sessionId} (key=${sessionKey}, model=${requestedModel || "default"}, cwd=${requestedCwd})`); + } + else { + isNewSession = false; + // Update model on reuse if the requested model differs from the session's stored model. + // Wrap in try-catch: if the session is stale, delete it and fall through to session/new. + if (needsModelUpdate(resolved?.existing, requestedModel)) { + try { + await acpRequest("session/set_model", { sessionId, modelId: requestedModel }); + sessions.set(sessionKey, { sessionId, cwd: requestedCwd, model: requestedModel }); + logErr(`Updated model on reuse: ${sessionId} (key=${sessionKey}, ${resolved?.existing?.model} -> ${requestedModel})`); + } + catch (setModelErr) { + logErr(`set_model failed on reuse (stale session?), recreating: ${setModelErr}`); + sessions.delete(getRetryDeleteKey(sessionKey)); + activeSessionId = ""; + return handleQuery(msg); + } + } + logErr(`Reusing existing ACP session: ${sessionId} (key=${sessionKey})`); + } + activeSessionId = sessionId; + fullPrompt = msg.prompt; + // Set up notification handler for this query + acpNotificationHandler = (method, params) => { + if (abortController.signal.aborted) + return; + if (method === "session/update") { + const p = params; + handleSessionUpdate(p, pendingTools, (text) => { + fullText += text; + }); + } + }; + // Send the prompt — retry with fresh session if stale + const sendPrompt = async () => { + const promptBlocks = []; + if (msg.imageBase64) { + promptBlocks.push({ type: "image", data: msg.imageBase64, mimeType: "image/jpeg" }); + } + promptBlocks.push({ type: "text", text: fullPrompt }); + const sessionPromptPayload = { + sessionId, + prompt: promptBlocks, + }; + const promptResult = (await acpRequest("session/prompt", sessionPromptPayload)); + logErr(`Prompt completed: stopReason=${promptResult.stopReason}`); + // Mark any remaining pending tools as completed + for (const name of pendingTools) { + send({ type: "tool_activity", name, status: "completed" }); + } + pendingTools.length = 0; + const inputTokens = promptResult.usage?.inputTokens ?? Math.ceil(fullPrompt.length / 4); + const outputTokens = promptResult.usage?.outputTokens ?? Math.ceil(fullText.length / 4); + const cacheReadTokens = promptResult.usage?.cachedReadTokens ?? 0; + const cacheWriteTokens = promptResult.usage?.cachedWriteTokens ?? 0; + const costUsd = promptResult._meta?.costUsd ?? 0; + send({ type: "result", text: fullText, sessionId, costUsd, inputTokens, outputTokens, cacheReadTokens, cacheWriteTokens }); + }; + try { + await sendPrompt(); + } + catch (err) { + if (abortController.signal.aborted) { + if (interruptRequested) { + for (const name of pendingTools) { + send({ type: "tool_activity", name, status: "completed" }); + } + pendingTools.length = 0; + logErr(`Query interrupted by user, sending partial result (${fullText.length} chars)`); + const inputTokens = Math.ceil(fullPrompt.length / 4); + const outputTokens = Math.ceil(fullText.length / 4); + send({ type: "result", text: fullText, sessionId, costUsd: 0, inputTokens, outputTokens, cacheReadTokens: 0, cacheWriteTokens: 0 }); + } + else { + logErr("Query aborted (superseded by new query)"); + } + return; + } + // Only -32000 is AUTH_REQUIRED in the new ACP protocol. + // -32603 is a generic internal error (API error, rate limit, etc.) — do NOT start OAuth for it. + if (err instanceof AcpError && err.code === -32000) { + if (authRetryCount >= MAX_AUTH_RETRIES) { + logErr(`session/prompt auth error but max retries (${MAX_AUTH_RETRIES}) reached, giving up`); + send({ type: "error", message: "Authentication required. Please disconnect and reconnect your Claude account in Settings." }); + return; + } + authRetryCount++; + logErr(`session/prompt failed with auth error (code=${err.code}), starting OAuth flow (attempt ${authRetryCount})`); + sessions.delete(getRetryDeleteKey(sessionKey)); + activeSessionId = ""; + await startAuthFlow(); + return handleQuery(msg); + } + // If session/prompt failed while reusing an existing session, retry once with a fresh one. + // Do NOT retry if we already started fresh (isNewSession) — that would infinite-loop. + if (!isNewSession && sessionId) { + logErr(`session/prompt failed with existing session, retrying with fresh session: ${err}`); + sessions.delete(getRetryDeleteKey(sessionKey)); + activeSessionId = ""; + return handleQuery(msg); + } + throw err; + } + } + catch (err) { + if (abortController.signal.aborted) { + if (interruptRequested) { + for (const name of pendingTools) { + send({ type: "tool_activity", name, status: "completed" }); + } + pendingTools.length = 0; + const inputTokens = Math.ceil(fullPrompt.length / 4); + const outputTokens = Math.ceil(fullText.length / 4); + send({ type: "result", text: fullText, sessionId: activeSessionId, costUsd: 0, inputTokens, outputTokens }); + } + return; + } + // Only -32000 is AUTH_REQUIRED in the new ACP protocol. + // -32603 is a generic internal error — surface it as a real error, not auth. + if (err instanceof AcpError && err.code === -32000) { + if (authRetryCount >= MAX_AUTH_RETRIES) { + logErr(`Query auth error but max retries (${MAX_AUTH_RETRIES}) reached, giving up`); + send({ type: "error", message: "Authentication required. Please disconnect and reconnect your Claude account in Settings." }); + return; + } + authRetryCount++; + logErr(`Query failed with auth error (code=${err.code}), starting OAuth flow (attempt ${authRetryCount})`); + await startAuthFlow(); + return handleQuery(msg); + } + const errMsg = err instanceof Error ? err.message : String(err); + logErr(`Query error: ${errMsg}`); + send({ type: "error", message: errMsg }); + } + finally { + if (activeAbort === abortController) { + activeAbort = null; + } + acpNotificationHandler = null; + } +} +/** Translate ACP session/update notifications into our JSON-lines protocol. + * + * ACP uses `params.update.sessionUpdate` as the discriminator field: + * - "agent_message_chunk" → text delta (content.text) + * - "agent_thought_chunk" → thinking delta (content.text) + * - "tool_call" → tool started (title, toolCallId, kind, status) + * - "tool_call_update" → tool completed (toolCallId, status, content) + * - "plan" → plan entries (entries[].content) + */ +function handleSessionUpdate(params, pendingTools, onText) { + const update = params.update; + if (!update) { + logErr(`session/update missing 'update' field: ${JSON.stringify(params).slice(0, 200)}`); + return; + } + const sessionUpdate = update.sessionUpdate; + switch (sessionUpdate) { + case "agent_message_chunk": { + const content = update.content; + const text = content?.text ?? ""; + if (text) { + // If tools were pending, they're now complete + if (pendingTools.length > 0) { + for (const name of pendingTools) { + send({ type: "tool_activity", name, status: "completed" }); + } + pendingTools.length = 0; + } + onText(text); + send({ type: "text_delta", text }); + } + break; + } + case "agent_thought_chunk": { + const content = update.content; + const text = content?.text ?? ""; + if (text) { + send({ type: "thinking_delta", text }); + } + break; + } + case "tool_call": { + const toolCallId = update.toolCallId ?? ""; + let title = update.title ?? "unknown"; + const kind = update.kind ?? ""; + const status = update.status ?? "pending"; + // Recover real tool name for server-side tools (e.g. WebSearch, WebFetch) + // where title may arrive as undefined/unknown + if (title === "unknown" || title.includes("undefined")) { + const meta = update._meta; + const toolName = meta?.claudeCode?.toolName; + const rawInput = update.rawInput; + if (toolName === "WebSearch" && rawInput?.query) { + title = `WebSearch: "${rawInput.query}"`; + } + else if (toolName === "WebFetch" && rawInput?.url) { + title = `WebFetch: ${rawInput.url}`; + } + else if (toolName) { + title = toolName; + } + } + if (status === "pending" || status === "in_progress") { + pendingTools.push(title); + send({ + type: "tool_activity", + name: title, + status: "started", + toolUseId: toolCallId, + }); + // Extract input from rawInput if available + const rawInput = update.rawInput; + if (rawInput && Object.keys(rawInput).length > 0) { + send({ + type: "tool_activity", + name: title, + status: "started", + toolUseId: toolCallId, + input: rawInput, + }); + } + logErr(`Tool started: ${title} (id=${toolCallId}, kind=${kind})`); + } + break; + } + case "tool_call_update": { + const toolCallId = update.toolCallId ?? ""; + const status = update.status ?? ""; + let title = update.title ?? "unknown"; + // Recover real tool name (same logic as tool_call) + if (title === "unknown" || title.includes("undefined")) { + const meta = update._meta; + const toolName = meta?.claudeCode?.toolName; + if (toolName) { + title = toolName; + } + } + if (status === "completed" || status === "failed" || status === "cancelled") { + // Remove from pending + const idx = pendingTools.indexOf(title); + if (idx >= 0) + pendingTools.splice(idx, 1); + send({ + type: "tool_activity", + name: title, + status: "completed", + toolUseId: toolCallId, + }); + // Extract output from content array or rawOutput + let output = ""; + const contentArr = update.content; + if (contentArr && Array.isArray(contentArr)) { + output = contentArr + .filter((c) => c.type === "text" && c.text) + .map((c) => c.text) + .join("\n"); + } + if (!output) { + const rawOutput = update.rawOutput; + if (rawOutput) { + output = JSON.stringify(rawOutput); + } + } + if (output) { + const truncated = output.length > 2000 + ? output.slice(0, 2000) + "\n... (truncated)" + : output; + send({ + type: "tool_result_display", + toolUseId: toolCallId, + name: title, + output: truncated, + }); + } + logErr(`Tool completed: ${title} (id=${toolCallId}) output=${output ? output.length + " chars" : "none"}`); + } + break; + } + case "plan": { + const entries = update.entries; + if (entries && Array.isArray(entries)) { + for (const entry of entries) { + if (entry.content) { + send({ type: "thinking_delta", text: entry.content + "\n" }); + } + } + } + break; + } + default: + logErr(`Unknown session update type: ${sessionUpdate} — ${JSON.stringify(update).slice(0, 200)}`); + } +} +// --- Error handling --- +/** Write to /tmp/acp-bridge-crash.log as fallback when stderr might be lost */ +const CRASH_LOG_PATH = "/tmp/acp-bridge-crash.log"; +const CRASH_LOG_MAX_BYTES = 10 * 1024 * 1024; // 10 MB cap +function logCrash(msg) { + try { + // Skip if log already exceeded size cap to prevent disk fill + try { + const stat = statSync(CRASH_LOG_PATH); + if (stat.size >= CRASH_LOG_MAX_BYTES) return; + } catch { /* file doesn't exist yet, ok */ } + const ts = new Date().toISOString(); + appendFileSync(CRASH_LOG_PATH, `[${ts}] ${msg}\n`); + } + catch { + // ignore + } +} +process.on("unhandledRejection", (reason) => { + logErr(`Unhandled rejection: ${reason}`); + logCrash(`Unhandled rejection: ${reason}`); +}); +process.on("uncaughtException", (err) => { + const code = err.code; + if (code === "EPIPE" || code === "ERR_STREAM_DESTROYED") { + logErr(`Caught ${code} in uncaughtException (subprocess pipe closed) — exiting`); + logCrash(`Caught ${code} (pipe closed) — exiting`); + process.exit(0); + } + logErr(`Uncaught exception: ${err.message}\n${err.stack ?? ""}`); + logCrash(`Uncaught exception: ${err.message}\n${err.stack ?? ""}`); + send({ type: "error", message: `Uncaught: ${err.message}` }); + process.exit(1); +}); +process.stdout.on("error", (err) => { + if (err.code === "EPIPE") { + logErr("stdout pipe closed (parent process disconnected)"); + logCrash("stdout EPIPE — parent disconnected"); + process.exit(0); + } + logErr(`stdout error: ${err.message}`); + logCrash(`stdout error: ${err.message}`); +}); +// --- Main --- +async function main() { + logErr(`Bridge main() starting (pid=${process.pid}, node=${process.version}, execPath=${process.execPath})`); + // 1. Start Unix socket for omi-tools relay + omiToolsPipePath = await startOmiToolsRelay(); + logErr("omi-tools relay started"); + // 2. Start the ACP subprocess + startAcpProcess(); + logErr("ACP subprocess spawned"); + // 3. Signal readiness + send({ type: "init", sessionId: "" }); + logErr("ACP Bridge started, waiting for queries..."); + // 4. Read JSON lines from Swift + const rl = createInterface({ input: process.stdin, terminal: false }); + rl.on("line", (line) => { + if (!line.trim()) + return; + let msg; + try { + msg = JSON.parse(line); + } + catch { + logErr(`Invalid JSON: ${line}`); + return; + } + switch (msg.type) { + case "query": + handleQuery(msg).catch((err) => { + logErr(`Unhandled query error: ${err}`); + send({ type: "error", message: String(err) }); + }); + break; + case "warmup": { + const wm = msg; + if (wm.sessions && wm.sessions.length > 0) { + logErr(`Warmup requested (cwd=${wm.cwd || "default"}, sessions=${wm.sessions.map(s => s.key).join(", ")})`); + preWarmPromise = preWarmSession(wm.cwd, wm.sessions); + } + else { + // Backward compat: models array or single model + const models = wm.models ?? (wm.model ? [wm.model] : undefined); + logErr(`Warmup requested (cwd=${wm.cwd || "default"}, models=${JSON.stringify(models) || "default"})`); + preWarmPromise = preWarmSession(wm.cwd, undefined, models); + } + break; + } + case "tool_result": + resolveToolCall(msg); + break; + case "interrupt": + logErr("Interrupt requested by user"); + interruptRequested = true; + if (activeAbort) + activeAbort.abort(); + if (activeSessionId) { + acpNotify("session/cancel", { sessionId: activeSessionId }); + } + break; + case "invalidate_session": + sessions.delete(msg.sessionKey); + logErr(`Invalidated cached ACP session for key=${msg.sessionKey}`); + break; + case "authenticate": { + // Legacy fallback: OAuth flow now handles auth internally. + // This handler is kept for backward compatibility. + logErr(`Authentication message received from Swift (legacy fallback)`); + send({ type: "auth_success" }); + if (authResolve) { + authResolve(); + authResolve = null; + } + break; + } + case "stop": + logErr("Received stop signal, exiting"); + if (activeAbort) + activeAbort.abort(); + if (acpProcess) { + acpProcess.kill(); + } + process.exit(0); + break; + default: + logErr(`Unknown message type: ${msg.type}`); + } + }); + rl.on("close", () => { + logErr("stdin closed, exiting"); + logCrash("stdin closed, exiting"); + if (activeAbort) + activeAbort.abort(); + if (acpProcess) + acpProcess.kill(); + process.exit(0); + }); +} +main().catch((err) => { + logErr(`Fatal error: ${err}`); + logCrash(`Fatal error: ${err}`); + send({ type: "error", message: `Fatal: ${err}` }); + process.exit(1); +});