diff --git a/packages/durabletask-js/src/worker/activity-executor.ts b/packages/durabletask-js/src/worker/activity-executor.ts index 10ba9a9..f4ad151 100644 --- a/packages/durabletask-js/src/worker/activity-executor.ts +++ b/packages/durabletask-js/src/worker/activity-executor.ts @@ -32,10 +32,14 @@ export class ActivityExecutor { // Log activity start (EventId 603) WorkerLogs.activityStarted(this._logger, orchestrationId, name); - const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined; const ctx = new ActivityContext(orchestrationId, taskId); try { + // Deserialize the input inside the try-catch so that malformed JSON + // is reported through the same activityFailed log path (EventId 605) + // as any other activity execution error. + const activityInput = encodedInput ? JSON.parse(encodedInput) : undefined; + // Execute the activity function let activityOutput = fn(ctx, activityInput); diff --git a/packages/durabletask-js/test/activity_executor.spec.ts b/packages/durabletask-js/test/activity_executor.spec.ts index cbf8236..bafe8ee 100644 --- a/packages/durabletask-js/test/activity_executor.spec.ts +++ b/packages/durabletask-js/test/activity_executor.spec.ts @@ -53,12 +53,62 @@ describe("Activity Executor", () => { expect(caughtException).not.toBeNull(); expect(caughtException?.message).toMatch(/Bogus/); }); + + it("should throw and log activityFailed when input is malformed JSON", async () => { + const testActivity = (_: ActivityContext, input: any) => { + return input; + }; + + const loggerSpy = createSpyLogger(); + const [executor, name] = getActivityExecutor(testActivity, loggerSpy); + + const malformedJson = "{not valid json"; + + await expect(executor.execute(TEST_INSTANCE_ID, name, TEST_TASK_ID, malformedJson)) + .rejects.toThrow(SyntaxError); + + // Verify the activityFailed log (EventId 605) was emitted + expect(loggerSpy.error).toHaveBeenCalled(); + const errorCall = loggerSpy.error.mock.calls[0][0]; + expect(errorCall).toContain(name); + expect(errorCall).toContain(TEST_INSTANCE_ID); + expect(errorCall).toContain("failed"); + }); + + it("should handle undefined input without error", async () => { + const testActivity = (_: ActivityContext, input: any) => { + return input; + }; + + const [executor, name] = getActivityExecutor(testActivity); + const result = await executor.execute(TEST_INSTANCE_ID, name, TEST_TASK_ID, undefined); + expect(result).toBeUndefined(); + }); + + it("should handle empty string input without error", async () => { + const testActivity = (_: ActivityContext, input: any) => { + return input; + }; + + const [executor, name] = getActivityExecutor(testActivity); + const result = await executor.execute(TEST_INSTANCE_ID, name, TEST_TASK_ID, ""); + expect(result).toBeUndefined(); + }); }); // Activity = Callable[[ActivityContext, TInput], TOutput] -function getActivityExecutor(fn: TActivity): [ActivityExecutor, string] { +function getActivityExecutor(fn: TActivity, logger?: any): [ActivityExecutor, string] { const registry = new Registry(); const name = registry.addActivity(fn); - const executor = new ActivityExecutor(registry, testLogger); + const executor = new ActivityExecutor(registry, logger ?? testLogger); return [executor, name]; } + +function createSpyLogger() { + return { + error: jest.fn(), + warn: jest.fn(), + info: jest.fn(), + debug: jest.fn(), + }; +}