Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "uipath"
version = "2.8.28"
version = "2.8.29"
description = "Python SDK and CLI for UiPath Platform, enabling programmatic interaction with automation services, process management, and deployment tools."
readme = { file = "README.md", content-type = "text/markdown" }
requires-python = ">=3.11"
Expand Down
201 changes: 175 additions & 26 deletions src/uipath/functions/debug.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
from __future__ import annotations

import asyncio
import contextvars
import json
import logging
import os
Expand All @@ -35,6 +36,7 @@
UiPathRuntimeResult,
UiPathStreamOptions,
)
from uipath.runtime.events import UiPathRuntimeStateEvent
from uipath.runtime.schema import UiPathRuntimeSchema

logger = logging.getLogger(__name__)
Expand All @@ -53,7 +55,9 @@ def _capture_frame_locals(frame: FrameType) -> dict[str, Any]:
if isinstance(value, (bool, int, float, str, type(None))):
snapshot[name] = value
elif isinstance(value, (dict, list, tuple)):
json.dumps(value, default=str) # serialisability probe
# Strict probe — no default=str, so nested non-serialisable
# objects (code, frame, etc.) correctly fail here.
json.dumps(value)
snapshot[name] = value
else:
snapshot[name] = repr(value)
Expand Down Expand Up @@ -96,8 +100,18 @@ def __init__(
project_dir: str,
breakpoints: list[str] | Literal["*"],
entrypoint_path: str | None = None,
state_tracked_functions: dict[str, set[str]] | None = None,
) -> None:
"""Initialize the controller with project directory, breakpoints, and optional entrypoint path."""
"""Initialize the controller.

Parameters
----------
state_tracked_functions:
If provided, state events are emitted *only* for function calls
whose ``(abs_file_path, func_name)`` appears in this mapping
(``{abs_path: {func_name, …}, …}``). When ``None``, no state
events are emitted.
"""
self._project_dir = project_dir
self._entrypoint_path = (
os.path.abspath(entrypoint_path) if entrypoint_path else None
Expand All @@ -107,11 +121,17 @@ def __init__(
if isinstance(breakpoints, list):
self._parse_breakpoints(breakpoints)

self._state_tracked: dict[str, set[str]] | None = state_tracked_functions
self._events: queue.Queue[tuple[str, Any]] = queue.Queue()
self._resume_event = threading.Event()
self._stopped = False
self._thread: threading.Thread | None = None
self._abspath_cache: dict[str, str] = {}
# Track which breakpoints already fired per frame so that
# multiline expressions (where the bytecodes bounce back to the
# call line after evaluating arguments on deeper lines) don't
# trigger the same breakpoint twice within one function call.
self._hit_lines: dict[int, set[int]] = {} # frame-id → {lines}

# Breakpoint management

Expand Down Expand Up @@ -160,36 +180,88 @@ def _abspath(self, path: str) -> str:
return result

def _is_project_file(self, abspath: str) -> bool:
"""Return *True* for files under the project directory that are not vendored."""
return abspath.startswith(self._project_dir) and "site-packages" not in abspath
"""Return *True* for real .py files under the project directory."""
return (
abspath.endswith(".py")
and abspath.startswith(self._project_dir)
and "site-packages" not in abspath
)

def _is_tracked_function(self, abspath: str, func_name: str) -> bool:
"""Return *True* if this function should produce a state event."""
if self._state_tracked is None:
return False
funcs = self._state_tracked.get(abspath)
return funcs is not None and func_name in funcs

def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Any:
"""sys.settrace callback — dispatched for every frame event."""
if self._stopped:
return None

try:
filepath = self._abspath(frame.f_code.co_filename)
co_filename = frame.f_code.co_filename
# Fast reject: frozen/built-in modules never have a dot-py path
if co_filename.startswith("<"):
return None

filepath = self._abspath(co_filename)

if event == "call":
is_project = self._is_project_file(filepath)

# Emit state event only for tracked graph-node functions
if is_project and self._is_tracked_function(
filepath, frame.f_code.co_name
):
self._events.put(
(
"state",
{
"file": filepath,
"line": frame.f_lineno,
"function": frame.f_code.co_name,
"locals": _capture_frame_locals(frame),
},
)
)

# Reset per-frame hit tracking so each call starts fresh.
self._hit_lines[id(frame)] = set()

# Decide whether to trace *into* this function's frame.
if self._step_mode:
return (
self._trace_callback
if self._is_project_file(filepath)
else None
)
return (
self._trace_callback if filepath in self._file_breakpoints else None
)
return self._trace_callback if is_project else None
if filepath in self._file_breakpoints:
return self._trace_callback
# Also trace project files that contain tracked functions
if is_project and filepath in (self._state_tracked or {}):
return self._trace_callback
return None

if event == "line":
# Skip module-level lines (imports, class/function defs).
# These fire during module loading, not user code execution.
if frame.f_code.co_name == "<module>":
return self._trace_callback

lineno = frame.f_lineno
should_break = (
self._step_mode and self._is_project_file(filepath)
) or (lineno in self._file_breakpoints.get(filepath, ()))

if should_break:
# Deduplicate: multiline expressions (e.g.
# ``return Foo(arg=bar(...))``) cause the bytecode to
# bounce back to the call-site line after evaluating
# arguments on deeper lines. Without this guard the
# same breakpoint would fire twice per call.
frame_hits = self._hit_lines.get(id(frame))
if frame_hits is not None and lineno in frame_hits:
return self._trace_callback
if frame_hits is not None:
frame_hits.add(lineno)

self._events.put(
(
"breakpoint",
Expand All @@ -208,6 +280,10 @@ def _trace_callback(self, frame: FrameType, event: str, arg: Any) -> Any:
if self._stopped:
return None

elif event == "return":
# Clean up per-frame tracking when the frame exits.
self._hit_lines.pop(id(frame), None)

return self._trace_callback

except Exception:
Expand All @@ -222,10 +298,16 @@ def start(
input: dict[str, Any] | None,
options: UiPathExecuteOptions | None,
) -> None:
"""Launch delegate.execute() in a traced daemon thread."""
"""Launch delegate.execute() in a traced daemon thread.

Copies the caller's contextvars (including OTEL span context) so
that ``@traced`` decorators in user code produce spans that are
properly linked to the parent trace.
"""
ctx = contextvars.copy_context()
self._thread = threading.Thread(
target=self._run,
args=(delegate, input, options),
target=ctx.run,
args=(self._run, delegate, input, options),
daemon=True,
)
self._thread.start()
Expand Down Expand Up @@ -274,12 +356,15 @@ class UiPathDebugFunctionsRuntime:
Follows the same composition pattern as UiPathDebugRuntime: wraps a UiPathRuntimeProtocol delegate and
intercepts stream() to inject breakpoint behaviour.

When no breakpoints are active every call delegates transparently.
When breakpoints **are** present the delegate's execute() runs in
a background thread with sys.settrace enabled. The trace callback
pauses the thread at matching lines and this runtime yields
UiPathBreakpointResult events with captured local variables.

Additionally emits ``UiPathRuntimeStateEvent`` for every function call
that appears in the entrypoint's call graph, so the debug bridge can
visualise execution flow through the graph nodes.

Works for both sync and async user functions — async functions run in
a dedicated asyncio event loop on the background thread.

Expand All @@ -290,16 +375,21 @@ class UiPathDebugFunctionsRuntime:
entrypoint_path:
Absolute or relative path to the user's entrypoint file. Used to
resolve bare line-number breakpoints (e.g. "42").
function_name:
Name of the entrypoint function. Used together with
*entrypoint_path* to build the call graph for state events.
"""

def __init__(
self,
delegate: UiPathRuntimeProtocol,
entrypoint_path: str | None = None,
function_name: str | None = None,
) -> None:
"""Initialize the debug wrapper with a delegate runtime and optional entrypoint path."""
"""Initialize the debug wrapper."""
self.delegate = delegate
self._entrypoint_path = entrypoint_path
self._function_name = function_name
self._controller: BreakpointController | None = None

async def execute(
Expand All @@ -317,6 +407,9 @@ async def stream(
) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Stream execution events with line-level breakpoint support.

Emits ``UiPathRuntimeStateEvent`` for every call-graph function
entry so the debug bridge can visualise execution flow.

Breakpoint formats (via options.breakpoints):

* "42" — line 42 in the entrypoint file
Expand All @@ -330,21 +423,28 @@ async def stream(
self._controller.update_breakpoints(breakpoints)
self._controller.resume()

event_type, data = await asyncio.to_thread(self._controller.wait_for_event)
yield self._to_runtime_event(event_type, data)
async for event in self._drain_events():
yield event
return

# No breakpoints, transparent delegation
if not breakpoints:
# Build the set of tracked functions from the call graph so we
# can emit state events even without breakpoints.
tracked = self._build_tracked_functions()

# Nothing to trace → transparent delegation. The controller
# path runs delegate.execute() in a background thread with a
# new asyncio event loop, so we only use it when there is
# something to observe (breakpoints and/or state tracking).
if not breakpoints and not tracked:
async for event in self.delegate.stream(input, options):
yield event
return

# First execution with breakpoints
controller = BreakpointController(
project_dir=str(Path.cwd()),
breakpoints=breakpoints,
breakpoints=breakpoints if breakpoints else [],
entrypoint_path=self._entrypoint_path,
state_tracked_functions=tracked,
)
self._controller = controller

Expand All @@ -355,8 +455,8 @@ async def stream(
)
controller.start(self.delegate, input, delegate_options)

event_type, data = await asyncio.to_thread(controller.wait_for_event)
yield self._to_runtime_event(event_type, data)
async for event in self._drain_events():
yield event

async def get_schema(self) -> UiPathRuntimeSchema:
"""Pass-through to delegate."""
Expand All @@ -369,6 +469,55 @@ async def dispose(self) -> None:
self._controller = None
await self.delegate.dispose()

def _build_tracked_functions(self) -> dict[str, set[str]] | None:
"""Build a mapping of abs_file → {func_names} from the call graph.

Returns None when the graph cannot be built (missing path / name).
"""
if not self._entrypoint_path or not self._function_name:
return None

try:
from .graph_builder import build_call_graph

graph = build_call_graph(
self._entrypoint_path,
self._function_name,
project_dir=str(Path.cwd()),
)

tracked: dict[str, set[str]] = {}
for node in graph.nodes:
file_rel = (node.metadata or {}).get("file")
if not file_rel:
continue
abs_path = os.path.abspath(file_rel)
tracked.setdefault(abs_path, set()).add(node.name)

return tracked if tracked else None
except Exception:
logger.debug("Failed to build call graph for state tracking", exc_info=True)
return None

async def _drain_events(self) -> AsyncGenerator[UiPathRuntimeEvent, None]:
"""Drain events from the controller, yielding state events and stopping at a terminal event."""
while self._controller is not None:
event_type, data = await asyncio.to_thread(self._controller.wait_for_event)
if event_type == "state":
yield self._to_state_event(data)
else:
yield self._to_runtime_event(event_type, data)
return

@staticmethod
def _to_state_event(data: dict[str, Any]) -> UiPathRuntimeStateEvent:
"""Convert a trace state event into a UiPathRuntimeStateEvent."""
return UiPathRuntimeStateEvent(
node_name=data["function"],
qualified_node_name=_format_location(data["file"], data["line"]),
payload=data["locals"],
)

def _to_runtime_event(self, event_type: str, data: Any) -> UiPathRuntimeEvent:
"""Convert a BreakpointController event into a UiPathRuntimeEvent."""
if event_type == "breakpoint":
Expand Down
1 change: 1 addition & 0 deletions src/uipath/functions/factory.py
Original file line number Diff line number Diff line change
Expand Up @@ -102,4 +102,5 @@ def _create_runtime(self, entrypoint: str) -> UiPathRuntimeProtocol:
return UiPathDebugFunctionsRuntime(
delegate=inner,
entrypoint_path=str(full_path),
function_name=function_name,
)
Loading