Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath-runtime"
version = "0.8.3"
version = "0.8.4"
description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem"
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
25 changes: 20 additions & 5 deletions src/uipath/runtime/resumable/runtime.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
UiPathRuntimeResult,
UiPathRuntimeStatus,
)
from uipath.runtime.resumable import UiPathResumeTriggerType
from uipath.runtime.resumable.protocols import (
UiPathResumableStorageProtocol,
UiPathResumeTriggerProtocol,
Expand Down Expand Up @@ -82,8 +83,11 @@ async def execute(
suspension_result = await self._handle_suspension(result)

# check if any trigger may be resumed
# api triggers cannot be completed before suspending the job, skip them
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
fired_triggers := await self._restore_resume_input(None)
fired_triggers := await self._restore_resume_input(
None, skip_trigger_types=[UiPathResumeTriggerType.API]
)
):
return suspension_result

Expand Down Expand Up @@ -115,6 +119,7 @@ async def stream(

final_result: UiPathRuntimeResult | None = None
execution_completed = False
fired_triggers = None

while not execution_completed:
async for event in self.delegate.stream(input, options=options):
Expand All @@ -127,8 +132,12 @@ async def stream(
if final_result:
suspension_result = await self._handle_suspension(final_result)

# check if any trigger may be resumed
# api triggers cannot be completed before suspending the job, skip them
if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not (
fired_triggers := await self._restore_resume_input(None)
fired_triggers := await self._restore_resume_input(
None, skip_trigger_types=[UiPathResumeTriggerType.API]
)
):
yield suspension_result
execution_completed = True
Expand All @@ -143,7 +152,9 @@ async def stream(
options.resume = True

async def _restore_resume_input(
self, input: dict[str, Any] | None
self,
input: dict[str, Any] | None,
skip_trigger_types: list[UiPathResumeTriggerType] | None = None,
) -> dict[str, Any] | None:
"""Restore resume input from storage if not provided.

Expand Down Expand Up @@ -180,14 +191,18 @@ async def _restore_resume_input(
if not triggers:
return None

return await self._build_resume_map(triggers)
return await self._build_resume_map(triggers, skip_trigger_types)

async def _build_resume_map(
self, triggers: list[UiPathResumeTrigger]
self,
triggers: list[UiPathResumeTrigger],
skip_trigger_types: list[UiPathResumeTriggerType] | None,
) -> dict[str, Any]:
# Build resume map: {interrupt_id: resume_data}
resume_map: dict[str, Any] = {}
for trigger in triggers:
if skip_trigger_types and trigger.trigger_type in skip_trigger_types:
continue
try:
data = await self.trigger_manager.read_trigger(trigger)
assert trigger.interrupt_id is not None, (
Expand Down
100 changes: 99 additions & 1 deletion tests/test_resumable.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ def make_trigger_manager_mock() -> UiPathResumeTriggerProtocol:
def create_trigger_impl(data: dict[str, Any]) -> UiPathResumeTrigger:
return UiPathResumeTrigger(
interrupt_id="", # Will be set by resumable runtime
trigger_type=UiPathResumeTriggerType.API,
trigger_type=UiPathResumeTriggerType.TASK,
payload=data,
)

Expand Down Expand Up @@ -453,3 +453,101 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:

# Delegate should have been executed only once)
assert runtime_impl.execution_count == 1

@pytest.mark.asyncio
async def test_resumable_skips_api_triggers_on_auto_resume_check(self) -> None:
"""API triggers should be skipped when checking for auto-resume after suspension."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

# Create trigger manager that returns API trigger type
def create_api_trigger(data: dict[str, Any]) -> UiPathResumeTrigger:
return UiPathResumeTrigger(
interrupt_id="", # Will be set by resumable runtime
trigger_type=UiPathResumeTriggerType.API,
payload=data,
)

trigger_manager.create_trigger = AsyncMock(side_effect=create_api_trigger) # type: ignore

async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
return {"approved": True}

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# Execute - should suspend and NOT auto-resume because they are API triggers
result = await resumable.execute({})

assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None
assert len(result.triggers) == 2
assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"}

# Verify all triggers are API type
assert all(
t.trigger_type == UiPathResumeTriggerType.API for t in result.triggers
)

# Delegate should have been executed only once (no auto-resume)
assert runtime_impl.execution_count == 1

@pytest.mark.asyncio
async def test_resumable_auto_resumes_task_triggers_but_not_api_triggers(
self,
) -> None:
"""Mixed triggers: TASK triggers should trigger auto-resume, API triggers should not."""

runtime_impl = MultiTriggerMockRuntime()
storage = StatefulStorageMock()
trigger_manager = make_trigger_manager_mock()

# Create different trigger types: int-1 is TASK, int-2 is API
def create_typed_trigger(data: dict[str, Any]) -> UiPathResumeTrigger:
# Determine trigger type based on payload action
if "approve_branch_1" in str(data):
trigger_type = UiPathResumeTriggerType.TASK
else:
trigger_type = UiPathResumeTriggerType.API

return UiPathResumeTrigger(
interrupt_id="", # Will be set by resumable runtime
trigger_type=trigger_type,
payload=data,
)

trigger_manager.create_trigger = AsyncMock(side_effect=create_typed_trigger) # type: ignore

# only TASK should trigger auto-resume
async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]:
return {"approved": True}

trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore

resumable = UiPathResumableRuntime(
delegate=runtime_impl,
storage=storage,
trigger_manager=trigger_manager,
runtime_id="runtime-1",
)

# Execute - should auto-resume based on int-1 (TASK) but skip int-2 (API)
result = await resumable.execute({})

# Should have auto-resumed once (because of TASK trigger)
assert result.status == UiPathRuntimeStatus.SUSPENDED
assert result.triggers is not None

# After auto-resume with int-1, should be at second suspension with int-2 and int-3
assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"}

# Delegate should have been executed twice (initial + auto-resume for TASK trigger)
assert runtime_impl.execution_count == 2
2 changes: 1 addition & 1 deletion uv.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.