Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 47 additions & 30 deletions yarn-project/end-to-end/src/test-wallet/wallet_worker_script.ts
Original file line number Diff line number Diff line change
@@ -1,43 +1,60 @@
import { createAztecNodeClient } from '@aztec/aztec.js/node';
import type { SendOptions } from '@aztec/aztec.js/wallet';
import { jsonStringify } from '@aztec/foundation/json-rpc';
import type { ApiSchema } from '@aztec/foundation/schemas';
import { createLogger } from '@aztec/foundation/log';
import type { ApiSchema, Fr } from '@aztec/foundation/schemas';
import { parseWithOptionals, schemaHasMethod } from '@aztec/foundation/schemas';
import { NodeListener, TransportServer } from '@aztec/foundation/transport';
import { ExecutionPayload, Tx } from '@aztec/stdlib/tx';

import { workerData } from 'worker_threads';

import { TestWallet } from './test_wallet.js';
import { WorkerWalletSchema } from './worker_wallet_schema.js';

const { nodeUrl, pxeConfig } = workerData as { nodeUrl: string; pxeConfig?: Record<string, unknown> };
const logger = createLogger('e2e:test-wallet:worker');

const node = createAztecNodeClient(nodeUrl);
const wallet = await TestWallet.create(node, pxeConfig);
try {
const { nodeUrl, pxeConfig } = workerData as { nodeUrl: string; pxeConfig?: Record<string, unknown> };

/** Handlers for methods that need custom implementation (not direct wallet passthrough). */
const handlers: Record<string, (...args: any[]) => Promise<any>> = {
proveTx: async (exec, opts) => {
const provenTx = await wallet.proveTx(exec, opts);
// ProvenTx has non-serializable fields (node proxy, etc.) — extract only Tx-compatible fields
const { data, chonkProof, contractClassLogFields, publicFunctionCalldata } = provenTx;
return { data, chonkProof, contractClassLogFields, publicFunctionCalldata };
},
registerAccount: async (secret, salt) => {
const manager = await wallet.createSchnorrAccount(secret, salt);
return manager.address;
},
};
logger.info('Initializing worker wallet', { nodeUrl });
const node = createAztecNodeClient(nodeUrl);
const wallet = await TestWallet.create(node, pxeConfig);
logger.info('Worker wallet initialized');

const schema = WorkerWalletSchema as ApiSchema;
const listener = new NodeListener();
const server = new TransportServer<{ fn: string; args: string }>(listener, async msg => {
if (!schemaHasMethod(schema, msg.fn)) {
throw new Error(`Unknown method: ${msg.fn}`);
}
const jsonParams = JSON.parse(msg.args) as unknown[];
const args = await parseWithOptionals(jsonParams, schema[msg.fn].parameters());
const handler = handlers[msg.fn];
const result = handler ? await handler(...args) : await (wallet as any)[msg.fn](...args);
return jsonStringify(result);
});
server.start();
const customMethods = {
proveTx: async (exec: ExecutionPayload, opts: Omit<SendOptions, 'wait'>) => {
const provenTx = await wallet.proveTx(exec, opts);
return new Tx(
provenTx.getTxHash(),
provenTx.data,
provenTx.chonkProof,
provenTx.contractClassLogFields,
provenTx.publicFunctionCalldata,
);
},
registerAccount: async (secret: Fr, salt: Fr) => {
const manager = await wallet.createSchnorrAccount(secret, salt);
return manager.address;
},
};

const schema = WorkerWalletSchema as ApiSchema;
const listener = new NodeListener();
const server = new TransportServer<{ fn: string; args: string }>(listener, async msg => {
if (!schemaHasMethod(schema, msg.fn)) {
throw new Error(`Unknown method: ${msg.fn}`);
}
const jsonParams = JSON.parse(msg.args) as unknown[];
const args: any[] = await parseWithOptionals(jsonParams, schema[msg.fn].parameters());
// we have to erase the fn type in order to be able to spread ...args
const handler: ((...args: any[]) => Promise<any>) | undefined =
msg.fn in customMethods ? customMethods[msg.fn as keyof typeof customMethods] : undefined;
const result = handler ? await handler(...args) : await (wallet as any)[msg.fn](...args);
return jsonStringify(result);
});
server.start();
} catch (err: unknown) {
logger.error('Worker wallet initialization failed', { error: err instanceof Error ? err.stack : String(err) });
process.exit(1);
}
54 changes: 51 additions & 3 deletions yarn-project/end-to-end/src/test-wallet/worker_wallet.ts
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,10 @@ import type {
import type { ChainInfo } from '@aztec/entrypoints/interfaces';
import type { Fr } from '@aztec/foundation/curves/bn254';
import { jsonStringify } from '@aztec/foundation/json-rpc';
import { createLogger } from '@aztec/foundation/log';
import { promiseWithResolvers } from '@aztec/foundation/promise';
import type { ApiSchema } from '@aztec/foundation/schemas';
import { sleep } from '@aztec/foundation/sleep';
import { NodeConnector, TransportClient } from '@aztec/foundation/transport';
import type { PXEConfig } from '@aztec/pxe/config';
import type { ContractArtifact, EventMetadataDefinition, FunctionCall } from '@aztec/stdlib/abi';
Expand All @@ -35,6 +38,10 @@ import { WorkerWalletSchema } from './worker_wallet_schema.js';

type WorkerMsg = { fn: string; args: string };

const log = createLogger('e2e:test-wallet:worker-wallet');

const WORKER_READY_TIMEOUT_MS = 120_000;

/**
* Wallet implementation that offloads all work to a worker thread.
* Implements the Wallet interface by proxying calls over a transport layer
Expand All @@ -53,17 +60,58 @@ export class WorkerWallet implements Wallet {
* @returns A WorkerWallet ready to use.
*/
static async create(nodeUrl: string, pxeConfig?: Partial<PXEConfig>): Promise<WorkerWallet> {
const worker = new Worker(new URL('./wallet_worker_script.js', import.meta.url), {
// replace stc/ with dest/ so the wallet works in Jest tests
const workerUrl = new URL('./wallet_worker_script.js', import.meta.url);
workerUrl.pathname = workerUrl.pathname.replace('/src/', '/dest/');
// remove JEST_WORKER_ID so the worker uses pino-pretty transport instead of Jest's raw output.
const { JEST_WORKER_ID: _, ...parentEnv } = process.env;
const worker = new Worker(workerUrl, {
workerData: { nodeUrl, pxeConfig },
env: {
...parentEnv,
...(process.stderr.isTTY || process.env.FORCE_COLOR ? { FORCE_COLOR: '1' } : {}),
LOG_LEVEL: process.env.WORKER_LOG_LEVEL ?? 'warning',
},
});

const connector = new NodeConnector(worker);
const client = new TransportClient<WorkerMsg>(connector);
await client.open();

const wallet = new WorkerWallet(worker, client);
// Warmup / readiness check — blocks until the worker has finished creating the TestWallet.
await wallet.getChainInfo();

const { promise: workerDied, reject: rejectWorkerDied } = promiseWithResolvers<void>();
// reject if the worker exits or errors before the warmup completes.
const onError = (err: Error): void => {
worker.off('exit', onExit!);
rejectWorkerDied(new Error(`Worker wallet thread error: ${err.message}`));
};

const onExit = (code: number): void => {
worker.off('error', onError!);
rejectWorkerDied(new Error(`Worker wallet thread exited with code ${code} before becoming ready`));
};

worker.once('error', onError);
worker.once('exit', onExit);

const timeout = sleep(WORKER_READY_TIMEOUT_MS).then(() => {
throw new Error(`Worker wallet creation timed out after ${WORKER_READY_TIMEOUT_MS / 1000}s`);
});

try {
// wait for worker wallet to start
await Promise.race([wallet.getChainInfo(), workerDied, timeout]);
} catch (err) {
log.error('Worker wallet creation failed, cleaning up', { error: String(err) });
client.close();
await worker.terminate();
throw err;
} finally {
worker.off('error', onError);
worker.off('exit', onExit);
}

return wallet;
}

Expand Down
4 changes: 2 additions & 2 deletions yarn-project/foundation/src/transport/transport_client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,7 @@ export class TransportClient<Payload> extends EventEmitter {
}
const msgId = this.msgId++;
const msg = { msgId, payload };
log.debug(format(`->`, msg));
log.trace(format(`->`, msg));
return new Promise<any>((resolve, reject) => {
this.pendingRequests.push({ resolve, reject, msgId });
this.socket!.send(msg, transfer).catch(reject);
Expand All @@ -111,7 +111,7 @@ export class TransportClient<Payload> extends EventEmitter {
this.close();
return;
}
log.debug(format(`<-`, msg));
log.trace(format(`<-`, msg));
if (isEventMessage(msg)) {
this.emit('event_msg', msg.payload);
return;
Expand Down
Loading