diff --git a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java index e1bacb34d4..d5bc82b6b5 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/DeciderService.java @@ -36,6 +36,7 @@ import com.netflix.conductor.core.execution.mapper.TaskMapper; import com.netflix.conductor.core.execution.mapper.TaskMapperContext; import com.netflix.conductor.core.execution.tasks.SystemTaskRegistry; +import com.netflix.conductor.core.listener.TaskStatusListener; import com.netflix.conductor.core.utils.ExternalPayloadStorageUtils; import com.netflix.conductor.core.utils.IDGenerator; import com.netflix.conductor.core.utils.ParametersUtils; @@ -65,7 +66,7 @@ public class DeciderService { private final MetadataDAO metadataDAO; private final SystemTaskRegistry systemTaskRegistry; private final long taskPendingTimeThresholdMins; - + private final TaskStatusListener taskStatusListener; private final Map taskMappers; public DeciderService( @@ -76,7 +77,8 @@ public DeciderService( SystemTaskRegistry systemTaskRegistry, @Qualifier("taskMappersByTaskType") Map taskMappers, @Value("${conductor.app.taskPendingTimeThreshold:60m}") - Duration taskPendingTimeThreshold) { + Duration taskPendingTimeThreshold, + TaskStatusListener taskStatusListener) { this.idGenerator = idGenerator; this.metadataDAO = metadataDAO; this.parametersUtils = parametersUtils; @@ -84,6 +86,7 @@ public DeciderService( this.externalPayloadStorageUtils = externalPayloadStorageUtils; this.taskPendingTimeThresholdMins = taskPendingTimeThreshold.toMinutes(); this.systemTaskRegistry = systemTaskRegistry; + this.taskStatusListener = taskStatusListener; } public DeciderOutcome decide(WorkflowModel workflow) throws TerminateWorkflowException { @@ -773,10 +776,12 @@ void timeoutTaskWithTimeoutPolicy(String reason, TaskDef taskDef, TaskModel task case RETRY: task.setStatus(TIMED_OUT); task.setReasonForIncompletion(reason); + taskStatusListener.onTaskTimedOut(task); return; case TIME_OUT_WF: task.setStatus(TIMED_OUT); task.setReasonForIncompletion(reason); + taskStatusListener.onTaskTimedOut(task); throw new TerminateWorkflowException(reason, WorkflowModel.Status.TIMED_OUT, task); } } @@ -851,6 +856,7 @@ private void timeoutTask(TaskDef taskDef, TaskModel task) { LOGGER.debug(reason); task.setStatus(TIMED_OUT); task.setReasonForIncompletion(reason); + taskStatusListener.onTaskTimedOut(task); } public List getTasksToBeScheduled(