Skip to content

Commit 255790f

Browse files
authored
fix: Add stream cleanup and retry logic to gRPC error handler (#182)
1 parent cc97f44 commit 255790f

3 files changed

Lines changed: 407 additions & 2 deletions

File tree

packages/durabletask-js/src/worker/task-hub-grpc-worker.ts

Lines changed: 21 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -367,7 +367,7 @@ export class TaskHubGrpcWorker {
367367
});
368368

369369
// Wait for the stream to end or error
370-
stream.on("end", async () => {
370+
stream.on("end", () => {
371371
if (this._stopWorker) {
372372
WorkerLogs.streamEnded(this._logger);
373373
stream.removeAllListeners();
@@ -376,9 +376,14 @@ export class TaskHubGrpcWorker {
376376
}
377377
// Stream ended unexpectedly - clean up and retry
378378
stream.removeAllListeners();
379+
stream.on("error", () => {}); // Prevent unhandled "error" after cleanup
379380
stream.destroy();
380381
WorkerLogs.streamRetry(this._logger, this._backoff.peekNextDelay());
381-
await this._createNewClientAndRetry();
382+
this._createNewClientAndRetry().catch((retryErr) => {
383+
if (!this._stopWorker) {
384+
WorkerLogs.workerError(this._logger, retryErr instanceof Error ? retryErr : new Error(String(retryErr)));
385+
}
386+
});
382387
});
383388

384389
stream.on("error", (err: Error) => {
@@ -387,6 +392,20 @@ export class TaskHubGrpcWorker {
387392
return;
388393
}
389394
WorkerLogs.streamErrorInfo(this._logger, err);
395+
396+
// Clean up the errored stream and retry the connection.
397+
// In Node.js, gRPC stream errors (e.g., UNAVAILABLE, transport failures)
398+
// may not always be followed by an "end" event. Without recovery here,
399+
// the worker would silently stop processing work items.
400+
stream.removeAllListeners();
401+
stream.on("error", () => {}); // Prevent unhandled "error" after cleanup
402+
stream.destroy();
403+
WorkerLogs.streamRetry(this._logger, this._backoff.peekNextDelay());
404+
this._createNewClientAndRetry().catch((retryErr) => {
405+
if (!this._stopWorker) {
406+
WorkerLogs.workerError(this._logger, retryErr instanceof Error ? retryErr : new Error(String(retryErr)));
407+
}
408+
});
390409
});
391410
} catch (err) {
392411
if (this._stopWorker) {
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
4+
/**
5+
* Tests that the TaskHubGrpcWorker correctly recovers when the gRPC work-item
6+
* stream emits an "error" event without a subsequent "end" event.
7+
*
8+
* This validates the fix for a bug where the stream "error" handler only logged
9+
* the error but did not clean up the stream or retry the connection — causing
10+
* the worker to silently stop processing work items after transport-level
11+
* failures (e.g., UNAVAILABLE, network disconnections).
12+
*/
13+
14+
import { EventEmitter } from "events";
15+
import { TaskHubGrpcWorker } from "../src/worker/task-hub-grpc-worker";
16+
import { NoOpLogger } from "../src/types/logger.type";
17+
import { GrpcClient } from "../src/client/client-grpc";
18+
19+
/**
20+
* Creates a mock GrpcClient whose `hello` call succeeds immediately
21+
* and whose `getWorkItems` returns a controllable EventEmitter stream.
22+
*/
23+
function createMockClient(): {
24+
client: GrpcClient;
25+
mockStream: EventEmitter & { destroy: jest.Mock; cancel: jest.Mock };
26+
} {
27+
const mockStream = new EventEmitter() as EventEmitter & {
28+
destroy: jest.Mock;
29+
cancel: jest.Mock;
30+
};
31+
mockStream.destroy = jest.fn();
32+
mockStream.cancel = jest.fn();
33+
34+
const stub = {
35+
hello: (_req: any, _metadata: any, callback: (err: any, res: any) => void) => {
36+
callback(null, {});
37+
return {} as any;
38+
},
39+
getWorkItems: jest.fn().mockReturnValue(mockStream),
40+
};
41+
42+
const client = { stub } as unknown as GrpcClient;
43+
return { client, mockStream };
44+
}
45+
46+
/** Flush the microtask / next-tick queue so async event handlers complete. */
47+
function flushAsync(): Promise<void> {
48+
return new Promise((resolve) => setImmediate(resolve));
49+
}
50+
51+
describe("Worker Stream Recovery", () => {
52+
it("should retry connection after a stream error event", async () => {
53+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
54+
const { client, mockStream } = createMockClient();
55+
56+
// Prevent actual reconnection — just record that it was attempted
57+
const retryMock = jest.fn().mockResolvedValue(undefined);
58+
(worker as any)._createNewClientAndRetry = retryMock;
59+
60+
// Start the worker's internal run (sets up stream event handlers)
61+
await worker.internalRunWorker(client);
62+
63+
// Simulate a transport-level error with no subsequent "end" event
64+
mockStream.emit("error", new Error("14 UNAVAILABLE: Connection lost"));
65+
await flushAsync();
66+
67+
// The worker must clean up the stream and attempt to reconnect
68+
expect(mockStream.destroy).toHaveBeenCalled();
69+
expect(retryMock).toHaveBeenCalledTimes(1);
70+
});
71+
72+
it("should not retry when the worker is being stopped", async () => {
73+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
74+
const { client, mockStream } = createMockClient();
75+
76+
const retryMock = jest.fn().mockResolvedValue(undefined);
77+
(worker as any)._createNewClientAndRetry = retryMock;
78+
79+
await worker.internalRunWorker(client);
80+
81+
// Signal that the worker is shutting down
82+
(worker as any)._stopWorker = true;
83+
84+
mockStream.emit("error", new Error("1 CANCELLED"));
85+
await flushAsync();
86+
87+
// During shutdown, errors are silently ignored — no retry
88+
expect(retryMock).not.toHaveBeenCalled();
89+
expect(mockStream.destroy).not.toHaveBeenCalled();
90+
});
91+
92+
it("should remove all stream listeners during error recovery", async () => {
93+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
94+
const { client, mockStream } = createMockClient();
95+
96+
const retryMock = jest.fn().mockResolvedValue(undefined);
97+
(worker as any)._createNewClientAndRetry = retryMock;
98+
99+
await worker.internalRunWorker(client);
100+
101+
// Capture listener counts before error
102+
const dataListenersBefore = mockStream.listenerCount("data");
103+
expect(dataListenersBefore).toBeGreaterThan(0);
104+
105+
mockStream.emit("error", new Error("14 UNAVAILABLE: Connection lost"));
106+
await flushAsync();
107+
108+
// After recovery, all original listeners should be removed
109+
// (only a no-op error guard remains)
110+
expect(mockStream.listenerCount("data")).toBe(0);
111+
expect(mockStream.listenerCount("end")).toBe(0);
112+
});
113+
114+
it("should not crash if a stale error event fires after recovery cleanup", async () => {
115+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
116+
const { client, mockStream } = createMockClient();
117+
118+
const retryMock = jest.fn().mockResolvedValue(undefined);
119+
(worker as any)._createNewClientAndRetry = retryMock;
120+
121+
await worker.internalRunWorker(client);
122+
123+
// First error triggers recovery
124+
mockStream.emit("error", new Error("14 UNAVAILABLE: Connection lost"));
125+
await flushAsync();
126+
127+
// A stale/duplicate error event must not throw (no-op handler remains)
128+
expect(() => {
129+
mockStream.emit("error", new Error("Stale error after cleanup"));
130+
}).not.toThrow();
131+
});
132+
133+
it("should recover via the end handler when end fires without error", async () => {
134+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
135+
const { client, mockStream } = createMockClient();
136+
137+
const retryMock = jest.fn().mockResolvedValue(undefined);
138+
(worker as any)._createNewClientAndRetry = retryMock;
139+
140+
await worker.internalRunWorker(client);
141+
142+
// Simulate a clean stream end (no error)
143+
mockStream.emit("end");
144+
await flushAsync();
145+
146+
// The "end" handler should also trigger recovery
147+
expect(mockStream.destroy).toHaveBeenCalled();
148+
expect(retryMock).toHaveBeenCalledTimes(1);
149+
});
150+
151+
it("should not crash if _createNewClientAndRetry rejects during error recovery", async () => {
152+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
153+
const { client, mockStream } = createMockClient();
154+
155+
// Simulate a retry that throws — must not become an unhandled rejection
156+
const retryMock = jest.fn().mockRejectedValue(new Error("Retry failed"));
157+
(worker as any)._createNewClientAndRetry = retryMock;
158+
159+
await worker.internalRunWorker(client);
160+
161+
// Should not throw or cause unhandled promise rejection
162+
mockStream.emit("error", new Error("14 UNAVAILABLE: Connection lost"));
163+
await flushAsync();
164+
165+
expect(retryMock).toHaveBeenCalledTimes(1);
166+
expect(mockStream.destroy).toHaveBeenCalled();
167+
});
168+
169+
it("should not crash if _createNewClientAndRetry rejects during end recovery", async () => {
170+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
171+
const { client, mockStream } = createMockClient();
172+
173+
// Simulate a retry that throws — must not become an unhandled rejection
174+
const retryMock = jest.fn().mockRejectedValue(new Error("Retry failed"));
175+
(worker as any)._createNewClientAndRetry = retryMock;
176+
177+
await worker.internalRunWorker(client);
178+
179+
// Should not throw or cause unhandled promise rejection
180+
mockStream.emit("end");
181+
await flushAsync();
182+
183+
expect(retryMock).toHaveBeenCalledTimes(1);
184+
expect(mockStream.destroy).toHaveBeenCalled();
185+
});
186+
187+
it("should also add no-op error guard in end handler to prevent crashes after cleanup", async () => {
188+
const worker = new TaskHubGrpcWorker({ logger: new NoOpLogger() });
189+
const { client, mockStream } = createMockClient();
190+
191+
const retryMock = jest.fn().mockResolvedValue(undefined);
192+
(worker as any)._createNewClientAndRetry = retryMock;
193+
194+
await worker.internalRunWorker(client);
195+
196+
// End fires → cleanup removes all listeners
197+
mockStream.emit("end");
198+
await flushAsync();
199+
200+
// A stale error after end cleanup must not crash
201+
expect(() => {
202+
mockStream.emit("error", new Error("Stale error after end cleanup"));
203+
}).not.toThrow();
204+
205+
// The no-op guard should remain
206+
expect(mockStream.listenerCount("error")).toBe(1);
207+
});
208+
});

0 commit comments

Comments
 (0)