diff --git a/apps/relay-docker/.env.example b/apps/relay-docker/.env.example new file mode 100644 index 0000000..22ff6df --- /dev/null +++ b/apps/relay-docker/.env.example @@ -0,0 +1,48 @@ +# Clawket Self-Hosted Relay — Environment Configuration +# Copy this file to .env and adjust the values for your deployment. + +# ---------- Ports ---------- +# Port for the Registry HTTP API +REGISTRY_PORT=3001 +# Port for the Relay WebSocket server +RELAY_PORT=3002 + +# ---------- URLs ---------- +# The public WebSocket URL clients will use to connect to the relay. +# Must be wss:// in production (behind nginx TLS termination). +RELAY_URL=wss://relay.example.com/ws + +# The public URL of the registry API. +# Used by the relay to verify tokens against the registry. +REGISTRY_URL=https://registry.example.com + +# ---------- Region Map (optional) ---------- +# Optional JSON map of region → public relay WebSocket URL for this same deployment. +# relay-docker is a single-process / single-instance deployment: do not point these URLs +# at different relay nodes unless you also add your own cross-node state coordination. +# Leave empty to use RELAY_URL for every region. +# Example: {"cn":"wss://relay-cn.example.com/ws","sg":"wss://relay-sg.example.com/ws"} +RELAY_REGION_MAP= + +# ---------- Persistence ---------- +# Path to SQLite database file for KV persistence. +# Leave empty for memory-only mode (data lost on restart). +# For Docker, mount a volume and set: /data/kv.sqlite +KV_PERSIST_PATH=/data/kv.sqlite + +# ---------- Pairing ---------- +# Access code TTL in seconds (default: 600 = 10 minutes) +PAIR_ACCESS_CODE_TTL_SEC=600 + +# Maximum client tokens per gateway (default: 8) +PAIR_CLIENT_TOKEN_MAX=8 + +# ---------- Rate Limiting ---------- +MAX_MESSAGES_PER_10S=120 +MAX_CLIENT_MESSAGES_PER_10S=300 + +# ---------- Timeouts ---------- +HEARTBEAT_INTERVAL_MS=30000 +AWAITING_CHALLENGE_TTL_MS=25000 +CLIENT_IDLE_TIMEOUT_MS=600000 +GATEWAY_OWNER_LEASE_MS=20000 diff --git a/apps/relay-docker/Dockerfile b/apps/relay-docker/Dockerfile new file mode 100644 index 0000000..07add11 --- /dev/null +++ b/apps/relay-docker/Dockerfile @@ -0,0 +1,57 @@ +# ---------- Build Stage ---------- +FROM node:22-alpine AS builder + +ARG NPM_REGISTRY=https://registry.npmjs.org/ + +WORKDIR /build + +# Copy root workspace files +COPY package.json package-lock.json ./ +COPY tsconfig.relay-base.json ./ + +# Copy shared package (source — used as workspace dependency) +COPY packages/relay-shared/ ./packages/relay-shared/ + +# Copy relay-docker app +COPY apps/relay-docker/ ./apps/relay-docker/ + +# Install ALL dependencies (including devDependencies for build) +# Use npm install so this image can still build when the monorepo lockfile changes outside relay-docker. +# --ignore-scripts: skip root postinstall (mobile-only setup not needed here) +RUN npm install --workspace=@clawket/relay-docker --workspace=@clawket/shared --include-workspace-root --ignore-scripts --registry=${NPM_REGISTRY} + +# Build relay-docker (tsc) +RUN npm run --workspace=@clawket/relay-docker build + +# Build the shared package runtime bundle for production consumption. +RUN npm run --workspace=@clawket/shared build + +# ---------- Production Stage ---------- +FROM node:22-alpine + +ARG NPM_REGISTRY=https://registry.npmjs.org/ + +WORKDIR /app + +# Copy compiled relay-docker output +COPY --from=builder /build/apps/relay-docker/dist/ ./dist/ +COPY --from=builder /build/apps/relay-docker/package.json ./ + +# Copy the shared package runtime artifacts prepared by its dedicated build script +COPY --from=builder /build/packages/relay-shared/dist/ ./node_modules/@clawket/shared/ + +# Install production dependencies only (ws, better-sqlite3) +RUN npm install --omit=dev --ignore-scripts --registry=${NPM_REGISTRY} \ + && npm rebuild better-sqlite3 + +# Create data directory for SQLite persistence +RUN mkdir -p /data + +EXPOSE 3001 3002 + +ENV NODE_ENV=production +ENV KV_PERSIST_PATH=/data/kv.sqlite + +VOLUME /data + +CMD ["node", "dist/server.js"] diff --git a/apps/relay-docker/README.md b/apps/relay-docker/README.md new file mode 100644 index 0000000..65e6a76 --- /dev/null +++ b/apps/relay-docker/README.md @@ -0,0 +1,116 @@ +# Clawket Self-Hosted Relay (Docker) + +`relay-docker` is the self-hosted Docker deployment of Clawket's relay stack. It runs the registry HTTP API and the relay WebSocket server inside one Node.js process, with one shared in-process KV layer and optional SQLite persistence. + +## What This Deployment Supports + +- One logical deployment unit: one registry + one relay process. +- Optional multiple public relay URLs via `RELAY_REGION_MAP`, but every URL must still route to this same deployment. +- Persistent pairing records when `KV_PERSIST_PATH` points to a writable SQLite file. + +It does not provide cross-node coordination, distributed room ownership, or multi-instance token propagation. + +## Quick Start + +### 1. Configure Environment + +```bash +cp .env.example .env +# Edit .env with your domains and persistence path +``` + +### 2. Build and Run + +```bash +docker compose up -d --build + +docker compose logs -f + +curl http://localhost:3001/v1/health +curl http://localhost:3002/v1/health +``` + +The checked-in `docker-compose.yml` builds from the local repository by default so the container matches the code in this workspace. + +### 3. Configure nginx + +Copy `nginx.conf.example` and replace `registry.example.com` / `relay.example.com` with your real domains. The relay domain only exposes: + +- `GET /v1/health` +- `WS /ws` + +## Architecture + +``` +┌─────────────────────────────────────────────┐ +│ Single Node.js Process │ +│ │ +│ Registry HTTP API (:3001) │ +│ ├── POST /v1/pair/register │ +│ ├── POST /v1/pair/access-code │ +│ ├── POST /v1/pair/claim │ +│ ├── GET /v1/verify/:gatewayId │ +│ └── GET /v1/health │ +│ │ +│ Relay WebSocket Server (:3002) │ +│ ├── WS /ws │ +│ └── GET /v1/health │ +│ │ +│ Shared MemoryKV (+ optional SQLite) │ +│ Room Manager (one room per gatewayId) │ +└─────────────────────────────────────────────┘ +``` + +## Security Notes + +- The relay no longer exposes an HTTP token-sync endpoint. Registry and relay share the same in-process KV state directly. +- Registry JSON request bodies are hard-limited because the API only accepts small pairing payloads. +- If `KV_PERSIST_PATH` is set but SQLite cannot open or write the database, startup or writes fail immediately instead of silently falling back to memory-only behavior. + +## Environment Variables + +See [`.env.example`](.env.example) for the full list. + +Key variables: + +| Variable | Default | Description | +|---|---|---| +| `REGISTRY_PORT` | `3001` | Registry API port | +| `RELAY_PORT` | `3002` | Relay WebSocket port | +| `RELAY_URL` | `ws://localhost:3002/ws` | Public relay WebSocket URL | +| `RELAY_REGION_MAP` | _(empty)_ | Optional region → public WebSocket URL map for this same deployment | +| `KV_PERSIST_PATH` | _(empty)_ | Writable SQLite path for persistent pairing records | + +## Differences from Cloudflare Deployment + +| Feature | Cloudflare | Docker | +|---|---|---| +| KV Storage | Cloudflare KV | In-process memory + optional SQLite | +| Room State | Durable Objects | In-memory per process | +| WebSocket | Hibernatable WS | `ws` library | +| Scaling | Edge / multi-region platform primitives | Single deployment unit | +| Region Detection | `request.cf.country` | `X-Real-Country` header | +| Heartbeat | DO alarms | `setTimeout` alarms | + +## Persistence + +With `KV_PERSIST_PATH` set: + +- Pairing records survive process restarts. +- Active WebSocket connections are lost on restart and clients must reconnect. +- Startup fails if the SQLite file cannot be opened. + +Without `KV_PERSIST_PATH`: + +- The service runs in memory-only mode. +- All pairing state is lost on restart. + +## Development + +```bash +npm install + +npm run --workspace=@clawket/relay-docker dev +npm run --workspace=@clawket/relay-docker typecheck +npm run --workspace=@clawket/relay-docker build +``` diff --git a/apps/relay-docker/README.zh-CN.md b/apps/relay-docker/README.zh-CN.md new file mode 100644 index 0000000..bee3d8c --- /dev/null +++ b/apps/relay-docker/README.zh-CN.md @@ -0,0 +1,116 @@ +# Clawket 自托管 Relay(Docker) + +`relay-docker` 是 Clawket relay 栈的 Docker 自托管部署方式。它把 registry HTTP API 和 relay WebSocket 服务放进同一个 Node.js 进程里运行,共享一套进程内 KV,并可选用 SQLite 做持久化。 + +## 这套部署实际支持什么 + +- 一个逻辑部署单元:一个 registry + 一个 relay 进程。 +- 可以通过 `RELAY_REGION_MAP` 暴露多个公网 relay URL,但这些 URL 最终都必须回到同一套部署。 +- 当 `KV_PERSIST_PATH` 指向可写的 SQLite 文件时,配对记录可持久化。 + +它不提供跨节点协调、分布式房间所有权,也不提供多实例之间的 token 传播。 + +## 快速开始 + +### 1. 配置环境变量 + +```bash +cp .env.example .env +# 按你的域名和持久化路径修改 .env +``` + +### 2. 构建并运行 + +```bash +docker compose up -d --build + +docker compose logs -f + +curl http://localhost:3001/v1/health +curl http://localhost:3002/v1/health +``` + +仓库里的 `docker-compose.yml` 默认会直接从当前代码构建镜像,保证容器运行的就是你正在审阅和修改的这份代码。 + +### 3. 配置 nginx + +复制 `nginx.conf.example`,把 `registry.example.com` 和 `relay.example.com` 替换成你的真实域名。relay 域名只暴露: + +- `GET /v1/health` +- `WS /ws` + +## 架构 + +``` +┌─────────────────────────────────────────────┐ +│ Single Node.js Process │ +│ │ +│ Registry HTTP API (:3001) │ +│ ├── POST /v1/pair/register │ +│ ├── POST /v1/pair/access-code │ +│ ├── POST /v1/pair/claim │ +│ ├── GET /v1/verify/:gatewayId │ +│ └── GET /v1/health │ +│ │ +│ Relay WebSocket Server (:3002) │ +│ ├── WS /ws │ +│ └── GET /v1/health │ +│ │ +│ Shared MemoryKV (+ optional SQLite) │ +│ Room Manager (one room per gatewayId) │ +└─────────────────────────────────────────────┘ +``` + +## 安全说明 + +- relay 不再暴露 HTTP token 同步接口。registry 和 relay 直接共享同一份进程内 KV 状态。 +- registry 的 JSON 请求体做了硬限制,因为它只接受很小的配对请求。 +- 如果设置了 `KV_PERSIST_PATH`,但 SQLite 不能打开或写入,服务会立即启动失败或请求失败,而不是悄悄退化成纯内存模式。 + +## 环境变量 + +完整说明见 [`.env.example`](.env.example)。 + +关键变量: + +| 变量 | 默认值 | 说明 | +|---|---|---| +| `REGISTRY_PORT` | `3001` | Registry API 端口 | +| `RELAY_PORT` | `3002` | Relay WebSocket 端口 | +| `RELAY_URL` | `ws://localhost:3002/ws` | 对外提供的 relay WebSocket URL | +| `RELAY_REGION_MAP` | _(空)_ | 当前这套部署的可选 region → 公网 WebSocket URL 映射 | +| `KV_PERSIST_PATH` | _(空)_ | 持久化配对记录的可写 SQLite 路径 | + +## 与 Cloudflare 部署的差异 + +| 功能 | Cloudflare | Docker | +|---|---|---| +| KV 存储 | Cloudflare KV | 进程内内存 + 可选 SQLite | +| 房间状态 | Durable Objects | 单进程内存 | +| WebSocket | Hibernatable WS | `ws` 库 | +| 扩展方式 | 边缘平台 / 多区域原语 | 单部署单元 | +| Region 判定 | `request.cf.country` | `X-Real-Country` 请求头 | +| 心跳 | DO alarms | `setTimeout` alarm | + +## 持久化 + +设置 `KV_PERSIST_PATH` 时: + +- 配对记录可以跨进程重启保留。 +- 活跃 WebSocket 连接会在重启时断开,客户端需要重连。 +- 如果 SQLite 文件无法打开,启动会直接失败。 + +不设置 `KV_PERSIST_PATH` 时: + +- 服务以纯内存模式运行。 +- 进程重启后所有配对状态都会丢失。 + +## 开发 + +```bash +npm install + +npm run --workspace=@clawket/relay-docker dev +npm run --workspace=@clawket/relay-docker typecheck +npm run --workspace=@clawket/relay-docker build +``` diff --git a/apps/relay-docker/docker-compose.yml b/apps/relay-docker/docker-compose.yml new file mode 100644 index 0000000..bceb941 --- /dev/null +++ b/apps/relay-docker/docker-compose.yml @@ -0,0 +1,31 @@ +services: + clawket-relay: + build: + context: ../../ + dockerfile: apps/relay-docker/Dockerfile + # 如需使用已发布镜像,可改为: + # image: weirdoyh/clawket-relay:latest + container_name: clawket-relay + restart: unless-stopped + ports: + - "127.0.0.1:3001:3001" + - "127.0.0.1:3002:3002" + volumes: + - clawket-data:/data + env_file: + - .env + healthcheck: + test: ["CMD", "wget", "--no-verbose", "--tries=1", "--spider", "http://localhost:3001/v1/health"] + interval: 30s + timeout: 5s + retries: 3 + start_period: 10s + logging: + driver: json-file + options: + max-size: "10m" + max-file: "3" + +volumes: + clawket-data: + driver: local diff --git a/apps/relay-docker/nginx.conf.example b/apps/relay-docker/nginx.conf.example new file mode 100644 index 0000000..4cba9fc --- /dev/null +++ b/apps/relay-docker/nginx.conf.example @@ -0,0 +1,70 @@ +# Clawket Self-Hosted Relay — nginx Reverse Proxy Example +# +# This configuration assumes: +# - Registry runs on localhost:3001 +# - Relay WebSocket server runs on localhost:3002 +# - You have SSL certificates configured +# - nginx handles TLS termination +# +# Place this in /etc/nginx/sites-available/ and symlink to sites-enabled/ + +# Registry API +server { + listen 443 ssl http2; + server_name registry.example.com; + + ssl_certificate /etc/ssl/certs/registry.example.com.pem; + ssl_certificate_key /etc/ssl/private/registry.example.com.key; + client_max_body_size 16k; + + location / { + proxy_pass http://127.0.0.1:3001; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Optional: pass GeoIP country for region-based relay selection + # Requires nginx geoip2 module + # proxy_set_header X-Real-Country $geoip2_data_country_code; + } +} + +# Relay WebSocket +server { + listen 443 ssl http2; + server_name relay.example.com; + + ssl_certificate /etc/ssl/certs/relay.example.com.pem; + ssl_certificate_key /etc/ssl/private/relay.example.com.key; + + # WebSocket endpoint + location /ws { + proxy_pass http://127.0.0.1:3002; + proxy_http_version 1.1; + proxy_set_header Upgrade $http_upgrade; + proxy_set_header Connection "upgrade"; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + + # Keep WebSocket connections alive for long sessions + proxy_read_timeout 3600s; + proxy_send_timeout 3600s; + } + + # Relay health check + location = /v1/health { + proxy_pass http://127.0.0.1:3002/v1/health; + proxy_set_header Host $host; + proxy_set_header X-Real-IP $remote_addr; + proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for; + proxy_set_header X-Forwarded-Proto $scheme; + } + + # Everything else on the relay domain stays closed. + location / { + return 404; + } +} diff --git a/apps/relay-docker/package.json b/apps/relay-docker/package.json new file mode 100644 index 0000000..80860c9 --- /dev/null +++ b/apps/relay-docker/package.json @@ -0,0 +1,27 @@ +{ + "name": "@clawket/relay-docker", + "version": "0.1.0", + "private": true, + "type": "module", + "license": "AGPL-3.0-only", + "dependencies": { + "@clawket/shared": "0.1.0", + "better-sqlite3": "^11.9.1", + "ws": "^8.18.1" + }, + "devDependencies": { + "@types/better-sqlite3": "^7.6.13", + "@types/ws": "^8.18.1", + "tsx": "^4.19.4", + "typescript": "^5.8.2" + }, + "scripts": { + "dev": "tsx watch src/server.ts", + "build": "tsc -p tsconfig.build.json", + "start": "node dist/server.js", + "typecheck": "tsc -p tsconfig.json --noEmit", + "test": "vitest run", + "docker:build": "cd ../.. && docker build -f apps/relay-docker/Dockerfile -t clawket-relay .", + "docker:run": "docker run -p 3001:3001 -p 3002:3002 -v clawket-data:/data clawket-relay" + } +} diff --git a/apps/relay-docker/src/cf-shim.ts b/apps/relay-docker/src/cf-shim.ts new file mode 100644 index 0000000..fc9a99b --- /dev/null +++ b/apps/relay-docker/src/cf-shim.ts @@ -0,0 +1,118 @@ +/** + * cf-shim.ts — Shim layer providing Cloudflare-compatible interfaces + * backed by Node.js primitives. This allows relay modules to run with + * minimal source changes. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; + +// ---------- WebSocket Attachment Shim ---------- +// CF Durable Objects support ws.serializeAttachment / ws.deserializeAttachment. +// We shim this with a WeakMap. + +const wsAttachments = new WeakMap(); + +export function serializeAttachment(ws: WsWebSocket, data: unknown): void { + wsAttachments.set(ws, structuredClone(data)); +} + +export function deserializeAttachment(ws: WsWebSocket): unknown { + return wsAttachments.get(ws) ?? null; +} + +// ---------- Room Storage Shim ---------- +// Replaces DurableObjectState.storage for per-room persistent storage. + +export class RoomStorage { + private readonly data = new Map(); + private alarmCallback: (() => Promise) | null = null; + private alarmTimer: ReturnType | null = null; + + async get(key: string): Promise { + return this.data.get(key) as T | undefined; + } + + async put(key: string, value: unknown): Promise { + this.data.set(key, structuredClone(value)); + } + + async delete(key: string): Promise { + return this.data.delete(key); + } + + setAlarmHandler(handler: () => Promise): void { + this.alarmCallback = handler; + } + + async setAlarm(scheduledTime: number): Promise { + if (this.alarmTimer) { + clearTimeout(this.alarmTimer); + } + const delay = Math.max(0, scheduledTime - Date.now()); + this.alarmTimer = setTimeout(async () => { + this.alarmTimer = null; + if (this.alarmCallback) { + try { + await this.alarmCallback(); + } catch (err) { + console.error('[room-storage] Alarm callback failed:', err); + } + } + }, delay); + } + + async deleteAlarm(): Promise { + if (this.alarmTimer) { + clearTimeout(this.alarmTimer); + this.alarmTimer = null; + } + } + + destroy(): void { + if (this.alarmTimer) { + clearTimeout(this.alarmTimer); + this.alarmTimer = null; + } + this.data.clear(); + } +} + +// ---------- Room State Shim ---------- +// Replaces DurableObjectState for per-room WebSocket tracking + storage. + +export class RoomState { + readonly storage: RoomStorage; + private readonly sockets = new Set(); + private readonly _id: string; + + constructor(id: string) { + this._id = id; + this.storage = new RoomStorage(); + } + + get id(): { toString(): string } { + return { toString: () => this._id }; + } + + acceptWebSocket(ws: WsWebSocket): void { + this.sockets.add(ws); + } + + removeWebSocket(ws: WsWebSocket): void { + this.sockets.delete(ws); + } + + getWebSockets(): WsWebSocket[] { + return Array.from(this.sockets); + } + + async blockConcurrencyWhile(fn: () => Promise): Promise { + // In single-threaded Node.js, just await the function. + await fn(); + } + + destroy(): void { + this.storage.destroy(); + this.sockets.clear(); + } +} diff --git a/apps/relay-docker/src/globals.d.ts b/apps/relay-docker/src/globals.d.ts new file mode 100644 index 0000000..ae8fc42 --- /dev/null +++ b/apps/relay-docker/src/globals.d.ts @@ -0,0 +1,38 @@ +/** + * globals.d.ts — Type declarations for Node.js 22 globals + * that @types/node may not expose depending on version/resolution. + */ + +/* global fetch — available in Node.js 18+ */ +declare function fetch(input: string | URL, init?: { + method?: string; + headers?: Record | Headers; + body?: string | Buffer | ReadableStream | null; + signal?: AbortSignal; +}): Promise; + +interface Response { + readonly ok: boolean; + readonly status: number; + readonly statusText: string; + readonly headers: Headers; + json(): Promise; + text(): Promise; + arrayBuffer(): Promise; +} + +interface Headers { + get(name: string): string | null; + has(name: string): boolean; + set(name: string, value: string): void; + append(name: string, value: string): void; + delete(name: string): void; + forEach(callback: (value: string, key: string) => void): void; +} + +/* Request — used only as a cast target for resolveRelayAuthToken shim */ +interface Request { + readonly headers: Headers; + readonly method: string; + readonly url: string; +} diff --git a/apps/relay-docker/src/kv-store.ts b/apps/relay-docker/src/kv-store.ts new file mode 100644 index 0000000..39661cc --- /dev/null +++ b/apps/relay-docker/src/kv-store.ts @@ -0,0 +1,128 @@ +/** + * kv-store.ts — In-memory KV store with optional SQLite persistence. + * + * Replaces Cloudflare KV Namespace for self-hosted Docker deployment. + * Supports TTL expiration and periodic garbage collection. + */ + +import type Database from 'better-sqlite3'; +import { createRequire } from 'module'; + +export interface KVEntry { + value: string; + expiresAt: number | null; // epoch ms, null = no expiry +} + +export class MemoryKV { + private readonly store = new Map(); + private readonly gcTimer: ReturnType; + private db: Database.Database | null = null; + + constructor(sqlitePath?: string) { + const normalizedPath = sqlitePath?.trim() || null; + if (normalizedPath) { + this.initSqlite(normalizedPath); + } + // GC every 60 seconds + this.gcTimer = setInterval(() => this.gc(), 60_000); + } + + private initSqlite(path: string): void { + try { + // Dynamic import workaround — better-sqlite3 is CommonJS + // eslint-disable-next-line @typescript-eslint/no-require-imports + const require = createRequire(import.meta.url); + const BetterSqlite3 = require('better-sqlite3') as new (filename: string) => Database.Database; + this.db = new BetterSqlite3(path); + this.db!.pragma('journal_mode = WAL'); + this.db!.exec(` + CREATE TABLE IF NOT EXISTS kv ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + expires_at INTEGER + ) + `); + this.db!.prepare('DELETE FROM kv WHERE expires_at IS NOT NULL AND expires_at <= ?').run(Date.now()); + // Load existing entries into memory + const rows = this.db!.prepare('SELECT key, value, expires_at FROM kv').all() as Array<{ + key: string; + value: string; + expires_at: number | null; + }>; + const now = Date.now(); + for (const row of rows) { + if (row.expires_at && row.expires_at <= now) continue; + this.store.set(row.key, { + value: row.value, + expiresAt: row.expires_at, + }); + } + console.log(`[kv-store] Loaded ${this.store.size} entries from ${path}`); + } catch (err) { + this.db = null; + const message = err instanceof Error ? err.message : String(err); + throw new Error(`[kv-store] SQLite init failed for ${path}: ${message}`); + } + } + + async get(key: string): Promise { + const entry = this.store.get(key); + if (!entry) return null; + if (entry.expiresAt && entry.expiresAt <= Date.now()) { + this.store.delete(key); + this.deleteSqlite(key, false); + return null; + } + return entry.value; + } + + async put(key: string, value: string, options?: { expirationTtl?: number }): Promise { + const expiresAt = options?.expirationTtl + ? Date.now() + options.expirationTtl * 1000 + : null; + this.putSqlite(key, value, expiresAt); + this.store.set(key, { value, expiresAt }); + } + + async delete(key: string): Promise { + this.deleteSqlite(key, true); + this.store.delete(key); + } + + private putSqlite(key: string, value: string, expiresAt: number | null): void { + if (!this.db) return; + this.db.prepare( + 'INSERT OR REPLACE INTO kv (key, value, expires_at) VALUES (?, ?, ?)', + ).run(key, value, expiresAt); + } + + private deleteSqlite(key: string, strict: boolean): void { + if (!this.db) return; + try { + this.db.prepare('DELETE FROM kv WHERE key = ?').run(key); + } catch (err) { + if (strict) { + const message = err instanceof Error ? err.message : String(err); + throw new Error(`[kv-store] SQLite delete failed for key ${key}: ${message}`); + } + console.warn(`[kv-store] SQLite cleanup failed for key ${key}:`, err); + } + } + + private gc(): void { + const now = Date.now(); + for (const [key, entry] of this.store.entries()) { + if (entry.expiresAt && entry.expiresAt <= now) { + this.store.delete(key); + this.deleteSqlite(key, false); + } + } + } + + close(): void { + clearInterval(this.gcTimer); + if (this.db) { + try { this.db.close(); } catch { /* best effort */ } + } + } +} diff --git a/apps/relay-docker/src/registry.ts b/apps/relay-docker/src/registry.ts new file mode 100644 index 0000000..c9e9017 --- /dev/null +++ b/apps/relay-docker/src/registry.ts @@ -0,0 +1,589 @@ +/** + * registry.ts — Registry HTTP API server for Docker deployment. + * + * Ported from apps/relay-registry/src/index.ts. + * Replaces Cloudflare Worker with Node.js http server. + */ + +import { createServer, type IncomingMessage, type ServerResponse } from 'node:http'; +import { + normalizeRegion, + parsePositiveInt, + sha256Hex, + type PairAccessCodeRequest, + type PairAccessCodeResponse, + type PairClaimRequest, + type PairClaimResponse, + type PairRegisterRequest, + type PairRegisterResponse, +} from '@clawket/shared'; +import type { MemoryKV } from './kv-store.js'; + +interface RegistryConfig { + routesKv: MemoryKV; + relayRegionMap: string; + pairAccessCodeTtlSec?: string; + pairClientTokenMax?: string; + relayUrl: string; // The public WebSocket relay URL for this deployment +} + +type PairClientTokenRecord = { + hash: string; + label: string | null; + createdAt: string; + lastUsedAt: string | null; +}; + +type PairGatewayRecord = { + gatewayId: string; + relayUrl: string; + region: string; + displayName: string | null; + relaySecretHash: string; + accessCodeHash: string | null; + accessCodeExpiresAt: string | null; + clientTokens: PairClientTokenRecord[]; + createdAt: string; + updatedAt: string; +}; + +type PairGatewayLookupResult = + | { ok: true; record: PairGatewayRecord | null } + | { ok: false; gatewayId: string }; + +const ACCESS_CODE_TTL_FALLBACK_SEC = 10 * 60; +const PAIR_CLIENT_TOKEN_MAX_FALLBACK = 8; +const ACCESS_CODE_ALPHABET = 'ABCDEFGHJKMNPQRSTVWXYZ23456789'; +const ACCESS_CODE_LENGTH = 6; +const ACCESS_CODE_RANDOM_LIMIT = Math.floor(256 / ACCESS_CODE_ALPHABET.length) * ACCESS_CODE_ALPHABET.length; +const MAX_JSON_BODY_BYTES = 16 * 1024; + +export function createRegistryServer( + config: RegistryConfig, + port: number, +): { start: () => void; close: () => Promise } { + const server = createServer(async (req, res) => { + const startedAt = Date.now(); + const url = new URL(req.url ?? '/', `http://localhost:${port}`); + + // CORS preflight + if (req.method === 'OPTIONS') { + sendCors(res, 204); + return; + } + + try { + if (req.method === 'GET' && url.pathname === '/v1/health') { + const relayMap = readRelayMap(config); + sendJsonCors(res, 200, { ok: true, regions: Object.keys(relayMap) }); + logTelemetry('http_request', req, url, 200, Date.now() - startedAt); + return; + } + + if (req.method === 'POST' && url.pathname === '/v1/pair/register') { + const bodyResult = await readJsonBodyLimited(req, MAX_JSON_BODY_BYTES); + if (!bodyResult.ok) { + sendJsonCors(res, bodyResult.status, bodyResult.body); + logTelemetry('http_request', req, url, bodyResult.status, Date.now() - startedAt); + return; + } + const body = bodyResult.data; + const result = await handlePairRegister(body, config, req); + sendJsonCors(res, result.status, result.body); + logTelemetry('http_request', req, url, result.status, Date.now() - startedAt); + return; + } + + if (req.method === 'POST' && url.pathname === '/v1/pair/access-code') { + const bodyResult = await readJsonBodyLimited(req, MAX_JSON_BODY_BYTES); + if (!bodyResult.ok) { + sendJsonCors(res, bodyResult.status, bodyResult.body); + logTelemetry('http_request', req, url, bodyResult.status, Date.now() - startedAt); + return; + } + const body = bodyResult.data; + const result = await handlePairAccessCode(body, config); + sendJsonCors(res, result.status, result.body); + logTelemetry('http_request', req, url, result.status, Date.now() - startedAt); + return; + } + + if (req.method === 'POST' && url.pathname === '/v1/pair/claim') { + const bodyResult = await readJsonBodyLimited(req, MAX_JSON_BODY_BYTES); + if (!bodyResult.ok) { + sendJsonCors(res, bodyResult.status, bodyResult.body); + logTelemetry('http_request', req, url, bodyResult.status, Date.now() - startedAt); + return; + } + const body = bodyResult.data; + const result = await handlePairClaim(body, config); + sendJsonCors(res, result.status, result.body); + logTelemetry('http_request', req, url, result.status, Date.now() - startedAt); + return; + } + + if (req.method === 'GET' && url.pathname.startsWith('/v1/verify/')) { + const gatewayId = decodeURIComponent(url.pathname.slice('/v1/verify/'.length)); + const result = await handleVerify(req, config, gatewayId); + sendJsonCors(res, result.status, result.body); + logTelemetry('http_request', req, url, result.status, Date.now() - startedAt); + return; + } + + sendJsonCors(res, 404, { error: { code: 'NOT_FOUND', message: 'Route not found' } }); + logTelemetry('http_request', req, url, 404, Date.now() - startedAt); + } catch (err) { + console.error('[registry] Unhandled error:', err); + sendJsonCors(res, 500, { error: { code: 'INTERNAL_ERROR', message: 'Internal server error' } }); + logTelemetry('http_request', req, url, 500, Date.now() - startedAt); + } + }); + + return { + start: () => { + server.listen(port, () => { + console.log(`[registry] Registry API listening on port ${port}`); + }); + }, + close: () => new Promise((resolve, reject) => { + server.close((error) => { + if (error && (error as NodeJS.ErrnoException).code !== 'ERR_SERVER_NOT_RUNNING') { + reject(error); + return; + } + resolve(); + }); + }), + }; +} + +// ---------- Handlers ---------- + +async function handlePairRegister( + body: PairRegisterRequest | null, + config: RegistryConfig, + req: IncomingMessage, +): Promise<{ status: number; body: unknown }> { + const relayMap = readRelayMap(config); + const region = resolveRegion(req, body?.preferredRegion ?? undefined); + const relayUrl = resolveRelayUrl(relayMap, region); + if (!relayUrl) { + return { status: 500, body: { error: { code: 'RELAY_REGION_UNAVAILABLE', message: `No relay URL configured for region ${region}` } } }; + } + + const now = new Date().toISOString(); + const gatewayId = `gw_${crypto.randomUUID().replace(/-/g, '')}`; + const relaySecret = generateRelaySecret(); + const accessCode = generateAccessCode(); + const accessCodeExpiresAt = new Date( + Date.now() + parsePositiveInt(config.pairAccessCodeTtlSec, ACCESS_CODE_TTL_FALLBACK_SEC) * 1000, + ).toISOString(); + + const record: PairGatewayRecord = { + gatewayId, + relayUrl, + region, + displayName: body?.displayName?.trim() || null, + relaySecretHash: await sha256Hex(relaySecret), + accessCodeHash: await sha256Hex(accessCode), + accessCodeExpiresAt, + clientTokens: [], + createdAt: now, + updatedAt: now, + }; + + await putPairGateway(config.routesKv, record); + + const response: PairRegisterResponse = { + gatewayId, + relaySecret, + relayUrl, + accessCode, + accessCodeExpiresAt, + displayName: record.displayName, + region, + }; + return { status: 200, body: response }; +} + +async function handlePairAccessCode( + body: PairAccessCodeRequest | null, + config: RegistryConfig, +): Promise<{ status: number; body: unknown }> { + if (!body?.gatewayId?.trim()) return { status: 400, body: { error: { code: 'INVALID_GATEWAY_ID', message: 'gatewayId is required' } } }; + if (!body?.relaySecret?.trim()) return { status: 400, body: { error: { code: 'INVALID_RELAY_SECRET', message: 'relaySecret is required' } } }; + + const gatewayLookup = await getPairGateway(config.routesKv, body.gatewayId.trim()); + if (!gatewayLookup.ok) return pairingRecordCorruptResponse(gatewayLookup.gatewayId); + const record = gatewayLookup.record; + if (!record) return { status: 404, body: { error: { code: 'GATEWAY_NOT_FOUND', message: 'Gateway not found' } } }; + if (await sha256Hex(body.relaySecret.trim()) !== record.relaySecretHash) { + return { status: 401, body: { error: { code: 'UNAUTHORIZED', message: 'Invalid relay secret' } } }; + } + + const accessCode = generateAccessCode(); + const nextDisplayName = body.displayName === undefined + ? record.displayName + : body.displayName?.trim() || null; + const next: PairGatewayRecord = { + ...record, + displayName: nextDisplayName, + accessCodeHash: await sha256Hex(accessCode), + accessCodeExpiresAt: new Date( + Date.now() + parsePositiveInt(config.pairAccessCodeTtlSec, ACCESS_CODE_TTL_FALLBACK_SEC) * 1000, + ).toISOString(), + updatedAt: new Date().toISOString(), + }; + await putPairGateway(config.routesKv, next); + + const response: PairAccessCodeResponse = { + gatewayId: next.gatewayId, + relayUrl: next.relayUrl, + accessCode, + accessCodeExpiresAt: next.accessCodeExpiresAt as string, + displayName: next.displayName, + region: next.region, + }; + return { status: 200, body: response }; +} + +async function handlePairClaim( + body: PairClaimRequest | null, + config: RegistryConfig, +): Promise<{ status: number; body: unknown }> { + if (!body?.gatewayId?.trim()) return { status: 400, body: { error: { code: 'INVALID_GATEWAY_ID', message: 'gatewayId is required' } } }; + const normalizedAccessCode = normalizeAccessCode(body?.accessCode); + if (!normalizedAccessCode) return { status: 400, body: { error: { code: 'INVALID_ACCESS_CODE', message: 'accessCode is required' } } }; + + const gatewayLookup = await getPairGateway(config.routesKv, body.gatewayId.trim()); + if (!gatewayLookup.ok) return pairingRecordCorruptResponse(gatewayLookup.gatewayId); + const record = gatewayLookup.record; + if (!record) return { status: 404, body: { error: { code: 'GATEWAY_NOT_FOUND', message: 'Gateway not found' } } }; + + const codeHash = await sha256Hex(normalizedAccessCode); + if (!record.accessCodeHash || !record.accessCodeExpiresAt) { + return { status: 409, body: { error: { code: 'ACCESS_CODE_REQUIRED', message: 'Gateway does not have an active access code' } } }; + } + if (Date.parse(record.accessCodeExpiresAt) <= Date.now()) { + return { status: 410, body: { error: { code: 'ACCESS_CODE_EXPIRED', message: 'Access code expired' } } }; + } + if (codeHash !== record.accessCodeHash) { + return { status: 401, body: { error: { code: 'UNAUTHORIZED', message: 'Invalid access code' } } }; + } + + const now = new Date().toISOString(); + const issued = await mintClientToken(record, config, body.clientLabel, now); + const next: PairGatewayRecord = { + ...issued.record, + accessCodeHash: null, + accessCodeExpiresAt: null, + }; + await putPairGateway(config.routesKv, next); + return { status: 200, body: buildPairClaimResponse(next, issued.clientToken) }; +} + +async function handleVerify( + req: IncomingMessage, + config: RegistryConfig, + gatewayId: string, +): Promise<{ status: number; body: unknown }> { + if (!gatewayId.trim()) return { status: 400, body: { error: { code: 'INVALID_GATEWAY_ID', message: 'gatewayId is required' } } }; + const token = readBearerTokenFromNode(req); + if (!token) return { status: 401, body: { error: { code: 'UNAUTHORIZED', message: 'Missing token for verify' } } }; + + const gatewayLookup = await getPairGateway(config.routesKv, gatewayId.trim()); + if (!gatewayLookup.ok) return pairingRecordCorruptResponse(gatewayLookup.gatewayId); + const gateway = gatewayLookup.record; + if (!gateway) return { status: 404, body: { error: { code: 'GATEWAY_NOT_FOUND', message: 'Gateway not found' } } }; + + const tokenHash = await sha256Hex(token); + if (tokenHash === gateway.relaySecretHash) { + return { status: 200, body: { ok: true, role: 'gateway' } }; + } + if (gateway.clientTokens.some((item) => item.hash === tokenHash)) { + return { status: 200, body: { ok: true, role: 'client' } }; + } + return { status: 401, body: { error: { code: 'UNAUTHORIZED', message: 'Invalid pairing token' } } }; +} + +// ---------- KV Operations ---------- + +async function getPairGateway(routesKv: MemoryKV, gatewayId: string): Promise { + const raw = await routesKv.get(pairGatewayKey(gatewayId)); + if (!raw) return { ok: true, record: null }; + try { + const parsed = JSON.parse(raw) as Record | null; + if (!parsed || typeof parsed.gatewayId !== 'string' || typeof parsed.relaySecretHash !== 'string') { + return { ok: false, gatewayId }; + } + const clientTokens = Array.isArray(parsed.clientTokens) ? parsed.clientTokens : []; + return { + ok: true, + record: { + gatewayId: parsed.gatewayId, + relayUrl: typeof parsed.relayUrl === 'string' ? parsed.relayUrl : '', + region: typeof parsed.region === 'string' ? parsed.region : 'us', + displayName: typeof parsed.displayName === 'string' ? parsed.displayName : null, + relaySecretHash: parsed.relaySecretHash, + accessCodeHash: typeof parsed.accessCodeHash === 'string' ? parsed.accessCodeHash : null, + accessCodeExpiresAt: typeof parsed.accessCodeExpiresAt === 'string' ? parsed.accessCodeExpiresAt : null, + clientTokens: clientTokens + .filter((item): item is Record => typeof item === 'object' && item !== null && typeof item.hash === 'string') + .map((item) => ({ + hash: item.hash as string, + label: typeof item.label === 'string' ? item.label : null, + createdAt: typeof item.createdAt === 'string' ? item.createdAt : new Date().toISOString(), + lastUsedAt: typeof item.lastUsedAt === 'string' ? item.lastUsedAt : null, + })), + createdAt: typeof parsed.createdAt === 'string' ? parsed.createdAt : new Date().toISOString(), + updatedAt: typeof parsed.updatedAt === 'string' ? parsed.updatedAt : new Date().toISOString(), + }, + }; + } catch { + return { ok: false, gatewayId }; + } +} + +async function putPairGateway(routesKv: MemoryKV, record: PairGatewayRecord): Promise { + await routesKv.put(pairGatewayKey(record.gatewayId), JSON.stringify(record), { + expirationTtl: 365 * 24 * 3600, + }); +} + +// ---------- Helpers ---------- + +function resolveRegion(req: IncomingMessage, preferred?: string): string { + if (preferred?.trim()) return normalizeRegion(preferred); + // In Docker, use X-Real-Country header or default to 'us' + const country = (req.headers['x-real-country'] ?? 'US').toString().toUpperCase(); + if (country === 'CN') return 'cn'; + if (['SG', 'MY', 'TH', 'VN', 'ID', 'PH', 'JP', 'KR', 'TW', 'HK', 'MO', 'IN', 'AU', 'NZ'].includes(country)) return 'sg'; + if (['GB', 'DE', 'FR', 'IT', 'ES', 'NL', 'SE', 'NO', 'DK', 'FI', 'PL', 'CZ', 'AT', 'CH', 'BE', 'IE', 'PT'].includes(country)) return 'eu'; + return 'us'; +} + +function resolveRelayUrl(map: Record, region: string): string | null { + return map[region] ?? map.us ?? null; +} + +function readRelayMap(config: RegistryConfig): Record { + const raw = config.relayRegionMap; + if (!raw?.trim()) { + // Default: single self-hosted relay + return { us: config.relayUrl }; + } + const parsed = safeParseJson>(raw, {}); + return { + us: config.relayUrl, + ...Object.fromEntries( + Object.entries(parsed) + .filter(([, value]) => typeof value === 'string' && value.trim().startsWith('ws')) + .map(([key, value]) => [key.trim().toLowerCase(), value.trim()]), + ), + }; +} + +function readBearerTokenFromNode(req: IncomingMessage): string | null { + const auth = req.headers['authorization'] ?? ''; + if (!auth.toString().toLowerCase().startsWith('bearer ')) return null; + const token = auth.toString().slice(7).trim(); + return token || null; +} + +async function mintClientToken( + record: PairGatewayRecord, + config: RegistryConfig, + clientLabel: string | null | undefined, + now: string, +): Promise<{ record: PairGatewayRecord; clientToken: string }> { + const clientToken = generateClientToken(); + return { + clientToken, + record: { + ...record, + clientTokens: [ + { + hash: await sha256Hex(clientToken), + label: clientLabel?.trim() || null, + createdAt: now, + lastUsedAt: null, + }, + ...record.clientTokens, + ].slice(0, parsePositiveInt(config.pairClientTokenMax, PAIR_CLIENT_TOKEN_MAX_FALLBACK)), + updatedAt: now, + }, + }; +} + +function buildPairClaimResponse(record: PairGatewayRecord, clientToken: string): PairClaimResponse { + return { + gatewayId: record.gatewayId, + relayUrl: record.relayUrl, + clientToken, + displayName: record.displayName, + region: record.region, + }; +} + +function pairGatewayKey(gatewayId: string): string { + return `pair-gateway:${gatewayId}`; +} + +function pairingRecordCorruptResponse(gatewayId: string): { status: number; body: unknown } { + return { + status: 500, + body: { + error: { + code: 'PAIRING_RECORD_CORRUPT', + message: `Stored pairing record for ${gatewayId} is invalid. Reset the bridge pairing and pair again.`, + }, + }, + }; +} + +function generateAccessCode(): string { + let code = ''; + while (code.length < ACCESS_CODE_LENGTH) { + const randomBytes = crypto.getRandomValues(new Uint8Array(ACCESS_CODE_LENGTH)); + for (const byte of randomBytes) { + if (byte >= ACCESS_CODE_RANDOM_LIMIT) continue; + code += ACCESS_CODE_ALPHABET[byte % ACCESS_CODE_ALPHABET.length]; + if (code.length === ACCESS_CODE_LENGTH) break; + } + } + return code; +} + +function normalizeAccessCode(value: unknown): string { + if (typeof value !== 'string') return ''; + return value.trim().toUpperCase(); +} + +function generateRelaySecret(): string { + return `grs_${crypto.randomUUID().replace(/-/g, '')}${crypto.randomUUID().replace(/-/g, '')}`; +} + +function generateClientToken(): string { + return `gct_${crypto.randomUUID().replace(/-/g, '')}${crypto.randomUUID().replace(/-/g, '')}`; +} + +function safeParseJson(raw: string | null, fallback: T): T { + if (!raw) return fallback; + try { + return JSON.parse(raw) as T; + } catch { + return fallback; + } +} + +function logTelemetry( + event: string, + req: IncomingMessage, + url: URL, + status: number, + elapsedMs: number, +): void { + const pathname = url.pathname.startsWith('/v1/verify/') ? '/v1/verify/:gatewayId' : url.pathname; + console.log(JSON.stringify({ + scope: 'registry_worker', + event, + ts: new Date().toISOString(), + method: req.method, + path: pathname, + status, + elapsedMs, + })); +} + +// ---------- HTTP Helpers ---------- + +function corsHeaders(): Record { + return { + 'access-control-allow-origin': '*', + 'access-control-allow-methods': 'GET,POST,OPTIONS', + 'access-control-allow-headers': 'authorization,content-type,x-relay-trace-id,x-clawket-admin-secret', + }; +} + +function sendCors(res: ServerResponse, status: number): void { + res.writeHead(status, corsHeaders()); + res.end(); +} + +function sendJsonCors(res: ServerResponse, status: number, data: unknown): void { + const body = JSON.stringify(data); + res.writeHead(status, { + 'content-type': 'application/json; charset=utf-8', + 'cache-control': 'no-store', + ...corsHeaders(), + }); + res.end(body); +} + +type ReadJsonBodyResult = + | { ok: true; data: T | null } + | { ok: false; status: number; body: unknown }; + +async function readJsonBodyLimited(req: IncomingMessage, maxBytes: number): Promise> { + const declaredLength = parseContentLength(req); + if (declaredLength !== null && declaredLength > maxBytes) { + return { + ok: false, + status: 413, + body: { error: { code: 'PAYLOAD_TOO_LARGE', message: `Request body exceeds ${maxBytes} bytes` } }, + }; + } + + return new Promise((resolve) => { + const chunks: Buffer[] = []; + let totalBytes = 0; + let settled = false; + + const finish = (result: ReadJsonBodyResult): void => { + if (settled) return; + settled = true; + req.off('data', onData); + req.off('end', onEnd); + req.off('error', onError); + resolve(result); + }; + + const onData = (chunk: Buffer): void => { + totalBytes += chunk.length; + if (totalBytes > maxBytes) { + req.resume(); + finish({ + ok: false, + status: 413, + body: { error: { code: 'PAYLOAD_TOO_LARGE', message: `Request body exceeds ${maxBytes} bytes` } }, + }); + return; + } + chunks.push(chunk); + }; + + const onEnd = (): void => { + try { + finish({ ok: true, data: JSON.parse(Buffer.concat(chunks).toString()) as T }); + } catch { + finish({ ok: true, data: null }); + } + }; + + const onError = (): void => { + finish({ ok: true, data: null }); + }; + + req.on('data', onData); + req.on('end', onEnd); + req.on('error', onError); + }); +} + +function parseContentLength(req: IncomingMessage): number | null { + const raw = req.headers['content-length']; + const value = Array.isArray(raw) ? raw[0] : raw; + if (!value) return null; + const parsed = Number.parseInt(value, 10); + return Number.isFinite(parsed) && parsed >= 0 ? parsed : null; +} diff --git a/apps/relay-docker/src/relay-room.ts b/apps/relay-docker/src/relay-room.ts new file mode 100644 index 0000000..44a29aa --- /dev/null +++ b/apps/relay-docker/src/relay-room.ts @@ -0,0 +1,376 @@ +/** + * relay-room.ts — Docker port of the RelayRoom Durable Object. + * + * Each instance represents a single gateway room with its own state, + * WebSocket connections, and heartbeat lifecycle. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { parseRelayAuthQuery, resolveRelayAuthToken } from '@clawket/shared'; +import { RoomState, serializeAttachment, deserializeAttachment } from './cf-shim.js'; +import { RelayRuntime, touchClientActivity, touchGatewayActivity } from './relay/runtime.js'; +import { isRelayTokenAuthorized, resolveClientLabelFromToken, sha256Hex } from './relay/auth.js'; +import { + isConnectChallengeFrame, + isConnectStartReqFrame, + isPendingChallengeExpired, + normalizeMessage, + resolveAwaitingChallengeClientId, + isAwaitingChallengeExpired, + isClientIdleExpired, + isClientStaleForHandshake, + shouldEmitClientControlAfterSocketEvent, +} from './relay/frames.js'; +import { dropClientState, ensureHeartbeat, pruneExpiredAwaitingChallenges, prunePendingConnectStarts, pruneStaleHandshakeClients } from './relay/heartbeat.js'; +import { + allowMessage, + bufferClientConnectStart, + flushPendingChallenge, + forwardClientControlToGateway, + forwardClientMessageToGateway, + handleClientConnected, + handleGatewayConnected, + handleGatewayMessage, + prepareClientMessage, +} from './relay/routing.js'; +import { replaceGateway, sendControlToGateway } from './relay/control.js'; +import { + canAcceptGatewayOwner, + loadGatewayOwner, + loadMirroredClientTokenHashes, + loadRoomMeta, + rehydrateSockets, + reconcileSockets, + storeRoomMeta, + touchGatewayOwner, +} from './relay/storage.js'; +import { logRelayTelemetry } from './relay/telemetry.js'; +import { parsePositiveInt } from './relay/utils.js'; +import { + CONTROL_PREFIX, + SOCKET_CLOSE_CODES, + type Env, + type SocketAttachment, +} from './relay/types.js'; + +const WS_OPEN = 1; + +export class DockerRelayRoom { + private readonly runtime: RelayRuntime; + private initialized = false; + private initializationPromise: Promise | null = null; + private socketEventQueue: Promise = Promise.resolve(); + + constructor( + private readonly roomId: string, + env: Env, + ) { + const state = new RoomState(roomId); + this.runtime = new RelayRuntime(state, env); + // Set up alarm handler + state.storage.setAlarmHandler(() => this.alarm()); + } + + async initialize(): Promise { + if (this.initialized) return; + if (!this.initializationPromise) { + this.initializationPromise = (async () => { + await loadRoomMeta(this.runtime); + await loadMirroredClientTokenHashes(this.runtime); + await loadGatewayOwner(this.runtime); + rehydrateSockets(this.runtime); + await ensureHeartbeat(this.runtime); + this.initialized = true; + })() + .catch((error) => { + this.initialized = false; + throw error; + }) + .finally(() => { + this.initializationPromise = null; + }); + } + await this.initializationPromise; + } + + /** + * Handle a new WebSocket connection (after upgrade). + */ + async handleWebSocket( + ws: WsWebSocket, + urlString: string, + headers: Record, + ): Promise<{ accepted: boolean; status?: number; error?: string }> { + await this.initialize(); + const url = new URL(urlString, 'http://localhost'); + const query = parseRelayAuthQuery(url); + const traceId = (url.searchParams.get('traceId') ?? '').trim() || undefined; + if (!query.gatewayId) { + return { accepted: false, status: 400, error: 'gatewayId is required' }; + } + + await storeRoomMeta(this.runtime, query.gatewayId); + const { token, authSource } = resolveRelayAuthToken(query.token, { + headers: { get: (name: string) => headers[name.toLowerCase()] ?? null }, + } as unknown as Request); + if (!token) { + logRelayTelemetry('relay_worker', 'ws_auth_rejected', { + role: query.role, + authSource, + reason: 'missing_token', + }); + return { accepted: false, status: 401, error: 'Missing token for relay connection' }; + } + + const authorized = await isRelayTokenAuthorized({ + routesKv: this.runtime.env.ROUTES_KV, + registryVerifyUrl: this.runtime.env.REGISTRY_VERIFY_URL, + gatewayId: query.gatewayId, + role: query.role, + token, + mirroredClientTokenHashes: this.runtime.mirroredClientTokenHashes, + }); + if (!authorized) { + logRelayTelemetry('relay_worker', 'ws_auth_rejected', { + role: query.role, + authSource, + reason: 'invalid_token', + }); + return { accepted: false, status: 401, error: 'Invalid token for relay connection' }; + } + + const gatewayId = query.role === 'gateway' + ? (query.clientId || `legacy-${(await sha256Hex(token)).slice(0, 16)}`) + : ''; + if (query.role === 'gateway') { + const leaseMs = parsePositiveInt(this.runtime.env.GATEWAY_OWNER_LEASE_MS, 20_000); + if (!canAcceptGatewayOwner(this.runtime, gatewayId, Date.now(), leaseMs)) { + logRelayTelemetry('relay_worker', 'gateway_owner_locked', { + role: query.role, + hasGateway: Boolean(this.runtime.gatewaySocket?.readyState === WS_OPEN), + }); + return { accepted: false, status: 409, error: 'Gateway owner is locked by another active gateway runtime' }; + } + } + + const clientId = query.role === 'gateway' ? gatewayId : (query.clientId || crypto.randomUUID()); + const clientLabel = query.role === 'client' + ? await resolveClientLabelFromToken(this.runtime.env.ROUTES_KV, query.gatewayId, token) + : null; + const attachment: SocketAttachment = { + role: query.role, + clientId, + connectedAt: Date.now(), + traceId, + clientLabel, + }; + + // Track the socket in the room state + this.runtime.state.acceptWebSocket(ws); + serializeAttachment(ws, attachment); + reconcileSockets(this.runtime, { preferredSocket: ws }); + + if (query.role === 'gateway') { + replaceGateway(this.runtime, ws); + touchGatewayActivity(this.runtime, attachment.connectedAt); + await touchGatewayOwner(this.runtime, clientId, true); + handleGatewayConnected(this.runtime); + } else { + const previousClient = this.runtime.clients.get(clientId); + if (previousClient && previousClient !== ws && previousClient.readyState === WS_OPEN) { + previousClient.close(SOCKET_CLOSE_CODES.REPLACED_BY_NEW_CLIENT_SOCKET, 'replaced_by_new_client_socket'); + logRelayTelemetry('relay_worker', 'client_socket_replaced', { + role: query.role, + clientCount: this.runtime.clients.size, + }); + } + handleClientConnected(this.runtime, clientId, ws); + } + + logRelayTelemetry('relay_worker', 'ws_connected', { + role: query.role, + authSource, + clientCount: this.runtime.clients.size, + hasGateway: Boolean(this.runtime.gatewaySocket?.readyState === WS_OPEN), + }); + + void ensureHeartbeat(this.runtime).catch((error) => { + logRelayTelemetry('relay_worker', 'heartbeat_schedule_failed', { + errorMessage: error instanceof Error ? error.message : String(error), + }); + }); + + // Wire up ws event handlers + ws.on('message', (message) => { + void this.enqueueSocketEvent('message', () => this.onMessage(ws, message)); + }); + ws.on('close', () => { + void this.enqueueSocketEvent('close', () => this.onClose(ws)); + }); + ws.on('error', () => { + void this.enqueueSocketEvent('error', () => this.onError(ws)); + }); + + return { accepted: true }; + } + + private enqueueSocketEvent( + eventType: 'message' | 'close' | 'error', + handler: () => Promise, + ): Promise { + const run = this.socketEventQueue.then(handler); + this.socketEventQueue = run.catch((error) => { + logRelayTelemetry('relay_worker', 'ws_event_handler_error', { + eventType, + errorMessage: error instanceof Error ? error.message : String(error), + }); + }); + return this.socketEventQueue; + } + + private async onMessage(ws: WsWebSocket, message: unknown): Promise { + const attachment = deserializeAttachment(ws) as SocketAttachment | null; + if (!attachment) return; + + const text = normalizeMessage(message as string | ArrayBuffer | Buffer); + if (text == null) return; + if (attachment.role === 'client') { + touchClientActivity(this.runtime, attachment.clientId); + } + if (!allowMessage(this.runtime, ws, attachment, text)) { + ws.close(SOCKET_CLOSE_CODES.RATE_LIMITED, 'rate_limited'); + return; + } + + if (attachment.role === 'gateway') { + await handleGatewayMessage(this.runtime, attachment, text, (gatewayClientId) => + touchGatewayOwner(this.runtime, gatewayClientId), + ); + return; + } + + if (text.startsWith(CONTROL_PREFIX)) { + forwardClientControlToGateway(this.runtime, attachment, text); + return; + } + + const isConnectStart = prepareClientMessage(this.runtime, attachment, text); + if (isConnectStart == null) return; + + if (this.runtime.gatewaySocket?.readyState === WS_OPEN) { + forwardClientMessageToGateway(this.runtime, attachment, text, isConnectStart); + return; + } + + if (isConnectStart) { + bufferClientConnectStart(this.runtime, attachment, text); + } + } + + private async onClose(ws: WsWebSocket): Promise { + await this.removeSocket(ws, 'close'); + } + + private async onError(ws: WsWebSocket): Promise { + await this.removeSocket(ws, 'error'); + } + + private async alarm(): Promise { + const now = Date.now(); + pruneStaleHandshakeClients(this.runtime, now); + pruneExpiredAwaitingChallenges(this.runtime, now); + prunePendingConnectStarts(this.runtime, now); + const flushedChallenge = flushPendingChallenge(this.runtime, now); + const payload = JSON.stringify({ type: 'tick', ts: now }); + const deadClients: Array<{ clientId: string; socket: WsWebSocket }> = []; + + for (const [clientId, client] of this.runtime.clients.entries()) { + if (client.readyState !== WS_OPEN) { + deadClients.push({ clientId, socket: client }); + continue; + } + try { + client.send(payload); + } catch { + deadClients.push({ clientId, socket: client }); + } + } + + let removedDeadClients = 0; + for (const { clientId, socket } of deadClients) { + removedDeadClients += dropClientState(this.runtime, clientId, 'dead_socket_on_tick') ? 1 : 0; + try { + socket.close(SOCKET_CLOSE_CODES.DEAD_SOCKET, 'dead_socket'); + } catch { + // Best effort cleanup + } + } + + if (removedDeadClients > 0) { + if (this.runtime.clients.size === 0) { + this.runtime.pendingChallenge = null; + sendControlToGateway(this.runtime, 'client_disconnected', { count: 0 }); + } else { + sendControlToGateway(this.runtime, 'client_count', { count: this.runtime.clients.size }); + } + } + + logRelayTelemetry('relay_worker', 'alarm_tick', { + clientCount: this.runtime.clients.size, + hasGateway: Boolean(this.runtime.gatewaySocket?.readyState === WS_OPEN), + hasPendingChallenge: Boolean(this.runtime.pendingChallenge), + awaitingChallengeCount: this.runtime.awaitingChallenge.size, + flushedChallenge, + deadClientsRemoved: removedDeadClients, + }); + await ensureHeartbeat(this.runtime); + } + + private async removeSocket(ws: WsWebSocket, reason: 'close' | 'error'): Promise { + // Remove from state tracking + this.runtime.state.removeWebSocket(ws); + + const attachment = deserializeAttachment(ws) as SocketAttachment | null; + if (!attachment) return; + + if (attachment.role === 'gateway') { + if (this.runtime.gatewaySocket === ws) { + this.runtime.gatewaySocket = null; + this.runtime.pendingChallenge = null; + await touchGatewayOwner(this.runtime, attachment.clientId, true); + } + } else { + const wasCurrentClientMapping = this.runtime.clients.get(attachment.clientId) === ws; + if (wasCurrentClientMapping) { + dropClientState(this.runtime, attachment.clientId, `socket_${reason}`); + } + if (shouldEmitClientControlAfterSocketEvent(wasCurrentClientMapping)) { + if (this.runtime.clients.size === 0) { + this.runtime.pendingChallenge = null; + sendControlToGateway(this.runtime, 'client_disconnected', { count: 0 }); + } else { + sendControlToGateway(this.runtime, 'client_count', { count: this.runtime.clients.size }); + } + } + } + + logRelayTelemetry('relay_worker', 'ws_disconnected', { + role: attachment.role, + reason, + clientCount: this.runtime.clients.size, + hasGateway: Boolean(this.runtime.gatewaySocket?.readyState === WS_OPEN), + }); + + await ensureHeartbeat(this.runtime); + } + + /** Check if room has any active connections */ + get hasConnections(): boolean { + return this.runtime.clients.size > 0 || + (this.runtime.gatewaySocket?.readyState === WS_OPEN) === true; + } + + destroy(): void { + this.runtime.state.destroy(); + } +} diff --git a/apps/relay-docker/src/relay-server.ts b/apps/relay-docker/src/relay-server.ts new file mode 100644 index 0000000..6f3219e --- /dev/null +++ b/apps/relay-docker/src/relay-server.ts @@ -0,0 +1,136 @@ +/** + * relay-server.ts — WebSocket relay server for Docker deployment. + * + * Replaces the relay-worker Cloudflare Worker entry point. + * Uses the `ws` library for WebSocket upgrades. + */ + +import { createServer, type IncomingMessage, type ServerResponse } from 'node:http'; +import { WebSocketServer } from 'ws'; +import { parseRelayAuthQuery } from '@clawket/shared'; +import type { RoomManager } from './room-manager.js'; + +export function createRelayServer( + roomManager: RoomManager, + port: number, +): { start: () => void; close: () => Promise } { + const httpServer = createServer(async (req, res) => { + const url = new URL(req.url ?? '/', `http://localhost:${port}`); + + if (req.method === 'GET' && url.pathname === '/v1/health') { + sendJson(res, 200, { ok: true, runtime: 'docker' }); + return; + } + + // Non-websocket, non-API routes + if (url.pathname !== '/ws') { + sendJson(res, 404, { error: { code: 'NOT_FOUND', message: 'Route not found' } }); + return; + } + + // For ws path without upgrade header, return error + sendJson(res, 426, { error: { code: 'UPGRADE_REQUIRED', message: 'Expected websocket upgrade' } }); + }); + + const wss = new WebSocketServer({ noServer: true }); + + httpServer.on('upgrade', (req: IncomingMessage, socket, head) => { + const url = new URL(req.url ?? '/', `http://localhost:${port}`); + if (url.pathname !== '/ws') { + socket.destroy(); + return; + } + + const query = parseRelayAuthQuery(url); + if (!query.gatewayId) { + socket.write('HTTP/1.1 400 Bad Request\r\n\r\n'); + socket.destroy(); + return; + } + + const room = roomManager.getRoom(query.gatewayId); + + wss.handleUpgrade(req, socket, head, (ws) => { + void (async () => { + try { + // Build headers map for the room + const headers: Record = {}; + for (const [key, value] of Object.entries(req.headers)) { + if (typeof value === 'string') { + headers[key.toLowerCase()] = value; + } + } + + const result = await room.handleWebSocket( + ws, + url.toString(), + headers, + ); + + if (!result.accepted) { + ws.close( + result.status === 401 ? 4401 : result.status === 409 ? 4409 : 4400, + result.error ?? 'rejected', + ); + } + } catch { + try { + if (ws.readyState < 2) { + ws.close(1011, 'internal error'); + } else { + ws.terminate(); + } + } catch { + ws.terminate(); + } + } + })(); + }); + }); + + return { + start: () => { + httpServer.listen(port, () => { + console.log(`[relay-server] WebSocket relay listening on port ${port}`); + }); + }, + close: async () => { + await closeWebSocketServer(wss); + await closeHttpServer(httpServer); + }, + }; +} + +function closeWebSocketServer(server: WebSocketServer): Promise { + return new Promise((resolve, reject) => { + server.close((error?: Error) => { + if (error) { + reject(error); + return; + } + resolve(); + }); + }); +} + +function closeHttpServer(server: ReturnType): Promise { + return new Promise((resolve, reject) => { + server.close((error) => { + if (error && (error as NodeJS.ErrnoException).code !== 'ERR_SERVER_NOT_RUNNING') { + reject(error); + return; + } + resolve(); + }); + }); +} + +function sendJson(res: ServerResponse, status: number, data: unknown): void { + const body = JSON.stringify(data); + res.writeHead(status, { + 'content-type': 'application/json; charset=utf-8', + 'cache-control': 'no-store', + 'access-control-allow-origin': '*', + }); + res.end(body); +} diff --git a/apps/relay-docker/src/relay/auth.ts b/apps/relay-docker/src/relay/auth.ts new file mode 100644 index 0000000..e9f44a1 --- /dev/null +++ b/apps/relay-docker/src/relay/auth.ts @@ -0,0 +1,92 @@ +/** + * auth.ts — Relay token authentication. + * Ported from apps/relay-worker/src/relay/auth.ts. + * Adapted: uses MemoryKV instead of KVNamespace. + */ + +import { sha256Hex } from '@clawket/shared'; +import type { PairGatewayRecord } from './types.js'; +import type { MemoryKV } from '../kv-store.js'; + +function pairGatewayKey(gatewayId: string): string { + return `pair-gateway:${gatewayId}`; +} + +export type RelayAuthInput = { + routesKv: MemoryKV; + registryVerifyUrl?: string; + gatewayId: string; + role: 'gateway' | 'client'; + token: string; + mirroredClientTokenHashes?: ReadonlySet; +}; + +export async function isRelayTokenAuthorized(input: RelayAuthInput): Promise { + const { routesKv, registryVerifyUrl, gatewayId, role, token, mirroredClientTokenHashes } = input; + const tokenHash = await sha256Hex(token); + if (role === 'client' && mirroredClientTokenHashes?.has(tokenHash)) { + return true; + } + const pairGateway = await getPairGateway(routesKv, gatewayId); + if (pairGateway) { + if (role === 'gateway') { + if (tokenHash === pairGateway.relaySecretHash) return true; + return verifyViaRegistry(registryVerifyUrl, gatewayId, token); + } + if (Array.isArray(pairGateway.clientTokens) + && pairGateway.clientTokens.some((item) => item?.hash === tokenHash)) { + return true; + } + return verifyViaRegistry(registryVerifyUrl, gatewayId, token); + } + return verifyViaRegistry(registryVerifyUrl, gatewayId, token); +} + +export async function getPairGateway(routesKv: MemoryKV, gatewayId: string): Promise { + const raw = await routesKv.get(pairGatewayKey(gatewayId)); + if (!raw) return null; + try { + const parsed = JSON.parse(raw) as PairGatewayRecord; + return parsed && typeof parsed.gatewayId === 'string' && typeof parsed.relaySecretHash === 'string' + ? parsed + : null; + } catch { + return null; + } +} + +export async function resolveClientLabelFromToken( + routesKv: MemoryKV, + gatewayId: string, + token: string, +): Promise { + const pairGateway = await getPairGateway(routesKv, gatewayId); + if (!pairGateway || !Array.isArray(pairGateway.clientTokens)) return null; + const tokenHash = await sha256Hex(token); + const matched = pairGateway.clientTokens.find((item) => item?.hash === tokenHash); + return matched?.label?.trim() || null; +} + +export { sha256Hex } from '@clawket/shared'; + +async function verifyViaRegistry( + registryVerifyUrl: string | undefined, + gatewayId: string, + token: string, +): Promise { + const base = registryVerifyUrl?.trim(); + if (!base) return false; + + try { + const endpoint = `${base.replace(/\/+$/, '')}/v1/verify/${encodeURIComponent(gatewayId)}`; + const response = await fetch(endpoint, { + method: 'GET', + headers: { + authorization: `Bearer ${token}`, + }, + }); + return response.status === 200; + } catch { + return false; + } +} diff --git a/apps/relay-docker/src/relay/control.ts b/apps/relay-docker/src/relay/control.ts new file mode 100644 index 0000000..17ffe7a --- /dev/null +++ b/apps/relay-docker/src/relay/control.ts @@ -0,0 +1,84 @@ +/** + * control.ts — Relay control envelope handling. + * Ported from apps/relay-worker/src/relay/control.ts. + * Adapted: uses ws.WebSocket + attachment shim instead of CF WebSocket. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { + CONTROL_PREFIX, + SOCKET_CLOSE_CODES, + type RelayControlEnvelope, + type SocketAttachment, +} from './types.js'; +import { logRelayTelemetry } from './telemetry.js'; +import type { RelayRuntime } from './runtime.js'; +import { deserializeAttachment } from '../cf-shim.js'; + +export function replaceGateway(runtime: RelayRuntime, nextGateway: WsWebSocket): void { + if (runtime.gatewaySocket + && runtime.gatewaySocket !== nextGateway + && runtime.gatewaySocket.readyState === 1 /* WebSocket.OPEN */) { + runtime.pendingChallenge = null; + runtime.gatewaySocket.close(SOCKET_CLOSE_CODES.REPLACED_BY_NEW_GATEWAY, 'replaced_by_new_gateway'); + } + runtime.gatewaySocket = nextGateway; +} + +export function parseControlEnvelope(text: string): RelayControlEnvelope | null { + if (!text.startsWith(CONTROL_PREFIX)) return null; + try { + const parsed = JSON.parse(text.slice(CONTROL_PREFIX.length)); + if (!parsed || typeof parsed !== 'object' || Array.isArray(parsed)) { + return null; + } + return parsed as RelayControlEnvelope; + } catch { + return null; + } +} + +export function serializeControlEnvelope(envelope: RelayControlEnvelope): string { + return `${CONTROL_PREFIX}${JSON.stringify(envelope)}`; +} + +function normalizeControlEvent(envelope: RelayControlEnvelope): string | null { + return typeof envelope.event === 'string' && envelope.event.trim() + ? envelope.event.trim() + : null; +} + +export function sendControlToGateway( + runtime: RelayRuntime, + event: string, + payload?: Record, +): void { + if (!runtime.gatewaySocket || runtime.gatewaySocket.readyState !== 1 /* OPEN */) return; + runtime.gatewaySocket.send(serializeControlEnvelope({ + type: 'control', + event, + ...(payload ?? {}), + })); + logRelayTelemetry('relay_worker', 'control_sent', { + controlEvent: event, + count: payload?.count, + clientCount: runtime.clients.size, + }); +} + +export function logControlRoutingTelemetry( + runtime: RelayRuntime, + event: string, + attachment: SocketAttachment, + envelope: RelayControlEnvelope, + extra: Record = {}, +): void { + logRelayTelemetry('relay_worker', event, { + role: attachment.role, + controlEvent: normalizeControlEvent(envelope), + hasSourceClient: typeof envelope.sourceClientId === 'string' && envelope.sourceClientId.trim().length > 0, + hasTargetClient: typeof envelope.targetClientId === 'string' && envelope.targetClientId.trim().length > 0, + clientCount: runtime.clients.size, + ...extra, + }); +} diff --git a/apps/relay-docker/src/relay/frames.ts b/apps/relay-docker/src/relay/frames.ts new file mode 100644 index 0000000..c05d932 --- /dev/null +++ b/apps/relay-docker/src/relay/frames.ts @@ -0,0 +1,115 @@ +/** + * frames.ts — Relay frame parsing and inspection. + * Ported verbatim from apps/relay-worker/src/relay/frames.ts. + */ + +import { CONTROL_PREFIX, PENDING_CHALLENGE_TTL_MS } from './types.js'; + +export function normalizeMessage(message: string | ArrayBuffer | Buffer): string | null { + if (typeof message === 'string') return message; + if (message instanceof ArrayBuffer) { + return new TextDecoder().decode(message); + } + if (Buffer.isBuffer(message)) { + return message.toString('utf-8'); + } + return null; +} + +export function isPendingChallengeExpired(queuedAt: number, now: number): boolean { + return now - queuedAt > PENDING_CHALLENGE_TTL_MS; +} + +export function isAwaitingChallengeExpired(queuedAt: number, now: number, ttlMs: number): boolean { + return now - queuedAt > ttlMs; +} + +export function isClientStaleForHandshake( + lastActivityAt: number, + awaitingQueuedAt: number, + now: number, + ttlMs: number, +): boolean { + if (!isAwaitingChallengeExpired(awaitingQueuedAt, now, ttlMs)) return false; + return now - lastActivityAt > ttlMs; +} + +export function isClientIdleExpired(lastActivityAt: number, now: number, timeoutMs: number): boolean { + return now - lastActivityAt > timeoutMs; +} + +export function shouldEmitClientControlAfterSocketEvent(wasCurrentClientMapping: boolean): boolean { + return wasCurrentClientMapping; +} + +export function resolveAwaitingChallengeClientId(input: { + awaitingChallenge: Array<{ clientId: string; queuedAt: number }>; + openClientIds: string[]; + preferredClientId?: string | null; + activeClientId?: string | null; + now?: number; +}): string | null { + void input.now; + const openClientIds = new Set(input.openClientIds); + let candidateClientId: string | null = null; + let candidateQueuedAt = Number.POSITIVE_INFINITY; + + for (const entry of input.awaitingChallenge) { + if (!openClientIds.has(entry.clientId)) continue; + if (entry.queuedAt < candidateQueuedAt) { + candidateClientId = entry.clientId; + candidateQueuedAt = entry.queuedAt; + } + } + if (candidateClientId) return candidateClientId; + + const preferredClientId = input.preferredClientId ?? input.activeClientId ?? null; + if (preferredClientId && openClientIds.has(preferredClientId)) { + return preferredClientId; + } + + for (const clientId of input.openClientIds) { + return clientId; + } + + return null; +} + +export function isConnectChallengeFrame(data: string): boolean { + try { + const parsed = JSON.parse(data) as { type?: unknown; event?: unknown }; + return parsed?.type === 'event' && parsed?.event === 'connect.challenge'; + } catch { + return false; + } +} + +export function isConnectStartReqFrame(data: string): boolean { + try { + const parsed = JSON.parse(data) as { type?: unknown; method?: unknown }; + return parsed?.type === 'req' && (parsed?.method === 'connect.start' || parsed?.method === 'connect'); + } catch { + return false; + } +} + +export function parseConnectReqId(data: string): string | null { + try { + const parsed = JSON.parse(data) as { type?: unknown; method?: unknown; id?: unknown }; + if (parsed?.type !== 'req') return null; + if (parsed?.method !== 'connect' && parsed?.method !== 'connect.start') return null; + return typeof parsed.id === 'string' && parsed.id.trim() ? parsed.id : null; + } catch { + return null; + } +} + +export function parseResponseId(data: string): string | null { + try { + const parsed = JSON.parse(data) as { type?: unknown; id?: unknown }; + if (parsed?.type !== 'res') return null; + return typeof parsed.id === 'string' && parsed.id.trim() ? parsed.id : null; + } catch { + return null; + } +} diff --git a/apps/relay-docker/src/relay/heartbeat.ts b/apps/relay-docker/src/relay/heartbeat.ts new file mode 100644 index 0000000..74cba70 --- /dev/null +++ b/apps/relay-docker/src/relay/heartbeat.ts @@ -0,0 +1,151 @@ +/** + * heartbeat.ts — Relay heartbeat and client pruning. + * Ported from apps/relay-worker/src/relay/heartbeat.ts. + * Adapted: uses ws.WebSocket readyState constant. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { + CONNECT_START_BUFFER_TTL_MS, + SOCKET_CLOSE_CODES, +} from './types.js'; +import { + isAwaitingChallengeExpired, + isClientIdleExpired, + isClientStaleForHandshake, + resolveAwaitingChallengeClientId, +} from './frames.js'; +import { logRelayTelemetry } from './telemetry.js'; +import type { RelayRuntime } from './runtime.js'; +import { parsePositiveInt } from './utils.js'; +import { sendControlToGateway } from './control.js'; + +const WS_OPEN = 1; + +export async function ensureHeartbeat(runtime: RelayRuntime): Promise { + const interval = parsePositiveInt(runtime.env.HEARTBEAT_INTERVAL_MS, 30_000); + if (!hasOpenClients(runtime)) { + await runtime.state.storage.deleteAlarm(); + return; + } + await runtime.state.storage.setAlarm(Date.now() + interval); +} + +export function hasOpenClients(runtime: RelayRuntime): boolean { + for (const ws of runtime.clients.values()) { + if (ws.readyState === WS_OPEN) return true; + } + return false; +} + +export function prunePendingConnectStarts(runtime: RelayRuntime, now: number): void { + for (const [clientId, pending] of runtime.pendingConnectStarts.entries()) { + if (now - pending.queuedAt <= CONNECT_START_BUFFER_TTL_MS) continue; + runtime.pendingConnectStarts.delete(clientId); + logRelayTelemetry('relay_worker', 'connect_start_buffer_expired', { + role: 'client', + queuedMs: Math.max(0, now - pending.queuedAt), + }); + } +} + +export function pruneExpiredAwaitingChallenges(runtime: RelayRuntime, now: number): void { + const ttlMs = runtime.awaitingChallengeTtlMs(); + for (const [clientId, entry] of runtime.awaitingChallenge.entries()) { + if (!isAwaitingChallengeExpired(entry.queuedAt, now, ttlMs)) continue; + runtime.awaitingChallenge.delete(clientId); + runtime.connectStartAtByClientId.delete(clientId); + for (const [reqId, reqClientId] of runtime.connectReqClientByReqId.entries()) { + if (reqClientId === clientId) { + runtime.connectReqClientByReqId.delete(reqId); + } + } + if (runtime.challengeClientId === clientId) { + runtime.challengeClientId = null; + } + logRelayTelemetry('relay_worker', 'awaiting_challenge_expired', { + queuedMs: Math.max(0, now - entry.queuedAt), + ttlMs, + clientCount: runtime.clients.size, + }); + } +} + +export function pruneStaleHandshakeClients(runtime: RelayRuntime, now: number): void { + let changed = false; + const ttlMs = runtime.awaitingChallengeTtlMs(); + const idleTimeoutMs = runtime.clientIdleTimeoutMs(); + for (const [clientId, client] of runtime.clients.entries()) { + if (client.readyState !== WS_OPEN) { + changed = dropClientState(runtime, clientId, 'non_open_ready_state') || changed; + continue; + } + const lastActivityAt = runtime.clientLastActivityAtById.get(clientId) ?? 0; + if (lastActivityAt > 0 && isClientIdleExpired(lastActivityAt, now, idleTimeoutMs)) { + try { + client.close(SOCKET_CLOSE_CODES.IDLE_OR_STALE_TIMEOUT, 'idle_timeout'); + } catch { + // Best effort cleanup; stale sockets may already be detached remotely. + } + changed = dropClientState(runtime, clientId, 'idle_timeout') || changed; + continue; + } + const awaiting = runtime.awaitingChallenge.get(clientId); + if (!awaiting) continue; + const handshakeActivityAt = lastActivityAt > 0 ? lastActivityAt : awaiting.queuedAt; + if (!isClientStaleForHandshake(handshakeActivityAt, awaiting.queuedAt, now, ttlMs)) { + continue; + } + try { + client.close(SOCKET_CLOSE_CODES.IDLE_OR_STALE_TIMEOUT, 'stale_handshake_timeout'); + } catch { + // Best effort cleanup; stale sockets may already be detached remotely. + } + changed = dropClientState(runtime, clientId, 'stale_handshake_timeout') || changed; + } + if (!changed) return; + if (runtime.clients.size === 0) { + runtime.pendingChallenge = null; + sendControlToGateway(runtime, 'client_disconnected', { count: 0 }); + return; + } + sendControlToGateway(runtime, 'client_count', { count: runtime.clients.size }); +} + +export function dropClientState(runtime: RelayRuntime, clientId: string, reason: string): boolean { + if (!runtime.clients.has(clientId)) return false; + runtime.clients.delete(clientId); + runtime.clientLastActivityAtById.delete(clientId); + runtime.connectStartAtByClientId.delete(clientId); + runtime.pendingConnectStarts.delete(clientId); + runtime.awaitingChallenge.delete(clientId); + for (const [reqId, reqClientId] of runtime.connectReqClientByReqId.entries()) { + if (reqClientId === clientId) { + runtime.connectReqClientByReqId.delete(reqId); + } + } + if (runtime.activeClientId === clientId) { + runtime.activeClientId = null; + for (const [nextClientId, nextClient] of runtime.clients.entries()) { + if (nextClient.readyState === WS_OPEN) { + runtime.activeClientId = nextClientId; + break; + } + } + } + if (runtime.challengeClientId === clientId) { + runtime.challengeClientId = resolveAwaitingChallengeClientId({ + awaitingChallenge: Array.from(runtime.awaitingChallenge.values()), + openClientIds: Array.from(runtime.clients.entries()) + .filter(([, client]) => client.readyState === WS_OPEN) + .map(([nextClientId]) => nextClientId), + preferredClientId: null, + activeClientId: runtime.activeClientId, + }); + } + logRelayTelemetry('relay_worker', 'client_pruned', { + reason, + clientCount: runtime.clients.size, + }); + return true; +} diff --git a/apps/relay-docker/src/relay/routing.ts b/apps/relay-docker/src/relay/routing.ts new file mode 100644 index 0000000..7e739c1 --- /dev/null +++ b/apps/relay-docker/src/relay/routing.ts @@ -0,0 +1,478 @@ +/** + * routing.ts — Relay message routing. + * Ported from apps/relay-worker/src/relay/routing.ts. + * Adapted: uses ws.WebSocket + attachment shim. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { CONTROL_PREFIX, type RelayControlEnvelope, type SocketAttachment } from './types.js'; +import { + isConnectChallengeFrame, + isConnectStartReqFrame, + isPendingChallengeExpired, + parseConnectReqId, + parseResponseId, + resolveAwaitingChallengeClientId, +} from './frames.js'; +import { logRelayTelemetry } from './telemetry.js'; +import type { RelayRuntime } from './runtime.js'; +import { touchClientActivity, touchGatewayActivity } from './runtime.js'; +import { prunePendingConnectStarts } from './heartbeat.js'; +import { + logControlRoutingTelemetry, + parseControlEnvelope, + sendControlToGateway, + serializeControlEnvelope, +} from './control.js'; +import { parsePositiveInt } from './utils.js'; +import { deserializeAttachment } from '../cf-shim.js'; + +const WS_OPEN = 1; + +export function allowMessage( + runtime: RelayRuntime, + ws: WsWebSocket, + attachment: SocketAttachment, + text: string, +): boolean { + if (attachment.role === 'gateway') { + return true; + } + + if (isConnectStartReqFrame(text)) { + return true; + } + + const fallback = parsePositiveInt(runtime.env.MAX_MESSAGES_PER_10S, 120); + const max = parsePositiveInt(runtime.env.MAX_CLIENT_MESSAGES_PER_10S, Math.max(300, fallback)); + const now = Date.now(); + const existing = runtime.rate.get(ws); + if (!existing) { + runtime.rate.set(ws, { windowStart: now, count: 1 }); + return true; + } + + if (now - existing.windowStart >= 10_000) { + existing.windowStart = now; + existing.count = 1; + return true; + } + + existing.count += 1; + return existing.count <= max; +} + +export function markAwaitingChallenge(runtime: RelayRuntime, clientId: string, queuedAt: number): void { + const existing = runtime.awaitingChallenge.get(clientId); + if (existing && existing.queuedAt <= queuedAt) { + return; + } + runtime.awaitingChallenge.set(clientId, { clientId, queuedAt }); +} + +export function resolveChallengeClientId(runtime: RelayRuntime, now: number): string | null { + const awaitingClientId = resolveAwaitingChallengeClientId({ + awaitingChallenge: Array.from(runtime.awaitingChallenge.values()), + openClientIds: Array.from(runtime.clients.entries()) + .filter(([, client]) => client.readyState === WS_OPEN) + .map(([clientId]) => clientId), + preferredClientId: runtime.challengeClientId, + activeClientId: runtime.activeClientId, + now, + }); + if (awaitingClientId) { + runtime.challengeClientId = awaitingClientId; + return awaitingClientId; + } + + const preferredClientId = runtime.challengeClientId ?? runtime.activeClientId; + if (preferredClientId) { + const preferredClient = runtime.clients.get(preferredClientId); + if (preferredClient?.readyState === WS_OPEN) { + if (runtime.challengeClientId !== preferredClientId) { + runtime.challengeClientId = preferredClientId; + } + return preferredClientId; + } + } + + for (const [clientId, client] of runtime.clients.entries()) { + if (client.readyState !== WS_OPEN) continue; + runtime.challengeClientId = clientId; + return clientId; + } + + runtime.challengeClientId = null; + return null; +} + +export function tryDeliverChallenge( + runtime: RelayRuntime, + data: string, + gatewayAttachment: SocketAttachment, + now: number, + buffered: boolean, +): boolean { + const challengeClientId = resolveChallengeClientId(runtime, now); + const challengeClient = challengeClientId ? runtime.clients.get(challengeClientId) : null; + if (!challengeClientId || challengeClient?.readyState !== WS_OPEN) { + return false; + } + + challengeClient.send(data); + touchClientActivity(runtime, challengeClientId); + runtime.awaitingChallenge.delete(challengeClientId); + const connectStartAt = runtime.connectStartAtByClientId.get(challengeClientId); + const payload: Record = { + role: 'gateway', + buffered, + awaitingChallengeCount: runtime.awaitingChallenge.size, + clientCount: runtime.clients.size, + }; + if (typeof connectStartAt === 'number') { + payload.relayLegMs = Math.max(0, now - connectStartAt); + runtime.connectStartAtByClientId.delete(challengeClientId); + } + logRelayTelemetry('relay_worker', 'challenge_delivered', payload); + return true; +} + +export function flushPendingChallenge(runtime: RelayRuntime, now: number): boolean { + const pending = runtime.pendingChallenge; + if (!pending) return false; + if (runtime.awaitingChallenge.size === 0) { + logRelayTelemetry('relay_worker', 'challenge_buffer_dropped_without_awaiting_client', { + role: 'gateway', + queuedMs: Math.max(0, now - pending.queuedAt), + clientCount: runtime.clients.size, + }); + runtime.pendingChallenge = null; + return false; + } + const currentGatewayAttachment = runtime.gatewaySocket + ? deserializeAttachment(runtime.gatewaySocket) as SocketAttachment | null + : null; + if (!currentGatewayAttachment + || currentGatewayAttachment.role !== 'gateway' + || currentGatewayAttachment.clientId !== pending.gatewayClientId) { + logRelayTelemetry('relay_worker', 'challenge_buffer_dropped_stale_gateway', { + role: 'gateway', + gatewayReplaced: true, + queuedMs: Math.max(0, now - pending.queuedAt), + }); + runtime.pendingChallenge = null; + return false; + } + if (isPendingChallengeExpired(pending.queuedAt, now)) { + logRelayTelemetry('relay_worker', 'challenge_buffer_expired', { + role: 'gateway', + queuedMs: Math.max(0, now - pending.queuedAt), + awaitingChallengeCount: runtime.awaitingChallenge.size, + clientCount: runtime.clients.size, + }); + runtime.pendingChallenge = null; + return false; + } + if (tryDeliverChallenge(runtime, pending.data, { + role: 'gateway', + clientId: pending.gatewayClientId, + connectedAt: pending.queuedAt, + traceId: pending.traceId, + }, now, true)) { + runtime.pendingChallenge = null; + return true; + } + return false; +} + +export function forwardGatewayChallengeFastPath( + runtime: RelayRuntime, + text: string, + gatewayAttachment: SocketAttachment, +): void { + const now = Date.now(); + if (tryDeliverChallenge(runtime, text, gatewayAttachment, now, false)) { + runtime.pendingChallenge = null; + return; + } + runtime.pendingChallenge = { + data: text, + queuedAt: now, + gatewayClientId: gatewayAttachment.clientId, + traceId: gatewayAttachment.traceId, + }; + logRelayTelemetry('relay_worker', 'challenge_buffered_no_client', { + role: 'gateway', + hasActiveClient: Boolean(runtime.activeClientId), + hasChallengeClient: Boolean(runtime.challengeClientId), + awaitingChallengeCount: runtime.awaitingChallenge.size, + clientCount: runtime.clients.size, + }); +} + +export function flushPendingConnectStarts(runtime: RelayRuntime): void { + if (!runtime.gatewaySocket || runtime.gatewaySocket.readyState !== WS_OPEN) return; + const now = Date.now(); + prunePendingConnectStarts(runtime, now); + + let flushed = 0; + for (const [clientId, pending] of runtime.pendingConnectStarts.entries()) { + const client = runtime.clients.get(clientId); + if (!client || client.readyState !== WS_OPEN) { + runtime.pendingConnectStarts.delete(clientId); + continue; + } + runtime.gatewaySocket.send(pending.data); + runtime.connectStartAtByClientId.set(clientId, pending.queuedAt); + markAwaitingChallenge(runtime, clientId, pending.queuedAt); + runtime.pendingConnectStarts.delete(clientId); + flushed += 1; + logRelayTelemetry('relay_worker', 'connect_start_flushed', { + role: 'client', + queuedMs: Math.max(0, now - pending.queuedAt), + }); + } + + if (flushed > 0) { + logRelayTelemetry('relay_worker', 'connect_start_flush_done', { + flushed, + remaining: runtime.pendingConnectStarts.size, + clientCount: runtime.clients.size, + }); + } +} + +export async function handleGatewayMessage( + runtime: RelayRuntime, + attachment: SocketAttachment, + text: string, + touchGatewayOwnerFn: (gatewayId: string) => Promise, +): Promise { + await touchGatewayOwnerFn(attachment.clientId); + touchGatewayActivity(runtime); + if (text.startsWith(CONTROL_PREFIX)) { + const gatewayControl = parseControlEnvelope(text); + if (gatewayControl) { + routeGatewayControl(runtime, attachment, gatewayControl); + } else { + logRelayTelemetry('relay_worker', 'gateway_control_invalid', { + role: 'gateway', + clientCount: runtime.clients.size, + }); + } + return; + } + if (isConnectChallengeFrame(text)) { + logRelayTelemetry('relay_worker', 'challenge_forward', { + role: 'gateway', + clientCount: runtime.clients.size, + }); + forwardGatewayChallengeFastPath(runtime, text, attachment); + return; + } + const connectResId = parseResponseId(text); + if (connectResId) { + const targetClientId = runtime.connectReqClientByReqId.get(connectResId); + if (targetClientId) { + const targetClient = runtime.clients.get(targetClientId); + if (targetClient?.readyState === WS_OPEN) { + targetClient.send(text); + touchClientActivity(runtime, targetClientId); + logRelayTelemetry('relay_worker', 'connect_response_delivered', { + role: 'gateway', + matchedRequest: true, + }); + runtime.connectReqClientByReqId.delete(connectResId); + return; + } + logRelayTelemetry('relay_worker', 'connect_response_target_missing', { + role: 'gateway', + matchedRequest: true, + }); + runtime.connectReqClientByReqId.delete(connectResId); + } + } + let delivered = 0; + const activeClient = runtime.activeClientId ? runtime.clients.get(runtime.activeClientId) : null; + if (activeClient?.readyState === WS_OPEN) { + activeClient.send(text); + touchClientActivity(runtime, runtime.activeClientId!); + delivered = 1; + } + if (delivered === 0) { + logRelayTelemetry('relay_worker', 'gateway_message_dropped_without_active_client', { + role: 'gateway', + hasGateway: true, + clientCount: runtime.clients.size, + }); + } +} + +function routeGatewayControl( + runtime: RelayRuntime, + attachment: SocketAttachment, + envelope: RelayControlEnvelope, +): void { + const targetClientId = typeof envelope.targetClientId === 'string' && envelope.targetClientId.trim() + ? envelope.targetClientId.trim() + : null; + + if (targetClientId) { + const targetClient = runtime.clients.get(targetClientId); + if (targetClient?.readyState === WS_OPEN) { + targetClient.send(serializeControlEnvelope(envelope)); + touchClientActivity(runtime, targetClientId); + logControlRoutingTelemetry(runtime, 'gateway_control_target_delivered', attachment, envelope); + return; + } + logControlRoutingTelemetry(runtime, 'gateway_control_target_missing', attachment, envelope); + return; + } + + const activeClientId = runtime.activeClientId; + const activeClient = activeClientId ? runtime.clients.get(activeClientId) : null; + if (activeClientId && activeClient?.readyState === WS_OPEN) { + activeClient.send(serializeControlEnvelope(envelope)); + touchClientActivity(runtime, activeClientId); + logControlRoutingTelemetry(runtime, 'gateway_control_delivered', attachment, envelope); + return; + } + + logControlRoutingTelemetry(runtime, 'gateway_control_no_active_client', attachment, envelope); +} + +export function handleClientConnected(runtime: RelayRuntime, clientId: string, server: WsWebSocket): void { + const wasEmpty = runtime.clients.size === 0; + runtime.clients.set(clientId, server); + touchClientActivity(runtime, clientId); + if (!runtime.activeClientId) { + runtime.activeClientId = clientId; + } + if (!runtime.challengeClientId) { + runtime.challengeClientId = clientId; + } + if (wasEmpty) { + sendControlToGateway(runtime, 'client_connected', { count: runtime.clients.size }); + } else { + sendControlToGateway(runtime, 'client_count', { count: runtime.clients.size }); + } +} + +export function handleGatewayConnected(runtime: RelayRuntime): void { + sendControlToGateway(runtime, 'gateway_connected'); + sendControlToGateway(runtime, 'client_count', { count: runtime.clients.size }); + flushPendingConnectStarts(runtime); +} + +export function handleInactiveClientMessage(runtime: RelayRuntime, attachment: SocketAttachment): boolean { + if (runtime.activeClientId !== attachment.clientId) { + logRelayTelemetry('relay_worker', 'inactive_client_message_dropped', { + role: 'client', + reason: 'non_connect_before_active', + }); + return true; + } + return false; +} + +export function prepareClientMessage(runtime: RelayRuntime, attachment: SocketAttachment, text: string): boolean | null { + const isConnectStart = isConnectStartReqFrame(text); + if (isConnectStart) { + if (runtime.activeClientId !== attachment.clientId) { + runtime.activeClientId = attachment.clientId; + logRelayTelemetry('relay_worker', 'active_client_switched', { + role: 'client', + reason: 'connect_start', + }); + } + if (runtime.challengeClientId === attachment.clientId) { + runtime.challengeClientId = null; + } + } else if (handleInactiveClientMessage(runtime, attachment)) { + return null; + } + return isConnectStart; +} + +export function forwardClientMessageToGateway( + runtime: RelayRuntime, + attachment: SocketAttachment, + text: string, + isConnectStart: boolean, +): void { + if (!runtime.gatewaySocket || runtime.gatewaySocket.readyState !== WS_OPEN) return; + if (isConnectStart) { + const queuedAt = Date.now(); + runtime.connectStartAtByClientId.set(attachment.clientId, queuedAt); + markAwaitingChallenge(runtime, attachment.clientId, queuedAt); + const connectReqId = parseConnectReqId(text); + if (connectReqId) { + runtime.connectReqClientByReqId.set(connectReqId, attachment.clientId); + } + logRelayTelemetry('relay_worker', 'connect_start_forward', { + role: 'client', + hasRequestId: Boolean(connectReqId), + clientCount: runtime.clients.size, + }); + } + runtime.gatewaySocket.send(text); +} + +export function forwardClientControlToGateway( + runtime: RelayRuntime, + attachment: SocketAttachment, + text: string, +): void { + const envelope = parseControlEnvelope(text); + if (!envelope) { + logRelayTelemetry('relay_worker', 'client_control_invalid', { + role: 'client', + clientCount: runtime.clients.size, + }); + return; + } + + if (!runtime.gatewaySocket || runtime.gatewaySocket.readyState !== WS_OPEN) { + logControlRoutingTelemetry(runtime, 'client_control_no_gateway', attachment, envelope); + return; + } + + const gatewayAttachment = runtime.gatewaySocket + ? deserializeAttachment(runtime.gatewaySocket) as SocketAttachment | null + : null; + const forwardedEnvelope: RelayControlEnvelope = { + ...envelope, + type: typeof envelope.type === 'string' && envelope.type.trim() ? envelope.type : 'control', + sourceClientId: attachment.clientId, + }; + runtime.gatewaySocket.send(serializeControlEnvelope(forwardedEnvelope)); + logControlRoutingTelemetry(runtime, 'client_control_forwarded', attachment, forwardedEnvelope, { + gatewayClientId: gatewayAttachment?.clientId ?? null, + }); +} + +export function bufferClientConnectStart( + runtime: RelayRuntime, + attachment: SocketAttachment, + text: string, +): void { + const queuedAt = Date.now(); + markAwaitingChallenge(runtime, attachment.clientId, queuedAt); + const connectReqId = parseConnectReqId(text); + if (connectReqId) { + runtime.connectReqClientByReqId.set(connectReqId, attachment.clientId); + } + runtime.pendingConnectStarts.set(attachment.clientId, { + clientId: attachment.clientId, + data: text, + queuedAt, + traceId: attachment.traceId, + }); + prunePendingConnectStarts(runtime, queuedAt); + logRelayTelemetry('relay_worker', 'connect_start_no_gateway', { + role: 'client', + hasRequestId: Boolean(connectReqId), + clientCount: runtime.clients.size, + pendingCount: runtime.pendingConnectStarts.size, + }); +} diff --git a/apps/relay-docker/src/relay/runtime.ts b/apps/relay-docker/src/relay/runtime.ts new file mode 100644 index 0000000..4428f3f --- /dev/null +++ b/apps/relay-docker/src/relay/runtime.ts @@ -0,0 +1,75 @@ +/** + * runtime.ts — Relay runtime state container. + * Ported from apps/relay-worker/src/relay/runtime.ts. + * Adapted: uses RoomState shim instead of DurableObjectState. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { + AWAITING_CHALLENGE_TTL_DEFAULT_MS, + CLIENT_IDLE_TIMEOUT_DEFAULT_MS, + type Env, + type GatewayOwnerRecord, + type PendingChallenge, + type PendingConnectStart, + type RateState, + type AwaitingChallengeEntry, +} from './types.js'; +import { parsePositiveInt } from './utils.js'; +import type { RoomState } from '../cf-shim.js'; +import { deserializeAttachment } from '../cf-shim.js'; + +export class RelayRuntime { + gatewaySocket: WsWebSocket | null = null; + gatewayLastActivityAt = 0; + readonly clients = new Map(); + readonly rate = new WeakMap(); + gatewayOwner: GatewayOwnerRecord | null = null; + gatewayOwnerTouchedAt = 0; + roomGatewayId: string | null = null; + readonly connectStartAtByClientId = new Map(); + readonly pendingConnectStarts = new Map(); + readonly connectReqClientByReqId = new Map(); + readonly awaitingChallenge = new Map(); + readonly clientLastActivityAtById = new Map(); + mirroredClientTokenHashes = new Set(); + mirroredClientTokenHashesUpdatedAt = 0; + activeClientId: string | null = null; + challengeClientId: string | null = null; + pendingChallenge: PendingChallenge | null = null; + + constructor( + readonly state: RoomState, + readonly env: Env, + ) {} + + awaitingChallengeTtlMs(): number { + return parsePositiveInt(this.env.AWAITING_CHALLENGE_TTL_MS, AWAITING_CHALLENGE_TTL_DEFAULT_MS); + } + + clientIdleTimeoutMs(): number { + return parsePositiveInt(this.env.CLIENT_IDLE_TIMEOUT_MS, CLIENT_IDLE_TIMEOUT_DEFAULT_MS); + } + + objectId(): string | null { + try { + return this.state.id.toString(); + } catch { + return null; + } + } +} + +export function touchClientActivity(runtime: RelayRuntime, clientId: string, at = Date.now()): void { + runtime.clientLastActivityAtById.set(clientId, at); +} + +export function touchClientSocketActivity(runtime: RelayRuntime, ws: WsWebSocket, at = Date.now()): void { + const attachment = deserializeAttachment(ws) as { role?: string; clientId?: string } | null; + if (!attachment || attachment.role !== 'client' || typeof attachment.clientId !== 'string') return; + touchClientActivity(runtime, attachment.clientId, at); +} + +export function touchGatewayActivity(runtime: RelayRuntime, at = Date.now()): void { + runtime.gatewayLastActivityAt = Math.max(runtime.gatewayLastActivityAt, at); +} diff --git a/apps/relay-docker/src/relay/storage.ts b/apps/relay-docker/src/relay/storage.ts new file mode 100644 index 0000000..4ad7a9a --- /dev/null +++ b/apps/relay-docker/src/relay/storage.ts @@ -0,0 +1,271 @@ +/** + * storage.ts — Relay room persistent storage operations. + * Ported from apps/relay-worker/src/relay/storage.ts. + * Adapted: uses RoomState/RoomStorage shim + ws.WebSocket + attachment shim. + */ + +import type { WebSocket as WsWebSocket } from 'ws'; +import { + GATEWAY_OWNER_KEY, + GATEWAY_OWNER_TOUCH_INTERVAL_MS, + MIRRORED_CLIENT_TOKEN_HASHES_KEY, + ROOM_META_KEY, + SOCKET_CLOSE_CODES, + type GatewayOwnerRecord, + type MirroredClientTokenHashesRecord, + type PairGatewayRecord, + type RoomMetaRecord, + type SocketAttachment, +} from './types.js'; +import type { RelayRuntime } from './runtime.js'; +import { logRelayTelemetry } from './telemetry.js'; +import { deserializeAttachment } from '../cf-shim.js'; +import type { MemoryKV } from '../kv-store.js'; + +// ws library: WebSocket.OPEN = 1 +const WS_OPEN = 1; + +export type RehydrateSummary = { + totalSocketCount: number; + openClientCount: number; + orphanSocketsClosed: number; + nonOpenSocketsClosed: number; + duplicateSocketsClosed: number; + hasGateway: boolean; +}; + +type ReconcileSocketsOptions = { + preferredSocket?: WsWebSocket | null; +}; + +export async function loadRoomMeta(runtime: RelayRuntime): Promise { + const raw = await runtime.state.storage.get(ROOM_META_KEY); + if (!raw || typeof raw.gatewayId !== 'string' || !raw.gatewayId.trim()) { + runtime.roomGatewayId = null; + return; + } + runtime.roomGatewayId = raw.gatewayId; +} + +export async function storeRoomMeta(runtime: RelayRuntime, gatewayId: string): Promise { + if (runtime.roomGatewayId === gatewayId) return; + runtime.roomGatewayId = gatewayId; + await runtime.state.storage.put(ROOM_META_KEY, { gatewayId }); +} + +export async function loadPairGatewayRecord( + routesKv: MemoryKV, + gatewayId: string | null | undefined, +): Promise { + const normalized = gatewayId?.trim() ?? ''; + if (!normalized) return null; + const raw = await routesKv.get(`pair-gateway:${normalized}`); + if (!raw) return null; + try { + const parsed = JSON.parse(raw) as PairGatewayRecord; + if (!parsed || typeof parsed.gatewayId !== 'string' || typeof parsed.relaySecretHash !== 'string') { + return null; + } + return parsed; + } catch { + return null; + } +} + +export async function loadMirroredClientTokenHashes(runtime: RelayRuntime): Promise { + const raw = await runtime.state.storage.get(MIRRORED_CLIENT_TOKEN_HASHES_KEY); + if (!raw || !Array.isArray(raw.hashes)) { + runtime.mirroredClientTokenHashes = new Set(); + runtime.mirroredClientTokenHashesUpdatedAt = 0; + return; + } + runtime.mirroredClientTokenHashes = new Set( + raw.hashes + .filter((item): item is string => typeof item === 'string' && item.trim().length > 0), + ); + runtime.mirroredClientTokenHashesUpdatedAt = typeof raw.updatedAt === 'number' ? raw.updatedAt : 0; +} + +export async function storeMirroredClientTokenHashes( + runtime: RelayRuntime, + hashes: string[], + updatedAt = Date.now(), +): Promise { + const normalized = Array.from(new Set( + hashes + .filter((item): item is string => typeof item === 'string' && item.trim().length > 0), + )); + runtime.mirroredClientTokenHashes = new Set(normalized); + runtime.mirroredClientTokenHashesUpdatedAt = updatedAt; + await runtime.state.storage.put(MIRRORED_CLIENT_TOKEN_HASHES_KEY, { + hashes: normalized, + updatedAt, + } satisfies MirroredClientTokenHashesRecord); +} + +export async function loadGatewayOwner(runtime: RelayRuntime): Promise { + const raw = await runtime.state.storage.get(GATEWAY_OWNER_KEY); + if (!raw || typeof raw.gatewayId !== 'string' || typeof raw.seenAt !== 'number') { + runtime.gatewayOwner = null; + return; + } + runtime.gatewayOwner = raw; + runtime.gatewayOwnerTouchedAt = raw.seenAt; +} + +export async function touchGatewayOwner(runtime: RelayRuntime, gatewayId: string, force = false): Promise { + const now = Date.now(); + if (!force + && runtime.gatewayOwner?.gatewayId === gatewayId + && now - runtime.gatewayOwnerTouchedAt < GATEWAY_OWNER_TOUCH_INTERVAL_MS) { + return; + } + runtime.gatewayOwner = { + gatewayId, + seenAt: now, + }; + runtime.gatewayOwnerTouchedAt = now; + await runtime.state.storage.put(GATEWAY_OWNER_KEY, runtime.gatewayOwner); +} + +export function canAcceptGatewayOwner(runtime: RelayRuntime, gatewayId: string, now: number, leaseMs: number): boolean { + if (!runtime.gatewayOwner) return true; + if (runtime.gatewayOwner.gatewayId === gatewayId) return true; + return now - runtime.gatewayOwner.seenAt > leaseMs; +} + +function closeSocketBestEffort(ws: WsWebSocket, reason: 'orphan_socket' | 'dead_socket' | 'duplicate_socket'): void { + try { + ws.close(SOCKET_CLOSE_CODES.DEAD_SOCKET, reason); + } catch { + // Best effort cleanup; sockets may already be detached. + } +} + +function shouldPreferSocketCandidate(input: { + currentConnectedAt: number; + nextConnectedAt: number; + currentSocket: WsWebSocket; + nextSocket: WsWebSocket; + preferredSocket?: WsWebSocket | null; +}): boolean { + if (input.preferredSocket && input.nextSocket === input.preferredSocket && input.currentSocket !== input.preferredSocket) { + return true; + } + if (input.preferredSocket && input.currentSocket === input.preferredSocket && input.nextSocket !== input.preferredSocket) { + return false; + } + return input.nextConnectedAt >= input.currentConnectedAt; +} + +export function reconcileSockets(runtime: RelayRuntime, options: ReconcileSocketsOptions = {}): RehydrateSummary { + const previousClientLastActivityAtById = new Map(runtime.clientLastActivityAtById); + const previousGatewayLastActivityAt = runtime.gatewayLastActivityAt; + runtime.gatewaySocket = null; + runtime.gatewayLastActivityAt = 0; + runtime.clients.clear(); + runtime.clientLastActivityAtById.clear(); + + const sockets = runtime.state.getWebSockets(); + let orphanSocketsClosed = 0; + let nonOpenSocketsClosed = 0; + let duplicateSocketsClosed = 0; + let gatewayCandidate: { socket: WsWebSocket; connectedAt: number } | null = null; + const clientCandidates = new Map(); + for (const ws of sockets) { + const attachment = deserializeAttachment(ws) as SocketAttachment | null; + if (!attachment) { + orphanSocketsClosed += 1; + closeSocketBestEffort(ws, 'orphan_socket'); + continue; + } + if (ws.readyState !== WS_OPEN) { + nonOpenSocketsClosed += 1; + closeSocketBestEffort(ws, 'dead_socket'); + continue; + } + if (attachment.role === 'gateway') { + if (!gatewayCandidate) { + gatewayCandidate = { socket: ws, connectedAt: attachment.connectedAt }; + continue; + } + const nextWins = shouldPreferSocketCandidate({ + currentConnectedAt: gatewayCandidate.connectedAt, + nextConnectedAt: attachment.connectedAt, + currentSocket: gatewayCandidate.socket, + nextSocket: ws, + preferredSocket: options.preferredSocket, + }); + if (nextWins) { + duplicateSocketsClosed += 1; + closeSocketBestEffort(gatewayCandidate.socket, 'duplicate_socket'); + gatewayCandidate = { socket: ws, connectedAt: attachment.connectedAt }; + } else { + duplicateSocketsClosed += 1; + closeSocketBestEffort(ws, 'duplicate_socket'); + } + continue; + } + + const existing = clientCandidates.get(attachment.clientId); + if (!existing) { + clientCandidates.set(attachment.clientId, { socket: ws, connectedAt: attachment.connectedAt }); + continue; + } + const nextWins = shouldPreferSocketCandidate({ + currentConnectedAt: existing.connectedAt, + nextConnectedAt: attachment.connectedAt, + currentSocket: existing.socket, + nextSocket: ws, + preferredSocket: options.preferredSocket, + }); + if (nextWins) { + duplicateSocketsClosed += 1; + closeSocketBestEffort(existing.socket, 'duplicate_socket'); + clientCandidates.set(attachment.clientId, { socket: ws, connectedAt: attachment.connectedAt }); + } else { + duplicateSocketsClosed += 1; + closeSocketBestEffort(ws, 'duplicate_socket'); + } + } + + runtime.gatewaySocket = gatewayCandidate?.socket ?? null; + runtime.gatewayLastActivityAt = gatewayCandidate + ? Math.max(previousGatewayLastActivityAt, gatewayCandidate.connectedAt) + : 0; + for (const [clientId, candidate] of clientCandidates.entries()) { + runtime.clients.set(clientId, candidate.socket); + const previousActivityAt = previousClientLastActivityAtById.get(clientId); + runtime.clientLastActivityAtById.set( + clientId, + typeof previousActivityAt === 'number' + ? Math.max(previousActivityAt, candidate.connectedAt) + : candidate.connectedAt, + ); + } + + const summary: RehydrateSummary = { + totalSocketCount: sockets.length, + openClientCount: runtime.clients.size, + orphanSocketsClosed, + nonOpenSocketsClosed, + duplicateSocketsClosed, + hasGateway: Boolean(runtime.gatewaySocket?.readyState === WS_OPEN), + }; + logRelayTelemetry('relay_worker', 'rehydrate_summary', { + totalSocketCount: summary.totalSocketCount, + openClientCount: summary.openClientCount, + orphanSocketsClosed: summary.orphanSocketsClosed, + nonOpenSocketsClosed: summary.nonOpenSocketsClosed, + duplicateSocketsClosed: summary.duplicateSocketsClosed, + hasGateway: summary.hasGateway, + pendingConnectStarts: runtime.pendingConnectStarts.size, + awaitingChallengeCount: runtime.awaitingChallenge.size, + hasPendingChallenge: Boolean(runtime.pendingChallenge), + }); + return summary; +} + +export function rehydrateSockets(runtime: RelayRuntime): RehydrateSummary { + return reconcileSockets(runtime); +} diff --git a/apps/relay-docker/src/relay/telemetry.ts b/apps/relay-docker/src/relay/telemetry.ts new file mode 100644 index 0000000..3812f6f --- /dev/null +++ b/apps/relay-docker/src/relay/telemetry.ts @@ -0,0 +1,42 @@ +/** + * telemetry.ts — Relay structured telemetry logging. + * Ported verbatim from apps/relay-worker/src/relay/telemetry.ts. + */ + +const REDACTED_FIELD_KEYS = new Set([ + 'accessCode', + 'authorization', + 'cfRay', + 'clientId', + 'clientLabel', + 'currentGatewayClientId', + 'gatewayClientId', + 'gatewayId', + 'objectId', + 'reqId', + 'secret', + 'sourceClientId', + 'targetClientId', + 'token', + 'traceId', +]); + +function sanitizeTelemetryFields(fields: Record): Record { + return Object.fromEntries( + Object.entries(fields).filter(([key, value]) => !REDACTED_FIELD_KEYS.has(key) && value !== undefined), + ); +} + +export function logRelayTelemetry( + scope: 'relay_worker' | 'registry_worker', + event: string, + fields: Record, +): void { + const sanitizedFields = sanitizeTelemetryFields(fields); + console.log(JSON.stringify({ + scope, + event, + ts: new Date().toISOString(), + ...sanitizedFields, + })); +} diff --git a/apps/relay-docker/src/relay/types.ts b/apps/relay-docker/src/relay/types.ts new file mode 100644 index 0000000..904a5ac --- /dev/null +++ b/apps/relay-docker/src/relay/types.ts @@ -0,0 +1,103 @@ +/** + * types.ts — Relay type definitions for Docker self-hosted deployment. + * + * Ported from apps/relay-worker/src/relay/types.ts. + * CF-specific types (DurableObjectNamespace, KVNamespace) replaced + * with Docker equivalents. + */ + +import type { MemoryKV } from '../kv-store.js'; + +export interface Env { + ROUTES_KV: MemoryKV; + REGISTRY_VERIFY_URL?: string; + MAX_MESSAGES_PER_10S: string; + MAX_CLIENT_MESSAGES_PER_10S?: string; + HEARTBEAT_INTERVAL_MS: string; + GATEWAY_OWNER_LEASE_MS?: string; + AWAITING_CHALLENGE_TTL_MS?: string; + CLIENT_IDLE_TIMEOUT_MS?: string; +} + +export type SocketAttachment = { + role: 'gateway' | 'client'; + clientId: string; + connectedAt: number; + traceId?: string; + clientLabel?: string | null; +}; + +export type PendingConnectStart = { + clientId: string; + data: string; + queuedAt: number; + traceId?: string; +}; + +export type RateState = { + windowStart: number; + count: number; +}; + +export type PairGatewayRecord = { + gatewayId: string; + displayName?: string | null; + relaySecretHash: string; + clientTokens?: Array<{ + hash: string; + label?: string | null; + createdAt?: string; + lastUsedAt?: string | null; + }>; +}; + +export type GatewayOwnerRecord = { + gatewayId: string; + seenAt: number; +}; + +export type PendingChallenge = { + data: string; + queuedAt: number; + gatewayClientId: string; + traceId?: string; +}; + +export type AwaitingChallengeEntry = { + clientId: string; + queuedAt: number; +}; + +export type RoomMetaRecord = { + gatewayId: string; +}; + +export type MirroredClientTokenHashesRecord = { + hashes: string[]; + updatedAt: number; +}; + +export type RelayControlEnvelope = Record & { + type?: string; + event?: string; + sourceClientId?: string; + targetClientId?: string; +}; + +export const ROOM_META_KEY = 'room-meta'; +export const MIRRORED_CLIENT_TOKEN_HASHES_KEY = 'mirrored-client-token-hashes'; +export const CONTROL_PREFIX = '__clawket_relay_control__:'; +export const GATEWAY_OWNER_KEY = 'gateway-owner'; +export const GATEWAY_OWNER_TOUCH_INTERVAL_MS = 5_000; +export const CONNECT_START_BUFFER_TTL_MS = 12_000; +export const PENDING_CHALLENGE_TTL_MS = 5_000; +export const AWAITING_CHALLENGE_TTL_DEFAULT_MS = 25_000; +export const CLIENT_IDLE_TIMEOUT_DEFAULT_MS = 10 * 60_000; + +export const SOCKET_CLOSE_CODES = { + REPLACED_BY_NEW_GATEWAY: 4001, + REPLACED_BY_NEW_CLIENT_SOCKET: 4002, + RATE_LIMITED: 4008, + IDLE_OR_STALE_TIMEOUT: 4009, + DEAD_SOCKET: 4010, +} as const; diff --git a/apps/relay-docker/src/relay/utils.ts b/apps/relay-docker/src/relay/utils.ts new file mode 100644 index 0000000..2c9d3f7 --- /dev/null +++ b/apps/relay-docker/src/relay/utils.ts @@ -0,0 +1,11 @@ +/** + * utils.ts — Utility functions for relay. + * Ported verbatim from apps/relay-worker/src/relay/utils.ts. + */ + +export function parsePositiveInt(raw: string | undefined, fallback: number): number { + if (!raw) return fallback; + const value = parseInt(raw, 10); + if (!Number.isFinite(value) || value <= 0) return fallback; + return value; +} diff --git a/apps/relay-docker/src/room-manager.ts b/apps/relay-docker/src/room-manager.ts new file mode 100644 index 0000000..09671e4 --- /dev/null +++ b/apps/relay-docker/src/room-manager.ts @@ -0,0 +1,48 @@ +/** + * room-manager.ts — Manages DockerRelayRoom instances. + * + * Replaces Cloudflare DurableObjectNamespace routing. + * Each gatewayId maps to exactly one DockerRelayRoom in memory. + */ + +import { DockerRelayRoom } from './relay-room.js'; +import type { Env } from './relay/types.js'; + +export class RoomManager { + private readonly rooms = new Map(); + private readonly gcTimer: ReturnType; + + constructor(private readonly env: Env) { + // GC idle rooms every 5 minutes + this.gcTimer = setInterval(() => this.gc(), 5 * 60_000); + } + + getRoom(gatewayId: string): DockerRelayRoom { + const existing = this.rooms.get(gatewayId); + if (existing) return existing; + + const room = new DockerRelayRoom(gatewayId, this.env); + this.rooms.set(gatewayId, room); + return room; + } + + private gc(): void { + for (const [gatewayId, room] of this.rooms.entries()) { + if (!room.hasConnections) { + room.destroy(); + this.rooms.delete(gatewayId); + } + } + if (this.rooms.size > 0) { + console.log(`[room-manager] Active rooms: ${this.rooms.size}`); + } + } + + close(): void { + clearInterval(this.gcTimer); + for (const [, room] of this.rooms.entries()) { + room.destroy(); + } + this.rooms.clear(); + } +} diff --git a/apps/relay-docker/src/runtime.test.ts b/apps/relay-docker/src/runtime.test.ts new file mode 100644 index 0000000..d602d7b --- /dev/null +++ b/apps/relay-docker/src/runtime.test.ts @@ -0,0 +1,196 @@ +import { createServer as createNetServer } from 'node:net'; +import { describe, expect, it } from 'vitest'; +import WebSocket from 'ws'; +import { MemoryKV } from './kv-store.js'; +import { createRegistryServer } from './registry.js'; +import { createRelayServer } from './relay-server.js'; +import type { RoomManager } from './room-manager.js'; + +async function allocatePort(): Promise { + return new Promise((resolve, reject) => { + const server = createNetServer(); + server.on('error', reject); + server.listen(0, '127.0.0.1', () => { + const address = server.address(); + if (!address || typeof address === 'string') { + server.close(); + reject(new Error('Failed to allocate port')); + return; + } + const { port } = address; + server.close((error) => { + if (error) { + reject(error); + return; + } + resolve(port); + }); + }); + }); +} + +async function waitForHttp(url: string, timeoutMs = 5_000): Promise { + const startedAt = Date.now(); + let lastError: unknown = null; + + while (Date.now() - startedAt < timeoutMs) { + try { + const response = await fetch(url); + await response.arrayBuffer(); + return; + } catch (error) { + lastError = error; + await new Promise((resolve) => setTimeout(resolve, 50)); + } + } + + throw new Error(`Timed out waiting for HTTP server: ${String(lastError)}`); +} + +async function waitForSocketClose( + ws: WebSocket, + timeoutMs = 5_000, +): Promise<{ code: number; reason: string }> { + return new Promise((resolve, reject) => { + const timer = setTimeout(() => { + try { + ws.terminate(); + } catch { + // ignore timeout cleanup error + } + reject(new Error('Timed out waiting for WebSocket close')); + }, timeoutMs); + + ws.once('close', (code, reason) => { + clearTimeout(timer); + resolve({ code, reason: reason.toString('utf8') }); + }); + + ws.once('error', () => { + // The server may close immediately after reporting an error. + // The close event is the assertion target for these tests. + }); + }); +} + +describe('relay-docker runtime smoke tests', () => { + it('serves registry health and pairing flow', async () => { + const registryPort = await allocatePort(); + const kv = new MemoryKV(); + const registry = createRegistryServer({ + routesKv: kv, + relayRegionMap: '', + pairAccessCodeTtlSec: '600', + pairClientTokenMax: '8', + relayUrl: 'ws://relay.local/ws', + }, registryPort); + + registry.start(); + try { + const baseUrl = `http://127.0.0.1:${registryPort}`; + await waitForHttp(`${baseUrl}/v1/health`); + + const healthRes = await fetch(`${baseUrl}/v1/health`); + expect(healthRes.status).toBe(200); + await expect(healthRes.json()).resolves.toEqual(expect.objectContaining({ ok: true })); + + const registerRes = await fetch(`${baseUrl}/v1/pair/register`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ displayName: 'Relay Host', preferredRegion: 'us' }), + }); + expect(registerRes.status).toBe(200); + const registerBody = await registerRes.json() as { + gatewayId: string; + relaySecret: string; + accessCode: string; + }; + + const claimRes = await fetch(`${baseUrl}/v1/pair/claim`, { + method: 'POST', + headers: { 'content-type': 'application/json' }, + body: JSON.stringify({ + gatewayId: registerBody.gatewayId, + accessCode: registerBody.accessCode, + clientLabel: 'iPhone', + }), + }); + expect(claimRes.status).toBe(200); + const claimBody = await claimRes.json() as { clientToken: string }; + expect(claimBody.clientToken.startsWith('gct_')).toBe(true); + + const verifyBadTokenRes = await fetch(`${baseUrl}/v1/verify/${encodeURIComponent(registerBody.gatewayId)}`, { + headers: { authorization: 'Bearer invalid-token' }, + }); + expect(verifyBadTokenRes.status).toBe(401); + } finally { + await registry.close(); + kv.close(); + } + }); + + it('serves relay health endpoint', async () => { + const relayPort = await allocatePort(); + const relay = createRelayServer({ + getRoom: () => ({ + handleWebSocket: async () => ({ accepted: true }), + }), + } as unknown as RoomManager, relayPort); + + relay.start(); + try { + const baseUrl = `http://127.0.0.1:${relayPort}`; + await waitForHttp(`${baseUrl}/v1/health`); + + const healthRes = await fetch(`${baseUrl}/v1/health`); + expect(healthRes.status).toBe(200); + await expect(healthRes.json()).resolves.toEqual({ ok: true, runtime: 'docker' }); + } finally { + await relay.close(); + } + }); + + it('uses non-conflicting close codes for rejected websocket auth', async () => { + const relayPort = await allocatePort(); + const relay = createRelayServer({ + getRoom: () => ({ + handleWebSocket: async () => ({ + accepted: false, + status: 401, + error: 'invalid token', + }), + }), + } as unknown as RoomManager, relayPort); + + relay.start(); + try { + await waitForHttp(`http://127.0.0.1:${relayPort}/v1/health`); + const ws = new WebSocket(`ws://127.0.0.1:${relayPort}/ws?gatewayId=gw_test&role=client`); + const closed = await waitForSocketClose(ws); + expect(closed.code).toBe(4401); + } finally { + await relay.close(); + } + }); + + it('closes websocket with 1011 when upgrade callback throws', async () => { + const relayPort = await allocatePort(); + const relay = createRelayServer({ + getRoom: () => ({ + handleWebSocket: async () => { + throw new Error('unexpected failure'); + }, + }), + } as unknown as RoomManager, relayPort); + + relay.start(); + try { + await waitForHttp(`http://127.0.0.1:${relayPort}/v1/health`); + const ws = new WebSocket(`ws://127.0.0.1:${relayPort}/ws?gatewayId=gw_test&role=client`); + const closed = await waitForSocketClose(ws); + expect(closed.code).toBe(1011); + } finally { + await relay.close(); + } + }); +}); diff --git a/apps/relay-docker/src/server.ts b/apps/relay-docker/src/server.ts new file mode 100644 index 0000000..7a7f870 --- /dev/null +++ b/apps/relay-docker/src/server.ts @@ -0,0 +1,138 @@ +/** + * server.ts — Main entry point for the Docker self-hosted relay. + * + * Starts both the Registry HTTP server and the Relay WebSocket server + * in a single Node.js process, sharing the same in-memory KV store. + */ + +import { MemoryKV } from './kv-store.js'; +import { RoomManager } from './room-manager.js'; +import { createRegistryServer } from './registry.js'; +import { createRelayServer } from './relay-server.js'; +import type { Env } from './relay/types.js'; + +// ---------- Configuration from environment ---------- + +const REGISTRY_PORT = parseInt(process.env.REGISTRY_PORT ?? '3001', 10); +const RELAY_PORT = parseInt(process.env.RELAY_PORT ?? '3002', 10); +const KV_PERSIST_PATH = process.env.KV_PERSIST_PATH ?? ''; +const RELAY_URL = process.env.RELAY_URL ?? `ws://localhost:${RELAY_PORT}/ws`; +const REGISTRY_URL = process.env.REGISTRY_URL ?? `http://localhost:${REGISTRY_PORT}`; +const RELAY_REGION_MAP = process.env.RELAY_REGION_MAP ?? ''; +const PAIR_ACCESS_CODE_TTL_SEC = process.env.PAIR_ACCESS_CODE_TTL_SEC ?? '600'; +const PAIR_CLIENT_TOKEN_MAX = process.env.PAIR_CLIENT_TOKEN_MAX ?? '8'; +const MAX_MESSAGES_PER_10S = process.env.MAX_MESSAGES_PER_10S ?? '120'; +const MAX_CLIENT_MESSAGES_PER_10S = process.env.MAX_CLIENT_MESSAGES_PER_10S ?? '300'; +const HEARTBEAT_INTERVAL_MS = process.env.HEARTBEAT_INTERVAL_MS ?? '30000'; +const AWAITING_CHALLENGE_TTL_MS = process.env.AWAITING_CHALLENGE_TTL_MS ?? '25000'; +const CLIENT_IDLE_TIMEOUT_MS = process.env.CLIENT_IDLE_TIMEOUT_MS ?? '600000'; +const GATEWAY_OWNER_LEASE_MS = process.env.GATEWAY_OWNER_LEASE_MS ?? '20000'; + +// ---------- Bootstrap ---------- + +console.log('╔══════════════════════════════════════════════╗'); +console.log('║ Clawket Self-Hosted Relay ║'); +console.log('╠══════════════════════════════════════════════╣'); +console.log(`║ Registry port : ${REGISTRY_PORT.toString().padEnd(28)}║`); +console.log(`║ Relay port : ${RELAY_PORT.toString().padEnd(28)}║`); +console.log(`║ KV persist : ${(KV_PERSIST_PATH || '(memory only)').padEnd(28)}║`); +console.log(`║ Relay URL : ${RELAY_URL.substring(0, 28).padEnd(28)}║`); +console.log('╚══════════════════════════════════════════════╝'); + +if (RELAY_REGION_MAP.trim()) { + console.warn('[server] RELAY_REGION_MAP is supported only for multiple public URLs that route to this same relay process.'); +} + +if ((process.env.PAIRING_SYNC_SECRET ?? '').trim()) { + console.warn('[server] PAIRING_SYNC_SECRET is ignored in relay-docker because registry and relay share the same process-local KV.'); +} + +// Shared KV store +const kv = new MemoryKV(KV_PERSIST_PATH || undefined); + +// Relay environment +const env: Env = { + ROUTES_KV: kv, + REGISTRY_VERIFY_URL: REGISTRY_URL, + MAX_MESSAGES_PER_10S, + MAX_CLIENT_MESSAGES_PER_10S, + HEARTBEAT_INTERVAL_MS, + GATEWAY_OWNER_LEASE_MS, + AWAITING_CHALLENGE_TTL_MS, + CLIENT_IDLE_TIMEOUT_MS, +}; + +// Room Manager +const roomManager = new RoomManager(env); + +// Registry HTTP server +const registry = createRegistryServer( + { + routesKv: kv, + relayRegionMap: RELAY_REGION_MAP, + pairAccessCodeTtlSec: PAIR_ACCESS_CODE_TTL_SEC, + pairClientTokenMax: PAIR_CLIENT_TOKEN_MAX, + relayUrl: RELAY_URL, + }, + REGISTRY_PORT, +); + +// Relay WebSocket server +const relay = createRelayServer( + roomManager, + RELAY_PORT, +); + +// Start both servers +registry.start(); +relay.start(); + +// Graceful shutdown +let isShuttingDown = false; + +async function shutdown(signal: NodeJS.Signals): Promise { + if (isShuttingDown) { + console.log(`[server] Shutdown already in progress (${signal})...`); + return; + } + + isShuttingDown = true; + console.log(`\n[server] Shutting down (${signal})...`); + + const forceExitTimeout = setTimeout(() => { + console.error('[server] Forced shutdown after timeout'); + process.exit(1); + }, 10_000); + forceExitTimeout.unref(); + + const settleSync = (fn: () => T): Promise => new Promise((resolve) => { + try { + fn(); + } finally { + resolve(); + } + }); + + try { + await Promise.allSettled([ + relay.close(), + registry.close(), + settleSync(() => roomManager.close()), + settleSync(() => kv.close()), + ]); + + clearTimeout(forceExitTimeout); + process.exitCode = 0; + } catch (error) { + clearTimeout(forceExitTimeout); + console.error('[server] Error during shutdown', error); + process.exit(1); + } +} + +process.on('SIGINT', () => { + void shutdown('SIGINT'); +}); +process.on('SIGTERM', () => { + void shutdown('SIGTERM'); +}); diff --git a/apps/relay-docker/tsconfig.build.json b/apps/relay-docker/tsconfig.build.json new file mode 100644 index 0000000..0a11719 --- /dev/null +++ b/apps/relay-docker/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["src/**/*.test.ts"] +} diff --git a/apps/relay-docker/tsconfig.json b/apps/relay-docker/tsconfig.json new file mode 100644 index 0000000..70045c4 --- /dev/null +++ b/apps/relay-docker/tsconfig.json @@ -0,0 +1,12 @@ +{ + "extends": "../../tsconfig.relay-base.json", + "compilerOptions": { + "lib": ["ES2022"], + "types": ["node"], + "outDir": "./dist", + "declaration": true, + "sourceMap": true, + "noEmit": false + }, + "include": ["src/**/*.ts"] +} diff --git a/package.json b/package.json index 7fe343b..2a23154 100644 --- a/package.json +++ b/package.json @@ -41,8 +41,8 @@ "mobile:typecheck": "npm run --workspace clawket typecheck", "mobile:test": "npm run --workspace clawket test", "mobile:office:build": "npm --prefix apps/mobile/office-game install && npm --prefix apps/mobile/office-game run build", - "relay:typecheck": "npm run --workspace @clawket/shared typecheck && npm run --workspace @clawket/registry-worker typecheck && npm run --workspace @clawket/relay-worker typecheck", - "relay:test": "npm run --workspace @clawket/shared test && npm run --workspace @clawket/registry-worker test && npm run --workspace @clawket/relay-worker test", + "relay:typecheck": "npm run --workspace @clawket/shared typecheck && npm run --workspace @clawket/registry-worker typecheck && npm run --workspace @clawket/relay-worker typecheck && npm run --workspace @clawket/relay-docker typecheck", + "relay:test": "npm run --workspace @clawket/shared test && npm run --workspace @clawket/registry-worker test && npm run --workspace @clawket/relay-worker test && npm run --workspace @clawket/relay-docker test", "relay:test:integration": "vitest run --dir tests/integration", "relay:dev:registry": "node scripts/relay/run-wrangler.mjs dev relay-registry", "relay:dev:worker": "node scripts/relay/run-wrangler.mjs dev relay-worker", diff --git a/packages/relay-shared/package.json b/packages/relay-shared/package.json index 6439710..3db9cc0 100644 --- a/packages/relay-shared/package.json +++ b/packages/relay-shared/package.json @@ -7,6 +7,7 @@ "main": "src/index.ts", "types": "src/index.ts", "scripts": { + "build": "tsc -p tsconfig.build.json --outDir dist --declaration && node scripts/write-dist-package-json.mjs", "typecheck": "tsc -p tsconfig.json --noEmit", "test": "vitest run", "lint": "echo \"No lint for @clawket/shared\"" diff --git a/packages/relay-shared/scripts/write-dist-package-json.mjs b/packages/relay-shared/scripts/write-dist-package-json.mjs new file mode 100644 index 0000000..165ff2b --- /dev/null +++ b/packages/relay-shared/scripts/write-dist-package-json.mjs @@ -0,0 +1,32 @@ +import { mkdir, readFile, writeFile } from 'node:fs/promises'; +import path from 'node:path'; +import { fileURLToPath } from 'node:url'; + +const scriptDir = path.dirname(fileURLToPath(import.meta.url)); +const packageDir = path.resolve(scriptDir, '..'); +const sourcePackageJsonPath = path.join(packageDir, 'package.json'); +const distDir = path.join(packageDir, 'dist'); +const distPackageJsonPath = path.join(distDir, 'package.json'); + +const sourcePackageRaw = await readFile(sourcePackageJsonPath, 'utf8'); +const sourcePackage = JSON.parse(sourcePackageRaw); + +const distPackage = { + name: sourcePackage.name, + version: sourcePackage.version, + private: sourcePackage.private, + type: 'module', + license: sourcePackage.license, + main: './index.js', + types: './index.d.ts', + exports: { + '.': { + types: './index.d.ts', + import: './index.js', + default: './index.js', + }, + }, +}; + +await mkdir(distDir, { recursive: true }); +await writeFile(distPackageJsonPath, `${JSON.stringify(distPackage, null, 2)}\n`, 'utf8'); diff --git a/packages/relay-shared/src/index.ts b/packages/relay-shared/src/index.ts index 995ea19..684fc2c 100644 --- a/packages/relay-shared/src/index.ts +++ b/packages/relay-shared/src/index.ts @@ -1 +1 @@ -export * from './protocol'; +export * from './protocol.js'; diff --git a/packages/relay-shared/tsconfig.build.json b/packages/relay-shared/tsconfig.build.json new file mode 100644 index 0000000..0a11719 --- /dev/null +++ b/packages/relay-shared/tsconfig.build.json @@ -0,0 +1,4 @@ +{ + "extends": "./tsconfig.json", + "exclude": ["src/**/*.test.ts"] +}