Skip to content

Commit c18578f

Browse files
committed
fix: send continueMessage after compaction completes
1 parent db3966d commit c18578f

File tree

3 files changed

+180
-59
lines changed

3 files changed

+180
-59
lines changed

src/node/services/agentSession.ts

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1099,6 +1099,16 @@ export class AgentSession {
10991099
return;
11001100
}
11011101

1102+
// If a compaction operation is active, do NOT auto-send queued messages.
1103+
//
1104+
// Why: if an earlier stream (the one we interrupted to start compaction) emits a late
1105+
// stream-end/tool-call-end event, it can trigger sendQueuedMessages() while compaction is
1106+
// still streaming. That can cause the queued continue message to be sent too early and
1107+
// then wiped by the compaction history replacement.
1108+
if (this.activeCompactionOperation) {
1109+
return;
1110+
}
1111+
11021112
// Clear the queued message flag (even if queue is empty, to handle race conditions)
11031113
this.backgroundProcessManager.setMessageQueued(this.workspaceId, false);
11041114

tests/ipc/compactHistory.test.ts

Lines changed: 140 additions & 59 deletions
Original file line numberDiff line numberDiff line change
@@ -1,22 +1,24 @@
11
/**
22
* compactHistory integration tests.
33
*
4-
* Ensures compaction is a control-plane operation (not a slash-command string), and that
5-
* history is replaced only on successful compaction completion.
4+
* Ensures compaction is a control-plane operation (not a slash-command string), and that:
5+
* - History is replaced only on successful compaction completion
6+
* - continueMessage is auto-sent after compaction completes
67
*
78
* Requirements:
8-
* - Uses the Haiku model for both normal messages and compaction
9-
* - Builds history by sending messages (replicates user behavior)
9+
* - Uses the Haiku model for both compaction and the follow-up continue message
10+
* - Seeds history via HistoryService (test-only) to avoid extra API calls
1011
*/
1112

13+
import type { WorkspaceChatMessage } from "@/common/orpc/types";
1214
import { shouldRunIntegrationTests, validateApiKeys } from "./setup";
1315
import {
1416
createSharedRepo,
1517
cleanupSharedRepo,
1618
withSharedWorkspace,
1719
configureTestRetries,
1820
} from "./sendMessageTestHelpers";
19-
import { assertStreamSuccess, modelString, sendMessageWithModel } from "./helpers";
21+
import { modelString, seedHistoryMessages } from "./helpers";
2022
import { KNOWN_MODELS } from "../../src/common/constants/knownModels";
2123

2224
// Skip all tests if TEST_INTEGRATION is not set
@@ -29,47 +31,78 @@ if (shouldRunIntegrationTests()) {
2931
beforeAll(createSharedRepo);
3032
afterAll(cleanupSharedRepo);
3133

34+
function getTextFromMessageParts(message: {
35+
parts?: Array<{ type: string; text?: string }>;
36+
}): string {
37+
return (
38+
message.parts
39+
?.filter((part) => part.type === "text")
40+
.map((part) => part.text ?? "")
41+
.join("") ?? ""
42+
);
43+
}
44+
45+
async function waitForMatchingEvent(
46+
collector: { getEvents: () => WorkspaceChatMessage[] },
47+
predicate: (event: WorkspaceChatMessage) => boolean,
48+
timeoutMs: number
49+
): Promise<WorkspaceChatMessage | null> {
50+
const start = Date.now();
51+
while (Date.now() - start < timeoutMs) {
52+
const match = collector.getEvents().find(predicate);
53+
if (match) {
54+
return match;
55+
}
56+
await new Promise((resolve) => setTimeout(resolve, 25));
57+
}
58+
return null;
59+
}
60+
3261
describeIntegration("compactHistory integration tests", () => {
3362
configureTestRetries(3);
3463

3564
test.concurrent(
36-
"should compact history using Haiku for both messages + compaction",
65+
"should compact history and then auto-send continueMessage",
3766
async () => {
3867
await withSharedWorkspace("anthropic", async ({ env, workspaceId, collector }) => {
3968
const haiku = modelString("anthropic", KNOWN_MODELS.HAIKU.providerModelId);
4069

41-
// Build history via normal user interactions.
42-
collector.clear();
43-
44-
const message1 =
45-
"You are helping me plan a small refactor. Explain, in a few sentences, what the risks are when refactoring code without tests.";
46-
const result1 = await sendMessageWithModel(env, workspaceId, message1, haiku);
47-
expect(result1.success).toBe(true);
48-
const streamEnd1 = await collector.waitForEvent("stream-end", 20000);
49-
expect(streamEnd1).not.toBeNull();
50-
expect((streamEnd1 as { metadata: { model?: string } }).metadata.model).toBe(haiku);
51-
assertStreamSuccess(collector);
70+
// Seed history quickly (test-only) to avoid extra API calls.
71+
const seededIds = await seedHistoryMessages(workspaceId, env.config, [
72+
{
73+
id: "seed-user-0",
74+
role: "user",
75+
content: "Context: we are discussing a small code refactor.",
76+
},
77+
{
78+
id: "seed-assistant-0",
79+
role: "assistant",
80+
content: "Acknowledged. I will help.",
81+
},
82+
{
83+
id: "seed-user-1",
84+
role: "user",
85+
content: "Please keep responses short and practical.",
86+
},
87+
{
88+
id: "seed-assistant-1",
89+
role: "assistant",
90+
content: "Understood.",
91+
},
92+
]);
5293

5394
collector.clear();
5495

55-
const message2 =
56-
"Now list three concrete steps I should take to refactor safely. Include enough detail that it would be useful in a code review.";
57-
const result2 = await sendMessageWithModel(env, workspaceId, message2, haiku);
58-
expect(result2.success).toBe(true);
59-
const streamEnd2 = await collector.waitForEvent("stream-end", 20000);
60-
expect(streamEnd2).not.toBeNull();
61-
expect((streamEnd2 as { metadata: { model?: string } }).metadata.model).toBe(haiku);
62-
assertStreamSuccess(collector);
63-
64-
collector.clear();
96+
const continueText = "Continue: reply with exactly 'OK'.";
6597

66-
// Trigger compaction explicitly via the control-plane API.
98+
// Trigger compaction via the control-plane API.
6799
const compactResult = await env.orpc.workspace.compactHistory({
68100
workspaceId,
69101
model: haiku,
70102
maxOutputTokens: 800,
71103
source: "user",
72104
interrupt: "none",
105+
continueMessage: { text: continueText },
73106
sendMessageOptions: {
74107
model: haiku,
75108
thinkingLevel: "off",
@@ -81,41 +114,89 @@ describeIntegration("compactHistory integration tests", () => {
81114
throw new Error(String(compactResult.error));
82115
}
83116

84-
// Ensure this stream is actually the compaction stream.
85-
const streamStart = await collector.waitForEvent("stream-start", 20000);
86-
expect(streamStart).not.toBeNull();
87-
const compactionMessageId = (streamStart as { messageId: string }).messageId;
88-
89-
const streamEnd = await collector.waitForEvent("stream-end", 30000);
90-
expect(streamEnd).not.toBeNull();
91-
expect((streamEnd as { messageId: string }).messageId).toBe(compactionMessageId);
92-
expect((streamEnd as { metadata: { model?: string } }).metadata.model).toBe(haiku);
93-
assertStreamSuccess(collector);
94-
95-
// The compaction handler emits a single summary message + delete event.
96-
const deleteEvent = collector.getEvents().find((e) => e.type === "delete");
97-
expect(deleteEvent).toBeDefined();
98-
99-
const summaryMessage = collector
100-
.getEvents()
101-
.find((e) => e.type === "message" && e.role === "assistant" && e.metadata?.compacted);
102-
expect(summaryMessage).toBeDefined();
103-
expect((summaryMessage as { metadata?: { model?: string } }).metadata?.model).toBe(haiku);
104-
105-
// Verify persisted history was replaced (user behavior: reload workspace).
117+
// Wait for compaction stream to start + end.
118+
const compactionStreamStart = await collector.waitForEvent("stream-start", 20000);
119+
expect(compactionStreamStart).not.toBeNull();
120+
const compactionMessageId = (compactionStreamStart as { messageId: string }).messageId;
121+
122+
const compactionStreamEnd = await waitForMatchingEvent(
123+
collector,
124+
(e) =>
125+
e.type === "stream-end" &&
126+
(e as { messageId?: string }).messageId === compactionMessageId,
127+
45000
128+
);
129+
expect(compactionStreamEnd).not.toBeNull();
130+
expect((compactionStreamEnd as { metadata: { model?: string } }).metadata.model).toBe(
131+
haiku
132+
);
133+
134+
// Compaction should emit delete + summary message.
135+
const deleteEvent = await waitForMatchingEvent(
136+
collector,
137+
(e) => e.type === "delete",
138+
10000
139+
);
140+
expect(deleteEvent).not.toBeNull();
141+
142+
const summaryMessage = await waitForMatchingEvent(
143+
collector,
144+
(e) => e.type === "message" && e.role === "assistant" && Boolean(e.metadata?.compacted),
145+
10000
146+
);
147+
expect(summaryMessage).not.toBeNull();
148+
149+
// Continue message should be persisted as a user message and then streamed.
150+
const continueUserMessage = await waitForMatchingEvent(
151+
collector,
152+
(e) =>
153+
e.type === "message" &&
154+
e.role === "user" &&
155+
getTextFromMessageParts(e) === continueText,
156+
20000
157+
);
158+
expect(continueUserMessage).not.toBeNull();
159+
160+
const continueStreamStart = await waitForMatchingEvent(
161+
collector,
162+
(e) =>
163+
e.type === "stream-start" &&
164+
(e as { messageId?: string }).messageId !== compactionMessageId &&
165+
(e as { metadata?: { model?: string } }).metadata?.model === haiku,
166+
20000
167+
);
168+
expect(continueStreamStart).not.toBeNull();
169+
170+
const continueMessageId = (continueStreamStart as { messageId: string }).messageId;
171+
const continueStreamEnd = await waitForMatchingEvent(
172+
collector,
173+
(e) =>
174+
e.type === "stream-end" &&
175+
(e as { messageId?: string }).messageId === continueMessageId,
176+
45000
177+
);
178+
expect(continueStreamEnd).not.toBeNull();
179+
180+
// Verify persisted history:
181+
// - seeded messages were removed
182+
// - summary exists
183+
// - continue message exists
106184
const replay = await env.orpc.workspace.getFullReplay({ workspaceId });
107185
const replayMessages = replay.filter((m) => m.type === "message");
108186

109-
// After compaction we should only have a single assistant summary message.
110-
expect(replayMessages).toHaveLength(1);
111-
expect(replayMessages[0].role).toBe("assistant");
112-
expect(replayMessages[0].metadata?.compacted).toBeDefined();
113-
expect(replayMessages[0].metadata?.model).toBe(haiku);
187+
for (const id of seededIds) {
188+
expect(replayMessages.some((m) => m.id === id)).toBe(false);
189+
}
190+
191+
const summaryIndex = replayMessages.findIndex(
192+
(m) => m.role === "assistant" && Boolean(m.metadata?.compacted)
193+
);
194+
expect(summaryIndex).toBe(0);
114195

115-
// Sanity check: original user prompt text should not be present after replacement.
116-
const replayText = JSON.stringify(replayMessages[0]);
117-
expect(replayText).not.toContain("refactoring code without tests");
118-
expect(replayText).not.toContain("three concrete steps");
196+
const continueIndex = replayMessages.findIndex(
197+
(m) => m.role === "user" && getTextFromMessageParts(m) === continueText
198+
);
199+
expect(continueIndex).toBeGreaterThan(summaryIndex);
119200
});
120201
},
121202
90000

tests/ipc/helpers.ts

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -590,6 +590,36 @@ export async function cleanupTempGitRepo(repoPath: string): Promise<void> {
590590
console.warn(`Failed to cleanup temp git repo after ${maxRetries} attempts:`, lastError);
591591
}
592592

593+
/**
594+
* Seed a workspace history with explicit messages.
595+
*
596+
* Test-only: uses HistoryService directly to populate chat.jsonl without making API calls.
597+
* Real application code should NEVER bypass IPC like this.
598+
*/
599+
export async function seedHistoryMessages(
600+
workspaceId: string,
601+
config: { getSessionDir: (id: string) => string },
602+
messages: Array<{ id?: string; role: "user" | "assistant"; content: string }>
603+
): Promise<string[]> {
604+
// HistoryService only needs getSessionDir, so we can cast the partial config.
605+
const historyService = new HistoryService(config as any);
606+
607+
const ids: string[] = [];
608+
for (let i = 0; i < messages.length; i++) {
609+
const entry = messages[i];
610+
const id = entry.id ?? `seed-msg-${i}`;
611+
ids.push(id);
612+
613+
const message = createMuxMessage(id, entry.role, entry.content, {});
614+
const result = await historyService.appendToHistory(workspaceId, message);
615+
if (!result.success) {
616+
throw new Error(`Failed to append history message ${i} (${id}): ${result.error}`);
617+
}
618+
}
619+
620+
return ids;
621+
}
622+
593623
/**
594624
* Build large conversation history to test context limits
595625
*

0 commit comments

Comments
 (0)