Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,6 @@ describe('makeHostRestrictedFetch', () => {
});

it('should handle Request objects correctly', async () => {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
const request = new Request(mockUrl);

await restrictedFetch(request);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ export const makeHostRestrictedFetch = (
const restrictedFetch = async (
...[url, ...args]: Parameters<typeof fetch>
): ReturnType<typeof fetch> => {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
const { host } = new URL(url instanceof Request ? url.url : url);
if (!allowedHosts.includes(host)) {
throw new Error(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ describe('fetch nodejs capability', () => {
{
name: 'with Request objects',
config: {},
// eslint-disable-next-line n/no-unsupported-features/node-builtins

input: new Request('file:///path/to/file.txt'),
},
{
Expand All @@ -50,7 +50,7 @@ describe('fetch nodejs capability', () => {
const result = await fetchCapability(input);

expect(readFile).toHaveBeenCalledWith('/path/to/file.txt', 'utf8');
// eslint-disable-next-line n/no-unsupported-features/node-builtins

expect(result).toBeInstanceOf(Response);
expect(await result.text()).toBe(fileContents);
expect(fetchMock).not.toHaveBeenCalled(); // Should not call global fetch for file:// URLs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,13 +13,12 @@ import { makeCapabilitySpecification } from '../../specification.ts';
*/
const makeExtendedFetch = (fromFetch: FetchCapability): FetchCapability => {
return async (...[input, ...args]: Parameters<FetchCapability>) => {
// eslint-disable-next-line n/no-unsupported-features/node-builtins
const url = input instanceof Request ? input.url : input;
const { protocol, pathname } = new URL(url);

if (protocol === 'file:') {
const contents = await readFile(pathname, 'utf8');
// eslint-disable-next-line n/no-unsupported-features/node-builtins

return new Response(contents);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ describe('resolveUrl', () => {
{ name: 'string URL', input: 'https://example.test/path' },
{
name: 'Request object URL',
// eslint-disable-next-line n/no-unsupported-features/node-builtins

input: new Request('https://example.test/path'),
},
{ name: 'URL object', input: new URL('https://example.test/path') },
Expand Down Expand Up @@ -44,7 +44,7 @@ describe('makeHostCaveat', () => {
it.each([
{
name: 'Request objects',
// eslint-disable-next-line n/no-unsupported-features/node-builtins

input: new Request('https://example.test/path'),
},
{ name: 'URL objects', input: new URL('https://example.test/path') },
Expand Down Expand Up @@ -91,7 +91,6 @@ describe('makeCaveatedFetch', () => {
const mockResponse = {
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
} as Response;
const mockFetch = vi.fn().mockResolvedValue(mockResponse);
const caveat = vi.fn().mockResolvedValue(undefined);
Expand Down Expand Up @@ -122,7 +121,6 @@ describe('makeCaveatedFetch', () => {
const mockResponse = {
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
} as Response;
const mockFetch = vi.fn().mockResolvedValue(mockResponse);
const caveat = vi.fn().mockResolvedValue(undefined);
Expand All @@ -142,7 +140,6 @@ describe('shared fetch capability behavior', () => {
const mockResponse = {
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
} as Response;
const mockFetch = vi.fn().mockResolvedValue(mockResponse);
const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] });
Expand All @@ -164,7 +161,6 @@ describe('shared fetch capability behavior', () => {
const mockResponse = {
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
} as Response;
const mockFetch = vi.fn().mockResolvedValue(mockResponse);
const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] });
Expand All @@ -179,15 +175,14 @@ describe('shared fetch capability behavior', () => {
it.each([
{
name: 'Request objects',
// eslint-disable-next-line n/no-unsupported-features/node-builtins

input: new Request('https://example.test/path'),
},
{ name: 'URL objects', input: new URL('https://example.test/path') },
])('handles $name', async ({ input }) => {
const mockResponse = {
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
} as Response;
const mockFetch = vi.fn().mockResolvedValue(mockResponse);
const caveat = makeFetchCaveat({ allowedHosts: ['example.test'] });
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import type { FetchCapability, FetchCaveat, FetchConfig } from './types.ts';
* @returns The resolved URL
*/
export const resolveUrl = (arg: Parameters<typeof fetch>[0]): URL =>
// eslint-disable-next-line n/no-unsupported-features/node-builtins
new URL(arg instanceof Request ? arg.url : arg);

/**
Expand Down
3 changes: 1 addition & 2 deletions packages/kernel-platforms/test/utils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,9 @@ export const superstructValidationError = /At path: .* -- Expected/u;
*
* @returns A mock Response object
*/
// eslint-disable-next-line n/no-unsupported-features/node-builtins

export const createMockResponse = (): Response =>
({
status: 200,
text: async () => Promise.resolve('test'),
// eslint-disable-next-line n/no-unsupported-features/node-builtins
}) as Response;
263 changes: 263 additions & 0 deletions packages/kernel-test/src/io.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
import { makeSQLKernelDatabase } from '@metamask/kernel-store/sqlite/nodejs';
import { waitUntilQuiescent } from '@metamask/kernel-utils';
import { Kernel } from '@metamask/ocap-kernel';
import type { IOChannel, IOConfig } from '@metamask/ocap-kernel';
import * as net from 'node:net';
import * as os from 'node:os';
import * as path from 'node:path';
import { describe, it, expect, afterEach } from 'vitest';

import { getBundleSpec, makeTestLogger } from './utils.ts';

function tempSocketPath(): string {
return path.join(
os.tmpdir(),
`io-int-${Date.now()}-${Math.random().toString(36).slice(2)}.sock`,
);
}

async function connectToSocket(socketPath: string): Promise<net.Socket> {
return new Promise((resolve, reject) => {
const client = net.createConnection(socketPath, () => {
client.removeListener('error', reject);
resolve(client);
});
client.on('error', reject);
});
}

async function writeLine(socket: net.Socket, line: string): Promise<void> {
return new Promise((resolve, reject) => {
socket.write(`${line}\n`, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
}

async function readLine(socket: net.Socket): Promise<string> {
return new Promise((resolve) => {
let buffer = '';
const onData = (data: Buffer): void => {
buffer += data.toString();
const idx = buffer.indexOf('\n');
if (idx !== -1) {
socket.removeListener('data', onData);
resolve(buffer.slice(0, idx));
}
};
socket.on('data', onData);
});
}

async function makeTestSocketChannel(
_name: string,
socketPath: string,
): Promise<IOChannel> {
const fsPromises = await import('node:fs/promises');
const lineQueue: string[] = [];
const readerQueue: { resolve: (value: string | null) => void }[] = [];
let currentSocket: net.Socket | null = null;
let lineBuffer = '';
let closed = false;

function deliverLine(line: string): void {
const reader = readerQueue.shift();
if (reader) {
reader.resolve(line);
} else {
lineQueue.push(line);
}
}

function deliverEOF(): void {
while (readerQueue.length > 0) {
readerQueue.shift()?.resolve(null);
}
}

const server = net.createServer((socket) => {
if (currentSocket) {
socket.destroy();
return;
}
currentSocket = socket;
lineBuffer = '';
socket.on('data', (data: Buffer) => {
lineBuffer += data.toString();
let idx = lineBuffer.indexOf('\n');
while (idx !== -1) {
deliverLine(lineBuffer.slice(0, idx));
lineBuffer = lineBuffer.slice(idx + 1);
idx = lineBuffer.indexOf('\n');
}
});
socket.on('end', () => {
if (lineBuffer.length > 0) {
deliverLine(lineBuffer);
lineBuffer = '';
}
currentSocket = null;
deliverEOF();
});
socket.on('error', () => {
currentSocket = null;
deliverEOF();
});
});

try {
await fsPromises.unlink(socketPath);
} catch {
// ignore
}

await new Promise<void>((resolve, reject) => {
server.on('error', reject);
server.listen(socketPath, () => {
server.removeListener('error', reject);
resolve();
});
});

return {
async read() {
if (closed) {
return null;
}
const queued = lineQueue.shift();
if (queued !== undefined) {
return queued;
}
if (!currentSocket) {
return null;
}
return new Promise<string | null>((resolve) => {
readerQueue.push({ resolve });
});
},
async write(data: string) {
if (!currentSocket) {
throw new Error('no connected client');
}
const socket = currentSocket;
return new Promise<void>((resolve, reject) => {
socket.write(`${data}\n`, (error) => {
if (error) {
reject(error);
} else {
resolve();
}
});
});
},
async close() {
if (closed) {
return;
}
closed = true;
deliverEOF();
currentSocket?.destroy();
currentSocket = null;
await new Promise<void>((resolve) => {
server.close(() => resolve());
});
try {
await fsPromises.unlink(socketPath);
} catch {
// ignore
}
},
};
}

describe('IO kernel service', () => {
const clients: net.Socket[] = [];

afterEach(async () => {
for (const client of clients) {
client.destroy();
}
clients.length = 0;
});

it('reads and writes through an IO channel', async () => {
const socketPath = tempSocketPath();
const kernelDatabase = await makeSQLKernelDatabase({
dbFilename: ':memory:',
});
const { logger } = makeTestLogger();

const { NodejsPlatformServices } = await import('@ocap/nodejs');
const kernel = await Kernel.make(
new NodejsPlatformServices({
logger: logger.subLogger({ tags: ['platform'] }),
}),
kernelDatabase,
{
resetStorage: true,
logger,
ioChannelFactory: async (name: string, config: IOConfig) => {
if (config.type !== 'socket') {
throw new Error(`unsupported: ${config.type}`);
}
return makeTestSocketChannel(name, config.path);
},
},
);

const config = {
bootstrap: 'io',
forceReset: true,
io: {
repl: {
type: 'socket' as const,
path: socketPath,
},
},
services: ['repl'],
vats: {
io: {
bundleSpec: getBundleSpec('io-vat'),
parameters: { name: 'io' },
},
},
};

const { rootKref } = await kernel.launchSubcluster(config);
await waitUntilQuiescent();

// Connect to the socket
const client = await connectToSocket(socketPath);
clients.push(client);

// Small delay for connection setup
await new Promise((resolve) => setTimeout(resolve, 20));

// Send a line from the test to the vat
await writeLine(client, 'hello from test');

// Trigger the vat to read and verify it received the data
await kernel.queueMessage(rootKref, 'doRead', []);
await waitUntilQuiescent(100);

const bufferResult = await kernel.queueMessage(
rootKref,
'getReadBuffer',
[],
);
await waitUntilQuiescent(100);
expect(bufferResult.body).toContain('hello from test');

// Trigger the vat to write
const linePromise = readLine(client);
await kernel.queueMessage(rootKref, 'doWrite', ['hello from vat']);
await waitUntilQuiescent(100);

const received = await linePromise;
expect(received).toBe('hello from vat');
});
});
Loading
Loading