Skip to content

Commit 66587a7

Browse files
author
Brendan DeBeasi
committed
fix: prevent SSE stream memory leaks on client disconnect
- Add cleanup() guard with done flag to prevent double-cleanup - await stream.writeSSE() calls and catch errors to trigger cleanup - Unsubscribe Bus and GlobalBus listeners on abort or write failure - Clear heartbeat interval in all exit paths - Add Bus.debug() subscription count introspection - Add GET /debug/memory endpoint for runtime memory diagnostics
1 parent 74ebb41 commit 66587a7

11 files changed

Lines changed: 410 additions & 56 deletions

File tree

packages/opencode/src/bus/index.ts

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -35,6 +35,7 @@ export namespace Bus {
3535
for (const sub of [...wildcard]) {
3636
sub(event)
3737
}
38+
entry.subscriptions.clear()
3839
},
3940
)
4041

@@ -102,4 +103,12 @@ export namespace Bus {
102103
match.splice(index, 1)
103104
}
104105
}
106+
107+
export function debug() {
108+
const counts: Record<string, number> = {}
109+
for (const [type, subs] of state().subscriptions) {
110+
counts[type] = subs.length
111+
}
112+
return { subscriptions: counts }
113+
}
105114
}

packages/opencode/src/cli/cmd/acp.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -62,8 +62,8 @@ export const AcpCommand = cmd({
6262
log.info("setup connection")
6363
process.stdin.resume()
6464
await new Promise((resolve, reject) => {
65-
process.stdin.on("end", resolve)
66-
process.stdin.on("error", reject)
65+
process.stdin.once("end", resolve)
66+
process.stdin.once("error", reject)
6767
})
6868
})
6969
},

packages/opencode/src/cli/cmd/github.ts

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@ import { Bus } from "../../bus"
2828
import { MessageV2 } from "../../session/message-v2"
2929
import { SessionPrompt } from "@/session/prompt"
3030
import { $ } from "bun"
31+
import { setTimeout as sleep } from "node:timers/promises"
3132

3233
type GitHubAuthor = {
3334
login: string
@@ -353,7 +354,7 @@ export const GithubInstallCommand = cmd({
353354
}
354355

355356
retries++
356-
await Bun.sleep(1000)
357+
await sleep(1000)
357358
} while (true)
358359

359360
s.stop("Installed GitHub app")
@@ -493,6 +494,7 @@ export const GithubRunCommand = cmd({
493494
: "issue"
494495
: undefined
495496

497+
let unsubSessionEvents: (() => void) | undefined
496498
try {
497499
if (useGithubToken) {
498500
const githubToken = process.env["GITHUB_TOKEN"]
@@ -532,7 +534,7 @@ export const GithubRunCommand = cmd({
532534
},
533535
],
534536
})
535-
subscribeSessionEvents()
537+
unsubSessionEvents = subscribeSessionEvents()
536538
shareId = await (async () => {
537539
if (share === false) return
538540
if (!share && repoData.data.private) return
@@ -670,6 +672,7 @@ export const GithubRunCommand = cmd({
670672
// Also output the clean error message for the action to capture
671673
//core.setOutput("prepare_error", e.message);
672674
} finally {
675+
unsubSessionEvents?.()
673676
if (!useGithubToken) {
674677
await restoreGitConfig()
675678
await revokeAppToken()
@@ -867,7 +870,7 @@ export const GithubRunCommand = cmd({
867870
}
868871

869872
let text = ""
870-
Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
873+
const unsub = Bus.subscribe(MessageV2.Event.PartUpdated, async (evt) => {
871874
if (evt.properties.part.sessionID !== session.id) return
872875
//if (evt.properties.part.messageID === messageID) return
873876
const part = evt.properties.part
@@ -894,6 +897,7 @@ export const GithubRunCommand = cmd({
894897
}
895898
}
896899
})
900+
return unsub
897901
}
898902

899903
async function summarize(response: string) {
@@ -1372,7 +1376,7 @@ Co-authored-by: ${actor} <${actor}@users.noreply.github.com>"`
13721376
} catch (e) {
13731377
if (retries > 0) {
13741378
console.log(`Retrying after ${delayMs}ms...`)
1375-
await Bun.sleep(delayMs)
1379+
await sleep(delayMs)
13761380
return withRetry(fn, retries - 1, delayMs)
13771381
}
13781382
throw e

packages/opencode/src/control-plane/workspace-server/routes.ts

Lines changed: 36 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,48 @@ export function WorkspaceServerRoutes() {
1212
data: JSON.stringify(event),
1313
})
1414
}
15+
16+
let done = false
17+
let resolveWait: () => void = () => {}
18+
let heartbeat: ReturnType<typeof setInterval> | undefined
19+
20+
const cleanup = () => {
21+
if (done) return
22+
done = true
23+
clearInterval(heartbeat)
24+
GlobalBus.off("event", handler)
25+
resolveWait()
26+
}
27+
1528
const handler = async (event: { directory?: string; payload: unknown }) => {
16-
await send(event.payload)
29+
if (done) return
30+
try {
31+
await send(event.payload)
32+
} catch {
33+
cleanup()
34+
}
1735
}
36+
1837
GlobalBus.on("event", handler)
19-
await send({ type: "server.connected", properties: {} })
20-
const heartbeat = setInterval(() => {
21-
void send({ type: "server.heartbeat", properties: {} })
38+
39+
try {
40+
await send({ type: "server.connected", properties: {} })
41+
} catch {
42+
cleanup()
43+
}
44+
45+
heartbeat = setInterval(async () => {
46+
if (done) return
47+
try {
48+
await send({ type: "server.heartbeat", properties: {} })
49+
} catch {
50+
cleanup()
51+
}
2252
}, 10_000)
2353

2454
await new Promise<void>((resolve) => {
25-
stream.onAbort(() => {
26-
clearInterval(heartbeat)
27-
GlobalBus.off("event", handler)
28-
resolve()
29-
})
55+
resolveWait = resolve
56+
stream.onAbort(cleanup)
3057
})
3158
})
3259
})

packages/opencode/src/index.ts

Lines changed: 11 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,7 +3,8 @@ import { hideBin } from "yargs/helpers"
33
import { RunCommand } from "./cli/cmd/run"
44
import { GenerateCommand } from "./cli/cmd/generate"
55
import { Log } from "./util/log"
6-
import { AuthCommand } from "./cli/cmd/auth"
6+
import { LoginCommand, LogoutCommand, SwitchCommand, WorkspacesCommand } from "./cli/cmd/account"
7+
import { ProvidersCommand } from "./cli/cmd/providers"
78
import { AgentCommand } from "./cli/cmd/agent"
89
import { UpgradeCommand } from "./cli/cmd/upgrade"
910
import { UninstallCommand } from "./cli/cmd/uninstall"
@@ -27,6 +28,7 @@ import { AcpCommand } from "./cli/cmd/acp"
2728
import { EOL } from "os"
2829
import { WebCommand } from "./cli/cmd/web"
2930
import { PrCommand } from "./cli/cmd/pr"
31+
import { Instance } from "./project/instance"
3032
import { SessionCommand } from "./cli/cmd/session"
3133
import { DbCommand } from "./cli/cmd/db"
3234
import path from "path"
@@ -129,7 +131,11 @@ let cli = yargs(hideBin(process.argv))
129131
.command(RunCommand)
130132
.command(GenerateCommand)
131133
.command(DebugCommand)
132-
.command(AuthCommand)
134+
.command(LoginCommand)
135+
.command(LogoutCommand)
136+
.command(SwitchCommand)
137+
.command(WorkspacesCommand)
138+
.command(ProvidersCommand)
133139
.command(AgentCommand)
134140
.command(UpgradeCommand)
135141
.command(UninstallCommand)
@@ -203,6 +209,9 @@ try {
203209
}
204210
process.exitCode = 1
205211
} finally {
212+
// Clean up instances (PTY, MCP, LSP) before exiting.
213+
// Timeout prevents hanging on unresponsive subprocesses.
214+
await Promise.race([Instance.disposeAll(), new Promise((resolve) => setTimeout(resolve, 5000))])
206215
// Some subprocesses don't react properly to SIGTERM and similar signals.
207216
// Most notably, some docker-container-based MCP servers don't handle such signals unless
208217
// run using `docker run --init`.

packages/opencode/src/lsp/client.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -237,6 +237,8 @@ export namespace LSPClient {
237237
},
238238
async shutdown() {
239239
l.info("shutting down")
240+
diagnostics.clear()
241+
for (const key of Object.keys(files)) delete files[key]
240242
connection.end()
241243
connection.dispose()
242244
input.server.process.kill()

packages/opencode/src/project/instance.ts

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,13 @@ export const Instance = {
6666
state<S>(init: () => S, dispose?: (state: Awaited<S>) => Promise<void>): () => S {
6767
return State.create(() => Instance.directory, init, dispose)
6868
},
69+
debug() {
70+
return {
71+
cacheSize: cache.size,
72+
cacheKeys: [...cache.keys()],
73+
state: State.debug(),
74+
}
75+
},
6976
async dispose() {
7077
Log.Default.info("disposing instance", { directory: Instance.directory })
7178
await State.dispose(Instance.directory)

packages/opencode/src/project/state.ts

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,12 @@ export namespace State {
6767
disposalFinished = true
6868
log.info("state disposal completed", { key })
6969
}
70+
71+
export function debug() {
72+
const result: Record<string, number> = {}
73+
for (const [key, entries] of recordsByKey) {
74+
result[key] = entries.size
75+
}
76+
return { keys: recordsByKey.size, entries: result }
77+
}
7078
}

packages/opencode/src/server/routes/global.ts

Lines changed: 38 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -69,40 +69,59 @@ export const GlobalRoutes = lazy(() =>
6969
c.header("X-Accel-Buffering", "no")
7070
c.header("X-Content-Type-Options", "nosniff")
7171
return streamSSE(c, async (stream) => {
72-
stream.writeSSE({
72+
await stream.writeSSE({
7373
data: JSON.stringify({
7474
payload: {
7575
type: "server.connected",
7676
properties: {},
7777
},
7878
}),
7979
})
80+
81+
let done = false
82+
let resolveWait: () => void = () => {}
83+
let heartbeat: ReturnType<typeof setInterval> | undefined
84+
85+
const cleanup = () => {
86+
if (done) return
87+
done = true
88+
clearInterval(heartbeat)
89+
GlobalBus.off("event", handler)
90+
resolveWait()
91+
log.info("global event disconnected")
92+
}
93+
8094
async function handler(event: any) {
81-
await stream.writeSSE({
82-
data: JSON.stringify(event),
83-
})
95+
if (done) return
96+
try {
97+
await stream.writeSSE({ data: JSON.stringify(event) })
98+
} catch {
99+
cleanup()
100+
}
84101
}
102+
85103
GlobalBus.on("event", handler)
86104

87105
// Send heartbeat every 10s to prevent stalled proxy streams.
88-
const heartbeat = setInterval(() => {
89-
stream.writeSSE({
90-
data: JSON.stringify({
91-
payload: {
92-
type: "server.heartbeat",
93-
properties: {},
94-
},
95-
}),
96-
})
106+
heartbeat = setInterval(async () => {
107+
if (done) return
108+
try {
109+
await stream.writeSSE({
110+
data: JSON.stringify({
111+
payload: {
112+
type: "server.heartbeat",
113+
properties: {},
114+
},
115+
}),
116+
})
117+
} catch {
118+
cleanup()
119+
}
97120
}, 10_000)
98121

99122
await new Promise<void>((resolve) => {
100-
stream.onAbort(() => {
101-
clearInterval(heartbeat)
102-
GlobalBus.off("event", handler)
103-
resolve()
104-
log.info("global event disconnected")
105-
})
123+
resolveWait = resolve
124+
stream.onAbort(cleanup)
106125
})
107126
})
108127
},

0 commit comments

Comments
 (0)