diff --git a/packages/kernel-language-model-service/src/ollama/fetch.test.ts b/packages/kernel-language-model-service/src/ollama/fetch.test.ts index 53cb0b0a9..3018f12bb 100644 --- a/packages/kernel-language-model-service/src/ollama/fetch.test.ts +++ b/packages/kernel-language-model-service/src/ollama/fetch.test.ts @@ -97,7 +97,6 @@ describe('makeHostRestrictedFetch', () => { }); it('should handle Request objects correctly', async () => { - // eslint-disable-next-line n/no-unsupported-features/node-builtins const request = new Request(mockUrl); await restrictedFetch(request); diff --git a/packages/kernel-language-model-service/src/ollama/fetch.ts b/packages/kernel-language-model-service/src/ollama/fetch.ts index cf0ae0ba0..2bae8156b 100644 --- a/packages/kernel-language-model-service/src/ollama/fetch.ts +++ b/packages/kernel-language-model-service/src/ollama/fetch.ts @@ -24,7 +24,6 @@ export const makeHostRestrictedFetch = ( const restrictedFetch = async ( ...[url, ...args]: Parameters ): ReturnType => { - // eslint-disable-next-line n/no-unsupported-features/node-builtins const { host } = new URL(url instanceof Request ? url.url : url); if (!allowedHosts.includes(host)) { throw new Error( diff --git a/packages/kernel-platforms/src/capabilities/fetch/nodejs.test.ts b/packages/kernel-platforms/src/capabilities/fetch/nodejs.test.ts index a6a22213a..eceab0871 100644 --- a/packages/kernel-platforms/src/capabilities/fetch/nodejs.test.ts +++ b/packages/kernel-platforms/src/capabilities/fetch/nodejs.test.ts @@ -31,7 +31,7 @@ describe('fetch nodejs capability', () => { { name: 'with Request objects', config: {}, - // eslint-disable-next-line n/no-unsupported-features/node-builtins + input: new Request('file:///path/to/file.txt'), }, { @@ -50,7 +50,7 @@ describe('fetch nodejs capability', () => { const result = await fetchCapability(input); expect(readFile).toHaveBeenCalledWith('/path/to/file.txt', 'utf8'); - // eslint-disable-next-line n/no-unsupported-features/node-builtins + expect(result).toBeInstanceOf(Response); expect(await result.text()).toBe(fileContents); expect(fetchMock).not.toHaveBeenCalled(); // Should not call global fetch for file:// URLs diff --git a/packages/kernel-platforms/src/capabilities/fetch/nodejs.ts b/packages/kernel-platforms/src/capabilities/fetch/nodejs.ts index 56f2991d1..49479fdc1 100644 --- a/packages/kernel-platforms/src/capabilities/fetch/nodejs.ts +++ b/packages/kernel-platforms/src/capabilities/fetch/nodejs.ts @@ -13,13 +13,12 @@ import { makeCapabilitySpecification } from '../../specification.ts'; */ const makeExtendedFetch = (fromFetch: FetchCapability): FetchCapability => { return async (...[input, ...args]: Parameters) => { - // eslint-disable-next-line n/no-unsupported-features/node-builtins const url = input instanceof Request ? input.url : input; const { protocol, pathname } = new URL(url); if (protocol === 'file:') { const contents = await readFile(pathname, 'utf8'); - // eslint-disable-next-line n/no-unsupported-features/node-builtins + return new Response(contents); } diff --git a/packages/kernel-platforms/src/capabilities/fetch/shared.test.ts b/packages/kernel-platforms/src/capabilities/fetch/shared.test.ts index cadf17174..39e799afc 100644 --- a/packages/kernel-platforms/src/capabilities/fetch/shared.test.ts +++ b/packages/kernel-platforms/src/capabilities/fetch/shared.test.ts @@ -12,7 +12,7 @@ describe('resolveUrl', () => { { name: 'string URL', input: 'https://example.test/path' }, { name: 'Request object URL', - // eslint-disable-next-line n/no-unsupported-features/node-builtins + input: new Request('https://example.test/path'), }, { name: 'URL object', input: new URL('https://example.test/path') }, @@ -44,7 +44,7 @@ describe('makeHostCaveat', () => { it.each([ { name: 'Request objects', - // eslint-disable-next-line n/no-unsupported-features/node-builtins + input: new Request('https://example.test/path'), }, { name: 'URL objects', input: new URL('https://example.test/path') }, @@ -91,7 +91,6 @@ describe('makeCaveatedFetch', () => { const mockResponse = { status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; const mockFetch = vi.fn().mockResolvedValue(mockResponse); const caveat = vi.fn().mockResolvedValue(undefined); @@ -122,7 +121,6 @@ describe('makeCaveatedFetch', () => { const mockResponse = { status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; const mockFetch = vi.fn().mockResolvedValue(mockResponse); const caveat = vi.fn().mockResolvedValue(undefined); @@ -142,7 +140,6 @@ describe('shared fetch capability behavior', () => { const mockResponse = { status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; const mockFetch = vi.fn().mockResolvedValue(mockResponse); const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] }); @@ -164,7 +161,6 @@ describe('shared fetch capability behavior', () => { const mockResponse = { status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; const mockFetch = vi.fn().mockResolvedValue(mockResponse); const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] }); @@ -179,7 +175,7 @@ describe('shared fetch capability behavior', () => { it.each([ { name: 'Request objects', - // eslint-disable-next-line n/no-unsupported-features/node-builtins + input: new Request('https://example.test/path'), }, { name: 'URL objects', input: new URL('https://example.test/path') }, @@ -187,7 +183,6 @@ describe('shared fetch capability behavior', () => { const mockResponse = { status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; const mockFetch = vi.fn().mockResolvedValue(mockResponse); const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] }); diff --git a/packages/kernel-platforms/src/capabilities/fetch/shared.ts b/packages/kernel-platforms/src/capabilities/fetch/shared.ts index 8c12733eb..9692f786a 100644 --- a/packages/kernel-platforms/src/capabilities/fetch/shared.ts +++ b/packages/kernel-platforms/src/capabilities/fetch/shared.ts @@ -7,7 +7,6 @@ import type { FetchCapability, FetchCaveat, FetchConfig } from './types.ts'; * @returns The resolved URL */ export const resolveUrl = (arg: Parameters[0]): URL => - // eslint-disable-next-line n/no-unsupported-features/node-builtins new URL(arg instanceof Request ? arg.url : arg); /** diff --git a/packages/kernel-platforms/test/utils.ts b/packages/kernel-platforms/test/utils.ts index 2833fd5da..6f98ec610 100644 --- a/packages/kernel-platforms/test/utils.ts +++ b/packages/kernel-platforms/test/utils.ts @@ -8,10 +8,9 @@ export const superstructValidationError = /At path: .* -- Expected/u; * * @returns A mock Response object */ -// eslint-disable-next-line n/no-unsupported-features/node-builtins + export const createMockResponse = (): Response => ({ status: 200, text: async () => Promise.resolve('test'), - // eslint-disable-next-line n/no-unsupported-features/node-builtins }) as Response; diff --git a/packages/kernel-test/src/io.test.ts b/packages/kernel-test/src/io.test.ts new file mode 100644 index 000000000..d9f9407ea --- /dev/null +++ b/packages/kernel-test/src/io.test.ts @@ -0,0 +1,263 @@ +import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; +import { waitUntilQuiescent } from '@metamask/kernel-utils'; +import { Kernel } from '@metamask/ocap-kernel'; +import type { IOChannel, IOConfig } from '@metamask/ocap-kernel'; +import * as net from 'node:net'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { getBundleSpec, makeTestLogger } from './utils.ts'; + +function tempSocketPath(): string { + return path.join( + os.tmpdir(), + `io-int-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +async function makeTestSocketChannel( + _name: string, + socketPath: string, +): Promise { + const fsPromises = await import('node:fs/promises'); + const lineQueue: string[] = []; + const readerQueue: { resolve: (value: string | null) => void }[] = []; + let currentSocket: net.Socket | null = null; + let lineBuffer = ''; + let closed = false; + + function deliverLine(line: string): void { + const reader = readerQueue.shift(); + if (reader) { + reader.resolve(line); + } else { + lineQueue.push(line); + } + } + + function deliverEOF(): void { + while (readerQueue.length > 0) { + readerQueue.shift()?.resolve(null); + } + } + + const server = net.createServer((socket) => { + if (currentSocket) { + socket.destroy(); + return; + } + currentSocket = socket; + lineBuffer = ''; + socket.on('data', (data: Buffer) => { + lineBuffer += data.toString(); + let idx = lineBuffer.indexOf('\n'); + while (idx !== -1) { + deliverLine(lineBuffer.slice(0, idx)); + lineBuffer = lineBuffer.slice(idx + 1); + idx = lineBuffer.indexOf('\n'); + } + }); + socket.on('end', () => { + if (lineBuffer.length > 0) { + deliverLine(lineBuffer); + lineBuffer = ''; + } + currentSocket = null; + deliverEOF(); + }); + socket.on('error', () => { + currentSocket = null; + deliverEOF(); + }); + }); + + try { + await fsPromises.unlink(socketPath); + } catch { + // ignore + } + + await new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(socketPath, () => { + server.removeListener('error', reject); + resolve(); + }); + }); + + return { + async read() { + if (closed) { + return null; + } + const queued = lineQueue.shift(); + if (queued !== undefined) { + return queued; + } + if (!currentSocket) { + return null; + } + return new Promise((resolve) => { + readerQueue.push({ resolve }); + }); + }, + async write(data: string) { + if (!currentSocket) { + throw new Error('no connected client'); + } + const socket = currentSocket; + return new Promise((resolve, reject) => { + socket.write(`${data}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + async close() { + if (closed) { + return; + } + closed = true; + deliverEOF(); + currentSocket?.destroy(); + currentSocket = null; + await new Promise((resolve) => { + server.close(() => resolve()); + }); + try { + await fsPromises.unlink(socketPath); + } catch { + // ignore + } + }, + }; +} + +describe('IO kernel service', () => { + const clients: net.Socket[] = []; + + afterEach(async () => { + for (const client of clients) { + client.destroy(); + } + clients.length = 0; + }); + + it('reads and writes through an IO channel', async () => { + const socketPath = tempSocketPath(); + const kernelDatabase = await makeSQLKernelDatabase({ + dbFilename: ':memory:', + }); + const { logger } = makeTestLogger(); + + const { NodejsPlatformServices } = await import('@ocap/nodejs'); + const kernel = await Kernel.make( + new NodejsPlatformServices({ + logger: logger.subLogger({ tags: ['platform'] }), + }), + kernelDatabase, + { + resetStorage: true, + logger, + ioChannelFactory: async (name: string, config: IOConfig) => { + if (config.type !== 'socket') { + throw new Error(`unsupported: ${config.type}`); + } + return makeTestSocketChannel(name, config.path); + }, + }, + ); + + const config = { + bootstrap: 'io', + forceReset: true, + io: { + repl: { + type: 'socket' as const, + path: socketPath, + }, + }, + services: ['repl'], + vats: { + io: { + bundleSpec: getBundleSpec('io-vat'), + parameters: { name: 'io' }, + }, + }, + }; + + const { rootKref } = await kernel.launchSubcluster(config); + await waitUntilQuiescent(); + + // Connect to the socket + const client = await connectToSocket(socketPath); + clients.push(client); + + // Small delay for connection setup + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Send a line from the test to the vat + await writeLine(client, 'hello from test'); + + // Trigger the vat to read and verify it received the data + await kernel.queueMessage(rootKref, 'doRead', []); + await waitUntilQuiescent(100); + + const bufferResult = await kernel.queueMessage( + rootKref, + 'getReadBuffer', + [], + ); + await waitUntilQuiescent(100); + expect(bufferResult.body).toContain('hello from test'); + + // Trigger the vat to write + const linePromise = readLine(client); + await kernel.queueMessage(rootKref, 'doWrite', ['hello from vat']); + await waitUntilQuiescent(100); + + const received = await linePromise; + expect(received).toBe('hello from vat'); + }); +}); diff --git a/packages/kernel-test/src/supervisor.test.ts b/packages/kernel-test/src/supervisor.test.ts index cd9a08cdb..0d77f1cf7 100644 --- a/packages/kernel-test/src/supervisor.test.ts +++ b/packages/kernel-test/src/supervisor.test.ts @@ -28,7 +28,7 @@ const makeVatSupervisor = async ({ logger: makeMockLogger(), vatPowers, makePlatform: vi.fn().mockResolvedValue({}), - // eslint-disable-next-line n/no-unsupported-features/node-builtins + fetchBlob: async (url: string): Promise => { if (!url.endsWith('.bundle')) { throw new Error(`Unexpected URL: ${url}`); @@ -39,7 +39,6 @@ const makeVatSupervisor = async ({ return { ok: true, text: async () => bundleContent, - // eslint-disable-next-line n/no-unsupported-features/node-builtins } as Response; }, }), diff --git a/packages/kernel-test/src/vats/io-vat.ts b/packages/kernel-test/src/vats/io-vat.ts new file mode 100644 index 000000000..04b582fde --- /dev/null +++ b/packages/kernel-test/src/vats/io-vat.ts @@ -0,0 +1,44 @@ +import { E } from '@endo/eventual-send'; +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import { unwrapTestLogger } from '../test-powers.ts'; +import type { TestPowers } from '../test-powers.ts'; + +/** + * Build function for testing IO kernel services. + * + * @param vatPowers - Special powers granted to this vat. + * @param parameters - Initialization parameters from the vat's config object. + * @param parameters.name - The name of the vat. + * @returns The root object for the new vat. + */ +// eslint-disable-next-line @typescript-eslint/explicit-function-return-type +export function buildRootObject( + vatPowers: TestPowers, + parameters: { name?: string } = {}, +) { + const name = parameters?.name ?? 'io-vat'; + const tlog = unwrapTestLogger(vatPowers, name); + let ioService: unknown; + const readBuffer: string[] = []; + + return makeDefaultExo('root', { + async bootstrap(_vats: unknown, services: { repl: unknown }) { + tlog('bootstrap'); + ioService = services.repl; + }, + async doRead() { + const line = await E(ioService).read(); + tlog(`read: ${line}`); + readBuffer.push(String(line)); + return line; + }, + async doWrite(data: string) { + await E(ioService).write(data); + tlog(`wrote: ${data}`); + }, + async getReadBuffer() { + return [...readBuffer]; + }, + }); +} diff --git a/packages/nodejs-test-workers/src/workers/mock-fetch.ts b/packages/nodejs-test-workers/src/workers/mock-fetch.ts index 58afd4844..9b624a0a6 100644 --- a/packages/nodejs-test-workers/src/workers/mock-fetch.ts +++ b/packages/nodejs-test-workers/src/workers/mock-fetch.ts @@ -7,8 +7,6 @@ const LOG_TAG = 'nodejs-test-vat-worker'; let logger = new Logger(LOG_TAG); -/* eslint-disable n/no-unsupported-features/node-builtins */ - main().catch((reason) => logger.error('main exited with error', reason)); /** diff --git a/packages/nodejs/src/index.ts b/packages/nodejs/src/index.ts index 6af1ec51b..49c133fdf 100644 --- a/packages/nodejs/src/index.ts +++ b/packages/nodejs/src/index.ts @@ -1,3 +1,4 @@ export { NodejsPlatformServices } from './kernel/PlatformServices.ts'; export { makeKernel } from './kernel/make-kernel.ts'; export { makeNodeJsVatSupervisor } from './vat/make-supervisor.ts'; +export { makeIOChannelFactory, makeSocketIOChannel } from './io/index.ts'; diff --git a/packages/nodejs/src/io/index.ts b/packages/nodejs/src/io/index.ts new file mode 100644 index 000000000..739aa7757 --- /dev/null +++ b/packages/nodejs/src/io/index.ts @@ -0,0 +1,24 @@ +import type { IOChannelFactory, IOConfig } from '@metamask/ocap-kernel'; + +import { makeSocketIOChannel } from './socket-channel.ts'; + +export { makeSocketIOChannel } from './socket-channel.ts'; + +/** + * Create an IOChannelFactory for the Node.js environment. + * Dispatches on `config.type` to the appropriate channel implementation. + * + * @returns An IOChannelFactory. + */ +export function makeIOChannelFactory(): IOChannelFactory { + return async (name: string, config: IOConfig) => { + switch (config.type) { + case 'socket': + return makeSocketIOChannel(name, config.path); + default: + throw new Error( + `Unsupported IO channel type "${config.type}" for channel "${name}"`, + ); + } + }; +} diff --git a/packages/nodejs/src/io/socket-channel.test.ts b/packages/nodejs/src/io/socket-channel.test.ts new file mode 100644 index 000000000..d6a773658 --- /dev/null +++ b/packages/nodejs/src/io/socket-channel.test.ts @@ -0,0 +1,293 @@ +import type { IOChannel } from '@metamask/ocap-kernel'; +import fs from 'node:fs/promises'; +import * as net from 'node:net'; +import * as os from 'node:os'; +import * as path from 'node:path'; +import { describe, it, expect, afterEach } from 'vitest'; + +import { makeSocketIOChannel } from './socket-channel.ts'; + +function tempSocketPath(): string { + return path.join( + os.tmpdir(), + `io-test-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`, + ); +} + +async function connectToSocket(socketPath: string): Promise { + return new Promise((resolve, reject) => { + const client = net.createConnection(socketPath, () => { + client.removeListener('error', reject); + resolve(client); + }); + client.on('error', reject); + }); +} + +async function writeLine(socket: net.Socket, line: string): Promise { + return new Promise((resolve, reject) => { + socket.write(`${line}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); +} + +async function readLine(socket: net.Socket): Promise { + return new Promise((resolve) => { + let buffer = ''; + const onData = (data: Buffer): void => { + buffer += data.toString(); + const idx = buffer.indexOf('\n'); + if (idx !== -1) { + socket.removeListener('data', onData); + resolve(buffer.slice(0, idx)); + } + }; + socket.on('data', onData); + }); +} + +async function fileExists(filePath: string): Promise { + try { + await fs.access(filePath); + return true; + } catch { + return false; + } +} + +describe('makeSocketIOChannel', () => { + const channels: IOChannel[] = []; + const clients: net.Socket[] = []; + + afterEach(async () => { + for (const client of clients) { + client.destroy(); + } + clients.length = 0; + for (const channel of channels) { + await channel.close(); + } + channels.length = 0; + }); + + it('creates a listening socket', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + expect(await fileExists(socketPath)).toBe(true); + }); + + it('reads lines from a connected client', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + await writeLine(client, 'hello'); + await writeLine(client, 'world'); + + const line1 = await channel.read(); + const line2 = await channel.read(); + + expect(line1).toBe('hello'); + expect(line2).toBe('world'); + }); + + it('writes lines to a connected client', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + // Small delay for connection to be established + await new Promise((resolve) => setTimeout(resolve, 10)); + + const linePromise = readLine(client); + await channel.write('output'); + const received = await linePromise; + + expect(received).toBe('output'); + }); + + it('returns null on client disconnect', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + + // Start a read that will block + const readPromise = channel.read(); + client.destroy(); + + const result = await readPromise; + expect(result).toBeNull(); + }); + + it('returns null when no client is connected', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const result = await channel.read(); + expect(result).toBeNull(); + }); + + it('throws on write when no client is connected', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + await expect(channel.write('data')).rejects.toThrow( + 'has no connected client', + ); + }); + + it('queues lines before read is called', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + // Send lines before any reads + await writeLine(client, 'a'); + await writeLine(client, 'b'); + + // Small delay for data to arrive + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(await channel.read()).toBe('a'); + expect(await channel.read()).toBe('b'); + }); + + it('rejects second connection', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client1 = await connectToSocket(socketPath); + clients.push(client1); + + const client2 = await connectToSocket(socketPath); + + // Second client should be destroyed + await new Promise((resolve) => { + client2.on('close', () => resolve()); + }); + expect(client2.destroyed).toBe(true); + }); + + it('cleans up socket file on close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + + expect(await fileExists(socketPath)).toBe(true); + await channel.close(); + expect(await fileExists(socketPath)).toBe(false); + }); + + it('returns null after close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + + await channel.close(); + + const result = await channel.read(); + expect(result).toBeNull(); + }); + + it('throws on write after close', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + const client = await connectToSocket(socketPath); + clients.push(client); + + await channel.close(); + + await expect(channel.write('data')).rejects.toThrow('is closed'); + }); + + it('drains stale lineQueue when a new client connects', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + // First client sends lines that are not read + const client1 = await connectToSocket(socketPath); + await writeLine(client1, 'stale-line'); + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Disconnect first client + client1.destroy(); + await new Promise((resolve) => setTimeout(resolve, 20)); + + // Second client connects — stale lines should be gone + const client2 = await connectToSocket(socketPath); + clients.push(client2); + + await writeLine(client2, 'fresh-line'); + await new Promise((resolve) => setTimeout(resolve, 20)); + + expect(await channel.read()).toBe('fresh-line'); + }); + + it('handles multi-byte UTF-8 split across TCP chunks', async () => { + const socketPath = tempSocketPath(); + const channel = await makeSocketIOChannel('test', socketPath); + channels.push(channel); + + const client = await connectToSocket(socketPath); + clients.push(client); + + // U+1F600 (😀) is 4 bytes: f0 9f 98 80 + const emoji = '\u{1F600}'; + const fullMessage = `hello ${emoji} world\n`; + const encoded = Buffer.from(fullMessage, 'utf8'); + + // Split in the middle of the emoji (after first 2 bytes of the 4-byte sequence) + const splitPoint = Buffer.from('hello ', 'utf8').length + 2; + const chunk1 = encoded.subarray(0, splitPoint); + const chunk2 = encoded.subarray(splitPoint); + + // Send the two chunks separately + await new Promise((resolve, reject) => { + client.write(chunk1, (error) => (error ? reject(error) : resolve())); + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + await new Promise((resolve, reject) => { + client.write(chunk2, (error) => (error ? reject(error) : resolve())); + }); + await new Promise((resolve) => setTimeout(resolve, 10)); + + expect(await channel.read()).toBe(`hello ${emoji} world`); + }); + + it('removes stale socket file on creation', async () => { + const socketPath = tempSocketPath(); + + // Create the first channel + const channel1 = await makeSocketIOChannel('test', socketPath); + await channel1.close(); + + // Recreate a stale file + await fs.writeFile(socketPath, ''); + + // Should succeed despite the stale file + const channel2 = await makeSocketIOChannel('test', socketPath); + channels.push(channel2); + + expect(await fileExists(socketPath)).toBe(true); + }); +}); diff --git a/packages/nodejs/src/io/socket-channel.ts b/packages/nodejs/src/io/socket-channel.ts new file mode 100644 index 000000000..c6cce477f --- /dev/null +++ b/packages/nodejs/src/io/socket-channel.ts @@ -0,0 +1,185 @@ +import type { IOChannel } from '@metamask/ocap-kernel'; +import fs from 'node:fs/promises'; +import * as net from 'node:net'; +import { StringDecoder } from 'node:string_decoder'; + +type PendingReader = { + resolve: (value: string | null) => void; +}; + +/** + * Create an IOChannel backed by a Unix domain socket. + * + * Creates a `net.Server` listening on the configured socket path. + * Accepts one connection at a time. Lines are `\n`-delimited. + * + * @param name - The channel name (for diagnostics). + * @param socketPath - The file path for the Unix domain socket. + * @returns A promise for the IOChannel, resolved once the server is listening. + */ +export async function makeSocketIOChannel( + name: string, + socketPath: string, +): Promise { + const lineQueue: string[] = []; + const readerQueue: PendingReader[] = []; + let currentSocket: net.Socket | null = null; + let decoder = new StringDecoder('utf8'); + let buffer = ''; + let closed = false; + + /** + * Deliver a line to a pending reader or enqueue it. + * + * @param line - The line to deliver. + */ + function deliverLine(line: string): void { + const reader = readerQueue.shift(); + if (reader) { + reader.resolve(line); + } else { + lineQueue.push(line); + } + } + + /** + * Handle the end of the input stream. + */ + function deliverEOF(): void { + while (readerQueue.length > 0) { + const reader = readerQueue.shift(); + reader?.resolve(null); + } + } + + /** + * Handle incoming data by splitting on newlines. + * + * @param data - The raw data buffer from the socket. + */ + function handleData(data: Buffer): void { + buffer += decoder.write(data); + let newlineIndex = buffer.indexOf('\n'); + while (newlineIndex !== -1) { + const line = buffer.slice(0, newlineIndex); + buffer = buffer.slice(newlineIndex + 1); + deliverLine(line); + newlineIndex = buffer.indexOf('\n'); + } + } + + /** + * Handle the channel disconnecting. + * + * @param socket - The socket that disconnected. + */ + function handleDisconnect(socket: net.Socket): void { + if (currentSocket !== socket) { + return; + } + // Flush any incomplete multi-byte sequence from the decoder + buffer += decoder.end(); + // Deliver any remaining buffered data as a final line + if (buffer.length > 0) { + deliverLine(buffer); + buffer = ''; + } + currentSocket = null; + deliverEOF(); + } + + const server = net.createServer((socket) => { + if (currentSocket) { + // Only one connection at a time + socket.destroy(); + return; + } + // Drain stale state from any previous connection + lineQueue.length = 0; + deliverEOF(); + + currentSocket = socket; + decoder = new StringDecoder('utf8'); + buffer = ''; + + socket.on('data', handleData); + socket.on('end', () => handleDisconnect(socket)); + socket.on('error', () => handleDisconnect(socket)); + socket.on('close', () => handleDisconnect(socket)); + }); + + // Remove stale socket file if it exists + try { + await fs.unlink(socketPath); + } catch { + // Ignore if it doesn't exist + } + + await new Promise((resolve, reject) => { + server.on('error', reject); + server.listen(socketPath, () => { + server.removeListener('error', reject); + resolve(); + }); + }); + + const channel: IOChannel = { + async read(): Promise { + if (closed) { + return null; + } + const queued = lineQueue.shift(); + if (queued !== undefined) { + return queued; + } + if (!currentSocket) { + return null; + } + return new Promise((resolve) => { + readerQueue.push({ resolve }); + }); + }, + + async write(data: string): Promise { + if (closed) { + throw new Error(`IO channel "${name}" is closed`); + } + if (!currentSocket) { + throw new Error(`IO channel "${name}" has no connected client`); + } + const socket = currentSocket; + return new Promise((resolve, reject) => { + socket.write(`${data}\n`, (error) => { + if (error) { + reject(error); + } else { + resolve(); + } + }); + }); + }, + + async close(): Promise { + if (closed) { + return; + } + closed = true; + deliverEOF(); + if (currentSocket) { + currentSocket.destroy(); + currentSocket = null; + } + await new Promise((resolve) => { + server.close(() => resolve()); + }); + // Clean up socket file + try { + await fs.unlink(socketPath); + } catch { + // Ignore + } + }, + }; + + return channel; +} diff --git a/packages/nodejs/src/kernel/make-kernel.ts b/packages/nodejs/src/kernel/make-kernel.ts index a359c35a9..00f6353b4 100644 --- a/packages/nodejs/src/kernel/make-kernel.ts +++ b/packages/nodejs/src/kernel/make-kernel.ts @@ -1,8 +1,10 @@ import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs'; import { Logger } from '@metamask/logger'; import { Kernel } from '@metamask/ocap-kernel'; +import type { IOChannelFactory } from '@metamask/ocap-kernel'; import { NodejsPlatformServices } from './PlatformServices.ts'; +import { makeIOChannelFactory } from '../io/index.ts'; /** * The main function for the kernel worker. @@ -13,6 +15,7 @@ import { NodejsPlatformServices } from './PlatformServices.ts'; * @param options.dbFilename - The filename of the SQLite database file. * @param options.logger - The logger to use for the kernel. * @param options.keySeed - Optional seed for libp2p key generation. + * @param options.ioChannelFactory - Optional factory for creating IO channels. * @returns The kernel, initialized. */ export async function makeKernel({ @@ -21,12 +24,14 @@ export async function makeKernel({ dbFilename, logger, keySeed, + ioChannelFactory, }: { workerFilePath?: string; resetStorage?: boolean; dbFilename?: string; logger?: Logger; keySeed?: string | undefined; + ioChannelFactory?: IOChannelFactory; }): Promise { const rootLogger = logger ?? new Logger('kernel-worker'); const platformServicesClient = new NodejsPlatformServices({ @@ -42,6 +47,7 @@ export async function makeKernel({ resetStorage, logger: rootLogger.subLogger({ tags: ['kernel'] }), keySeed, + ioChannelFactory: ioChannelFactory ?? makeIOChannelFactory(), }); return kernel; diff --git a/packages/nodejs/src/vat/fetch-blob.test.ts b/packages/nodejs/src/vat/fetch-blob.test.ts index 5215d3536..170df962d 100644 --- a/packages/nodejs/src/vat/fetch-blob.test.ts +++ b/packages/nodejs/src/vat/fetch-blob.test.ts @@ -2,8 +2,6 @@ import { fetchMock } from '@ocap/repo-tools/test-utils/fetch-mock'; import '@ocap/repo-tools/test-utils/mock-endoify'; import { describe, expect, it, vi } from 'vitest'; -/* eslint-disable n/no-unsupported-features/node-builtins */ - const mocks = vi.hoisted(() => ({ readFile: vi.fn(), fileURLToPath: vi.fn(), diff --git a/packages/nodejs/src/vat/fetch-blob.ts b/packages/nodejs/src/vat/fetch-blob.ts index 706fe6964..777667218 100644 --- a/packages/nodejs/src/vat/fetch-blob.ts +++ b/packages/nodejs/src/vat/fetch-blob.ts @@ -2,20 +2,17 @@ import fs from 'node:fs/promises'; import url from 'node:url'; /** - * Fetch a blob of bytes from a URL + * Fetch a blob of bytes from a URL. * * This works like `fetch`, but handles `file:` URLs directly, since Node's * `fetch` implementation chokes on those. * * @param blobURL - The URL of the blob to fetch. - * - * @returns a Response containing the requested blob. + * @returns A Response containing the requested blob. */ -// eslint-disable-next-line n/no-unsupported-features/node-builtins export async function fetchBlob(blobURL: string): Promise { const parsedURL = new URL(blobURL); if (parsedURL.protocol === 'file:') { - // eslint-disable-next-line n/no-unsupported-features/node-builtins return new Response(await fs.readFile(url.fileURLToPath(parsedURL))); } return fetch(blobURL); diff --git a/packages/ocap-kernel/src/Kernel.ts b/packages/ocap-kernel/src/Kernel.ts index e66e535a2..4a5296aa4 100644 --- a/packages/ocap-kernel/src/Kernel.ts +++ b/packages/ocap-kernel/src/Kernel.ts @@ -2,6 +2,8 @@ import type { CapData } from '@endo/marshal'; import type { KernelDatabase } from '@metamask/kernel-store'; import { Logger } from '@metamask/logger'; +import { IOManager } from './io/IOManager.ts'; +import type { IOChannelFactory } from './io/types.ts'; import { makeKernelFacet } from './kernel-facet.ts'; import type { KernelFacet } from './kernel-facet.ts'; import { KernelQueue } from './KernelQueue.ts'; @@ -82,6 +84,9 @@ export class Kernel { /** The kernel's router */ readonly #kernelRouter: KernelRouter; + /** Manages IO channel lifecycle (optional, requires factory injection) */ + readonly #ioManager: IOManager | undefined; + /** * Construct a new kernel instance. * @@ -92,6 +97,7 @@ export class Kernel { * @param options.logger - Optional logger for error and diagnostic output. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.mnemonic - Optional BIP39 mnemonic for deriving the kernel identity. + * @param options.ioChannelFactory - Optional factory for creating IO channels. */ // eslint-disable-next-line no-restricted-syntax private constructor( @@ -102,6 +108,7 @@ export class Kernel { logger?: Logger; keySeed?: string | undefined; mnemonic?: string | undefined; + ioChannelFactory?: IOChannelFactory; } = {}, ) { this.#platformServices = platformServices; @@ -148,6 +155,21 @@ export class Kernel { logger: this.#logger.subLogger({ tags: ['KernelServiceManager'] }), }); + if (options.ioChannelFactory) { + this.#ioManager = new IOManager({ + factory: options.ioChannelFactory, + registerService: + this.#kernelServiceManager.registerKernelServiceObject.bind( + this.#kernelServiceManager, + ), + unregisterService: + this.#kernelServiceManager.unregisterKernelServiceObject.bind( + this.#kernelServiceManager, + ), + logger: this.#logger.subLogger({ tags: ['IOManager'] }), + }); + } + this.#subclusterManager = new SubclusterManager({ kernelStore: this.#kernelStore, kernelQueue: this.#kernelQueue, @@ -155,6 +177,7 @@ export class Kernel { getKernelService: (name) => this.#kernelServiceManager.getKernelService(name), queueMessage: this.queueMessage.bind(this), + ...(this.#ioManager ? { ioManager: this.#ioManager } : {}), logger: this.#logger.subLogger({ tags: ['SubclusterManager'] }), }); @@ -193,6 +216,7 @@ export class Kernel { * @param options.logger - Optional logger for error and diagnostic output. * @param options.keySeed - Optional seed for libp2p key generation. * @param options.mnemonic - Optional BIP39 mnemonic for deriving the kernel identity. + * @param options.ioChannelFactory - Optional factory for creating IO channels. * @param options.systemSubclusters - Optional array of system subcluster configurations. * @returns A promise for the new kernel instance. */ @@ -204,6 +228,7 @@ export class Kernel { logger?: Logger; keySeed?: string | undefined; mnemonic?: string | undefined; + ioChannelFactory?: IOChannelFactory; systemSubclusters?: SystemSubclusterConfig[]; } = {}, ): Promise { @@ -683,6 +708,9 @@ export class Kernel { async reset(): Promise { await this.#kernelQueue.waitForCrank(); try { + if (this.#ioManager) { + await this.#ioManager.destroyAllChannels(); + } await this.terminateAllVats(); this.#subclusterManager.clearSystemSubclusters(); this.#resetKernelState(); diff --git a/packages/ocap-kernel/src/KernelServiceManager.test.ts b/packages/ocap-kernel/src/KernelServiceManager.test.ts index cec435293..af375ba76 100644 --- a/packages/ocap-kernel/src/KernelServiceManager.test.ts +++ b/packages/ocap-kernel/src/KernelServiceManager.test.ts @@ -132,6 +132,71 @@ describe('KernelServiceManager', () => { }); }); + describe('unregisterKernelServiceObject', () => { + it('removes a registered service', () => { + const testService = { testMethod: () => 'test' }; + serviceManager.registerKernelServiceObject('myService', testService); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect(serviceManager.getKernelService('myService')).toBeUndefined(); + }); + + it('removes from kref lookup', () => { + const testService = { testMethod: () => 'test' }; + const registered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect( + serviceManager.getKernelServiceByKref(registered.kref), + ).toBeUndefined(); + expect(serviceManager.isKernelService(registered.kref)).toBe(false); + }); + + it('unpins the object and deletes the KV key', () => { + const testService = { testMethod: () => 'test' }; + const registered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + + serviceManager.unregisterKernelServiceObject('myService'); + + expect(kernelStore.kv.get('kernelService.myService')).toBeUndefined(); + // Verify it was unpinned by trying to re-register with the same name + const reregistered = serviceManager.registerKernelServiceObject( + 'myService', + testService, + ); + expect(reregistered.kref).not.toBe(registered.kref); + }); + + it('is a no-op for non-existent service', () => { + expect(() => + serviceManager.unregisterKernelServiceObject('nonexistent'), + ).not.toThrow(); + }); + + it('allows re-registration after unregister', () => { + const service1 = { method: () => 'v1' }; + const service2 = { method: () => 'v2' }; + + serviceManager.registerKernelServiceObject('svc', service1); + serviceManager.unregisterKernelServiceObject('svc'); + const registered = serviceManager.registerKernelServiceObject( + 'svc', + service2, + ); + + expect(registered.service).toBe(service2); + expect(serviceManager.getKernelService('svc')?.service).toBe(service2); + }); + }); + describe('getKernelService', () => { it('retrieves registered service by name', () => { const testService = { diff --git a/packages/ocap-kernel/src/KernelServiceManager.ts b/packages/ocap-kernel/src/KernelServiceManager.ts index 3208984ab..f3c1dc001 100644 --- a/packages/ocap-kernel/src/KernelServiceManager.ts +++ b/packages/ocap-kernel/src/KernelServiceManager.ts @@ -87,6 +87,22 @@ export class KernelServiceManager { return kernelService; } + /** + * Unregister a kernel service object by name. + * + * @param name - The name of the service to unregister. + */ + unregisterKernelServiceObject(name: string): void { + const service = this.#kernelServicesByName.get(name); + if (!service) { + return; + } + this.#kernelServicesByName.delete(name); + this.#kernelServicesByObject.delete(service.kref); + this.#kernelStore.unpinObject(service.kref); + this.#kernelStore.kv.delete(`kernelService.${name}`); + } + /** * Get a kernel service by name. * diff --git a/packages/ocap-kernel/src/index.ts b/packages/ocap-kernel/src/index.ts index c9e1b804b..938019f5a 100644 --- a/packages/ocap-kernel/src/index.ts +++ b/packages/ocap-kernel/src/index.ts @@ -2,9 +2,12 @@ export { Kernel } from './Kernel.ts'; export { VatHandle } from './vats/VatHandle.ts'; export { VatSupervisor } from './vats/VatSupervisor.ts'; export { initTransport } from './remotes/platform/transport.ts'; +export type { IOChannel, IOChannelFactory } from './io/types.ts'; export type { Baggage, ClusterConfig, + IOConfig, + IOSpec, KRef, Message, VatId, diff --git a/packages/ocap-kernel/src/io/IOManager.test.ts b/packages/ocap-kernel/src/io/IOManager.test.ts new file mode 100644 index 000000000..31fe62712 --- /dev/null +++ b/packages/ocap-kernel/src/io/IOManager.test.ts @@ -0,0 +1,243 @@ +import { Logger } from '@metamask/logger'; +import { describe, it, expect, vi, beforeEach } from 'vitest'; + +import { IOManager } from './IOManager.ts'; +import type { IOChannel, IOChannelFactory } from './types.ts'; +import type { KernelService } from '../KernelServiceManager.ts'; +import type { IOConfig } from '../types.ts'; + +const makeChannel = (): IOChannel => ({ + read: vi.fn().mockResolvedValue('data'), + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), +}); + +describe('IOManager', () => { + let factory: IOChannelFactory; + let registerService: ReturnType; + let unregisterService: ReturnType; + let logger: Logger; + let manager: IOManager; + let channels: IOChannel[]; + + beforeEach(() => { + channels = []; + factory = vi.fn(async () => { + const ch = makeChannel(); + channels.push(ch); + return ch; + }) as unknown as IOChannelFactory; + + registerService = vi.fn( + (name: string): KernelService => ({ + name, + kref: `ko${name}`, + service: {}, + systemOnly: false, + }), + ); + unregisterService = vi.fn(); + logger = new Logger('test'); + + manager = new IOManager({ + factory, + registerService, + unregisterService, + logger, + }); + }); + + describe('createChannels', () => { + it('creates channels and registers services', async () => { + const ioConfig: Record = { + repl: { type: 'socket', path: '/tmp/repl.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + + expect(factory).toHaveBeenCalledWith('repl', ioConfig.repl); + expect(registerService).toHaveBeenCalledWith( + 'io:s1:repl', + expect.any(Object), + ); + }); + + it('creates multiple channels', async () => { + const ioConfig: Record = { + input: { type: 'socket', path: '/tmp/in.sock' } as IOConfig, + output: { type: 'socket', path: '/tmp/out.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + + expect(factory).toHaveBeenCalledTimes(2); + expect(registerService).toHaveBeenCalledTimes(2); + }); + + it('cleans up on factory failure', async () => { + const successChannel = makeChannel(); + let callCount = 0; + const failingFactory = vi.fn(async () => { + callCount += 1; + if (callCount === 2) { + throw new Error('factory error'); + } + return successChannel; + }) as unknown as IOChannelFactory; + + const mgr = new IOManager({ + factory: failingFactory, + registerService, + unregisterService, + logger, + }); + + const ioConfig: Record = { + first: { type: 'socket', path: '/tmp/a.sock' } as IOConfig, + second: { type: 'socket', path: '/tmp/b.sock' } as IOConfig, + }; + + await expect(mgr.createChannels('s1', ioConfig)).rejects.toThrow( + 'factory error', + ); + + expect(successChannel.close).toHaveBeenCalledOnce(); + expect(unregisterService).toHaveBeenCalledWith('io:s1:first'); + }); + + it('does not mask factory error when unregister fails during rollback', async () => { + const successChannel = makeChannel(); + let callCount = 0; + const failingFactory = vi.fn(async () => { + callCount += 1; + if (callCount === 2) { + throw new Error('factory error'); + } + return successChannel; + }) as unknown as IOChannelFactory; + + const failingUnregister = vi.fn(() => { + throw new Error('unregister boom'); + }); + const errorSpy = vi.spyOn(logger, 'error'); + + const mgr = new IOManager({ + factory: failingFactory, + registerService, + unregisterService: failingUnregister, + logger, + }); + + const ioConfig: Record = { + first: { type: 'socket', path: '/tmp/a.sock' } as IOConfig, + second: { type: 'socket', path: '/tmp/b.sock' } as IOConfig, + }; + + // Original factory error propagates, not the unregister error + await expect(mgr.createChannels('s1', ioConfig)).rejects.toThrow( + 'factory error', + ); + + expect(errorSpy).toHaveBeenCalledWith( + 'Error unregistering IO service "io:s1:first":', + expect.any(Error), + ); + expect(successChannel.close).toHaveBeenCalledOnce(); + }); + }); + + describe('destroyChannels', () => { + it('closes channels and unregisters services', async () => { + const ioConfig: Record = { + repl: { type: 'socket', path: '/tmp/repl.sock' } as IOConfig, + }; + + await manager.createChannels('s1', ioConfig); + await manager.destroyChannels('s1'); + + expect(channels[0]?.close).toHaveBeenCalledOnce(); + expect(unregisterService).toHaveBeenCalledWith('io:s1:repl'); + }); + + it('is idempotent for unknown subcluster', async () => { + expect(await manager.destroyChannels('nonexistent')).toBeUndefined(); + }); + + it('handles unregister errors gracefully', async () => { + const failingUnregister = vi.fn(() => { + throw new Error('unregister failed'); + }); + const errorSpy = vi.spyOn(logger, 'error'); + + const mgr = new IOManager({ + factory, + registerService, + unregisterService: failingUnregister, + logger, + }); + + await mgr.createChannels('s1', { + ch: { type: 'socket', path: '/tmp/ch.sock' } as IOConfig, + }); + await mgr.destroyChannels('s1'); + + expect(errorSpy).toHaveBeenCalledWith( + 'Error unregistering IO service "io:s1:ch":', + expect.any(Error), + ); + // Channel should still be closed despite unregister failure + expect(channels[0]?.close).toHaveBeenCalledOnce(); + }); + + it('handles close errors gracefully', async () => { + const errorChannel = makeChannel(); + (errorChannel.close as ReturnType).mockRejectedValue( + new Error('close failed'), + ); + + const errorFactory = vi.fn( + async () => errorChannel, + ) as unknown as IOChannelFactory; + const errorSpy = vi.spyOn(logger, 'error'); + + const mgr = new IOManager({ + factory: errorFactory, + registerService, + unregisterService, + logger, + }); + + await mgr.createChannels('s1', { + ch: { type: 'socket', path: '/tmp/ch.sock' } as IOConfig, + }); + await mgr.destroyChannels('s1'); + + expect(errorSpy).toHaveBeenCalledWith( + 'Error closing IO channel "ch":', + expect.any(Error), + ); + }); + }); + + describe('destroyAllChannels', () => { + it('destroys channels for all subclusters', async () => { + await manager.createChannels('s1', { + a: { type: 'socket', path: '/tmp/a.sock' } as IOConfig, + }); + await manager.createChannels('s2', { + b: { type: 'socket', path: '/tmp/b.sock' } as IOConfig, + }); + + await manager.destroyAllChannels(); + + expect(channels[0]?.close).toHaveBeenCalledOnce(); + expect(channels[1]?.close).toHaveBeenCalledOnce(); + expect(unregisterService).toHaveBeenCalledWith('io:s1:a'); + expect(unregisterService).toHaveBeenCalledWith('io:s2:b'); + }); + + it('is safe to call when no channels exist', async () => { + expect(await manager.destroyAllChannels()).toBeUndefined(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/io/IOManager.ts b/packages/ocap-kernel/src/io/IOManager.ts new file mode 100644 index 000000000..cd3cdfdc6 --- /dev/null +++ b/packages/ocap-kernel/src/io/IOManager.ts @@ -0,0 +1,161 @@ +import type { Logger } from '@metamask/logger'; + +import { makeIOService } from './io-service.ts'; +import type { IOChannel, IOChannelFactory } from './types.ts'; +import type { KernelService } from '../KernelServiceManager.ts'; +import type { IOConfig } from '../types.ts'; + +type RegisterService = ( + name: string, + service: object, + options?: { systemOnly?: boolean }, +) => KernelService; +type UnregisterService = (name: string) => void; + +type IOManagerOptions = { + factory: IOChannelFactory; + registerService: RegisterService; + unregisterService: UnregisterService; + logger?: Logger; +}; + +type SubclusterIOState = { + channels: Map; + serviceNames: string[]; +}; + +/** + * Manages IO channel lifecycle, creating channels at subcluster launch + * and destroying them at termination. + */ +export class IOManager { + readonly #factory: IOChannelFactory; + + readonly #registerService: RegisterService; + + readonly #unregisterService: UnregisterService; + + readonly #logger: Logger | undefined; + + /** IO state indexed by subcluster ID */ + readonly #subclusters: Map = new Map(); + + /** + * Creates a new IOManager instance. + * + * @param options - Constructor options. + * @param options.factory - Factory for creating IO channels. + * @param options.registerService - Function to register a kernel service. + * @param options.unregisterService - Function to unregister a kernel service. + * @param options.logger - Optional logger for diagnostics. + */ + constructor({ + factory, + registerService, + unregisterService, + logger, + }: IOManagerOptions) { + this.#factory = factory; + this.#registerService = registerService; + this.#unregisterService = unregisterService; + this.#logger = logger; + harden(this); + } + + /** + * Create IO channels for a subcluster and register them as kernel services. + * + * @param subclusterId - The ID of the subcluster. + * @param ioConfig - The IO configuration map from channel names to configs. + */ + async createChannels( + subclusterId: string, + ioConfig: Record, + ): Promise { + const channels = new Map(); + const serviceNames: string[] = []; + + for (const [name, config] of Object.entries(ioConfig)) { + const serviceName = `io:${subclusterId}:${name}`; + try { + const channel = await this.#factory(name, config); + channels.set(name, channel); + + const service = makeIOService(serviceName, channel, config); + this.#registerService(serviceName, service); + serviceNames.push(serviceName); + + this.#logger?.info( + `Created IO channel "${name}" for subcluster ${subclusterId}`, + ); + } catch (error) { + // Clean up any channels we already created before re-throwing + await this.#closeChannels(channels); + for (const registeredName of serviceNames) { + try { + this.#unregisterService(registeredName); + } catch (unregisterError) { + this.#logger?.error( + `Error unregistering IO service "${registeredName}":`, + unregisterError, + ); + } + } + throw error; + } + } + + this.#subclusters.set(subclusterId, { channels, serviceNames }); + } + + /** + * Destroy IO channels for a subcluster and unregister their services. + * + * @param subclusterId - The ID of the subcluster. + */ + async destroyChannels(subclusterId: string): Promise { + const state = this.#subclusters.get(subclusterId); + if (!state) { + return; + } + + for (const name of state.serviceNames) { + try { + this.#unregisterService(name); + } catch (error) { + this.#logger?.error(`Error unregistering IO service "${name}":`, error); + } + } + + await this.#closeChannels(state.channels); + this.#subclusters.delete(subclusterId); + + this.#logger?.info(`Destroyed IO channels for subcluster ${subclusterId}`); + } + + /** + * Destroy all IO channels across all subclusters. + * Used during kernel reset to ensure no channels are leaked. + */ + async destroyAllChannels(): Promise { + for (const subclusterId of [...this.#subclusters.keys()]) { + await this.destroyChannels(subclusterId); + } + } + + /** + * Close all channels in a map, logging errors. + * + * @param channels - The channels to close. + */ + async #closeChannels(channels: Map): Promise { + for (const [name, channel] of channels) { + try { + await channel.close(); + } catch (error) { + this.#logger?.error(`Error closing IO channel "${name}":`, error); + } + } + } +} +harden(IOManager); diff --git a/packages/ocap-kernel/src/io/index.ts b/packages/ocap-kernel/src/io/index.ts new file mode 100644 index 000000000..a132c8a31 --- /dev/null +++ b/packages/ocap-kernel/src/io/index.ts @@ -0,0 +1,2 @@ +export { IOManager } from './IOManager.ts'; +export type { IOChannel, IOChannelFactory } from './types.ts'; diff --git a/packages/ocap-kernel/src/io/io-service.test.ts b/packages/ocap-kernel/src/io/io-service.test.ts new file mode 100644 index 000000000..14c8c0a57 --- /dev/null +++ b/packages/ocap-kernel/src/io/io-service.test.ts @@ -0,0 +1,128 @@ +import { describe, it, expect, vi } from 'vitest'; + +import { makeIOService } from './io-service.ts'; +import type { IOChannel } from './types.ts'; +import type { IOConfig } from '../types.ts'; + +const makeChannel = (): IOChannel => ({ + read: vi.fn().mockResolvedValue('hello'), + write: vi.fn().mockResolvedValue(undefined), + close: vi.fn().mockResolvedValue(undefined), +}); + +const makeConfig = (overrides: Partial = {}): IOConfig => + ({ + type: 'socket', + path: '/tmp/test.sock', + ...overrides, + }) as IOConfig; + +describe('makeIOService', () => { + describe('read()', () => { + it('delegates to the channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig(), + ) as { + read: () => Promise; + }; + + const result = await service.read(); + + expect(result).toBe('hello'); + expect(channel.read).toHaveBeenCalledOnce(); + }); + + it('throws on write-only channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig({ direction: 'out' }), + ) as { read: () => Promise }; + + await expect(service.read()).rejects.toThrow( + 'IO channel "io:subclusterFoo:test" is write-only', + ); + expect(channel.read).not.toHaveBeenCalled(); + }); + + it.each(['in', 'inout'] as const)( + 'allows read on direction=%s', + async (direction) => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig({ direction }), + ) as { read: () => Promise }; + + expect(await service.read()).toBe('hello'); + }, + ); + }); + + describe('write()', () => { + it('delegates to the channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig(), + ) as { + write: (data: string) => Promise; + }; + + await service.write('world'); + + expect(channel.write).toHaveBeenCalledWith('world'); + }); + + it('throws on read-only channel', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig({ direction: 'in' }), + ) as { write: (data: string) => Promise }; + + await expect(service.write('data')).rejects.toThrow( + 'IO channel "io:subclusterFoo:test" is read-only', + ); + expect(channel.write).not.toHaveBeenCalled(); + }); + + it.each(['out', 'inout'] as const)( + 'allows write on direction=%s', + async (direction) => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig({ direction }), + ) as { write: (data: string) => Promise }; + + expect(await service.write('data')).toBeUndefined(); + }, + ); + }); + + describe('direction defaults', () => { + it('defaults to inout when direction is not specified', async () => { + const channel = makeChannel(); + const service = makeIOService( + 'io:subclusterFoo:test', + channel, + makeConfig(), + ) as { + read: () => Promise; + write: (data: string) => Promise; + }; + + expect(await service.read()).toBe('hello'); + expect(await service.write('data')).toBeUndefined(); + }); + }); +}); diff --git a/packages/ocap-kernel/src/io/io-service.ts b/packages/ocap-kernel/src/io/io-service.ts new file mode 100644 index 000000000..0c7d86468 --- /dev/null +++ b/packages/ocap-kernel/src/io/io-service.ts @@ -0,0 +1,37 @@ +import { makeDefaultExo } from '@metamask/kernel-utils/exo'; + +import type { IOChannel } from './types.ts'; +import type { IOConfig } from '../types.ts'; + +/** + * Create a kernel service exo that wraps an IOChannel. + * + * @param name - The scoped service name (e.g. `io:s1:repl`). + * @param channel - The underlying IOChannel to delegate to. + * @param config - The IO configuration for this channel. + * @returns A remotable service object with `read()` and `write()` methods. + */ +export function makeIOService( + name: string, + channel: IOChannel, + config: IOConfig, +): object { + const direction = config.direction ?? 'inout'; + + return makeDefaultExo(name, { + async read(): Promise { + if (direction === 'out') { + throw new Error(`IO channel "${name}" is write-only`); + } + return channel.read(); + }, + + async write(data: string): Promise { + if (direction === 'in') { + throw new Error(`IO channel "${name}" is read-only`); + } + return channel.write(data); + }, + }); +} +harden(makeIOService); diff --git a/packages/ocap-kernel/src/io/types.ts b/packages/ocap-kernel/src/io/types.ts new file mode 100644 index 000000000..f08f00dbd --- /dev/null +++ b/packages/ocap-kernel/src/io/types.ts @@ -0,0 +1,27 @@ +import type { IOConfig } from '../types.ts'; + +/** + * A platform-agnostic IO channel that supports reading and writing data. + * Implementations are platform-specific (e.g., Unix domain sockets in Node.js). + */ +export type IOChannel = { + /** Read the next unit of data, or `null` on EOF/disconnect. */ + read(): Promise; + /** Write a unit of data to the channel. */ + write(data: string): Promise; + /** Close the channel and release resources. */ + close(): Promise; +}; + +/** + * Factory function that creates an IOChannel for a given configuration. + * Injected from the host environment (e.g., Node.js) into the kernel. + * + * @param name - The name of the IO channel (from the cluster config key). + * @param config - The IO configuration describing the channel type and options. + * @returns A promise for the created IOChannel. + */ +export type IOChannelFactory = ( + name: string, + config: IOConfig, +) => Promise; diff --git a/packages/ocap-kernel/src/types.ts b/packages/ocap-kernel/src/types.ts index c77d6ff89..3fb68f0b6 100644 --- a/packages/ocap-kernel/src/types.ts +++ b/packages/ocap-kernel/src/types.ts @@ -11,6 +11,7 @@ import type { VatCheckpoint } from '@metamask/kernel-store'; import type { JsonRpcMessage } from '@metamask/kernel-utils'; import type { DuplexStream } from '@metamask/streams'; import { + assign, define, is, object, @@ -432,10 +433,59 @@ export const isVatConfig = (value: unknown): value is VatConfig => export type VatConfigTable = Record; +// IO configuration types + +const ConsoleIOSpecStruct = object({ type: literal('console') }); +const ListenIOSpecStruct = object({ + type: literal('listen'), + hostport: string(), +}); +const NetworkIOSpecStruct = object({ + type: literal('network'), + hostport: string(), +}); +const FileIOSpecStruct = object({ type: literal('file'), path: string() }); +const SocketIOSpecStruct = object({ type: literal('socket'), path: string() }); + +export type IOSpec = + | Infer + | Infer + | Infer + | Infer + | Infer; + +const IODirectionStruct = union([ + literal('in'), + literal('out'), + literal('inout'), +]); +const IOUnitStruct = union([ + literal('line'), + literal('string'), + literal('chars'), + literal('bytes'), +]); + +const IOExtraStruct = object({ + direction: exactOptional(IODirectionStruct), + unit: exactOptional(IOUnitStruct), +}); + +const IOConfigStruct = union([ + assign(ConsoleIOSpecStruct, IOExtraStruct), + assign(ListenIOSpecStruct, IOExtraStruct), + assign(NetworkIOSpecStruct, IOExtraStruct), + assign(FileIOSpecStruct, IOExtraStruct), + assign(SocketIOSpecStruct, IOExtraStruct), +]); + +export type IOConfig = Infer; + export const ClusterConfigStruct = object({ bootstrap: string(), forceReset: exactOptional(boolean()), services: exactOptional(array(string())), + io: exactOptional(record(string(), IOConfigStruct)), vats: record(string(), VatConfigStruct), bundles: exactOptional(record(string(), VatConfigStruct)), }); diff --git a/packages/ocap-kernel/src/vats/SubclusterManager.test.ts b/packages/ocap-kernel/src/vats/SubclusterManager.test.ts index 19e4729ac..06404ca12 100644 --- a/packages/ocap-kernel/src/vats/SubclusterManager.test.ts +++ b/packages/ocap-kernel/src/vats/SubclusterManager.test.ts @@ -253,6 +253,149 @@ describe('SubclusterManager', () => { ); }); + it('cleans up IO channels and subcluster on validation failure', async () => { + const mockIOManager = { + createChannels: vi.fn().mockResolvedValue(undefined), + destroyChannels: vi.fn().mockResolvedValue(undefined), + }; + const mgr = new SubclusterManager({ + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + vatManager: mockVatManager, + getKernelService: mockGetKernelService, + queueMessage: mockQueueMessage, + ioManager: mockIOManager as never, + }); + + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { testVat: { sourceSpec: 'test.js' } }, + services: ['unknownService'], + io: { + repl: { type: 'socket', path: '/tmp/repl.sock' }, + }, + }; + (mockGetKernelService as ReturnType).mockReturnValue( + undefined, + ); + + await expect(mgr.launchSubcluster(config)).rejects.toThrow( + "no registered kernel service 'unknownService'", + ); + + expect(mockIOManager.destroyChannels).toHaveBeenCalledWith('s1'); + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('cleans up IO channels and subcluster on vat launch failure', async () => { + const mockIOManager = { + createChannels: vi.fn().mockResolvedValue(undefined), + destroyChannels: vi.fn().mockResolvedValue(undefined), + }; + const mgr = new SubclusterManager({ + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + vatManager: mockVatManager, + getKernelService: mockGetKernelService, + queueMessage: mockQueueMessage, + ioManager: mockIOManager as never, + }); + + mockVatManager.launchVat.mockRejectedValue(new Error('vat boom')); + + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { testVat: { sourceSpec: 'test.js' } }, + io: { + repl: { type: 'socket', path: '/tmp/repl.sock' }, + }, + }; + + await expect(mgr.launchSubcluster(config)).rejects.toThrow('vat boom'); + + expect(mockIOManager.destroyChannels).toHaveBeenCalledWith('s1'); + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('cleans up subcluster when createChannels fails', async () => { + const mockIOManager = { + createChannels: vi + .fn() + .mockRejectedValue(new Error('channel creation failed')), + destroyChannels: vi.fn().mockResolvedValue(undefined), + }; + const mgr = new SubclusterManager({ + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + vatManager: mockVatManager, + getKernelService: mockGetKernelService, + queueMessage: mockQueueMessage, + ioManager: mockIOManager as never, + }); + + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { testVat: { sourceSpec: 'test.js' } }, + io: { + repl: { type: 'socket', path: '/tmp/repl.sock' }, + }, + }; + + await expect(mgr.launchSubcluster(config)).rejects.toThrow( + 'channel creation failed', + ); + + expect(mockIOManager.destroyChannels).toHaveBeenCalledWith('s1'); + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('does not mask original error when destroyChannels fails during rollback', async () => { + const mockIOManager = { + createChannels: vi.fn().mockResolvedValue(undefined), + destroyChannels: vi.fn().mockRejectedValue(new Error('cleanup boom')), + }; + const mgr = new SubclusterManager({ + kernelStore: mockKernelStore, + kernelQueue: mockKernelQueue, + vatManager: mockVatManager, + getKernelService: mockGetKernelService, + queueMessage: mockQueueMessage, + ioManager: mockIOManager as never, + }); + + mockVatManager.launchVat.mockRejectedValue(new Error('launch boom')); + + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { testVat: { sourceSpec: 'test.js' } }, + io: { + repl: { type: 'socket', path: '/tmp/repl.sock' }, + }, + }; + + // Original error propagates, not the cleanup error + await expect(mgr.launchSubcluster(config)).rejects.toThrow('launch boom'); + + // deleteSubcluster still called despite destroyChannels failure + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + + it('throws when config declares IO but no IO manager is provided', async () => { + const config: ClusterConfig = { + bootstrap: 'testVat', + vats: { testVat: { sourceSpec: 'test.js' } }, + io: { + repl: { type: 'socket', path: '/tmp/repl.sock' }, + }, + }; + + await expect(subclusterManager.launchSubcluster(config)).rejects.toThrow( + 'no IO channel factory was provided', + ); + + expect(mockKernelStore.deleteSubcluster).toHaveBeenCalledWith('s1'); + }); + it('throws when launchVat returns undefined', async () => { const config: ClusterConfig = { bootstrap: 'testVat', diff --git a/packages/ocap-kernel/src/vats/SubclusterManager.ts b/packages/ocap-kernel/src/vats/SubclusterManager.ts index 8d00ba3b7..d56ffe34e 100644 --- a/packages/ocap-kernel/src/vats/SubclusterManager.ts +++ b/packages/ocap-kernel/src/vats/SubclusterManager.ts @@ -2,6 +2,7 @@ import type { CapData } from '@endo/marshal'; import { SubclusterNotFoundError } from '@metamask/kernel-errors'; import { Logger } from '@metamask/logger'; +import type { IOManager } from '../io/IOManager.ts'; import type { KernelQueue } from '../KernelQueue.ts'; import type { VatManager } from './VatManager.ts'; import { kslot, kunser } from '../liveslots/kernel-marshal.ts'; @@ -30,6 +31,7 @@ type SubclusterManagerOptions = { method: string, args: unknown[], ) => Promise>; + ioManager?: IOManager; logger?: Logger; }; @@ -61,6 +63,9 @@ export class SubclusterManager { /** Logger for diagnostic output */ readonly #logger: Logger; + /** Optional IO manager for creating/destroying IO channels */ + readonly #ioManager: IOManager | undefined; + /** Stores bootstrap root krefs of launched system subclusters */ readonly #systemSubclusterRoots: Map = new Map(); @@ -73,6 +78,7 @@ export class SubclusterManager { * @param options.vatManager - Manager for creating and managing vat instances. * @param options.getKernelService - Function to retrieve a kernel service by its kref. * @param options.queueMessage - Function to queue messages for delivery to targets. + * @param options.ioManager - Optional IO manager for IO channel lifecycle. * @param options.logger - Optional logger for diagnostic output. */ constructor({ @@ -81,6 +87,7 @@ export class SubclusterManager { vatManager, getKernelService, queueMessage, + ioManager, logger, }: SubclusterManagerOptions) { this.#kernelStore = kernelStore; @@ -88,6 +95,7 @@ export class SubclusterManager { this.#vatManager = vatManager; this.#getKernelService = getKernelService; this.#queueMessage = queueMessage; + this.#ioManager = ioManager; this.#logger = logger ?? new Logger('SubclusterManager'); harden(this); } @@ -111,13 +119,42 @@ export class SubclusterManager { if (!config.vats[config.bootstrap]) { Fail`invalid bootstrap vat name ${config.bootstrap}`; } - this.#validateServices(config, isSystem); const subclusterId = this.#kernelStore.addSubcluster(config); - const { rootKref, bootstrapResult } = await this.#launchVatsForSubcluster( - subclusterId, - config, - ); - return { subclusterId, rootKref, bootstrapResult }; + + try { + // Create IO channels before validating services so that IO service + // names are registered and discoverable by #validateServices. + if (config.io) { + if (!this.#ioManager) { + throw new Error( + 'Cluster config declares IO channels but no IO channel factory was provided to the kernel', + ); + } + await this.#ioManager.createChannels(subclusterId, config.io); + } + + this.#validateServices(config, isSystem); + const { rootKref, bootstrapResult } = await this.#launchVatsForSubcluster( + subclusterId, + config, + ); + return { subclusterId, rootKref, bootstrapResult }; + } catch (error) { + // Roll back IO channels and persisted subcluster on failure. + // Cleanup is best-effort — errors must not mask the original failure. + try { + if (this.#ioManager) { + await this.#ioManager.destroyChannels(subclusterId); + } + } catch (cleanupError) { + this.#logger.error( + 'Error during IO cleanup on failed launch:', + cleanupError, + ); + } + this.#kernelStore.deleteSubcluster(subclusterId); + throw error; + } } /** @@ -148,6 +185,16 @@ export class SubclusterManager { await this.#vatManager.terminateVat(vatId); this.#vatManager.collectGarbage(); } + + // Destroy IO channels after terminating vats so that any queued + // messages targeting IO service krefs are drained first. + try { + if (this.#ioManager) { + await this.#ioManager.destroyChannels(subclusterId); + } + } catch (error) { + this.#logger.error('Error during IO cleanup on termination:', error); + } this.#kernelStore.deleteSubcluster(subclusterId); } @@ -225,7 +272,14 @@ export class SubclusterManager { if (!config.services) { return; } + const ioNames = config.io + ? new Set(Object.keys(config.io)) + : new Set(); for (const name of config.services) { + // IO services are registered by IOManager with scoped names + if (ioNames.has(name)) { + continue; + } const service = this.#getKernelService(name); if (!service || (service.systemOnly && !isSystem)) { throw Error(`no registered kernel service '${name}'`); @@ -259,15 +313,24 @@ export class SubclusterManager { roots[vatName] = kslot(rootRef, 'vatRoot'); } const services: Record = {}; - if (config.services) { - for (const name of config.services) { - const possibleService = this.#getKernelService(name); - if (possibleService) { - const { kref } = possibleService; - services[name] = kslot(kref); - } else { - throw Error(`no registered kernel service '${name}'`); - } + const ioNames = config.io + ? new Set(Object.keys(config.io)) + : new Set(); + + // Collect all service names: explicit services plus IO channel names + const allServiceNames = new Set([...(config.services ?? []), ...ioNames]); + + for (const name of allServiceNames) { + // IO services are registered under scoped names to avoid collisions + const lookupName = ioNames.has(name) + ? `io:${subclusterId}:${name}` + : name; + const possibleService = this.#getKernelService(lookupName); + if (possibleService) { + const { kref } = possibleService; + services[name] = kslot(kref); + } else { + throw Error(`no registered kernel service '${lookupName}'`); } } const rootKref = rootIds[config.bootstrap];