diff --git a/pyproject.toml b/pyproject.toml index da71ce4..9d841d9 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "uipath-runtime" -version = "0.8.3" +version = "0.8.4" description = "Runtime abstractions and interfaces for building agents and automation scripts in the UiPath ecosystem" readme = { file = "README.md", content-type = "text/markdown" } requires-python = ">=3.11" diff --git a/src/uipath/runtime/resumable/runtime.py b/src/uipath/runtime/resumable/runtime.py index 2cf5fda..fd6ea35 100644 --- a/src/uipath/runtime/resumable/runtime.py +++ b/src/uipath/runtime/resumable/runtime.py @@ -16,6 +16,7 @@ UiPathRuntimeResult, UiPathRuntimeStatus, ) +from uipath.runtime.resumable import UiPathResumeTriggerType from uipath.runtime.resumable.protocols import ( UiPathResumableStorageProtocol, UiPathResumeTriggerProtocol, @@ -82,8 +83,11 @@ async def execute( suspension_result = await self._handle_suspension(result) # check if any trigger may be resumed + # api triggers cannot be completed before suspending the job, skip them if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not ( - fired_triggers := await self._restore_resume_input(None) + fired_triggers := await self._restore_resume_input( + None, skip_trigger_types=[UiPathResumeTriggerType.API] + ) ): return suspension_result @@ -115,6 +119,7 @@ async def stream( final_result: UiPathRuntimeResult | None = None execution_completed = False + fired_triggers = None while not execution_completed: async for event in self.delegate.stream(input, options=options): @@ -127,8 +132,12 @@ async def stream( if final_result: suspension_result = await self._handle_suspension(final_result) + # check if any trigger may be resumed + # api triggers cannot be completed before suspending the job, skip them if suspension_result.status != UiPathRuntimeStatus.SUSPENDED or not ( - fired_triggers := await self._restore_resume_input(None) + fired_triggers := await self._restore_resume_input( + None, skip_trigger_types=[UiPathResumeTriggerType.API] + ) ): yield suspension_result execution_completed = True @@ -143,7 +152,9 @@ async def stream( options.resume = True async def _restore_resume_input( - self, input: dict[str, Any] | None + self, + input: dict[str, Any] | None, + skip_trigger_types: list[UiPathResumeTriggerType] | None = None, ) -> dict[str, Any] | None: """Restore resume input from storage if not provided. @@ -180,14 +191,18 @@ async def _restore_resume_input( if not triggers: return None - return await self._build_resume_map(triggers) + return await self._build_resume_map(triggers, skip_trigger_types) async def _build_resume_map( - self, triggers: list[UiPathResumeTrigger] + self, + triggers: list[UiPathResumeTrigger], + skip_trigger_types: list[UiPathResumeTriggerType] | None, ) -> dict[str, Any]: # Build resume map: {interrupt_id: resume_data} resume_map: dict[str, Any] = {} for trigger in triggers: + if skip_trigger_types and trigger.trigger_type in skip_trigger_types: + continue try: data = await self.trigger_manager.read_trigger(trigger) assert trigger.interrupt_id is not None, ( diff --git a/tests/test_resumable.py b/tests/test_resumable.py index cd5712c..a46b7b9 100644 --- a/tests/test_resumable.py +++ b/tests/test_resumable.py @@ -127,7 +127,7 @@ def make_trigger_manager_mock() -> UiPathResumeTriggerProtocol: def create_trigger_impl(data: dict[str, Any]) -> UiPathResumeTrigger: return UiPathResumeTrigger( interrupt_id="", # Will be set by resumable runtime - trigger_type=UiPathResumeTriggerType.API, + trigger_type=UiPathResumeTriggerType.TASK, payload=data, ) @@ -453,3 +453,101 @@ async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: # Delegate should have been executed only once) assert runtime_impl.execution_count == 1 + + @pytest.mark.asyncio + async def test_resumable_skips_api_triggers_on_auto_resume_check(self) -> None: + """API triggers should be skipped when checking for auto-resume after suspension.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Create trigger manager that returns API trigger type + def create_api_trigger(data: dict[str, Any]) -> UiPathResumeTrigger: + return UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=UiPathResumeTriggerType.API, + payload=data, + ) + + trigger_manager.create_trigger = AsyncMock(side_effect=create_api_trigger) # type: ignore + + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + return {"approved": True} + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # Execute - should suspend and NOT auto-resume because they are API triggers + result = await resumable.execute({}) + + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + assert len(result.triggers) == 2 + assert {t.interrupt_id for t in result.triggers} == {"int-1", "int-2"} + + # Verify all triggers are API type + assert all( + t.trigger_type == UiPathResumeTriggerType.API for t in result.triggers + ) + + # Delegate should have been executed only once (no auto-resume) + assert runtime_impl.execution_count == 1 + + @pytest.mark.asyncio + async def test_resumable_auto_resumes_task_triggers_but_not_api_triggers( + self, + ) -> None: + """Mixed triggers: TASK triggers should trigger auto-resume, API triggers should not.""" + + runtime_impl = MultiTriggerMockRuntime() + storage = StatefulStorageMock() + trigger_manager = make_trigger_manager_mock() + + # Create different trigger types: int-1 is TASK, int-2 is API + def create_typed_trigger(data: dict[str, Any]) -> UiPathResumeTrigger: + # Determine trigger type based on payload action + if "approve_branch_1" in str(data): + trigger_type = UiPathResumeTriggerType.TASK + else: + trigger_type = UiPathResumeTriggerType.API + + return UiPathResumeTrigger( + interrupt_id="", # Will be set by resumable runtime + trigger_type=trigger_type, + payload=data, + ) + + trigger_manager.create_trigger = AsyncMock(side_effect=create_typed_trigger) # type: ignore + + # only TASK should trigger auto-resume + async def read_trigger_impl(trigger: UiPathResumeTrigger) -> dict[str, Any]: + return {"approved": True} + + trigger_manager.read_trigger = AsyncMock(side_effect=read_trigger_impl) # type: ignore + + resumable = UiPathResumableRuntime( + delegate=runtime_impl, + storage=storage, + trigger_manager=trigger_manager, + runtime_id="runtime-1", + ) + + # Execute - should auto-resume based on int-1 (TASK) but skip int-2 (API) + result = await resumable.execute({}) + + # Should have auto-resumed once (because of TASK trigger) + assert result.status == UiPathRuntimeStatus.SUSPENDED + assert result.triggers is not None + + # After auto-resume with int-1, should be at second suspension with int-2 and int-3 + assert {t.interrupt_id for t in result.triggers} == {"int-2", "int-3"} + + # Delegate should have been executed twice (initial + auto-resume for TASK trigger) + assert runtime_impl.execution_count == 2 diff --git a/uv.lock b/uv.lock index 51c68dd..9baafab 100644 --- a/uv.lock +++ b/uv.lock @@ -1005,7 +1005,7 @@ wheels = [ [[package]] name = "uipath-runtime" -version = "0.8.3" +version = "0.8.4" source = { editable = "." } dependencies = [ { name = "uipath-core" },